/usr/share/pyshared/celery/task/base.py is in python-celery 2.4.6-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | # -*- coding: utf-8 -*-
"""
celery.task.base
~~~~~~~~~~~~~~~~
The task implementation has been moved to :mod:`celery.app.task`.
:copyright: (c) 2009 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
from .. import current_app
from ..app.task import Context, TaskType, BaseTask # noqa
from ..schedules import maybe_schedule
from ..utils import deprecated, timeutils
Task = current_app.Task
@deprecated("Importing TaskSet from celery.task.base",
alternative="Use celery.task.TaskSet instead.",
removal="2.4")
def TaskSet(*args, **kwargs):
from celery.task.sets import TaskSet
return TaskSet(*args, **kwargs)
@deprecated("Importing subtask from celery.task.base",
alternative="Use celery.task.subtask instead.",
removal="2.4")
def subtask(*args, **kwargs):
from celery.task.sets import subtask
return subtask(*args, **kwargs)
class PeriodicTask(Task):
"""A periodic task is a task that behaves like a :manpage:`cron` job.
Results of periodic tasks are not stored by default.
.. attribute:: run_every
*REQUIRED* Defines how often the task is run (its interval),
it can be a :class:`~datetime.timedelta` object, a
:class:`~celery.schedules.crontab` object or an integer
specifying the time in seconds.
.. attribute:: relative
If set to :const:`True`, run times are relative to the time when the
server was started. This was the previous behaviour, periodic tasks
are now scheduled by the clock.
:raises NotImplementedError: if the :attr:`run_every` attribute is
not defined.
Example
>>> from celery.task import tasks, PeriodicTask
>>> from datetime import timedelta
>>> class EveryThirtySecondsTask(PeriodicTask):
... run_every = timedelta(seconds=30)
...
... def run(self, **kwargs):
... logger = self.get_logger(**kwargs)
... logger.info("Execute every 30 seconds")
>>> from celery.task import PeriodicTask
>>> from celery.schedules import crontab
>>> class EveryMondayMorningTask(PeriodicTask):
... run_every = crontab(hour=7, minute=30, day_of_week=1)
...
... def run(self, **kwargs):
... logger = self.get_logger(**kwargs)
... logger.info("Execute every Monday at 7:30AM.")
>>> class EveryMorningTask(PeriodicTask):
... run_every = crontab(hours=7, minute=30)
...
... def run(self, **kwargs):
... logger = self.get_logger(**kwargs)
... logger.info("Execute every day at 7:30AM.")
>>> class EveryQuarterPastTheHourTask(PeriodicTask):
... run_every = crontab(minute=15)
...
... def run(self, **kwargs):
... logger = self.get_logger(**kwargs)
... logger.info("Execute every 0:15 past the hour every day.")
"""
abstract = True
ignore_result = True
type = "periodic"
relative = False
options = None
def __init__(self):
app = current_app
if not hasattr(self, "run_every"):
raise NotImplementedError(
"Periodic tasks must have a run_every attribute")
self.run_every = maybe_schedule(self.run_every, self.relative)
# For backward compatibility, add the periodic task to the
# configuration schedule instead.
app.conf.CELERYBEAT_SCHEDULE[self.name] = {
"task": self.name,
"schedule": self.run_every,
"args": (),
"kwargs": {},
"options": self.options or {},
"relative": self.relative,
}
super(PeriodicTask, self).__init__()
def timedelta_seconds(self, delta):
"""Convert :class:`~datetime.timedelta` to seconds.
Doesn't account for negative timedeltas.
"""
return timeutils.timedelta_seconds(delta)
def is_due(self, last_run_at):
"""Returns tuple of two items `(is_due, next_time_to_run)`,
where next time to run is in seconds.
See :meth:`celery.schedules.schedule.is_due` for more information.
"""
return self.run_every.is_due(last_run_at)
def remaining_estimate(self, last_run_at):
"""Returns when the periodic task should run next as a timedelta."""
return self.run_every.remaining_estimate(last_run_at)
|