| Viewing file:  test_observer.py (5.13 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# Copyright (c) Twisted Matrix Laboratories.# See LICENSE for details.
 
 """
 Test cases for L{twisted.logger._observer}.
 """
 
 from zope.interface.verify import verifyObject, BrokenMethodImplementation
 
 from twisted.trial import unittest
 
 from .._logger import Logger
 from .._observer import ILogObserver
 from .._observer import LogPublisher
 
 
 
 class LogPublisherTests(unittest.TestCase):
 """
 Tests for L{LogPublisher}.
 """
 
 def test_interface(self):
 """
 L{LogPublisher} is an L{ILogObserver}.
 """
 publisher = LogPublisher()
 try:
 verifyObject(ILogObserver, publisher)
 except BrokenMethodImplementation as e:
 self.fail(e)
 
 
 def test_observers(self):
 """
 L{LogPublisher.observers} returns the observers.
 """
 o1 = lambda e: None
 o2 = lambda e: None
 
 publisher = LogPublisher(o1, o2)
 self.assertEqual(set((o1, o2)), set(publisher._observers))
 
 
 def test_addObserver(self):
 """
 L{LogPublisher.addObserver} adds an observer.
 """
 o1 = lambda e: None
 o2 = lambda e: None
 o3 = lambda e: None
 
 publisher = LogPublisher(o1, o2)
 publisher.addObserver(o3)
 self.assertEqual(set((o1, o2, o3)), set(publisher._observers))
 
 
 def test_addObserverNotCallable(self):
 """
 L{LogPublisher.addObserver} refuses to add an observer that's
 not callable.
 """
 publisher = LogPublisher()
 self.assertRaises(TypeError, publisher.addObserver, object())
 
 
 def test_removeObserver(self):
 """
 L{LogPublisher.removeObserver} removes an observer.
 """
 o1 = lambda e: None
 o2 = lambda e: None
 o3 = lambda e: None
 
 publisher = LogPublisher(o1, o2, o3)
 publisher.removeObserver(o2)
 self.assertEqual(set((o1, o3)), set(publisher._observers))
 
 
 def test_removeObserverNotRegistered(self):
 """
 L{LogPublisher.removeObserver} removes an observer that is not
 registered.
 """
 o1 = lambda e: None
 o2 = lambda e: None
 o3 = lambda e: None
 
 publisher = LogPublisher(o1, o2)
 publisher.removeObserver(o3)
 self.assertEqual(set((o1, o2)), set(publisher._observers))
 
 
 def test_fanOut(self):
 """
 L{LogPublisher} calls its observers.
 """
 event = dict(foo=1, bar=2)
 
 events1 = []
 events2 = []
 events3 = []
 
 o1 = lambda e: events1.append(e)
 o2 = lambda e: events2.append(e)
 o3 = lambda e: events3.append(e)
 
 publisher = LogPublisher(o1, o2, o3)
 publisher(event)
 self.assertIn(event, events1)
 self.assertIn(event, events2)
 self.assertIn(event, events3)
 
 
 def test_observerRaises(self):
 """
 Observer raises an exception during fan out: a failure is logged, but
 not re-raised.  Life goes on.
 """
 event = dict(foo=1, bar=2)
 exception = RuntimeError("ARGH! EVIL DEATH!")
 
 events = []
 
 def observer(event):
 shouldRaise = not events
 events.append(event)
 if shouldRaise:
 raise exception
 
 collector = []
 
 publisher = LogPublisher(observer, collector.append)
 publisher(event)
 
 # Verify that the observer saw my event
 self.assertIn(event, events)
 
 # Verify that the observer raised my exception
 errors = [
 e["log_failure"] for e in collector
 if "log_failure" in e
 ]
 self.assertEqual(len(errors), 1)
 self.assertIs(errors[0].value, exception)
 # Make sure the exceptional observer does not receive its own error.
 self.assertEqual(len(events), 1)
 
 
 def test_observerRaisesAndLoggerHatesMe(self):
 """
 Observer raises an exception during fan out and the publisher's Logger
 pukes when the failure is reported.  The exception does not propagate
 back to the caller.
 """
 event = dict(foo=1, bar=2)
 exception = RuntimeError("ARGH! EVIL DEATH!")
 
 def observer(event):
 raise RuntimeError("Sad panda")
 
 class GurkLogger(Logger):
 def failure(self, *args, **kwargs):
 raise exception
 
 publisher = LogPublisher(observer)
 publisher.log = GurkLogger()
 publisher(event)
 
 # Here, the lack of an exception thus far is a success, of sorts
 
 
 def test_trace(self):
 """
 Tracing keeps track of forwarding to observers.
 """
 event = dict(foo=1, bar=2, log_trace=[])
 
 traces = {}
 
 # Copy trace to a tuple; otherwise, both observers will store the same
 # mutable list, and we won't be able to see o1's view distinctly.
 o1 = lambda e: traces.setdefault(1, tuple(e["log_trace"]))
 o2 = lambda e: traces.setdefault(2, tuple(e["log_trace"]))
 
 publisher = LogPublisher(o1, o2)
 publisher(event)
 
 self.assertEqual(traces[1], ((publisher, o1),))
 self.assertEqual(traces[2], ((publisher, o1), (publisher, o2)))
 
 |