This file is indexed.

/usr/lib/python3/dist-packages/pyutilib/pyro/dispatcher.py is in python3-pyutilib 5.3.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
#  _________________________________________________________________________
#
#  PyUtilib: A Python utility library.
#  Copyright (c) 2008 Sandia Corporation.
#  This software is distributed under the BSD License.
#  Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
#  the U.S. Government retains certain rights in this software.
#  _________________________________________________________________________

__all__ = ['Dispatcher', 'DispatcherServer']

import os
import sys
import uuid
from collections import defaultdict

from pyutilib.pyro import get_nameserver, using_pyro3, using_pyro4
from pyutilib.pyro import Pyro as _pyro
from pyutilib.pyro.util import set_maxconnections, get_dispatchers

if sys.version_info >= (3,0):
    import queue as Queue
else:
    import Queue

from six import iteritems

if using_pyro3:
    base = _pyro.core.ObjBase
    oneway = lambda method: method
elif using_pyro4:
    base = object
    oneway = _pyro.oneway
else:
    base = object
    oneway = lambda method: method

def _clear_queue_threadsafe(q):
    while not q.empty():
        try:
            q.get(False)
        except Queue.Empty:
            continue
        q.task_done()

class Dispatcher(base):

    def __init__(self, **kwds):
        if _pyro is None:
            raise ImportError("Pyro or Pyro4 is not available")
        if using_pyro3:
            _pyro.core.ObjBase.__init__(self)
        self._task_queue = defaultdict(Queue.Queue)
        self._result_queue = defaultdict(Queue.Queue)
        self._verbose = kwds.pop("verbose", False)
        self._registered_workers = set()
        self._acquired_workers = set()
        self._worker_limit = kwds.pop("worker_limit", None)
        if self._verbose:
           print("Verbose output enabled...")

    #
    # One-way methods (Pyro4 only)
    #

    @oneway
    def release_acquired_workers(self, names):
        names = set(names)
        if not names.issubset(self._registered_workers):
            raise ValueError("List contains one or more worker names that "
                             "were not registered")
        self._acquired_workers.difference_update(names)

    @oneway
    def unregister_worker(self, name):
        if name not in self._registered_workers:
            raise ValueError("Worker name '%s' has not been registered")
        if self._verbose:
            print("Unregistering worker with name: %s" % (name))
        self._registered_workers.remove(name)

    @oneway
    def shutdown(self):
        print("Dispatcher received request to shut down - initiating...")
        if using_pyro3:
            self.getDaemon().shutdown()
        else:
            self._pyroDaemon.shutdown()

    @oneway
    def add_task(self, task, type=None):
        if self._verbose:
           print("Received request to add task=<Task id="
                 +str(task['id'])+">; queue type="+str(type))
        self._task_queue[type].put(task)

    # process a set of tasks in one shot - the input
    # is a dictionary from queue type (including None)
    # to a list of tasks to be added to that queue.
    @oneway
    def add_tasks(self, tasks):
        if self._verbose:
           print("Received request to add bulk task set. Task ids=%s"
                 % (dict((task_type, [task['id'] for task in tasks[task_type]])
                         for task_type in tasks)))
        for task_type in tasks:
            task_queue = self._task_queue[task_type]
            for task in tasks[task_type]:
                task_queue.put(task)

    @oneway
    def add_result(self, result, type=None):
        if self._verbose:
           print("Received request to add result with "
                 "result="+str(result)+"; queue type="+str(type))
        self._result_queue[type].put(result)

    # process a set of results in one shot - the input
    # is a dictionary from queue type (including None)
    # to a list of results to be added to that queue.
    @oneway
    def add_results(self, results):
        if self._verbose:
            print("Received request to add bulk result set for task ids=%s"
                 % (dict((result_type, [result['id']
                                        for result in results[result_type]])
                         for result_type in results)))
        for result_type in results:
            result_queue = self._result_queue[result_type]
            for result in results[result_type]:
                result_queue.put(result)

    #
    # Methods that do not return anything but are
    # not marked oneway for Pyro4 to avoid race conditions
    #

    def clear_queue(self, type=None):
        if self._verbose:
           print("Received request to clear task and result "
                 "queues for queue type="+str(type))

        try:
            _clear_queue_threadsafe(self._task_queue[type])
        except KeyError:
            pass
        try:
            _clear_queue_threadsafe(self._result_queue[type])
        except KeyError:
            pass

    def clear_queues(self, types):
        for type in types:
            self.clear_queue(type=type)

    def clear_all_queues(self):
        self._task_queue = defaultdict(Queue.Queue)
        self._result_queue = defaultdict(Queue.Queue)

    def clear_task_queue(self, type=None):
        if self._verbose:
           print("Received request to clear task "
                 "queue for queue type="+str(type))
        try:
            _clear_queue_threadsafe(self._task_queue[type])
        except KeyError:
            pass

    def clear_task_queues(self, types):
        for type in types:
            self.clear_task_queue(type=type)

    def clear_all_task_queues(self):
        self._task_queue = defaultdict(Queue.Queue)

    def clear_result_queue(self, type=None):
        if self._verbose:
           print("Received request to clear result "
                 "queue for queue type="+str(type))
        try:
            _clear_queue_threadsafe(self._result_queue[type])
        except KeyError:
            pass

    def clear_result_queues(self, types):
        for type in types:
            self.clear_result_queue(type=type)

    def clear_all_result_queues(self):
        self._result_queue = defaultdict(Queue.Queue)

    #
    # Methods that do return something, so can't
    # be marked as oneway for Pyro4
    #

    def acquire_available_workers(self):
        worker_names = self._registered_workers - self._acquired_workers
        self._acquired_workers.update(worker_names)
        return worker_names

    def register_worker(self, name):
        if name in self._registered_workers:
            raise ValueError("Worker name '%s' has already been registered")
        if (self._worker_limit is None) or \
           (len(self._registered_workers) < self._worker_limit):
            self._registered_workers.add(name)
            if self._verbose:
                print("Registering worker %s with name: %s"
                      % (len(self._registered_workers), name))
            return True
        return False

    def get_task(self, type=None, block=True, timeout=5):
        if self._verbose:
           print("Received request to get a task from "
                 "queue type="+str(type)+"; block="+str(block)+
                 "; timeout="+str(timeout)+" seconds")
        try:
            task = self._task_queue[type].get(block=block,
                                              timeout=timeout)
            return task
        except Queue.Empty:
            return None

    def get_tasks(self, type_block_timeout_list):
        if self._verbose:
           print("Received request to get tasks in bulk. "
                 "Queue request types="+str(type_block_timeout_list))

        ret = {}
        for type, block, timeout in type_block_timeout_list:
            task_list = []
            try:
                task_list.append(self._task_queue[type].get(block=block,
                                                            timeout=timeout))
            except Queue.Empty:
                pass
            else:
                while self._task_queue[type].qsize():
                    try:
                        task_list.append(self._task_queue[type].get(block=block,
                                                                    timeout=timeout))
                    except Queue.Empty:
                        pass
            if len(task_list) > 0:
                ret.setdefault(type, []).extend(task_list)

        return ret

    def get_result(self, type=None, block=True, timeout=5):
        if self._verbose:
           print("Received request to get a result from "
                 "queue type="+str(type)+"; block="+str(block)+
                 "; timeout="+str(timeout))
        try:
            return self._result_queue[type].get(block=block,
                                                timeout=timeout)
        except Queue.Empty:
            return None

    def get_results(self, type_block_timeout_list):
        if self._verbose:
           print("Received request to get results in bulk. "
                 "Queue request types="+str(type_block_timeout_list))

        ret = {}
        for type_name, block, timeout in type_block_timeout_list:
            result_list = []
            try:
                result_list.append(self._result_queue[type_name].get(block=block,
                                                                     timeout=timeout))
            except Queue.Empty:
                pass
            else:
                while self._result_queue[type_name].qsize():
                    try:
                        result_list.append(
                            self._result_queue[type_name].get(block=block,
                                                              timeout=timeout))
                    except Queue.Empty:
                        pass
            if len(result_list) > 0:
                ret.setdefault(type_name, []).extend(result_list)

        return ret

    def num_tasks(self, type=None):
        if self._verbose:
           print("Received request for number of tasks in "
                 "queue with type="+str(type))
        return self._task_queue[type].qsize()

    def num_results(self, type=None):
        if self._verbose:
           print("Received request for number of results in "
                 "queue with type="+str(type))
        return self._result_queue[type].qsize()

    def queues_with_results(self):
        if self._verbose:
           print("Received request for the set of queues with results")

        results = []
        #
        # Iterate over a copy of the contents of the queue, since
        # the queue may change while iterating.
        #
        for queue_name, result_queue in list(self._result_queue.items()):
           if result_queue.qsize() > 0:
               results.append(queue_name)

        return results

    def get_results_all_queues(self):

        if self._verbose:
           print("Received request to obtain all available "
                 "results from all queues")

        results = []
        #
        # Iterate over a copy of the contents of the queue, since
        # the queue may change while iterating.
        #
        for queue_name, result_queue in list(self._result_queue.items()):
            while result_queue.qsize() > 0:
                try:
                    results.append(result_queue.get(block=False, timeout=0))
                except Queue.Empty:
                    pass
        return results

def DispatcherServer(group=":PyUtilibServer",
                     daemon_host=None,
                     daemon_port=0,
                     nameserver_host=None,
                     nameserver_port=None,
                     verbose=False,
                     max_allowed_connections=None,
                     worker_limit=None,
                     clear_group=True):

    set_maxconnections(max_allowed_connections=max_allowed_connections)

    #
    # main program
    #
    ns = get_nameserver(host=nameserver_host, port=nameserver_port, caller_name="Dispatcher")

    if clear_group:
        for name, uri in get_dispatchers(group=group, ns=ns):
            print("Multiple dispatchers not allowed.")
            print("dispatch_srvr is shutting down...")
            return 1

    if using_pyro3:
        daemon = _pyro.core.Daemon(host=daemon_host, port=daemon_port)
        daemon.useNameServer(ns)
    else:
        daemon = _pyro.Daemon(host=daemon_host, port=daemon_port)

    if using_pyro3:
        try:
            ns.createGroup(group)
        except _pyro.errors.NamingError:
            pass
        try:
            ns.createGroup(group+".dispatcher")
        except _pyro.errors.NamingError:
            pass
        if clear_group:
            try:
                ns.unregister(group+".dispatcher")
            except _pyro.errors.NamingError:
                pass
    else:
        if clear_group:
            try:
                ns.remove(group+".dispatcher")
            except _pyro.errors.NamingError:
                pass

    disp = Dispatcher(verbose=verbose,
                      worker_limit=worker_limit)
    proxy_name = group+".dispatcher."+str(uuid.uuid4())
    if using_pyro3:
        uri = daemon.connect(disp, proxy_name)
    else:
        uri = daemon.register(disp, proxy_name)
        ns.register(proxy_name, uri)

    # There is no need to retain the proxy connection to the
    # nameserver, so free up resources on the nameserver thread
    if using_pyro4:
        ns._pyroRelease()
    else:
        ns._release()

    print("Dispatcher is ready.")
    return daemon.requestLoop()