/usr/lib/telepathy-gabble-tests/twisted/connect/test-fail.py is in telepathy-gabble-tests 0.18.4-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 | """
Test various ways in which connections can fail.
"""
from twisted.words.xish import domish
from twisted.words.protocols.jabber import xmlstream
import dbus
from servicetest import assertEquals
from gabbletest import exec_test, GabbleAuthenticator
import constants as cs
import ns
def test_network_error(q, bus, conn, stream):
conn.Connect()
q.expect('dbus-signal', signal='StatusChanged',
args=[cs.CONN_STATUS_CONNECTING, cs.CSR_REQUESTED])
# FIXME: this is G_IO_ERROR_FAILED, which we can't really map to anything
# better than NetworkError. The debug message says "Connection refused",
# so something, somewhere, ought to be able to do better, and give us
# enough information to produce cs.CONNECTION_REFUSED.
new = q.expect('dbus-signal', signal='ConnectionError')
assertEquals(cs.NETWORK_ERROR, new.args[0])
q.expect('dbus-signal', signal='StatusChanged',
args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_NETWORK_ERROR])
def test_conflict_after_connect(q, bus, conn, stream):
stream.send_stream_error('conflict')
new = q.expect('dbus-signal', signal='ConnectionError')
assertEquals(cs.CONNECTION_REPLACED, new.args[0])
q.expect('dbus-signal', signal='StatusChanged',
args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_NAME_IN_USE])
class StreamErrorAuthenticator(GabbleAuthenticator):
def __init__(self, stream_error):
GabbleAuthenticator.__init__(self, username='n/a', password='n/a')
self.__stream_error = stream_error
def streamStarted(self, root=None):
if root:
self.xmlstream.sid = root.getAttribute('id')
self.xmlstream.sendHeader()
no = domish.Element((xmlstream.NS_STREAMS, 'error'))
no.addElement((ns.STREAMS, self.__stream_error))
self.xmlstream.send(no)
def test_stream_conflict_during_connect(q, bus, conn, stream):
conn.Connect()
q.expect('dbus-signal', signal='StatusChanged',
args=[cs.CONN_STATUS_CONNECTING, cs.CSR_REQUESTED])
new = q.expect('dbus-signal', signal='ConnectionError')
assertEquals(cs.ALREADY_CONNECTED, new.args[0])
old = q.expect('dbus-signal', signal='StatusChanged')
status, reason = old.args
assertEquals(cs.CONN_STATUS_DISCONNECTED, status)
assertEquals(cs.CSR_NAME_IN_USE, reason)
def test_host_unknown(q, bus, conn, stream):
conn.Connect()
q.expect('dbus-signal', signal='StatusChanged',
args=[cs.CONN_STATUS_CONNECTING, cs.CSR_REQUESTED])
new = q.expect('dbus-signal', signal='ConnectionError')
assertEquals(cs.AUTHENTICATION_FAILED, new.args[0])
old = q.expect('dbus-signal', signal='StatusChanged')
status, reason = old.args
assertEquals(cs.CONN_STATUS_DISCONNECTED, status)
assertEquals(cs.CSR_AUTHENTICATION_FAILED, reason)
if __name__ == '__main__':
exec_test(test_network_error, {'port': dbus.UInt32(4243)},
do_connect=False)
exec_test(test_conflict_after_connect)
exec_test(test_stream_conflict_during_connect,
authenticator=StreamErrorAuthenticator('conflict'), do_connect=False)
exec_test(test_host_unknown, {'server': 'localhost',
'account': 'test@example.org',
}, authenticator=StreamErrorAuthenticator('host-unknown'),
do_connect=False)
|