This file is indexed.

/usr/lib/python3/dist-packages/pyutilib/workflow/functor.py is in python3-pyutilib 5.3.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
"""
Defining a Functor-specific task class
"""

__all__ = ['functor_api', 'IFunctorTask', 'FunctorAPIFactory', 'FunctorAPIData']

import sys
import inspect
import logging

from pyutilib.workflow.tasks import TaskPlugin
import pyutilib.component.core
import pyutilib.misc

logger = logging.getLogger('pyutilib.workflow')

class FunctorAPIData(dict):
    """
    A generalization of pyutilib.misc.Bunch.  This class counts access to attributes, and
    it generates errors for undefined attributes.
    """

    def __init__(self, **kw):
        dict.__init__(self,kw)
        self.__dict__.update(kw)
        self._declared_ = set()
        self.clean()

    def clean(self):
        self._dirty_ = set()

    def unused(self):
        for k, v in self.__dict__.items():
            if not k in self._dirty_:
                yield k

    def declare(self, args):
        if type(args) in (list, tuple, set):
            self._declared_.update(args)
        else:
            self._declared_.add(args)

    def update(self, d):
        self.__dict__.update(d)

    def __setitem__(self, name, val):
        self.__setattr__(name,val)

    def __getitem__(self, name):
        return self.__getattr__(name)

    def __setattr__(self, name, val):
        if name[0] != '_':
            if len(self._declared_) > 0 and not name in self._declared_:
                raise AttributeError("Undeclared attribute '%s'" % name)
            self._dirty_.add(name)
            dict.__setitem__(self, name, val)
        self.__dict__[name] = val

    def __getattr__(self, name):
        if len(self._declared_) > 0 and not name in self._declared_:
            raise AttributeError("Undeclared attribute '%s'" % name)
        try:
            self._dirty_.add(name)
            return dict.__getitem__(self, name)
        except:
            if name[0] == '_':
                raise AttributeError("Unknown attribute %s" % name)
        return None

    def __repr__(self):
        return dict.__repr__(self)

    def __str__(self, nesting = 0, indent=''):
        attrs = []
        indentation = indent+"    " * nesting
        for k, v in self.__dict__.items():
            if not k.startswith("_"):
                text = [indentation, k, ":"]
                if isinstance(v, FunctorAPIData):
                    text.append('\n')
                    text.append(v.__str__(nesting + 1))
                else:
                    text.append(' '+str(v))
                attrs.append("".join(text))
        attrs.sort()
        return "\n".join(attrs)

    

class IFunctorTask(pyutilib.component.core.Interface):
    """Interface for Functor tasks"""

FunctorAPIFactory = pyutilib.component.core.CreatePluginFactory(IFunctorTask)


class FunctorTask(TaskPlugin):

    def __init__(self, *args, **kwargs):
        self._fn = kwargs.pop('fn', None)
        #
        TaskPlugin.__init__(self, *args, **kwargs)

    def execute(self, debug=False):
        if self._fn is None:            #pragma:nocover
            raise RuntimeError("This is a bad definition of a FunctorTask.  The '_fn' method is not defined")
        #
        # Process data
        #
        data = self._kwds.get('data', None)
        if not data is None and type(data) is dict:
            _data = FunctorAPIData()
            _data.update(data)
            self._kwds['data'] = _data
        data = self._kwds.get('data', None)
        #
        # Test nested data
        #
        def nested_lookup(kwds, lookup):
            lookups = lookup.split('.')
            obj = kwds[lookups[0]]
            if not data is None and lookups[0] == 'data':
                data.declare(lookups[1])
            for key in lookups[1:]:
                #print key, obj
                obj = obj[key]
            if obj is None:
                raise ValueError
        for name in self._nested_requirements:
            try:
                nested_lookup(self._kwds, name)
            except ValueError:
                raise RuntimeError("None value found for nested attribute '%s'" % name)
            except:
                raise RuntimeError("Failed to verify existence of nested attribute '%s'" % name)
        #
        # Call _fn
        #
        if 'data' in self._kwargs:
            data = self._kwds.get('data', None)
            retval = self._fn(**self._kwds)
        else:
            data = self._kwds.pop('data')
            retval = self._fn(data, **self._kwds)
        #
        # Process retval
        #
        if retval is None or id(data) == id(retval):
            self._retval = FunctorAPIData(data=data)
        elif isinstance(retval, FunctorAPIData):
            if not id(data) == id(retval):
                retval.data = data
            self._retval = retval
        elif isinstance(retval, dict):
            self._retval = FunctorAPIData()
            self._retval.update(retval)
        else:
            raise RuntimeError("A Functor task function must return either None, a FunctorAPIData object, or an instance of dict.")

    def _call_start(self):
        self.reset()

    def _call_init(self, *options, **kwds):
        if not 'data' in kwds:
            if len(options) > 0:
                if len(options) > 1:
                    raise RuntimeError("A FunctorTask instance can only be executed with a single non-keyword argument")
                kwds['data'] = options[0]
                #options = options[1:]
            elif not self.inputs.data.optional:
                raise RuntimeError("A FunctorTask instance must be executed with at 'data' argument")
        self._kwds = kwds
        return TaskPlugin._call_init(self, **kwds)

    def _call_fini(self, *options, **kwds):
        for key in self._retval:
            if not key in self.outputs:
                raise RuntimeError("Cannot return value '%s' that is not a predefined output of a Functor task" % key)
            setattr(self, key, self._retval[key])
        TaskPlugin._call_fini(self, *options, **kwds)
        return self._retval


#
# Decorate functions that are Functor tasks
#
def functor_api(fn=None, implements=None, outputs=None, namespace=None):

    def my_decorator(fn):
        if fn is None:                                  #pragma:nocover
            logger.error("Error applying decorator.  No function value!")
            return

        if namespace is None:
            _alias =  fn.__name__
        else:
            _alias =  namespace+'.'+fn.__name__
        _name = _alias.replace('_', '.')

        argspec = inspect.getargspec(fn)
        if sys.version_info < (2,6):
            argspec = pyutilib.misc.Bunch(args=argspec[0], varargs=argspec[1], keywords=argspec[2], defaults=argspec[3])
        if not argspec.varargs is None:
            logger.error("Attempting to declare Functor task with function '%s' that contains variable arguments" % _alias)
            return                                      #pragma:nocover
        if not argspec.keywords is None:
            logger.error("Attempting to declare Functor task with function '%s' that contains variable keyword arguments" % _alias)
            return                                      #pragma:nocover

        if _alias in FunctorAPIFactory.services():
            logger.error("Cannot define API %s, since this API name is already defined" % _alias)
            return                                      #pragma:nocover

        class TaskMeta(pyutilib.component.core.PluginMeta):
            def __new__(cls, name, bases, d):
                return pyutilib.component.core.PluginMeta.__new__(cls, "FunctorTask_"+str(_name), bases, d)


        class FunctorTask_tmp(FunctorTask):

            __metaclass__ = TaskMeta

            pyutilib.component.core.alias(_alias)

            pyutilib.component.core.implements(IFunctorTask)

            def __init__(self, *args, **kwargs):
                kwargs['fn'] = fn
                FunctorTask.__init__(self, *args, **kwargs)
                if not fn is None:
                    if len(argspec.args) is 0:
                        nargs = 0
                    elif argspec.defaults is None:
                        nargs = len(argspec.args)
                    else:
                        nargs = len(argspec.args) - len(argspec.defaults)
                    self._kwargs = argspec.args[nargs:]
                    if nargs != 1 and 'data' not in self._kwargs:
                        logger.error("Functor '%s' must have a 'data argument" % _alias)
                    if argspec.defaults is None:
                        _defaults = {}
                    else:
                        _defaults = dict(zip(argspec.args[nargs:], argspec.defaults))
                    #
                    docinfo = parse_docstring(fn)
                    #
                    if 'data' in docinfo['optional']:
                        self.inputs.declare('data', doc='A container of labeled data.', optional=True)
                    else:
                        self.inputs.declare('data', doc='A container of labeled data.')
                    for name in argspec.args[nargs:]:
                        if name in docinfo['optional']:
                            self.inputs.declare(name, optional=True, default=_defaults[name], doc=docinfo['optional'][name])
                        elif name in docinfo['required']:
                            self.inputs.declare(name, doc=docinfo['required'][name])
                        elif name != 'data':
                            logger.error("Argument '%s' is not specified in the docstring!" % name)
                    #
                    self.outputs.declare('data', doc='A container of labeled data.')
                    if outputs is None:
                        _outputs = list(docinfo['return'].keys())
                    else:
                        _outputs = outputs
                    for name in _outputs:
                        if name in docinfo['return']:
                            self.outputs.declare(name, doc=docinfo['return'][name])
                        else:
                            logger.error("Return value '%s' is not specified in the docstring!" % name)
                    #
                    self._nested_requirements = []
                    for name in docinfo['required']:
                        if '.' in name:
                            self._nested_requirements.append(name)
                    #
                    #  Error check keys for docinfo
                    #
                    for name in docinfo['required']:
                        if '.' in name:
                            continue
                        if not name in self.inputs:
                            logger.error("Unexpected name '%s' in list of required inputs for functor '%s'" % (name,_alias))
                    for name in docinfo['optional']:
                        if not name in self.inputs:
                            logger.error("Unexpected name '%s' in list of optional inputs for functor '%s'" % (name,_alias))
                    for name in docinfo['return']:
                        if not name in self.outputs:
                            logger.error("Unexpected name '%s' in list of outputs for functor '%s'" % (name,_alias))
                    #
                    self.__help__ = fn.__doc__
                    self.__doc__ = fn.__doc__
                    self.__short_doc__ = docinfo['short_doc'].strip()
                    self.__long_doc__ = docinfo['long_doc'].strip()
                    self.__namespace__ = namespace

        return FunctorTask_tmp()

    if fn is None:
        return my_decorator
    return my_decorator(fn)


def parse_docstring(fn):
    """Parse a function docstring for information about the function arguments and return values"""
    retval = {}
    retval['short_doc'] = ""
    retval['long_doc'] = None
    retval['required'] = {}
    retval['optional'] = {}
    retval['return'] = {}
    curr = None
    doc = inspect.getdoc(fn)
    if doc is None:
        retval['long_doc'] = ''
        return retval
    for line in doc.split('\n'):
        line = line.strip()
        if line == 'Required:' or line == 'Required Arguments:':
            curr = 'required'
        elif line == 'Optional:' or line == 'Optimal Arguments:':
            curr = 'optional'
        elif line == 'Return Values:' or line == 'Return:' or line == 'Returned:':
            curr = 'return'
        elif curr is None:
            if retval['long_doc'] is None:
                if line == '':
                    retval['long_doc'] = ''
                else:
                    retval['short_doc'] += line
                    retval['short_doc'] += '\n'
            else:
                retval['long_doc'] += line
                retval['long_doc'] += '\n'
        elif ':' in line:
            name, desc = line.split(':', 1)
            retval[curr][name.strip()] = desc.strip() + '\n'
        else:
            retval[curr][name.strip()] += line + '\n'
    if retval['long_doc'] is None:
        retval['long_doc'] = ''
    return retval