| Viewing file:  test_database.py (7.37 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# Copyright (c) Twisted Matrix Laboratories.# See LICENSE for details.
 
 """
 Tests for L{twisted.news.database}.
 """
 
 __metaclass__ = type
 
 from email.Parser import Parser
 from socket import gethostname
 
 from twisted.trial.unittest import TestCase
 from twisted.internet.defer import succeed
 from twisted.mail.smtp import messageid
 from twisted.news.database import Article, PickleStorage, NewsShelf
 
 
 
 class ModerationTestsMixin:
 """
 Tests for the moderation features of L{INewsStorage} implementations.
 """
 def setUp(self):
 self._email = []
 
 
 def sendmail(self, smtphost, from_addr, to_addrs, msg,
 senderDomainName=None, port=25):
 """
 Fake of L{twisted.mail.smtp.sendmail} which records attempts to send
 email and immediately pretends success.
 
 Subclasses should arrange for their storage implementation to call this
 instead of the real C{sendmail} function.
 """
 self._email.append((
 smtphost, from_addr, to_addrs, msg, senderDomainName, port))
 return succeed(None)
 
 
 _messageTemplate = """\
 From: some dude
 To: another person
 Subject: activities etc
 Message-ID: %(articleID)s
 Newsgroups: %(newsgroup)s
 %(approved)s
 Body of the message is such.
 """.replace('\n', '\r\n')
 
 
 def getApprovedMessage(self, articleID, group):
 """
 Return a C{str} containing an RFC 2822 formatted message including an
 I{Approved} header indicating it has passed through moderation.
 """
 return self._messageTemplate % {
 'articleID': articleID,
 'newsgroup': group,
 'approved': 'Approved: yup\r\n'}
 
 
 def getUnapprovedMessage(self, articleID, group):
 """
 Return a C{str} containing an RFC 2822 formatted message with no
 I{Approved} header indicating it may require moderation.
 """
 return self._messageTemplate % {
 'articleID': articleID,
 'newsgroup': group,
 'approved': '\r\n'}
 
 
 def getStorage(self, groups, moderators, mailhost, sender):
 """
 Override in a subclass to return a L{INewsStorage} provider to test for
 correct moderation behavior.
 
 @param groups: A C{list} of C{str} naming the groups which should exist
 in the resulting storage object.
 
 @param moderators: A C{dict} mapping C{str} each group name to a C{list}
 of C{str} giving moderator email (RFC 2821) addresses.
 """
 raise NotImplementedError()
 
 
 def test_postApproved(self):
 """
 L{INewsStorage.postRequest} posts the message if it includes an
 I{Approved} header.
 """
 group = "example.group"
 moderator = "alice@example.com"
 mailhost = "127.0.0.1"
 sender = "bob@example.org"
 articleID = messageid()
 storage = self.getStorage(
 [group], {group: [moderator]}, mailhost, sender)
 message = self.getApprovedMessage(articleID, group)
 result = storage.postRequest(message)
 
 def cbPosted(ignored):
 self.assertEqual(self._email, [])
 exists = storage.articleExistsRequest(articleID)
 exists.addCallback(self.assertTrue)
 return exists
 result.addCallback(cbPosted)
 return result
 
 
 def test_postModerated(self):
 """
 L{INewsStorage.postRequest} forwards a message to the moderator if it
 does not include an I{Approved} header.
 """
 group = "example.group"
 moderator = "alice@example.com"
 mailhost = "127.0.0.1"
 sender = "bob@example.org"
 articleID = messageid()
 storage = self.getStorage(
 [group], {group: [moderator]}, mailhost, sender)
 message = self.getUnapprovedMessage(articleID, group)
 result = storage.postRequest(message)
 
 def cbModerated(ignored):
 self.assertEqual(len(self._email), 1)
 self.assertEqual(self._email[0][0], mailhost)
 self.assertEqual(self._email[0][1], sender)
 self.assertEqual(self._email[0][2], [moderator])
 self._checkModeratorMessage(
 self._email[0][3], sender, moderator, group, message)
 self.assertEqual(self._email[0][4], None)
 self.assertEqual(self._email[0][5], 25)
 exists = storage.articleExistsRequest(articleID)
 exists.addCallback(self.assertFalse)
 return exists
 result.addCallback(cbModerated)
 return result
 
 
 def _checkModeratorMessage(self, messageText, sender, moderator, group, postingText):
 p = Parser()
 msg = p.parsestr(messageText)
 headers = dict(msg.items())
 del headers['Message-ID']
 self.assertEqual(
 headers,
 {'From': sender,
 'To': moderator,
 'Subject': 'Moderate new %s message: activities etc' % (group,),
 'Content-Type': 'message/rfc822'})
 
 posting = p.parsestr(postingText)
 attachment = msg.get_payload()[0]
 
 for header in ['from', 'to', 'subject', 'message-id', 'newsgroups']:
 self.assertEqual(posting[header], attachment[header])
 
 self.assertEqual(posting.get_payload(), attachment.get_payload())
 
 
 
 class PickleStorageTests(ModerationTestsMixin, TestCase):
 """
 Tests for L{PickleStorage}.
 """
 def getStorage(self, groups, moderators, mailhost, sender):
 """
 Create and return a L{PickleStorage} instance configured to require
 moderation.
 """
 storageFilename = self.mktemp()
 storage = PickleStorage(
 storageFilename, groups, moderators, mailhost, sender)
 storage.sendmail = self.sendmail
 self.addCleanup(PickleStorage.sharedDBs.pop, storageFilename)
 return storage
 
 
 
 class NewsShelfTests(ModerationTestsMixin, TestCase):
 """
 Tests for L{NewsShelf}.
 """
 def getStorage(self, groups, moderators, mailhost, sender):
 """
 Create and return a L{NewsShelf} instance configured to require
 moderation.
 """
 storageFilename = self.mktemp()
 shelf = NewsShelf(mailhost, storageFilename, sender)
 for name in groups:
 shelf.addGroup(name, 'm') # Dial 'm' for moderator
 for address in moderators.get(name, []):
 shelf.addModerator(name, address)
 shelf.sendmail = self.sendmail
 return shelf
 
 
 def test_notifyModerator(self):
 """
 L{NewsShelf.notifyModerator} sends a moderation email to a single
 moderator.
 """
 shelf = NewsShelf('example.com', self.mktemp(), 'alice@example.com')
 shelf.sendmail = self.sendmail
 shelf.notifyModerator('bob@example.org', Article('Foo: bar', 'Some text'))
 self.assertEqual(len(self._email), 1)
 
 
 def test_defaultSender(self):
 """
 If no sender is specified to L{NewsShelf.notifyModerators}, a default
 address based on the system hostname is used for both the envelope and
 RFC 2822 sender addresses.
 """
 shelf = NewsShelf('example.com', self.mktemp())
 shelf.sendmail = self.sendmail
 shelf.notifyModerators(['bob@example.org'], Article('Foo: bar', 'Some text'))
 self.assertEqual(self._email[0][1], 'twisted-news@' + gethostname())
 self.assertIn('From: twisted-news@' + gethostname(), self._email[0][3])
 
 |