| Viewing file:  assertsql.py (12.05 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# testing/assertsql.py# Copyright (C) 2005-2016 the SQLAlchemy authors and contributors
 # <see AUTHORS file>
 #
 # This module is part of SQLAlchemy and is released under
 # the MIT License: http://www.opensource.org/licenses/mit-license.php
 
 from ..engine.default import DefaultDialect
 from .. import util
 import re
 import collections
 import contextlib
 from .. import event
 from sqlalchemy.schema import _DDLCompiles
 from sqlalchemy.engine.util import _distill_params
 from sqlalchemy.engine import url
 
 
 class AssertRule(object):
 
 is_consumed = False
 errormessage = None
 consume_statement = True
 
 def process_statement(self, execute_observed):
 pass
 
 def no_more_statements(self):
 assert False, 'All statements are complete, but pending '\
 'assertion rules remain'
 
 
 class SQLMatchRule(AssertRule):
 pass
 
 
 class CursorSQL(SQLMatchRule):
 consume_statement = False
 
 def __init__(self, statement, params=None):
 self.statement = statement
 self.params = params
 
 def process_statement(self, execute_observed):
 stmt = execute_observed.statements[0]
 if self.statement != stmt.statement or (
 self.params is not None and self.params != stmt.parameters):
 self.errormessage = \
 "Testing for exact SQL %s parameters %s received %s %s" % (
 self.statement, self.params,
 stmt.statement, stmt.parameters
 )
 else:
 execute_observed.statements.pop(0)
 self.is_consumed = True
 if not execute_observed.statements:
 self.consume_statement = True
 
 
 class CompiledSQL(SQLMatchRule):
 
 def __init__(self, statement, params=None, dialect='default'):
 self.statement = statement
 self.params = params
 self.dialect = dialect
 
 def _compare_sql(self, execute_observed, received_statement):
 stmt = re.sub(r'[\n\t]', '', self.statement)
 return received_statement == stmt
 
 def _compile_dialect(self, execute_observed):
 if self.dialect == 'default':
 return DefaultDialect()
 else:
 # ugh
 if self.dialect == 'postgresql':
 params = {'implicit_returning': True}
 else:
 params = {}
 return url.URL(self.dialect).get_dialect()(**params)
 
 def _received_statement(self, execute_observed):
 """reconstruct the statement and params in terms
 of a target dialect, which for CompiledSQL is just DefaultDialect."""
 
 context = execute_observed.context
 compare_dialect = self._compile_dialect(execute_observed)
 if isinstance(context.compiled.statement, _DDLCompiles):
 compiled = \
 context.compiled.statement.compile(dialect=compare_dialect)
 else:
 compiled = (
 context.compiled.statement.compile(
 dialect=compare_dialect,
 column_keys=context.compiled.column_keys,
 inline=context.compiled.inline)
 )
 _received_statement = re.sub(r'[\n\t]', '', util.text_type(compiled))
 parameters = execute_observed.parameters
 
 if not parameters:
 _received_parameters = [compiled.construct_params()]
 else:
 _received_parameters = [
 compiled.construct_params(m) for m in parameters]
 
 return _received_statement, _received_parameters
 
 def process_statement(self, execute_observed):
 context = execute_observed.context
 
 _received_statement, _received_parameters = \
 self._received_statement(execute_observed)
 params = self._all_params(context)
 
 equivalent = self._compare_sql(execute_observed, _received_statement)
 
 if equivalent:
 if params is not None:
 all_params = list(params)
 all_received = list(_received_parameters)
 while all_params and all_received:
 param = dict(all_params.pop(0))
 
 for idx, received in enumerate(list(all_received)):
 # do a positive compare only
 for param_key in param:
 # a key in param did not match current
 # 'received'
 if param_key not in received or \
 received[param_key] != param[param_key]:
 break
 else:
 # all keys in param matched 'received';
 # onto next param
 del all_received[idx]
 break
 else:
 # param did not match any entry
 # in all_received
 equivalent = False
 break
 if all_params or all_received:
 equivalent = False
 
 if equivalent:
 self.is_consumed = True
 self.errormessage = None
 else:
 self.errormessage = self._failure_message(params) % {
 'received_statement': _received_statement,
 'received_parameters': _received_parameters
 }
 
 def _all_params(self, context):
 if self.params:
 if util.callable(self.params):
 params = self.params(context)
 else:
 params = self.params
 if not isinstance(params, list):
 params = [params]
 return params
 else:
 return None
 
 def _failure_message(self, expected_params):
 return (
 'Testing for compiled statement %r partial params %r, '
 'received %%(received_statement)r with params '
 '%%(received_parameters)r' % (
 self.statement.replace('%', '%%'), expected_params
 )
 )
 
 
 class RegexSQL(CompiledSQL):
 def __init__(self, regex, params=None):
 SQLMatchRule.__init__(self)
 self.regex = re.compile(regex)
 self.orig_regex = regex
 self.params = params
 self.dialect = 'default'
 
 def _failure_message(self, expected_params):
 return (
 'Testing for compiled statement ~%r partial params %r, '
 'received %%(received_statement)r with params '
 '%%(received_parameters)r' % (
 self.orig_regex, expected_params
 )
 )
 
 def _compare_sql(self, execute_observed, received_statement):
 return bool(self.regex.match(received_statement))
 
 
 class DialectSQL(CompiledSQL):
 def _compile_dialect(self, execute_observed):
 return execute_observed.context.dialect
 
 def _compare_no_space(self, real_stmt, received_stmt):
 stmt = re.sub(r'[\n\t]', '', real_stmt)
 return received_stmt == stmt
 
 def _received_statement(self, execute_observed):
 received_stmt, received_params = super(DialectSQL, self).\
 _received_statement(execute_observed)
 
 # TODO: why do we need this part?
 for real_stmt in execute_observed.statements:
 if self._compare_no_space(real_stmt.statement, received_stmt):
 break
 else:
 raise AssertionError(
 "Can't locate compiled statement %r in list of "
 "statements actually invoked" % received_stmt)
 
 return received_stmt, execute_observed.context.compiled_parameters
 
 def _compare_sql(self, execute_observed, received_statement):
 stmt = re.sub(r'[\n\t]', '', self.statement)
 # convert our comparison statement to have the
 # paramstyle of the received
 paramstyle = execute_observed.context.dialect.paramstyle
 if paramstyle == 'pyformat':
 stmt = re.sub(
 r':([\w_]+)', r"%(\1)s", stmt)
 else:
 # positional params
 repl = None
 if paramstyle == 'qmark':
 repl = "?"
 elif paramstyle == 'format':
 repl = r"%s"
 elif paramstyle == 'numeric':
 repl = None
 stmt = re.sub(r':([\w_]+)', repl, stmt)
 
 return received_statement == stmt
 
 
 class CountStatements(AssertRule):
 
 def __init__(self, count):
 self.count = count
 self._statement_count = 0
 
 def process_statement(self, execute_observed):
 self._statement_count += 1
 
 def no_more_statements(self):
 if self.count != self._statement_count:
 assert False, 'desired statement count %d does not match %d' \
 % (self.count, self._statement_count)
 
 
 class AllOf(AssertRule):
 
 def __init__(self, *rules):
 self.rules = set(rules)
 
 def process_statement(self, execute_observed):
 for rule in list(self.rules):
 rule.errormessage = None
 rule.process_statement(execute_observed)
 if rule.is_consumed:
 self.rules.discard(rule)
 if not self.rules:
 self.is_consumed = True
 break
 elif not rule.errormessage:
 # rule is not done yet
 self.errormessage = None
 break
 else:
 self.errormessage = list(self.rules)[0].errormessage
 
 
 class Or(AllOf):
 
 def process_statement(self, execute_observed):
 for rule in self.rules:
 rule.process_statement(execute_observed)
 if rule.is_consumed:
 self.is_consumed = True
 break
 else:
 self.errormessage = list(self.rules)[0].errormessage
 
 
 class SQLExecuteObserved(object):
 def __init__(self, context, clauseelement, multiparams, params):
 self.context = context
 self.clauseelement = clauseelement
 self.parameters = _distill_params(multiparams, params)
 self.statements = []
 
 
 class SQLCursorExecuteObserved(
 collections.namedtuple(
 "SQLCursorExecuteObserved",
 ["statement", "parameters", "context", "executemany"])
 ):
 pass
 
 
 class SQLAsserter(object):
 def __init__(self):
 self.accumulated = []
 
 def _close(self):
 self._final = self.accumulated
 del self.accumulated
 
 def assert_(self, *rules):
 rules = list(rules)
 observed = list(self._final)
 
 while observed and rules:
 rule = rules[0]
 rule.process_statement(observed[0])
 if rule.is_consumed:
 rules.pop(0)
 elif rule.errormessage:
 assert False, rule.errormessage
 
 if rule.consume_statement:
 observed.pop(0)
 
 if not observed and rules:
 rules[0].no_more_statements()
 elif not rules and observed:
 assert False, "Additional SQL statements remain"
 
 
 @contextlib.contextmanager
 def assert_engine(engine):
 asserter = SQLAsserter()
 
 orig = []
 
 @event.listens_for(engine, "before_execute")
 def connection_execute(conn, clauseelement, multiparams, params):
 # grab the original statement + params before any cursor
 # execution
 orig[:] = clauseelement, multiparams, params
 
 @event.listens_for(engine, "after_cursor_execute")
 def cursor_execute(conn, cursor, statement, parameters,
 context, executemany):
 if not context:
 return
 # then grab real cursor statements and associate them all
 # around a single context
 if asserter.accumulated and \
 asserter.accumulated[-1].context is context:
 obs = asserter.accumulated[-1]
 else:
 obs = SQLExecuteObserved(context, orig[0], orig[1], orig[2])
 asserter.accumulated.append(obs)
 obs.statements.append(
 SQLCursorExecuteObserved(
 statement, parameters, context, executemany)
 )
 
 try:
 yield asserter
 finally:
 event.remove(engine, "after_cursor_execute", cursor_execute)
 event.remove(engine, "before_execute", connection_execute)
 asserter._close()
 
 |