Viewing file: test_jabberclient.py (16.4 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details.
""" Tests for L{twisted.words.protocols.jabber.client} """ from hashlib import sha1
from twisted.internet import defer from twisted.trial import unittest from twisted.words.protocols.jabber import client, error, jid, xmlstream from twisted.words.protocols.jabber.sasl import SASLInitiatingInitializer from twisted.words.xish import utility
try: from twisted.internet import ssl except ImportError: ssl = None skipWhenNoSSL = "SSL not available" else: skipWhenNoSSL = None
IQ_AUTH_GET = '/iq[@type="get"]/query[@xmlns="jabber:iq:auth"]' IQ_AUTH_SET = '/iq[@type="set"]/query[@xmlns="jabber:iq:auth"]' NS_BIND = 'urn:ietf:params:xml:ns:xmpp-bind' IQ_BIND_SET = '/iq[@type="set"]/bind[@xmlns="%s"]' % NS_BIND NS_SESSION = 'urn:ietf:params:xml:ns:xmpp-session' IQ_SESSION_SET = '/iq[@type="set"]/session[@xmlns="%s"]' % NS_SESSION
class CheckVersionInitializerTests(unittest.TestCase): def setUp(self): a = xmlstream.Authenticator() xs = xmlstream.XmlStream(a) self.init = client.CheckVersionInitializer(xs)
def testSupported(self): """ Test supported version number 1.0 """ self.init.xmlstream.version = (1, 0) self.init.initialize()
def testNotSupported(self): """ Test unsupported version number 0.0, and check exception. """ self.init.xmlstream.version = (0, 0) exc = self.assertRaises(error.StreamError, self.init.initialize) self.assertEqual('unsupported-version', exc.condition)
class InitiatingInitializerHarness(object): """ Testing harness for interacting with XML stream initializers.
This sets up an L{utility.XmlPipe} to create a communication channel between the initializer and the stubbed receiving entity. It features a sink and source side that both act similarly to a real L{xmlstream.XmlStream}. The sink is augmented with an authenticator to which initializers can be added.
The harness also provides some utility methods to work with event observers and deferreds. """
def setUp(self): self.output = [] self.pipe = utility.XmlPipe() self.xmlstream = self.pipe.sink self.authenticator = xmlstream.ConnectAuthenticator('example.org') self.xmlstream.authenticator = self.authenticator
def waitFor(self, event, handler): """ Observe an output event, returning a deferred.
The returned deferred will be fired when the given event has been observed on the source end of the L{XmlPipe} tied to the protocol under test. The handler is added as the first callback.
@param event: The event to be observed. See L{utility.EventDispatcher.addOnetimeObserver}. @param handler: The handler to be called with the observed event object. @rtype: L{defer.Deferred}. """ d = defer.Deferred() d.addCallback(handler) self.pipe.source.addOnetimeObserver(event, d.callback) return d
class IQAuthInitializerTests(InitiatingInitializerHarness, unittest.TestCase): """ Tests for L{client.IQAuthInitializer}. """
def setUp(self): super(IQAuthInitializerTests, self).setUp() self.init = client.IQAuthInitializer(self.xmlstream) self.authenticator.jid = jid.JID('user@example.com/resource') self.authenticator.password = 'secret'
def testPlainText(self): """ Test plain-text authentication.
Act as a server supporting plain-text authentication and expect the C{password} field to be filled with the password. Then act as if authentication succeeds. """
def onAuthGet(iq): """ Called when the initializer sent a query for authentication methods.
The response informs the client that plain-text authentication is supported. """
# Create server response response = xmlstream.toResponse(iq, 'result') response.addElement(('jabber:iq:auth', 'query')) response.query.addElement('username') response.query.addElement('password') response.query.addElement('resource')
# Set up an observer for the next request we expect. d = self.waitFor(IQ_AUTH_SET, onAuthSet)
# Send server response self.pipe.source.send(response)
return d
def onAuthSet(iq): """ Called when the initializer sent the authentication request.
The server checks the credentials and responds with an empty result signalling success. """ self.assertEqual('user', unicode(iq.query.username)) self.assertEqual('secret', unicode(iq.query.password)) self.assertEqual('resource', unicode(iq.query.resource))
# Send server response response = xmlstream.toResponse(iq, 'result') self.pipe.source.send(response)
# Set up an observer for the request for authentication fields d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
# Start the initializer d2 = self.init.initialize() return defer.gatherResults([d1, d2])
def testDigest(self): """ Test digest authentication.
Act as a server supporting digest authentication and expect the C{digest} field to be filled with a sha1 digest of the concatenated stream session identifier and password. Then act as if authentication succeeds. """
def onAuthGet(iq): """ Called when the initializer sent a query for authentication methods.
The response informs the client that digest authentication is supported. """
# Create server response response = xmlstream.toResponse(iq, 'result') response.addElement(('jabber:iq:auth', 'query')) response.query.addElement('username') response.query.addElement('digest') response.query.addElement('resource')
# Set up an observer for the next request we expect. d = self.waitFor(IQ_AUTH_SET, onAuthSet)
# Send server response self.pipe.source.send(response)
return d
def onAuthSet(iq): """ Called when the initializer sent the authentication request.
The server checks the credentials and responds with an empty result signalling success. """ self.assertEqual('user', unicode(iq.query.username)) self.assertEqual(sha1('12345secret').hexdigest(), unicode(iq.query.digest).encode('utf-8')) self.assertEqual('resource', unicode(iq.query.resource))
# Send server response response = xmlstream.toResponse(iq, 'result') self.pipe.source.send(response)
# Digest authentication relies on the stream session identifier. Set it. self.xmlstream.sid = u'12345'
# Set up an observer for the request for authentication fields d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
# Start the initializer d2 = self.init.initialize()
return defer.gatherResults([d1, d2])
def testFailRequestFields(self): """ Test initializer failure of request for fields for authentication. """ def onAuthGet(iq): """ Called when the initializer sent a query for authentication methods.
The server responds that the client is not authorized to authenticate. """ response = error.StanzaError('not-authorized').toResponse(iq) self.pipe.source.send(response)
# Set up an observer for the request for authentication fields d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
# Start the initializer d2 = self.init.initialize()
# The initialized should fail with a stanza error. self.assertFailure(d2, error.StanzaError)
return defer.gatherResults([d1, d2])
def testFailAuth(self): """ Test initializer failure to authenticate. """
def onAuthGet(iq): """ Called when the initializer sent a query for authentication methods.
The response informs the client that plain-text authentication is supported. """
# Send server response response = xmlstream.toResponse(iq, 'result') response.addElement(('jabber:iq:auth', 'query')) response.query.addElement('username') response.query.addElement('password') response.query.addElement('resource')
# Set up an observer for the next request we expect. d = self.waitFor(IQ_AUTH_SET, onAuthSet)
# Send server response self.pipe.source.send(response)
return d
def onAuthSet(iq): """ Called when the initializer sent the authentication request.
The server checks the credentials and responds with a not-authorized stanza error. """ response = error.StanzaError('not-authorized').toResponse(iq) self.pipe.source.send(response)
# Set up an observer for the request for authentication fields d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
# Start the initializer d2 = self.init.initialize()
# The initializer should fail with a stanza error. self.assertFailure(d2, error.StanzaError)
return defer.gatherResults([d1, d2])
class BindInitializerTests(InitiatingInitializerHarness, unittest.TestCase): """ Tests for L{client.BindInitializer}. """
def setUp(self): super(BindInitializerTests, self).setUp() self.init = client.BindInitializer(self.xmlstream) self.authenticator.jid = jid.JID('user@example.com/resource')
def testBasic(self): """ Set up a stream, and act as if resource binding succeeds. """ def onBind(iq): response = xmlstream.toResponse(iq, 'result') response.addElement((NS_BIND, 'bind')) response.bind.addElement('jid', content='user@example.com/other resource') self.pipe.source.send(response)
def cb(result): self.assertEqual(jid.JID('user@example.com/other resource'), self.authenticator.jid)
d1 = self.waitFor(IQ_BIND_SET, onBind) d2 = self.init.start() d2.addCallback(cb) return defer.gatherResults([d1, d2])
def testFailure(self): """ Set up a stream, and act as if resource binding fails. """ def onBind(iq): response = error.StanzaError('conflict').toResponse(iq) self.pipe.source.send(response)
d1 = self.waitFor(IQ_BIND_SET, onBind) d2 = self.init.start() self.assertFailure(d2, error.StanzaError) return defer.gatherResults([d1, d2])
class SessionInitializerTests(InitiatingInitializerHarness, unittest.TestCase): """ Tests for L{client.SessionInitializer}. """
def setUp(self): super(SessionInitializerTests, self).setUp() self.init = client.SessionInitializer(self.xmlstream)
def testSuccess(self): """ Set up a stream, and act as if session establishment succeeds. """
def onSession(iq): response = xmlstream.toResponse(iq, 'result') self.pipe.source.send(response)
d1 = self.waitFor(IQ_SESSION_SET, onSession) d2 = self.init.start() return defer.gatherResults([d1, d2])
def testFailure(self): """ Set up a stream, and act as if session establishment fails. """ def onSession(iq): response = error.StanzaError('forbidden').toResponse(iq) self.pipe.source.send(response)
d1 = self.waitFor(IQ_SESSION_SET, onSession) d2 = self.init.start() self.assertFailure(d2, error.StanzaError) return defer.gatherResults([d1, d2])
class BasicAuthenticatorTests(unittest.TestCase): """ Test for both BasicAuthenticator and basicClientFactory. """
def test_basic(self): """ Authenticator and stream are properly constructed by the factory.
The L{xmlstream.XmlStream} protocol created by the factory has the new L{client.BasicAuthenticator} instance in its C{authenticator} attribute. It is set up with C{jid} and C{password} as passed to the factory, C{otherHost} taken from the client JID. The stream futher has two initializers, for TLS and authentication, of which the first has its C{required} attribute set to C{True}. """ self.client_jid = jid.JID('user@example.com/resource')
# Get an XmlStream instance. Note that it gets initialized with the # XMPPAuthenticator (that has its associateWithXmlStream called) that # is in turn initialized with the arguments to the factory. xs = client.basicClientFactory(self.client_jid, 'secret').buildProtocol(None)
# test authenticator's instance variables self.assertEqual('example.com', xs.authenticator.otherHost) self.assertEqual(self.client_jid, xs.authenticator.jid) self.assertEqual('secret', xs.authenticator.password)
# test list of initializers tls, auth = xs.initializers
self.assertIsInstance(tls, xmlstream.TLSInitiatingInitializer) self.assertIsInstance(auth, client.IQAuthInitializer)
self.assertFalse(tls.required)
class XMPPAuthenticatorTests(unittest.TestCase): """ Test for both XMPPAuthenticator and XMPPClientFactory. """
def test_basic(self): """ Test basic operations.
Setup an XMPPClientFactory, which sets up an XMPPAuthenticator, and let it produce a protocol instance. Then inspect the instance variables of the authenticator and XML stream objects. """ self.client_jid = jid.JID('user@example.com/resource')
# Get an XmlStream instance. Note that it gets initialized with the # XMPPAuthenticator (that has its associateWithXmlStream called) that # is in turn initialized with the arguments to the factory. xs = client.XMPPClientFactory(self.client_jid, 'secret').buildProtocol(None)
# test authenticator's instance variables self.assertEqual('example.com', xs.authenticator.otherHost) self.assertEqual(self.client_jid, xs.authenticator.jid) self.assertEqual('secret', xs.authenticator.password)
# test list of initializers version, tls, sasl, bind, session = xs.initializers
self.assert_(isinstance(tls, xmlstream.TLSInitiatingInitializer)) self.assert_(isinstance(sasl, SASLInitiatingInitializer)) self.assert_(isinstance(bind, client.BindInitializer)) self.assert_(isinstance(session, client.SessionInitializer))
self.assertTrue(tls.required) self.assertTrue(sasl.required) self.assertTrue(bind.required) self.assertFalse(session.required)
def test_tlsConfiguration(self): """ A TLS configuration is passed to the TLS initializer. """ configs = []
def init(self, xs, required=True, configurationForTLS=None): configs.append(configurationForTLS)
self.client_jid = jid.JID('user@example.com/resource')
# Get an XmlStream instance. Note that it gets initialized with the # XMPPAuthenticator (that has its associateWithXmlStream called) that # is in turn initialized with the arguments to the factory. configurationForTLS = ssl.CertificateOptions() factory = client.XMPPClientFactory( self.client_jid, 'secret', configurationForTLS=configurationForTLS) self.patch(xmlstream.TLSInitiatingInitializer, "__init__", init) xs = factory.buildProtocol(None)
# test list of initializers version, tls, sasl, bind, session = xs.initializers
self.assertIsInstance(tls, xmlstream.TLSInitiatingInitializer) self.assertIs(configurationForTLS, configs[0])
test_tlsConfiguration.skip = skipWhenNoSSL
|