| Viewing file:  test_connection.py (4.2 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
import datetimeimport decimal
 import pymysql
 import time
 from pymysql.tests import base
 
 
 class TestConnection(base.PyMySQLTestCase):
 def test_utf8mb4(self):
 """This test requires MySQL >= 5.5"""
 arg = self.databases[0].copy()
 arg['charset'] = 'utf8mb4'
 conn = pymysql.connect(**arg)
 
 def test_largedata(self):
 """Large query and response (>=16MB)"""
 cur = self.connections[0].cursor()
 cur.execute("SELECT @@max_allowed_packet")
 if cur.fetchone()[0] < 16*1024*1024 + 10:
 print("Set max_allowed_packet to bigger than 17MB")
 return
 t = 'a' * (16*1024*1024)
 cur.execute("SELECT '" + t + "'")
 assert cur.fetchone()[0] == t
 
 def test_autocommit(self):
 con = self.connections[0]
 self.assertFalse(con.get_autocommit())
 
 cur = con.cursor()
 cur.execute("SET AUTOCOMMIT=1")
 self.assertTrue(con.get_autocommit())
 
 con.autocommit(False)
 self.assertFalse(con.get_autocommit())
 cur.execute("SELECT @@AUTOCOMMIT")
 self.assertEqual(cur.fetchone()[0], 0)
 
 def test_select_db(self):
 con = self.connections[0]
 current_db = self.databases[0]['db']
 other_db = self.databases[1]['db']
 
 cur = con.cursor()
 cur.execute('SELECT database()')
 self.assertEqual(cur.fetchone()[0], current_db)
 
 con.select_db(other_db)
 cur.execute('SELECT database()')
 self.assertEqual(cur.fetchone()[0], other_db)
 
 def test_connection_gone_away(self):
 """
 http://dev.mysql.com/doc/refman/5.0/en/gone-away.html
 http://dev.mysql.com/doc/refman/5.0/en/error-messages-client.html#error_cr_server_gone_error
 """
 con = self.connections[0]
 cur = con.cursor()
 cur.execute("SET wait_timeout=1")
 time.sleep(2)
 with self.assertRaises(pymysql.OperationalError) as cm:
 cur.execute("SELECT 1+1")
 # error occures while reading, not writing because of socket buffer.
 #self.assertEquals(cm.exception.args[0], 2006)
 self.assertIn(cm.exception.args[0], (2006, 2013))
 
 def test_init_command(self):
 conn = pymysql.connect(
 init_command='SELECT "bar"; SELECT "baz"',
 **self.databases[0]
 )
 c = conn.cursor()
 c.execute('select "foobar";')
 self.assertEqual(('foobar',), c.fetchone())
 conn.close()
 
 
 # A custom type and function to escape it
 class Foo(object):
 value = "bar"
 
 
 def escape_foo(x, d):
 return x.value
 
 
 class TestEscape(base.PyMySQLTestCase):
 def test_escape_string(self):
 con = self.connections[0]
 cur = con.cursor()
 
 self.assertEqual(con.escape("foo'bar"), "'foo\\'bar'")
 cur.execute("SET sql_mode='NO_BACKSLASH_ESCAPES'")
 self.assertEqual(con.escape("foo'bar"), "'foo''bar'")
 
 def test_escape_builtin_encoders(self):
 con = self.connections[0]
 cur = con.cursor()
 
 val = datetime.datetime(2012, 3, 4, 5, 6)
 self.assertEqual(con.escape(val, con.encoders), "'2012-03-04 05:06:00'")
 
 def test_escape_custom_object(self):
 con = self.connections[0]
 cur = con.cursor()
 
 mapping = {Foo: escape_foo}
 self.assertEqual(con.escape(Foo(), mapping), "bar")
 
 def test_escape_fallback_encoder(self):
 con = self.connections[0]
 cur = con.cursor()
 
 class Custom(str):
 pass
 
 mapping = {pymysql.text_type: pymysql.escape_string}
 self.assertEqual(con.escape(Custom('foobar'), mapping), "'foobar'")
 
 def test_escape_no_default(self):
 con = self.connections[0]
 cur = con.cursor()
 
 self.assertRaises(TypeError, con.escape, 42, {})
 
 def test_escape_dict_value(self):
 con = self.connections[0]
 cur = con.cursor()
 
 mapping = con.encoders.copy()
 mapping[Foo] = escape_foo
 self.assertEqual(con.escape({'foo': Foo()}, mapping), {'foo': "bar"})
 
 def test_escape_list_item(self):
 con = self.connections[0]
 cur = con.cursor()
 
 mapping = con.encoders.copy()
 mapping[Foo] = escape_foo
 self.assertEqual(con.escape([Foo()], mapping), "(bar)")
 
 |