job.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. from time import strftime, sleep
  2. from services.jobs.source_data_set import SourceDataSet
  3. from services.jobs.data_center_set import DataCenterSet
  4. import schedule
  5. from services.clean_items.dispatcher import Dispatcher,UnDispatcher
  6. import redis
  7. class Job():
  8. _sourceDataSet = None
  9. _dataCenterSet = None
  10. _project_ids = []
  11. _item_maps = {}
  12. _redis = None
  13. def __init__(self, config):
  14. self.config = config
  15. self._sourceDataSet = SourceDataSet(self.config)
  16. self._dataCenterSet = DataCenterSet(self.config)
  17. self._init_redis(self.config['redis'])
  18. def set_project_id(self, project_ids=[]):
  19. self._project_ids = project_ids
  20. def _init_item_map(self):
  21. self._item_maps = ''
  22. def run(self):
  23. tag = Job.generate_tag()
  24. # 获取全量新增的数据
  25. data = self._sourceDataSet.fetch_increment(10000)
  26. # 数据按 project_id + item 分组
  27. group_data = self._sourceDataSet.group_by_items(data, self._project_ids)
  28. # 映射到目标 target_item
  29. for group_name in group_data:
  30. target_item = self._dataCenterSet.get_target_item(group_name)
  31. if target_item is not None:
  32. # 执行target_item 的清洗策略
  33. Dispatcher(group_name, tag, target_item, group_data[group_name], self).start()
  34. else:
  35. # 不需要清洗的点位直接入库
  36. UnDispatcher(group_name, tag, target_item, group_data[group_name], self).start()
  37. # 发送一个清洗完成的消息到队列中
  38. self._redis.rpush('data-center:mq:event', tag)
  39. self._redis.rpush('data-center:mq:alarm', tag)
  40. self._redis.rpush('data-center:mq:isomerism', tag)
  41. def get_clean_config(self, project_id, source_item, target_item):
  42. return self._dataCenterSet.get_clean_config(project_id, source_item, target_item)
  43. def insert_describe(self, project_id, tag, item, describe):
  44. self._dataCenterSet.insert_describe(project_id, tag, item, describe)
  45. def insert_cleaned_data(self, project_id, source_item, tag, series):
  46. self._dataCenterSet.insert_cleaned_data(project_id, source_item, tag, series)
  47. def _init_redis(self, config):
  48. self._redis = redis.Redis(host=config['host'], db=config['db'], port=config['port'], password=config['password'])
  49. @staticmethod
  50. def generate_tag():
  51. return strftime('%y%m%d%H%M%S')
  52. def start(self):
  53. schedule.every(1).seconds.do(self.run)
  54. while True:
  55. schedule.run_pending()
  56. sleep(5)