Viewing file: model.py (4.27 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# -*- coding: UTF-8 -*-
from sqlalchemy import create_engine from sqlalchemy import MetaData from sqlalchemy.orm import sessionmaker, interfaces from sqlalchemy.ext.automap import automap_base from sqlalchemy.ext.automap import generate_relationship from sqlalchemy import inspect from sqlalchemy.sql.expression import insert, update
class Model: def __init__(self, tablename, pk_column_name = None, labels={}, relations={}): # TODO: cambiar por valores en config.ini """ how relationship works: * the table has to have a FK configured * the relation structure has to contain: . join table . fields selected to include in the query For example: relations={ 'stock': { 'table': 'stock_productos', 'select': ['stock'] } } """ connection_string = 'mysql+pymysql://{}:{}@{}/{}?charset=utf8'.format( 'db_user', 'pacman', 'localhost', 'antonella_test' ) engine = create_engine(connection_string) Session = sessionmaker() self.session = Session() # Build datamodel self.base = automap_base(bind=engine, metadata=MetaData()) # reflect tables self.base.prepare(engine, reflect=True, generate_relationship=generate_relationship) self.tablename = tablename self.table = self.base.classes.get(tablename) self.pk_column_name = pk_column_name self.labels = labels self.relations = relations
def __del__(self): self.session.close()
def create(self, input): insert = self.table.__table__.insert().values(**input) result = self.session.execute(insert) commited = self.session.commit() if 'id' in self.pk_column_name: input.update({self.pk_column_name: result.lastrowid}) return input
def update(self, id, input): self.session.execute(update(self.table).values(**input).where( getattr(self.table, self.pk_column_name) == id )) self.session.commit() return
def delete(self, id): return [self._row2dict(r) for r in self.session.query(self.table).\ filter_by({self.pk_column_name : id}).delete() ]
def get(self, id): return self.get_by(**{self.pk_column_name: id})
def get_all(self): query = self.session.query(self.table) for r in self.relations: joined = self.base.classes.get(self.relations[r]['table']) query = query.add_entity(joined).join(joined) for c in self.relations[r]['select']: query = query.add_columns(getattr(joined, c)) result = query.all() return [self._row2dict(r) for r in result]
def get_by(self, **kwargs): query = self.session.query(self.table).filter_by(**kwargs) for r in self.relations: joined = self.base.classes.get(self.relations[r]['table']) query = query.add_entity(joined).join(joined) for c in self.relations[r]['select']: query = query.add_columns(getattr(joined, c)) result = query.first() if not result: raise NotFoundException(self.tablename) return self._row2dict(result)
def _row2dict(self, row): if len(self.relations) == 0: return self._simple_row2dict(row) return self._joined_row2dict(row)
def _simple_row2dict(self, row): d = {} for column in row.__table__.columns: column_key = column.name if column.name in self.labels: column_key = self.labels[column.name] d[column_key] = getattr(row, column.name) import datetime if isinstance(d[column_key], datetime.datetime): d[column_key] = str(d[column_key]) return d
def _joined_row2dict(self, row): d = self._simple_row2dict(getattr(row, self.tablename)) for r in self.relations: # add relation to row column_name = r d.update({ column_name : getattr(row, self.relations[r]['select'][0]) }) return d
class NotFoundException(Exception): pass
|