123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- #!coding:utf-8
- from sshtunnel import SSHTunnelForwarder
- from sqlalchemy import create_engine,text
- from sqlalchemy.orm import sessionmaker
- class Opc():
- engine = None
- _session = None
- def get(self, project_id, item_name, fields=['item_value','c_time'], st='', et=''):
- DBSession = sessionmaker(bind=self.engine)
- # 创建session对象
- session = DBSession()
- table_name = self.routeTableName(item_name)
- ts_cond = self.tsCond(st, et)
- query = 'select {} from {} where `item_name` = \'{}\' {} order by ts desc limit 1'.format(','.join(fields), table_name, item_name, ts_cond)
- print(query)
- try:
- data = session.execute(text(query)).one()
- except Exception:
- return None
- finally:
- session.close()
- return data
- def find(self, project_id, item_name, fields=['item_value','c_time'], st='', et='', start=0, limit=10):
- DBSession = sessionmaker(bind=self.engine)
- # 创建session对象
- session = DBSession()
- table_name = self.routeTableName(item_name)
- ts_cond = self.tsCond(st, et)
- query = 'select {} from {} where `item_name` = \'{}\' {} order by ts desc limit {},{}'.format(','.join(fields), table_name, item_name, ts_cond, start, limit)
- try:
- data = session.execute(text(query)).all()
- finally:
- session.close()
- return data
- def find_increment(self, table_name, fields=['id','project_id','item_name','item_value','ts'], last_id=0, limit=10000):
- DBSession = sessionmaker(bind=self.engine)
- # 创建session对象
- self._session = DBSession()
- query = 'select {} from {} where id>{} limit {}'.format(','.join(fields), table_name, last_id, limit)
- try:
- data = self._session.execute(text(query)).all()
- finally:
- self._session.close()
- return data
- def multi_items_find(self, project_id, item_names=[], st='', et='', start=0, limit=10):
- DBSession = sessionmaker(bind=self.engine)
- # 创建session对象
- session = DBSession()
- ts_cond = self.tsCond(st, et)
- result = []
- try:
- for name in item_names:
- table_name = self.routeTableName(item_name)
- query = 'select {} from {} where `item_name` = \'{}\' {} order by ts desc limit {},{}'.format(','.join(fields), table_name, name, ts_cond, start, limit)
- data = session.execute(text(query)).all()
- result.append(data)
- finally:
- session.close()
- return result
- def tsCond(self, st, et):
- ts_cond = ''
- if len(st) > 0 and len(et) > 0:
- ts_cond = ' and ts>=\'{}\' and ts<\'{}\' '.format(st, et)
- if len(st) > 0:
- ts_cond = ' and ts>=\'{}\' '.format(st)
- if len(et) > 0:
- ts_cond = ' and ts<=\'{}\' '.format(et)
- return ts_cond
- def routeTableName(self, item_name):
- idx = self.bkdrhash(item_name)
- return 'scada_data_{}'.format(idx)
- def bkdrhash(self, s):
- seed = 131
- hash = 0
- for i in range(0, len(s)):
- hash = (hash * seed) + ord(s[i])
- return (hash & 0x7FFFFFFF) % 40
- def __init__(self, scada_db):
-
- tunnel = SSHTunnelForwarder(
- (scada_db['ssh-host'], scada_db['ssh-port']),
- ssh_username=scada_db['ssh-username'],
- ssh_password=scada_db['ssh-password'],
- remote_bind_address=(scada_db['host'], scada_db['port'])
- )
- tunnel.start()
- scada_db['port'] = str(tunnel.local_bind_port)
- # print('----------------------------------------------------mysql+pymysql://{user}:{password}@{host}:{port}/{dbname}'.format(**scada_db))
- self.engine = create_engine('mysql+pymysql://{user}:{password}@{host}:{port}/{dbname}'.format(**scada_db))
|