This file is indexed.

/usr/lib/python3/dist-packages/pyutilib/pyro/util.py is in python3-pyutilib 5.3.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
#  _________________________________________________________________________
#
#  PyUtilib: A Python utility library.
#  Copyright (c) 2008 Sandia Corporation.
#  This software is distributed under the BSD License.
#  Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
#  the U.S. Government retains certain rights in this software.
#  _________________________________________________________________________

__all__ = ('get_nameserver', 'get_dispatchers', 'shutdown_pyro_components')

import os
import sys
import time
import random
import socket

from pyutilib.pyro import using_pyro3, using_pyro4
from pyutilib.pyro import Pyro as _pyro

if sys.version_info >= (3,0):
    xrange = range
    import queue as Queue
else:
    import Queue

_connection_problem = None
if using_pyro3:
    _connection_problem = _pyro.errors.ProtocolError
elif using_pyro4:
    _connection_problem = _pyro.errors.TimeoutError

def get_nameserver(host=None,
                   port=None,
                   num_retries=30,
                   caller_name="Unknown"):

    if _pyro is None:
        raise ImportError("Pyro or Pyro4 is not available")

    timeout_upper_bound = 5.0

    if not host is None:
        os.environ['PYRO_NS_HOSTNAME'] = host
    elif 'PYRO_NS_HOSTNAME' in os.environ:
        host = os.environ['PYRO_NS_HOSTNAME']

    # Deprecated in Pyro3
    # Removed in Pyro4
    if using_pyro3:
        _pyro.core.initServer()

    ns = None

    for i in xrange(0, num_retries+1):
        try:
            if using_pyro3:
                ns = _pyro.naming.NameServerLocator().getNS(host=host, port=port)
            else:
                ns = _pyro.locateNS(host=host, port=port)
            break
        except _pyro.errors.NamingError:
            pass
        except _connection_problem:
            # this can occur if the server is too busy.
            pass

        # we originally had a single sleep timeout value, hardcoded to 1 second.
        # the problem with this approach is that if a large number of concurrent
        # processes fail, then they will all re-attempt at roughly the same
        # time. causing more contention than is necessary / desirable. by randomizing
        # the sleep interval, we are hoping to distribute the number of clients
        # attempting to connect to the name server at any given time.
        # TBD: we should eventually read the timeout upper bound from an enviornment
        #      variable - to support cases with a very large (hundreds to thousands)
        #      number of clients.
        if i < num_retries:
            sleep_interval = random.uniform(1.0, timeout_upper_bound)
            print("%s failed to locate name server after %d attempts - "
                  "trying again in %5.2f seconds."
                  % (caller_name, i+1, sleep_interval))
            time.sleep(sleep_interval)

    if ns is None:
        print("%s could not locate nameserver (attempts=%d)"
              % (caller_name,num_retries+1))
        raise SystemExit

    return ns

def get_dispatchers(group=":PyUtilibServer",
                    host=None,
                    port=None,
                    num_dispatcher_tries=30,
                    min_dispatchers=1,
                    caller_name=None,
                    ns=None):

    if ns is None:
        ns = get_nameserver(host=host, port=port, caller_name=caller_name)
    else:
        assert caller_name is None
        assert host is None
        assert port is None

    if ns is None:
        raise RuntimeError("Failed to locate Pyro name "
                           "server on the network!")

    cumulative_sleep_time = 0.0
    dispatchers = []
    for i in xrange(0,num_dispatcher_tries):
        ns_entries = None
        if using_pyro3:
            for (name,uri) in ns.flatlist():
                if name.startswith(":PyUtilibServer.dispatcher."):
                    if (name,uri) not in dispatchers:
                        dispatchers.append((name, uri))
        elif using_pyro4:
            for name in ns.list(prefix=":PyUtilibServer.dispatcher."):
                uri = ns.lookup(name)
                if (name,uri) not in dispatchers:
                    dispatchers.append((name, uri))
        if len(dispatchers) >= min_dispatchers:
            break
    return dispatchers

#
# a utility for shutting down Pyro-related components, which at the
# moment is restricted to the name server and any dispatchers. the
# mip servers will come down once their dispatcher is shut down.
# NOTE: this is a utility that should eventually become part of
#       pyutilib.pyro, but because is prototype, I'm keeping it
#       here for now.
#

def shutdown_pyro_components(host=None,
                             port=None,
                             num_retries=30,
                             caller_name="Unknown"):

    if _pyro is None:
        raise ImportError("Pyro or Pyro4 is not available")

    ns = get_nameserver(host=host,
                        port=port,
                        num_retries=num_retries,
                        caller_name=caller_name)
    if ns is None:
        print("***WARNING - Could not locate name server "
              "- Pyro components will not be shut down")
        return

    if using_pyro3:
        ns_entries = ns.flatlist()
        for (name,uri) in ns_entries:
            if name.startswith(":PyUtilibServer.dispatcher."):
                try:
                    ns.unregister(name)
                    proxy = _pyro.core.getProxyForURI(uri)
                    proxy.shutdown()
                except:
                    pass
        for (name,uri) in ns_entries:
            if name == ":Pyro.NameServer":
                try:
                    proxy = _pyro.core.getProxyForURI(uri)
                    proxy._shutdown()
                    proxy._release()
                except:
                    pass
    elif using_pyro4:
        for name in ns.list(prefix=":PyUtilibServer.dispatcher."):
            try:
                uri = ns.lookup(name)
                ns.remove(name)
                proxy = _pyro.Proxy(uri)
                proxy.shutdown()
                proxy._pyroRelease()
            except:
                pass
        print("")
        print("*** NameServer must be shutdown manually when using Pyro4 ***")
        print("")

def set_maxconnections(max_allowed_connections=None):

    #
    # **NOTE: For some reason with Pyro3 we need to add 1 to this
    #         option in order to to get behavior that makes sense
    #         and matches behavior with Pyro4. For instance,
    #         running a dispatcher with a single client and a single
    #         server requires PYRO_MAXCONNECTIONS=3 with Pyro3 and
    #         requires THREADPOOL_SIZE=2 with Pyro4.
    #
    if max_allowed_connections is None:
        max_pyro_connections_envname = "PYUTILIB_PYRO_MAXCONNECTIONS"
        if max_pyro_connections_envname in os.environ:
            new_val = int(os.environ[max_pyro_connections_envname])
            print("Overriding %s default for maximum number of proxy "
                  "connections to %s, based on specification provided by "
                  "%s environment variable."
                  % ("Pyro" if using_pyro3 else "Pyro4",
                     new_val,
                     max_pyro_connections_envname))
            if using_pyro3:
                _pyro.config.PYRO_MAXCONNECTIONS = new_val + 1
            else:
                _pyro.config.THREADPOOL_SIZE = new_val
    else:
        print("Overriding %s default for maximum number of proxy "
              "connections to %s, based on specification provided by "
              "max_allowed_connections keyword"
              % ("Pyro" if using_pyro3 else "Pyro4", max_allowed_connections))
        if using_pyro3:
            _pyro.config.PYRO_MAXCONNECTIONS = max_allowed_connections + 1
        else:
            _pyro.config.THREADPOOL_SIZE = max_allowed_connections

def bind_port(sock, host="127.0.0.1"):
    """Bind the socket to a free port and return the port number.
    Relies on ephemeral ports in order to ensure we are using an
    unbound port.  This is important as many tests may be running
    simultaneously, especially in a buildbot environment.  This method
    raises an exception if the sock.family is AF_INET and sock.type is
    SOCK_STREAM, *and* the socket has SO_REUSEADDR or SO_REUSEPORT set
    on it.  Tests should *never* set these socket options for TCP/IP
    sockets.  The only case for setting these options is testing
    multicasting via multiple UDP sockets.

    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is
    available (i.e.  on Windows), it will be set on the socket.  This
    will prevent anyone else from bind()'ing to our host/port for the
    duration of the test.

    This code is copied from the stdlib's test.test_support module.
    """
    if sock.family in (socket.AF_INET, socket.AF_INET6) and sock.type == socket.SOCK_STREAM:
        if hasattr(socket, "SO_EXCLUSIVEADDRUSE"):
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
    if sock.family == socket.AF_INET:
        if host == 'localhost':
            sock.bind(('127.0.0.1', 0))
        else:
            sock.bind((host, 0))
    elif sock.family == socket.AF_INET6:
        if host == 'localhost':
            sock.bind(('::1', 0, 0, 0))
        else:
            sock.bind((host, 0, 0, 0))
    else:
        raise CommunicationError("unsupported socket family: " + sock.family)
    return sock.getsockname()[1]

"""
    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
        if hasattr(socket, 'SO_REUSEADDR'):
            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
                raise TestFailed("tests should never set the SO_REUSEADDR "
                                 "socket option on TCP/IP sockets!")
        if hasattr(socket, 'SO_REUSEPORT'):
            try:
                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
                    raise TestFailed("tests should never set the SO_REUSEPORT "
                                     "socket option on TCP/IP sockets!")
            except OSError:
                # Python's socket module was compiled using modern
                # headers thus defining SO_REUSEPORT but this process
                # is running under an older kernel that does not
                # support SO_REUSEPORT.
                pass
        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)

    sock.bind((host, 0))
    port = sock.getsockname()[1]
    return port
"""

def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
    """Returns an unused port that should be suitable for binding.
    This is achieved by creating a temporary socket with the same
    family and type as the 'sock' parameter (default is AF_INET,
    SOCK_STREAM), and binding it to the specified host address
    (defaults to 0.0.0.0) with the port set to 0, eliciting an unused
    ephemeral port from the OS.  The temporary socket is then closed
    and deleted, and the ephemeral port is returned.

    This code is copied from the stdlib's test.test_support module.
    """

    tempsock = socket.socket(family, socktype)
    port = bind_port(tempsock)
    tempsock.close()
    del tempsock
    return port