dispatcher.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. from threading import Thread
  2. from services.clean_items.items import Base, DCTmp
  3. from numpy import float64
  4. dispatcher = {
  5. 'tmp': lambda: DCTmp()
  6. }
  7. class Dispatcher(Thread):
  8. _project_id = 0
  9. _tag = ''
  10. _s_item = ''
  11. _t_item = None
  12. _data = []
  13. _job = None
  14. def __init__(self, group_name, tag, t_item, data, job):
  15. super().__init__()
  16. self._tag = tag
  17. self._t_item = t_item
  18. self._data = data
  19. self._job = job
  20. self._project_id, self._s_item = group_name.split('.', 1)
  21. def run(self) -> None:
  22. if self._t_item not in dispatcher:
  23. return None
  24. cleaner = dispatcher[self._t_item]()
  25. # 读取 config
  26. config = self._job.get_clean_config(self._project_id, self._s_item, self._t_item)
  27. dataframe = Base.to_dataframe(self._data, self._t_item, float64)
  28. # 分析出数据质量
  29. describe = Base.describe(dataframe)
  30. # 数据质量入库
  31. self._job.insert_describe(self._project_id, self._tag, self._s_item, describe[self._t_item])
  32. # 进行清洗
  33. cleaner.clean(config, describe, dataframe)
  34. # 去除na值
  35. dataframe.dropna(inplace=True)
  36. if len(dataframe) > 0:
  37. # 入库
  38. self._job.insert_cleaned_data(self._project_id, self._s_item, self._tag, dataframe)
  39. class UnDispatcher(Thread):
  40. _project_id = 0
  41. _tag = ''
  42. _s_item = ''
  43. _t_item = None
  44. _data = []
  45. _job = None
  46. def __init__(self, group_name, tag, t_item, data, job):
  47. super().__init__()
  48. self._tag = tag
  49. self._t_item = t_item
  50. self._data = data
  51. self._job = job
  52. self._project_id, self._s_item = group_name.split('.', 1)
  53. def run(self):
  54. series = Base.to_dataframe(self._data, self._t_item, str)
  55. # 分析出数据质量
  56. describe = Base.describe(series)
  57. # 数据质量入库
  58. self._job.insert_describe(self._project_id, self._tag, self._s_item, describe[self._t_item])
  59. # 去除na的值
  60. series.dropna(inplace=True)
  61. if len(series) > 0:
  62. # 入库
  63. self._job.insert_cleaned_data(self._project_id, self._s_item, self._tag, series)