from time import strftime, sleep from services.jobs.source_data_set import SourceDataSet from services.jobs.data_center_set import DataCenterSet import schedule from services.clean_items.dispatcher import Dispatcher,UnDispatcher import redis class Job(): _sourceDataSet = None _dataCenterSet = None _project_ids = [] _item_maps = {} _redis = None def __init__(self, config): self.config = config self._sourceDataSet = SourceDataSet(self.config) self._dataCenterSet = DataCenterSet(self.config) self._init_redis(self.config['redis']) def set_project_id(self, project_ids=[]): self._project_ids = project_ids def _init_item_map(self): self._item_maps = '' def run(self): tag = Job.generate_tag() # 获取全量新增的数据 data = self._sourceDataSet.fetch_increment(10000) # 数据按 project_id + item 分组 group_data = self._sourceDataSet.group_by_items(data, self._project_ids) # 映射到目标 target_item for group_name in group_data: target_item = self._dataCenterSet.get_target_item(group_name) if target_item is not None: # 执行target_item 的清洗策略 Dispatcher(group_name, tag, target_item, group_data[group_name], self).start() else: # 不需要清洗的点位直接入库 UnDispatcher(group_name, tag, target_item, group_data[group_name], self).start() # 发送一个清洗完成的消息到队列中 self._redis.rpush('data-center:mq:event', tag) self._redis.rpush('data-center:mq:alarm', tag) self._redis.rpush('data-center:mq:isomerism', tag) def get_clean_config(self, project_id, source_item, target_item): return self._dataCenterSet.get_clean_config(project_id, source_item, target_item) def insert_describe(self, project_id, tag, item, describe): self._dataCenterSet.insert_describe(project_id, tag, item, describe) def insert_cleaned_data(self, project_id, source_item, tag, series): self._dataCenterSet.insert_cleaned_data(project_id, source_item, tag, series) def _init_redis(self, config): self._redis = redis.Redis(host=config['host'], db=config['db'], port=config['port'], password=config['password']) @staticmethod def generate_tag(): return strftime('%y%m%d%H%M%S') def start(self): schedule.every(1).seconds.do(self.run) while True: schedule.run_pending() sleep(5)