1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- from time import strftime, sleep
- from services.jobs.source_data_set import SourceDataSet
- from services.jobs.data_center_set import DataCenterSet
- import schedule
- from services.clean_items.dispatcher import Dispatcher,UnDispatcher
- import redis
- class Job():
- _sourceDataSet = None
- _dataCenterSet = None
- _project_ids = []
- _item_maps = {}
- _redis = None
-
- def __init__(self, config):
- self.config = config
- self._sourceDataSet = SourceDataSet(self.config)
- self._dataCenterSet = DataCenterSet(self.config)
- self._init_redis(self.config['redis'])
- def set_project_id(self, project_ids=[]):
- self._project_ids = project_ids
- def _init_item_map(self):
- self._item_maps = ''
- def run(self):
- tag = Job.generate_tag()
- # 获取全量新增的数据
- data = self._sourceDataSet.fetch_increment(10000)
- # 数据按 project_id + item 分组
- group_data = self._sourceDataSet.group_by_items(data, self._project_ids)
- # 映射到目标 target_item
- for group_name in group_data:
- target_item = self._dataCenterSet.get_target_item(group_name)
- if target_item is not None:
- # 执行target_item 的清洗策略
- Dispatcher(group_name, tag, target_item, group_data[group_name], self).start()
- else:
- # 不需要清洗的点位直接入库
- UnDispatcher(group_name, tag, target_item, group_data[group_name], self).start()
- # 发送一个清洗完成的消息到队列中
- self._redis.rpush('data-center:mq:event', tag)
- self._redis.rpush('data-center:mq:alarm', tag)
- self._redis.rpush('data-center:mq:isomerism', tag)
- def get_clean_config(self, project_id, source_item, target_item):
- return self._dataCenterSet.get_clean_config(project_id, source_item, target_item)
- def insert_describe(self, project_id, tag, item, describe):
- self._dataCenterSet.insert_describe(project_id, tag, item, describe)
- def insert_cleaned_data(self, project_id, source_item, tag, series):
- self._dataCenterSet.insert_cleaned_data(project_id, source_item, tag, series)
- def _init_redis(self, config):
- self._redis = redis.Redis(host=config['host'], db=config['db'], port=config['port'], password=config['password'])
- @staticmethod
- def generate_tag():
- return strftime('%y%m%d%H%M%S')
- def start(self):
- schedule.every(1).seconds.do(self.run)
- while True:
- schedule.run_pending()
- sleep(5)
|