This file is indexed.

/usr/share/pyshared/pyamf/codec.py is in python-pyamf 0.6.1+dfsg-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
# Copyright (c) The PyAMF Project.
# See LICENSE.txt for details.

"""
Provides basic functionality for all pyamf.amf?.[De|E]ncoder classes.
"""

import types
import datetime

import pyamf
from pyamf import util, python, xml

__all__ = [
    'IndexedCollection',
    'Context',
    'Decoder',
    'Encoder'
]

try:
    unicode
except NameError:
    # py3k support
    unicode = str
    str = bytes


class IndexedCollection(object):
    """
    Store references to objects and provides an api to query references.

    All reference checks are done using the builtin C{id} function unless
    C{use_hash} is specified as C{True} where the slower but more flexible
    C{hash} builtin is used.

    @note: All attributes on the instance are private, use the apis only.
    """

    def __init__(self, use_hash=False):
        if use_hash is True:
            self.func = hash
        else:
            self.func = id

        self.clear()

    def clear(self):
        """
        Clears the collection.
        """
        self.list = []
        self.dict = {}

    def getByReference(self, ref):
        """
        Returns an object based on the supplied reference. The C{ref} should
        be an C{int}.

        If the reference is not found, C{None} will be returned.
        """
        try:
            return self.list[ref]
        except IndexError:
            return None

    def getReferenceTo(self, obj):
        """
        Returns a reference to C{obj} if it is contained within this index.

        If the object is not contained within the collection, C{-1} will be
        returned.

        @param obj: The object to find the reference to.
        @return: An C{int} representing the reference or C{-1} is the object
            is not contained within the collection.
        """
        return self.dict.get(self.func(obj), -1)

    def append(self, obj):
        """
        Appends C{obj} to this index.

        @note: Uniqueness is not checked
        @return: The reference to C{obj} in this index.
        """
        h = self.func(obj)

        self.list.append(obj)
        idx = len(self.list) - 1
        self.dict[h] = idx

        return idx

    def __eq__(self, other):
        if isinstance(other, list):
            return self.list == other

        raise NotImplementedError("cannot compare %s to %r" % (
            type(other), self))

    def __len__(self):
        return len(self.list)

    def __getitem__(self, idx):
        return self.getByReference(idx)

    def __contains__(self, obj):
        r = self.getReferenceTo(obj)

        return r != -1

    def __repr__(self):
        t = self.__class__

        return '<%s.%s size=%d 0x%x>' % (
            t.__module__,
            t.__name__,
            len(self.list),
            id(self))


class Context(object):
    """
    The base context for all AMF [de|en]coding.

    @ivar extra: The only public attribute. This is a placeholder for any extra
        contextual data that required for different adapters.
    @type extra: C{dict}
    @ivar _objects: A collection of stored references to objects that have
        already been visited by this context.
    @type _objects: L{IndexedCollection}
    @ivar _class_aliases: Lookup of C{class} -> L{pyamf.ClassAlias} as
        determined by L{pyamf.get_class_alias}
    @ivar _unicodes: Lookup of utf-8 encoded byte strings -> string objects
        (aka strings/unicodes).
    """

    def __init__(self):
        self._objects = IndexedCollection()

        self.clear()

    def clear(self):
        """
        Clears the context.
        """
        self._objects.clear()
        self._class_aliases = {}
        self._unicodes = {}
        self.extra = {}

    def getObject(self, ref):
        """
        Gets an object based on a reference.

        @type ref: C{int}
        @return: The referenced object or C{None} if not found.
        """
        return self._objects.getByReference(ref)

    def getObjectReference(self, obj):
        """
        Gets a reference for an already referenced object.

        @return: The reference to the object or C{-1} if the object is not in
            the context.
        """
        return self._objects.getReferenceTo(obj)

    def addObject(self, obj):
        """
        Adds a reference to C{obj}.

        @return: Reference to C{obj}.
        @rtype: C{int}
        """
        return self._objects.append(obj)

    def getClassAlias(self, klass):
        """
        Gets a class alias based on the supplied C{klass}. If one is not found
        in the global context, one is created locally.

        If you supply a string alias and the class is not registered,
        L{pyamf.UnknownClassAlias} will be raised.

        @param klass: A class object or string alias.
        @return: The L{pyamf.ClassAlias} instance that describes C{klass}
        """
        try:
            return self._class_aliases[klass]
        except KeyError:
            pass

        try:
            alias = self._class_aliases[klass] = pyamf.get_class_alias(klass)
        except pyamf.UnknownClassAlias:
            if isinstance(klass, python.str_types):
                raise

            # no alias has been found yet .. check subclasses
            alias = util.get_class_alias(klass) or pyamf.ClassAlias
            meta = util.get_class_meta(klass)
            alias = alias(klass, defer=True, **meta)

            self._class_aliases[klass] = alias

        return alias

    def getStringForBytes(self, s):
        """
        Returns the corresponding string for the supplied utf-8 encoded bytes.
        If there is no string object, one is created.

        @since: 0.6
        """
        h = hash(s)
        u = self._unicodes.get(h, None)

        if u is not None:
            return u

        u = self._unicodes[h] = s.decode('utf-8')

        return u

    def getBytesForString(self, u):
        """
        Returns the corresponding utf-8 encoded string for a given unicode
        object. If there is no string, one is encoded.

        @since: 0.6
        """
        h = hash(u)
        s = self._unicodes.get(h, None)

        if s is not None:
            return s

        s = self._unicodes[h] = u.encode('utf-8')

        return s


class _Codec(object):
    """
    Base codec.

    @ivar stream: The underlying data stream.
    @type stream: L{util.BufferedByteStream}
    @ivar context: The context for the encoding.
    @ivar strict: Whether the codec should operate in I{strict} mode.
    @type strict: C{bool}, default is C{False}.
    @ivar timezone_offset: The offset from I{UTC} for any C{datetime} objects
        being encoded. Default to C{None} means no offset.
    @type timezone_offset: C{datetime.timedelta} or C{int} or C{None}
    """

    def __init__(self, stream=None, context=None, strict=False,
                 timezone_offset=None):
        if not isinstance(stream, util.BufferedByteStream):
            stream = util.BufferedByteStream(stream)

        self.stream = stream
        self.context = context or self.buildContext()
        self.strict = strict
        self.timezone_offset = timezone_offset

        self._func_cache = {}

    def buildContext(self):
        """
        A context factory.
        """
        raise NotImplementedError

    def getTypeFunc(self, data):
        """
        Returns a callable based on C{data}. If no such callable can be found,
        the default must be to return C{None}.
        """
        raise NotImplementedError


class Decoder(_Codec):
    """
    Base AMF decoder.

    @ivar strict: Defines how strict the decoding should be. For the time
        being this relates to typed objects in the stream that do not have a
        registered alias. Introduced in 0.4.
    @type strict: C{bool}
    """

    def send(self, data):
        """
        Add data for the decoder to work on.
        """
        self.stream.append(data)

    def next(self):
        """
        Part of the iterator protocol.
        """
        try:
            return self.readElement()
        except pyamf.EOStream:
            # all data was successfully decoded from the stream
            raise StopIteration

    def readElement(self):
        """
        Reads an AMF3 element from the data stream.

        @raise DecodeError: The ActionScript type is unsupported.
        @raise EOStream: No more data left to decode.
        """
        pos = self.stream.tell()

        try:
            t = self.stream.read(1)
        except IOError:
            raise pyamf.EOStream

        try:
            func = self._func_cache[t]
        except KeyError:
            func = self.getTypeFunc(t)

            if not func:
                raise pyamf.DecodeError("Unsupported ActionScript type %s" % (
                    hex(ord(t)),))

            self._func_cache[t] = func

        try:
            return func()
        except IOError:
            self.stream.seek(pos)

            raise

    def __iter__(self):
        return self


class _CustomTypeFunc(object):
    """
    Support for custom type mappings when encoding.
    """

    def __init__(self, encoder, func):
        self.encoder = encoder
        self.func = func

    def __call__(self, data, **kwargs):
        ret = self.func(data, encoder=self.encoder)

        if ret is not None:
            self.encoder.writeElement(ret)


class Encoder(_Codec):
    """
    Base AMF encoder.
    """

    def __init__(self, *args, **kwargs):
        _Codec.__init__(self, *args, **kwargs)

        self.bucket = []

    def _write_type(self, obj, **kwargs):
        """
        Subclasses should override this and all write[type] functions
        """
        raise NotImplementedError

    writeNull = _write_type
    writeBytes = _write_type
    writeString = _write_type
    writeBoolean = _write_type
    writeNumber = _write_type
    writeList = _write_type
    writeUndefined = _write_type
    writeDate = _write_type
    writeXML = _write_type
    writeObject = _write_type

    def writeSequence(self, iterable):
        """
        Encodes an iterable. The default is to write If the iterable has an al
        """
        try:
            alias = self.context.getClassAlias(iterable.__class__)
        except (AttributeError, pyamf.UnknownClassAlias):
            self.writeList(iterable)

            return

        if alias.external:
            # a is a subclassed list with a registered alias - push to the
            # correct method
            self.writeObject(iterable)

            return

        self.writeList(iterable)

    def writeGenerator(self, gen):
        """
        Iterates over a generator object and encodes all that is returned.
        """
        n = getattr(gen, 'next')

        while True:
            try:
                self.writeElement(n())
            except StopIteration:
                break

    def getTypeFunc(self, data):
        """
        Returns a callable that will encode C{data} to C{self.stream}. If
        C{data} is unencodable, then C{None} is returned.
        """
        if data is None:
            return self.writeNull

        t = type(data)

        # try types that we know will work
        if t is str or issubclass(t, str):
            return self.writeBytes
        if t is unicode or issubclass(t, unicode):
            return self.writeString
        elif t is bool:
            return self.writeBoolean
        elif t is float:
            return self.writeNumber
        elif t in python.int_types:
            return self.writeNumber
        elif t in (list, tuple):
            return self.writeList
        elif isinstance(data, (list, tuple)):
            return self.writeSequence
        elif t is types.GeneratorType:
            return self.writeGenerator
        elif t is pyamf.UndefinedType:
            return self.writeUndefined
        elif t in (datetime.date, datetime.datetime, datetime.time):
            return self.writeDate
        elif xml.is_xml(data):
            return self.writeXML

        # check for any overridden types
        for type_, func in pyamf.TYPE_MAP.iteritems():
            try:
                if isinstance(data, type_):
                    return _CustomTypeFunc(self, func)
            except TypeError:
                if python.callable(type_) and type_(data):
                    return _CustomTypeFunc(self, func)

        # now try some types that won't encode
        if t in python.class_types:
            # can't encode classes
            return None
        elif isinstance(data, python.func_types):
            # can't encode code objects
            return None
        elif isinstance(t, types.ModuleType):
            # cannot encode module objects
            return None

        # well, we tried ..
        return self.writeObject

    def writeElement(self, data):
        """
        Encodes C{data} to AMF. If the data is not able to be matched to an AMF
        type, then L{pyamf.EncodeError} will be raised.
        """
        key = type(data)
        func = None

        try:
            func = self._func_cache[key]
        except KeyError:
            func = self.getTypeFunc(data)

            if func is None:
                raise pyamf.EncodeError('Unable to encode %r (type %r)' % (
                    data, key))

            self._func_cache[key] = func

        func(data)

    def send(self, element):
        self.bucket.append(element)

    def next(self):
        try:
            element = self.bucket.pop(0)
        except IndexError:
            raise StopIteration

        start_pos = self.stream.tell()

        self.writeElement(element)

        end_pos = self.stream.tell()

        self.stream.seek(start_pos)

        return self.stream.read(end_pos - start_pos)

    def __iter__(self):
        return self