This file is indexed.

/usr/share/pyshared/celery/backends/redis.py is in python-celery 2.5.3-4.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# -*- coding: utf-8 -*-
from __future__ import absolute_import

from ..exceptions import ImproperlyConfigured
from ..utils import cached_property

from .base import KeyValueStoreBackend

try:
    import redis
    from redis.exceptions import ConnectionError
except ImportError:
    redis = None            # noqa
    ConnectionError = None  # noqa


class RedisBackend(KeyValueStoreBackend):
    """Redis task result store."""

    #: redis-py client module.
    redis = redis

    #: default Redis server hostname (`localhost`).
    host = "localhost"

    #: default Redis server port (6379)
    port = 6379

    #: default Redis db number (0)
    db = 0

    #: default Redis password (:const:`None`)
    password = None

    #: Maximium number of connections in the pool.
    max_connections = None

    supports_native_join = True

    def __init__(self, host=None, port=None, db=None, password=None,
            expires=None, max_connections=None, **kwargs):
        super(RedisBackend, self).__init__(**kwargs)
        conf = self.app.conf
        if self.redis is None:
            raise ImproperlyConfigured(
                    "You need to install the redis library in order to use "
                  + "Redis result store backend.")

        # For compatibility with the old REDIS_* configuration keys.
        def _get(key):
            for prefix in "CELERY_REDIS_%s", "REDIS_%s":
                try:
                    return conf[prefix % key]
                except KeyError:
                    pass

        self.host = host or _get("HOST") or self.host
        self.port = int(port or _get("PORT") or self.port)
        self.db = db or _get("DB") or self.db
        self.password = password or _get("PASSWORD") or self.password
        self.expires = self.prepare_expires(expires, type=int)
        self.max_connections = (max_connections
                                or _get("MAX_CONNECTIONS")
                                or self.max_connections)

    def get(self, key):
        return self.client.get(key)

    def mget(self, keys):
        return self.client.mget(keys)

    def set(self, key, value):
        client = self.client
        if self.expires is not None:
            client.setex(key, value, self.expires)
        else:
            client.set(key, value)
        client.publish(key, value)

    def delete(self, key):
        self.client.delete(key)

    def on_chord_apply(self, setid, body, result=None, **kwargs):
        self.app.TaskSetResult(setid, result).save()

    def on_chord_part_return(self, task, propagate=False):
        from ..task.sets import subtask
        from ..result import TaskSetResult
        setid = task.request.taskset
        if not setid:
            return
        key = self.get_key_for_chord(setid)
        deps = TaskSetResult.restore(setid, backend=task.backend)
        if self.client.incr(key) >= deps.total:
            subtask(task.request.chord).delay(deps.join(propagate=propagate))
            deps.delete()
            self.client.delete(key)
        else:
            self.client.expire(key, 86400)

    @cached_property
    def client(self):
        pool = self.redis.ConnectionPool(host=self.host, port=self.port,
                                         db=self.db, password=self.password,
                                         max_connections=self.max_connections)
        return self.redis.Redis(connection_pool=pool)

    def __reduce__(self, args=(), kwargs={}):
        kwargs.update(
            dict(host=self.host,
                 port=self.port,
                 db=self.db,
                 password=self.password,
                 expires=self.expires,
                 max_connections=self.max_connections))
        return super(RedisBackend, self).__reduce__(args, kwargs)