/usr/share/pyshared/restkit/sock.py is in python-restkit 3.3.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | # -*- coding: utf-8 -
#
# This file is part of restkit released under the MIT license.
# See the NOTICE for more information.
import socket
import sys
CHUNK_SIZE = 16 * 1024
MAX_BODY = 1024 * 112
DNS_TIMEOUT = 60
_allowed_ssl_args = ('keyfile', 'certfile', 'server_side',
'cert_reqs', 'ssl_version', 'ca_certs',
'do_handshake_on_connect', 'suppress_ragged_eofs')
def validate_ssl_args(ssl_args):
for arg in ssl_args:
if arg not in _allowed_ssl_args:
raise TypeError('connect() got an unexpected keyword argument %r' % arg)
def close(skt):
if not skt or not hasattr(skt, "close"): return
try:
skt.close()
except socket.error:
pass
def send_chunk(sock, data):
chunk = "".join(("%X\r\n" % len(data), data, "\r\n"))
sock.sendall(chunk)
def send(sock, data, chunked=False):
if chunked:
return send_chunk(sock, data)
sock.sendall(data)
def send_nonblock(sock, data, chunked=False):
timeout = sock.gettimeout()
if timeout != 0.0:
try:
sock.setblocking(0)
return send(sock, data, chunked)
finally:
sock.setblocking(1)
else:
return send(sock, data, chunked)
def sendlines(sock, lines, chunked=False):
for line in list(lines):
send(sock, line, chunked)
def sendfile(sock, data, chunked=False):
if hasattr(data, 'seek'):
data.seek(0)
while True:
binarydata = data.read(CHUNK_SIZE)
if binarydata == '': break
send(sock, binarydata, chunked)
# Check if running in Jython
if 'java' in sys.platform:
from javax.net.ssl import TrustManager, X509TrustManager
from jarray import array
from javax.net.ssl import SSLContext
class TrustAllX509TrustManager(X509TrustManager):
'''Define a custom TrustManager which will blindly accept all certificates'''
def checkClientTrusted(self, chain, auth):
pass
def checkServerTrusted(self, chain, auth):
pass
def getAcceptedIssuers(self):
return None
# Create a static reference to an SSLContext which will use
# our custom TrustManager
trust_managers = array([TrustAllX509TrustManager()], TrustManager)
TRUST_ALL_CONTEXT = SSLContext.getInstance("SSL")
TRUST_ALL_CONTEXT.init(None, trust_managers, None)
# Keep a static reference to the JVM's default SSLContext for restoring
# at a later time
DEFAULT_CONTEXT = SSLContext.getDefault()
def trust_all_certificates(f):
'''Decorator function that will make it so the context of the decorated method
will run with our TrustManager that accepts all certificates'''
def wrapped(*args, **kwargs):
# Only do this if running under Jython
if 'java' in sys.platform:
from javax.net.ssl import SSLContext
SSLContext.setDefault(TRUST_ALL_CONTEXT)
try:
res = f(*args, **kwargs)
return res
finally:
SSLContext.setDefault(DEFAULT_CONTEXT)
else:
return f(*args, **kwargs)
return wrapped
|