from threading import Thread from services.clean_items.items import Base, DCTmp from numpy import float64 dispatcher = { 'tmp': lambda: DCTmp() } class Dispatcher(Thread): _project_id = 0 _tag = '' _s_item = '' _t_item = None _data = [] _job = None def __init__(self, group_name, tag, t_item, data, job): super().__init__() self._tag = tag self._t_item = t_item self._data = data self._job = job self._project_id, self._s_item = group_name.split('.', 1) def run(self) -> None: if self._t_item not in dispatcher: return None cleaner = dispatcher[self._t_item]() # 读取 config config = self._job.get_clean_config(self._project_id, self._s_item, self._t_item) dataframe = Base.to_dataframe(self._data, self._t_item, float64) # 分析出数据质量 describe = Base.describe(dataframe) # 数据质量入库 self._job.insert_describe(self._project_id, self._tag, self._s_item, describe[self._t_item]) # 进行清洗 cleaner.clean(config, describe, dataframe) # 去除na值 dataframe.dropna(inplace=True) if len(dataframe) > 0: # 入库 self._job.insert_cleaned_data(self._project_id, self._s_item, self._tag, dataframe) class UnDispatcher(Thread): _project_id = 0 _tag = '' _s_item = '' _t_item = None _data = [] _job = None def __init__(self, group_name, tag, t_item, data, job): super().__init__() self._tag = tag self._t_item = t_item self._data = data self._job = job self._project_id, self._s_item = group_name.split('.', 1) def run(self): series = Base.to_dataframe(self._data, self._t_item, str) # 分析出数据质量 describe = Base.describe(series) # 数据质量入库 self._job.insert_describe(self._project_id, self._tag, self._s_item, describe[self._t_item]) # 去除na的值 series.dropna(inplace=True) if len(series) > 0: # 入库 self._job.insert_cleaned_data(self._project_id, self._s_item, self._tag, series)