/usr/share/pyshared/restkit/forms.py is in python-restkit 3.3.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | # -*- coding: utf-8 -
#
# This file is part of restkit released under the MIT license.
# See the NOTICE for more information.
import mimetypes
import os
import re
import urllib
from restkit.util import to_bytestring, url_quote, url_encode
MIME_BOUNDARY = 'END_OF_PART'
CRLF = '\r\n'
def form_encode(obj, charset="utf8"):
encoded = url_encode(obj, charset=charset)
return to_bytestring(encoded)
class BoundaryItem(object):
def __init__(self, name, value, fname=None, filetype=None, filesize=None):
self.name = url_quote(name)
if value is not None and not hasattr(value, 'read'):
value = self.encode_unreadable_value(value)
self.size = len(value)
self.value = value
if fname is not None:
if isinstance(fname, unicode):
fname = fname.encode("utf-8").encode("string_escape").replace('"', '\\"')
else:
fname = fname.encode("string_escape").replace('"', '\\"')
self.fname = fname
if filetype is not None:
filetype = to_bytestring(filetype)
self.filetype = filetype
if isinstance(value, file) and filesize is None:
try:
value.flush()
except IOError:
pass
self.size = int(os.fstat(value.fileno())[6])
self._encoded_hdr = None
self._encoded_bdr = None
def encode_hdr(self, boundary):
"""Returns the header of the encoding of this parameter"""
if not self._encoded_hdr or self._encoded_bdr != boundary:
boundary = url_quote(boundary)
self._encoded_bdr = boundary
headers = ["--%s" % boundary]
if self.fname:
disposition = 'form-data; name="%s"; filename="%s"' % (self.name,
self.fname)
else:
disposition = 'form-data; name="%s"' % self.name
headers.append("Content-Disposition: %s" % disposition)
if self.filetype:
filetype = self.filetype
else:
filetype = "text/plain; charset=utf-8"
headers.append("Content-Type: %s" % filetype)
headers.append("Content-Length: %i" % self.size)
headers.append("")
headers.append("")
self._encoded_hdr = CRLF.join(headers)
return self._encoded_hdr
def encode(self, boundary):
"""Returns the string encoding of this parameter"""
value = self.value
if re.search("^--%s$" % re.escape(boundary), value, re.M):
raise ValueError("boundary found in encoded string")
return "%s%s%s" % (self.encode_hdr(boundary), value, CRLF)
def iter_encode(self, boundary, blocksize=16384):
if not hasattr(self.value, "read"):
yield self.encode(boundary)
else:
yield self.encode_hdr(boundary)
while True:
block = self.value.read(blocksize)
if not block:
yield CRLF
return
yield block
def encode_unreadable_value(self, value):
return value
class MultipartForm(object):
def __init__(self, params, boundary, headers, bitem_cls=BoundaryItem):
self.boundary = boundary
self.tboundary = "--%s--%s" % (boundary, CRLF)
self.boundaries = []
self._clen = headers.get('Content-Length')
if hasattr(params, 'items'):
params = params.items()
for param in params:
name, value = param
if hasattr(value, "read"):
fname = getattr(value, 'name')
if fname is not None:
filetype = ';'.join(filter(None, mimetypes.guess_type(fname)))
else:
filetype = None
if not isinstance(value, file) and self._clen is None:
value = value.read()
boundary = bitem_cls(name, value, fname, filetype)
self.boundaries.append(boundary)
elif isinstance(value, list):
for v in value:
boundary = bitem_cls(name, v)
self.boundaries.append(boundary)
else:
boundary = bitem_cls(name, value)
self.boundaries.append(boundary)
def get_size(self):
if self._clen is None:
self._clen = 0
for boundary in self.boundaries:
self._clen += boundary.size
self._clen += len(boundary.encode_hdr(self.boundary))
self._clen += len(CRLF)
self._clen += len(self.tboundary)
return int(self._clen)
def __iter__(self):
for boundary in self.boundaries:
for block in boundary.iter_encode(self.boundary):
yield block
yield self.tboundary
def multipart_form_encode(params, headers, boundary):
headers = headers or {}
boundary = urllib.quote_plus(boundary)
body = MultipartForm(params, boundary, headers)
headers['Content-Type'] = "multipart/form-data; boundary=%s" % boundary
headers['Content-Length'] = str(body.get_size())
return body, headers
|