123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- from threading import Thread
- from services.clean_items.items import Base, DCTmp
- from numpy import float64
- dispatcher = {
- 'tmp': lambda: DCTmp()
- }
- class Dispatcher(Thread):
- _project_id = 0
- _tag = ''
- _s_item = ''
- _t_item = None
- _data = []
- _job = None
- def __init__(self, group_name, tag, t_item, data, job):
- super().__init__()
- self._tag = tag
- self._t_item = t_item
- self._data = data
- self._job = job
- self._project_id, self._s_item = group_name.split('.', 1)
- def run(self) -> None:
- if self._t_item not in dispatcher:
- return None
- cleaner = dispatcher[self._t_item]()
- # 读取 config
- config = self._job.get_clean_config(self._project_id, self._s_item, self._t_item)
- dataframe = Base.to_dataframe(self._data, self._t_item, float64)
- # 分析出数据质量
- describe = Base.describe(dataframe)
- # 数据质量入库
- self._job.insert_describe(self._project_id, self._tag, self._s_item, describe[self._t_item])
- # 进行清洗
- cleaner.clean(config, describe, dataframe)
- # 去除na值
- dataframe.dropna(inplace=True)
- if len(dataframe) > 0:
- # 入库
- self._job.insert_cleaned_data(self._project_id, self._s_item, self._tag, dataframe)
- class UnDispatcher(Thread):
- _project_id = 0
- _tag = ''
- _s_item = ''
- _t_item = None
- _data = []
- _job = None
- def __init__(self, group_name, tag, t_item, data, job):
- super().__init__()
- self._tag = tag
- self._t_item = t_item
- self._data = data
- self._job = job
- self._project_id, self._s_item = group_name.split('.', 1)
- def run(self):
- series = Base.to_dataframe(self._data, self._t_item, str)
- # 分析出数据质量
- describe = Base.describe(series)
- # 数据质量入库
- self._job.insert_describe(self._project_id, self._tag, self._s_item, describe[self._t_item])
- # 去除na的值
- series.dropna(inplace=True)
- if len(series) > 0:
- # 入库
- self._job.insert_cleaned_data(self._project_id, self._s_item, self._tag, series)
|