This file is indexed.

/usr/lib/telepathy-gabble-tests/twisted/connect/test-fail.py is in telepathy-gabble-tests 0.18.4-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""
Test various ways in which connections can fail.
"""

from twisted.words.xish import domish
from twisted.words.protocols.jabber import xmlstream

import dbus

from servicetest import assertEquals
from gabbletest import exec_test, GabbleAuthenticator
import constants as cs
import ns

def test_network_error(q, bus, conn, stream):
    conn.Connect()
    q.expect('dbus-signal', signal='StatusChanged',
        args=[cs.CONN_STATUS_CONNECTING, cs.CSR_REQUESTED])

    # FIXME: this is G_IO_ERROR_FAILED, which we can't really map to anything
    # better than NetworkError. The debug message says "Connection refused",
    # so something, somewhere, ought to be able to do better, and give us
    # enough information to produce cs.CONNECTION_REFUSED.
    new = q.expect('dbus-signal', signal='ConnectionError')
    assertEquals(cs.NETWORK_ERROR, new.args[0])

    q.expect('dbus-signal', signal='StatusChanged',
        args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_NETWORK_ERROR])

def test_conflict_after_connect(q, bus, conn, stream):
    stream.send_stream_error('conflict')

    new = q.expect('dbus-signal', signal='ConnectionError')
    assertEquals(cs.CONNECTION_REPLACED, new.args[0])

    q.expect('dbus-signal', signal='StatusChanged',
        args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_NAME_IN_USE])

class StreamErrorAuthenticator(GabbleAuthenticator):
    def __init__(self, stream_error):
        GabbleAuthenticator.__init__(self, username='n/a', password='n/a')
        self.__stream_error = stream_error

    def streamStarted(self, root=None):
        if root:
            self.xmlstream.sid = root.getAttribute('id')

        self.xmlstream.sendHeader()

        no = domish.Element((xmlstream.NS_STREAMS, 'error'))
        no.addElement((ns.STREAMS, self.__stream_error))
        self.xmlstream.send(no)

def test_stream_conflict_during_connect(q, bus, conn, stream):
    conn.Connect()

    q.expect('dbus-signal', signal='StatusChanged',
        args=[cs.CONN_STATUS_CONNECTING, cs.CSR_REQUESTED])

    new = q.expect('dbus-signal', signal='ConnectionError')
    assertEquals(cs.ALREADY_CONNECTED, new.args[0])

    old = q.expect('dbus-signal', signal='StatusChanged')
    status, reason = old.args
    assertEquals(cs.CONN_STATUS_DISCONNECTED, status)
    assertEquals(cs.CSR_NAME_IN_USE, reason)

def test_host_unknown(q, bus, conn, stream):
    conn.Connect()

    q.expect('dbus-signal', signal='StatusChanged',
        args=[cs.CONN_STATUS_CONNECTING, cs.CSR_REQUESTED])

    new = q.expect('dbus-signal', signal='ConnectionError')
    assertEquals(cs.AUTHENTICATION_FAILED, new.args[0])

    old = q.expect('dbus-signal', signal='StatusChanged')
    status, reason = old.args
    assertEquals(cs.CONN_STATUS_DISCONNECTED, status)
    assertEquals(cs.CSR_AUTHENTICATION_FAILED, reason)

if __name__ == '__main__':
    exec_test(test_network_error, {'port': dbus.UInt32(4243)},
              do_connect=False)
    exec_test(test_conflict_after_connect)
    exec_test(test_stream_conflict_during_connect,
            authenticator=StreamErrorAuthenticator('conflict'), do_connect=False)
    exec_test(test_host_unknown, {'server': 'localhost',
                     'account': 'test@example.org',
                    }, authenticator=StreamErrorAuthenticator('host-unknown'),
              do_connect=False)