#!coding:utf-8 from sshtunnel import SSHTunnelForwarder from sqlalchemy import create_engine,text from sqlalchemy.orm import sessionmaker class Opc(): engine = None _session = None def get(self, project_id, item_name, fields=['item_value','c_time'], st='', et=''): DBSession = sessionmaker(bind=self.engine) # 创建session对象 session = DBSession() table_name = self.routeTableName(item_name) ts_cond = self.tsCond(st, et) query = 'select {} from {} where `item_name` = \'{}\' {} order by ts desc limit 1'.format(','.join(fields), table_name, item_name, ts_cond) print(query) try: data = session.execute(text(query)).one() except Exception: return None finally: session.close() return data def find(self, project_id, item_name, fields=['item_value','c_time'], st='', et='', start=0, limit=10): DBSession = sessionmaker(bind=self.engine) # 创建session对象 session = DBSession() table_name = self.routeTableName(item_name) ts_cond = self.tsCond(st, et) query = 'select {} from {} where `item_name` = \'{}\' {} order by ts desc limit {},{}'.format(','.join(fields), table_name, item_name, ts_cond, start, limit) try: data = session.execute(text(query)).all() finally: session.close() return data def find_increment(self, table_name, fields=['id','project_id','item_name','item_value','ts'], last_id=0, limit=10000): DBSession = sessionmaker(bind=self.engine) # 创建session对象 self._session = DBSession() query = 'select {} from {} where id>{} limit {}'.format(','.join(fields), table_name, last_id, limit) try: data = self._session.execute(text(query)).all() finally: self._session.close() return data def multi_items_find(self, project_id, item_names=[], st='', et='', start=0, limit=10): DBSession = sessionmaker(bind=self.engine) # 创建session对象 session = DBSession() ts_cond = self.tsCond(st, et) result = [] try: for name in item_names: table_name = self.routeTableName(item_name) query = 'select {} from {} where `item_name` = \'{}\' {} order by ts desc limit {},{}'.format(','.join(fields), table_name, name, ts_cond, start, limit) data = session.execute(text(query)).all() result.append(data) finally: session.close() return result def tsCond(self, st, et): ts_cond = '' if len(st) > 0 and len(et) > 0: ts_cond = ' and ts>=\'{}\' and ts<\'{}\' '.format(st, et) if len(st) > 0: ts_cond = ' and ts>=\'{}\' '.format(st) if len(et) > 0: ts_cond = ' and ts<=\'{}\' '.format(et) return ts_cond def routeTableName(self, item_name): idx = self.bkdrhash(item_name) return 'scada_data_{}'.format(idx) def bkdrhash(self, s): seed = 131 hash = 0 for i in range(0, len(s)): hash = (hash * seed) + ord(s[i]) return (hash & 0x7FFFFFFF) % 40 def __init__(self, scada_db): tunnel = SSHTunnelForwarder( (scada_db['ssh-host'], scada_db['ssh-port']), ssh_username=scada_db['ssh-username'], ssh_password=scada_db['ssh-password'], remote_bind_address=(scada_db['host'], scada_db['port']) ) tunnel.start() scada_db['port'] = str(tunnel.local_bind_port) # print('----------------------------------------------------mysql+pymysql://{user}:{password}@{host}:{port}/{dbname}'.format(**scada_db)) self.engine = create_engine('mysql+pymysql://{user}:{password}@{host}:{port}/{dbname}'.format(**scada_db))