This file is indexed.

/usr/lib/python3/dist-packages/pyutilib/workflow/workflow.py is in python3-pyutilib 5.3.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#  _________________________________________________________________________
#
#  PyUtilib: A Python utility library.
#  Copyright (c) 2008 Sandia Corporation.
#  This software is distributed under the BSD License.
#  Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
#  the U.S. Government retains certain rights in this software.
#  _________________________________________________________________________

# Q: When passing options, these values do not initialize the startup task.
#    Is this a bug?
# TODO: only set option values for variables that show up in a workflow's inputs
# TODO: add graceful management of exceptions
#       show the task tree, etc...

__all__ = ['Workflow']

import argparse
from collections import deque

from pyutilib.workflow.task import Task, EmptyTask, NoTask
from pyutilib.misc import Options

try:
    from collections import OrderedDict
except:
    from ordereddict import OrderedDict

def _collect_parser_groups(t):
    for key in t._parser_group:
        #
        # NOTE: we are changing the properties of the group
        # instances here.  This is OK _only_ because we are
        # printing the help info and then terminating.
        #

        # FIXME: The 'parser' object is not defined in the lines below
        raise NotImplementedError("")
        #t._parser_group[key].parser = parser
        #parser.add_argument_group(t._parser_group[key])

def _set_arguments(t):
    for arg in t._parser_arg:
        args = arg[0]
        kwargs = arg[1]
        try:
            t._parser.add_argument(*args, **kwargs)
        except argparse.ArgumentError:
            pass


class Workflow(Task):

    def __init__(self, id=None, name=None, parser=None):
        Task.__init__(self, id=id, name=name, parser=None)
        self._tasks = {}
        self._start_task = EmptyTask()
        self._final_task = EmptyTask()
        self.add(self._start_task)
        self.add(self._final_task)

    def add(self, task, loadall=True):
        if self.debug:
            print("ADDING",task.id)          #pragma:nocover
        if task.id == NoTask.id:
            return
        if task.id in self._tasks:
            return
        self._tasks[task.id] = task
        if not loadall:
            return
        for name in task.inputs:
            for t in task.inputs[name].from_tasks():
                self.add(t)

            if len(task.inputs[name].from_tasks()) > 0:
                continue

            if not task.inputs[name].constant:
                #
                # Constant input values are not added to the start task
                #
                if not name in self._start_task.outputs:
                    self._start_task.outputs.declare(name)
                    self.inputs.declare(name, optional=True)
                # TODO: this is a bit of a hack...
                val = getattr(task.inputs,name).get_value()
                try:
                    setattr(task.inputs, name, getattr(self._start_task.outputs, name))
                except ValueError:
                    # TBD: when do we get this exception?
                    pass
                getattr(self.inputs,name).set_value(val)
        for name in task.output_controls:
            for c in task.output_controls[name].output_connections:
                self.add(c.to_port.task())
        #
        for name in task.outputs:
            if len(task.outputs[name].output_connections) > 0:
                for c in task.outputs[name].output_connections:
                    self.add(c.to_port.task())
            else:
                if name in self._final_task.inputs:
                    raise ValueError("Cannot declare a workplan with multiple output values that share the same name: %s" % name)
                self.outputs.declare(name)
                self._final_task.inputs.declare(name)
                setattr(self._final_task.inputs, name, task.outputs[name])

    def _call_init(self, *options, **kwds):
        Task._call_init(self, *options, **kwds)
        for i in self.inputs:
            val = self.inputs[i].get_value()
            if val is not None:
                self._start_task.outputs[i].set_value( val )
                self._start_task.outputs[i].set_ready()
        #
        # TBD: this appears to be redundant
        #
        #for key in kwds:
        #    if key not in self._start_task.outputs:
        #        raise ValueError, "Cannot specify value for option %s.  Valid option names are %s" % (key, self._start_task.outputs.keys())
        #    self._start_task.outputs[key].set_value( kwds[key] )

    def _call_fini(self, *options, **kwds):
        ans = Options()
        for key in self._final_task.inputs:
            self._final_task.inputs[key].compute_value()
            ans[key] = self._final_task.inputs[key].get_value()
            getattr(self.outputs, key).set_value( ans[key] )
        for key in self.outputs:
            self.outputs[key].set_ready()
        return ans

    def set_options(self, args):
        self._dfs_([self._start_task.id], lambda t: t.set_options(args))

    def options(self):
        return self._start_task.outputs.keys()

    def print_help(self):
        parser = argparse.ArgumentParser()
        self._dfs_([self._start_task.id], _collect_parser_groups)
        parser.print_help()

    def set_arguments(self, parser=None):
        if parser is None:
            parser = self._parser
        self._dfs_([self._start_task.id], _set_arguments)

    def reset(self):
        return self._dfs_([self._start_task.id], lambda t: t.reset())

    def execute(self):
        #return self._dfs_([self._start_task.id], lambda t: t.__call__())
        if self.debug:                              #pragma:nocover
            print( self.name, '---------------')
            print( self.name, '---------------')
            print( self.name, '   STARTING')
            print( self.name, '---------------')
            print( self.name, '---------------')
        #
        queued = set([self._start_task.id])
        queue = deque([self._start_task])
        waiting = OrderedDict()
        while len(queue)+len(waiting) > 0:
            for id in waiting.keys():
                t = waiting[id]
                if not t.id in queued and t.ready():
                    #
                    if self.debug:                  #pragma:nocover
                        print(self.name, "WAITING: ",t.id," not queued and task ready")
                        print(self.name, "Waiting task",t.name,t.id,t.ready())
                    #
                    queue.append(t)
                    queued.add(t.id)
                    del waiting[t.id]
            if len(queue) == 0:
                break
                # TBD: should we sleep and add a timelimit before raising this exception?
                #if len(waiting) == 0:
                #    break
                #print self.name, "ERROR", waiting.keys()
                #raise RuntimeError, "Workflow failed to terminate normally.  All available tasks are blocked."
            task = queue.popleft()
            #
            if self.debug:                          #pragma:nocover
                print(self.name, "TASK   ",str(task))
                print(self.name, "QUEUE  ",queued)
                print(self.name, "WAITING",waiting.keys())
                print(self.name, "Executing Task "+task.name,task.next_task_ids())
            #
            queued.remove(task.id)
            task()
            for t in task.next_tasks():
                if t.id in queued:
                    continue
                if t.ready():
                    #
                    if self.debug:                  #pragma:nocover
                        print(self.name, "NEXT: ",t.id," not queued and task ready")
                        print(self.name, "Scheduling task",t.name,t.id,t.ready())
                    #
                    queue.append(t)
                    queued.add(t.id)
                    if t.id in waiting:
                        del waiting[t.id]
                else:
                    if not t.id in waiting:
                        waiting[t.id] = t
                    #
                    if self.debug:                  #pragma:nocover
                        print(self.name, "NEXT: ",t.id," not queued and task NOT ready")
                        print(self.name, "Ignoring task",t.name, t.id,t.ready())
            if self.debug:                          #pragma:nocover
                print(self.name, "FINAL QUEUE  ",queued)
                print(self.name, "FINAL WAITING",waiting.keys())
                print(self.name, '---------------')
                print(self.name, "    LOOP")
                print(self.name, '---------------')
        if self.debug:                              #pragma:nocover
            print(self.name, '---------------')

    def __str__(self):
        return "\n".join(["Workflow %s:" % self.name]+self._dfs_([self._start_task.id], lambda t: t._name()))

    def __repr__(self):
        return "Workflow %s:\n" % self.name+Task.__repr__(self)+'\n'+"\n".join(self._dfs_([self._start_task.id], lambda t: str(t)))

    def _dfs_(self, indices, fn, touched=None):
        if touched is None:
            touched = set()
        ans=[]
        for i in indices:
            if i in touched:
                # With this design, this condition should never be triggered
                # TODO: verify that this is an O(n) search algorithm; I think it's
                # O(n^2)
                continue        #pragma:nocover
            ok=True
            task = self._tasks[i]
            for j in task.prev_task_ids():
                if (j == NoTask.id) or (j in touched):
                    continue
                ok=False
                break
            if not ok:
                continue
            tmp = fn(task)
            if tmp is not None:
                ans.append(tmp)
            touched.add(i)
            ans = ans + self._dfs_(task.next_task_ids(), fn, touched)
        return ans