This file is indexed.

/usr/lib/python3/dist-packages/pyutilib/workflow/tasks.py is in python3-pyutilib 5.3.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
#  _________________________________________________________________________
#
#  PyUtilib: A Python utility library.
#  Copyright (c) 2008 Sandia Corporation.
#  This software is distributed under the BSD License.
#  Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
#  the U.S. Government retains certain rights in this software.
#  _________________________________________________________________________

__all__ = ['TaskPlugin', 'TaskFactory', 'WorkflowPlugin']

from pyutilib.workflow import port
from pyutilib.workflow import task
from pyutilib.workflow import workflow
from pyutilib.component.core import *


class IWorkflowTask(Interface):
    pass


TaskFactory = CreatePluginFactory(IWorkflowTask)


class TaskPlugin(Plugin, task.Task):

    implements(IWorkflowTask)

    def __init__(self, *args, **kwds):      #pragma:nocover
        Plugin.__init__(self, *args, **kwds)
        task.Task.__init__(self, *args, **kwds)

    def __repr__(self):
        return task.Task.__repr__(self)     #pragma:nocover


class WorkflowPlugin(Plugin, workflow.Workflow):

    implements(IWorkflowTask)

    def __init__(self, *args, **kwds):      #pragma:nocover
        Plugin.__init__(self, *args, **kwds)
        workflow.Workflow.__init__(self, *args, **kwds)

    def __repr__(self):                     #pragma:nocover
        return workflow.Workflow.__repr__(self)


class Selection_Task(TaskPlugin):

    alias('workflow.selection')

    def __init__(self, *args, **kwds):
        TaskPlugin.__init__(self, *args, **kwds)
        #
        self.inputs.declare('index')
        self.inputs.declare('data')
        #
        self.outputs.declare('selection')

    def execute(self):
        self.selection = self.data[self.index]


class Switch_Task(TaskPlugin):

    alias('workflow.switch')

    def __init__(self, *args, **kwds):
        TaskPlugin.__init__(self, *args, **kwds)
        self._branches = {}
        self.inputs.declare('value')

    def add_branch(self, value, task):
        self._branches[value] = 'task'+str(task.id)
        self.output_controls.declare(self._branches[value])

        task.input_controls.declare('task'+str(self.id))
        setattr(task.input_controls, 'task'+str(self.id), self.output_controls[ self._branches[value] ])

    def execute(self):
        flag=False
        for key in self._branches:
            if self.value == key:
                self.output_controls[self._branches[key]].set_ready()
                flag=True
            else:
                self.output_controls[self._branches[key]].reset()
        if not flag:
            raise ValueError("Branch condition has value '%s' but no branch is indexed with that value.\n    Valid branch indices: %s" % (str(self.value), sorted(self._branches.keys())))

    def __repr__(self):
        return TaskPlugin.__repr__(self)        #pragma:nocover


class IfThen_Task(Switch_Task):

    alias('workflow.branch')

    def __init__(self, *args, **kwds):
        Switch_Task.__init__(self, *args, **kwds)

    def __repr__(self):
        return Switch_Task.__repr__(self)       #pragma:nocover