opc.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. #!coding:utf-8
  2. from sshtunnel import SSHTunnelForwarder
  3. from sqlalchemy import create_engine,text
  4. from sqlalchemy.orm import sessionmaker
  5. class Opc():
  6. engine = None
  7. _session = None
  8. def get(self, project_id, item_name, fields=['item_value','c_time'], st='', et=''):
  9. DBSession = sessionmaker(bind=self.engine)
  10. # 创建session对象
  11. session = DBSession()
  12. table_name = self.routeTableName(item_name)
  13. ts_cond = self.tsCond(st, et)
  14. query = 'select {} from {} where `item_name` = \'{}\' {} order by ts desc limit 1'.format(','.join(fields), table_name, item_name, ts_cond)
  15. print(query)
  16. try:
  17. data = session.execute(text(query)).one()
  18. except Exception:
  19. return None
  20. finally:
  21. session.close()
  22. return data
  23. def find(self, project_id, item_name, fields=['item_value','c_time'], st='', et='', start=0, limit=10):
  24. DBSession = sessionmaker(bind=self.engine)
  25. # 创建session对象
  26. session = DBSession()
  27. table_name = self.routeTableName(item_name)
  28. ts_cond = self.tsCond(st, et)
  29. query = 'select {} from {} where `item_name` = \'{}\' {} order by ts desc limit {},{}'.format(','.join(fields), table_name, item_name, ts_cond, start, limit)
  30. try:
  31. data = session.execute(text(query)).all()
  32. finally:
  33. session.close()
  34. return data
  35. def find_increment(self, table_name, fields=['id','project_id','item_name','item_value','ts'], last_id=0, limit=10000):
  36. DBSession = sessionmaker(bind=self.engine)
  37. # 创建session对象
  38. self._session = DBSession()
  39. query = 'select {} from {} where id>{} limit {}'.format(','.join(fields), table_name, last_id, limit)
  40. try:
  41. data = self._session.execute(text(query)).all()
  42. finally:
  43. self._session.close()
  44. return data
  45. def multi_items_find(self, project_id, item_names=[], st='', et='', start=0, limit=10):
  46. DBSession = sessionmaker(bind=self.engine)
  47. # 创建session对象
  48. session = DBSession()
  49. ts_cond = self.tsCond(st, et)
  50. result = []
  51. try:
  52. for name in item_names:
  53. table_name = self.routeTableName(item_name)
  54. query = 'select {} from {} where `item_name` = \'{}\' {} order by ts desc limit {},{}'.format(','.join(fields), table_name, name, ts_cond, start, limit)
  55. data = session.execute(text(query)).all()
  56. result.append(data)
  57. finally:
  58. session.close()
  59. return result
  60. def tsCond(self, st, et):
  61. ts_cond = ''
  62. if len(st) > 0 and len(et) > 0:
  63. ts_cond = ' and ts>=\'{}\' and ts<\'{}\' '.format(st, et)
  64. if len(st) > 0:
  65. ts_cond = ' and ts>=\'{}\' '.format(st)
  66. if len(et) > 0:
  67. ts_cond = ' and ts<=\'{}\' '.format(et)
  68. return ts_cond
  69. def routeTableName(self, item_name):
  70. idx = self.bkdrhash(item_name)
  71. return 'scada_data_{}'.format(idx)
  72. def bkdrhash(self, s):
  73. seed = 131
  74. hash = 0
  75. for i in range(0, len(s)):
  76. hash = (hash * seed) + ord(s[i])
  77. return (hash & 0x7FFFFFFF) % 40
  78. def __init__(self, scada_db):
  79. tunnel = SSHTunnelForwarder(
  80. (scada_db['ssh-host'], scada_db['ssh-port']),
  81. ssh_username=scada_db['ssh-username'],
  82. ssh_password=scada_db['ssh-password'],
  83. remote_bind_address=(scada_db['host'], scada_db['port'])
  84. )
  85. tunnel.start()
  86. scada_db['port'] = str(tunnel.local_bind_port)
  87. # print('----------------------------------------------------mysql+pymysql://{user}:{password}@{host}:{port}/{dbname}'.format(**scada_db))
  88. self.engine = create_engine('mysql+pymysql://{user}:{password}@{host}:{port}/{dbname}'.format(**scada_db))