data_center_set.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. from models.data_center.models import ItemMap, CleanQuality, CleanBind, ItemData
  2. class CleanConfig():
  3. source_item: str = ''
  4. target_item: str = ''
  5. range_na: list = []
  6. dropna: bool = False
  7. fillna: bool = False
  8. fillna_way: str = None
  9. drop_solitude: bool = False
  10. duplicate: bool = False
  11. def __init__(self, source_item, target_item, record):
  12. self.source_item = source_item
  13. self.target_item = source_item
  14. if 'range_na' in record:
  15. self.range_na = record['range_na'].split(',')
  16. if 'dropna' in record:
  17. self.dropna = record['dropna'] != 0
  18. if 'fillna' in record:
  19. self.fillna = record['fillna'] != 0
  20. if 'fillna_way' in record:
  21. self.fillna_way = record['fillna_way']
  22. if 'drop_solitude' in record:
  23. self.drop_solitude = record['drop_solitude'] != 0
  24. if 'duplicate' in record:
  25. self.duplicate = record['duplicate'] != 0
  26. class DataCenterSet():
  27. _item_data_model = None
  28. _item_map_model = None
  29. _clean_quality_model = None
  30. _clean_bind_model = None
  31. _item_map = {}
  32. def get_target_item(self, k):
  33. if k in self._item_map:
  34. return self._item_map[k]
  35. return None
  36. def insert_describe(self, project_id, tag, item, describe):
  37. record = {
  38. 'project_id': project_id,
  39. 'tag': "{}".format(tag),
  40. 'item': "{}".format(item),
  41. 'describe': describe.to_csv(),
  42. # 'unique': 'unique' in describe and describe['unique'] or 'null',
  43. # 'freq': 'freq' in describe and describe['freq'] or 'null',
  44. # 'top': 'top' in describe and describe['top'] or 'null',
  45. # 'min': 'min' in describe and describe['min'] or 'null',
  46. # 'max': 'max' in describe and describe['max'] or 'null',
  47. # 'mean': 'mean' in describe and describe['mean'] or 'null',
  48. # 'std': 'std' in describe and describe['std'] or 'null',
  49. # '25%': '25%' in describe and describe['25%'] or 'null',
  50. # '50%': '50%' in describe and describe['50%'] or 'null',
  51. # '75%': '75%' in describe and describe['75%'] or 'null',
  52. }
  53. self._clean_quality_model.insert(record)
  54. def get_clean_config(self, project_id, source_item, target_item):
  55. record = self._clean_bind_model.get(project_id, target_item)
  56. return CleanConfig(source_item, target_item, record)
  57. def insert_cleaned_data(self, project_id, source_item, tag, series):
  58. datas = self.__to_sql_values(project_id, source_item, tag, series)
  59. self._item_data_model.multi_insert(datas)
  60. def __to_sql_values(self, project_id, source_item, tag, series):
  61. sql_texts = []
  62. for index, row in series.iterrows():
  63. try:
  64. float(row.values[0])
  65. except ValueError:
  66. continue
  67. sql_texts.append(str((project_id, source_item, row.values[0], row.values[1].strftime("%Y-%m-%d %H:%M:%S"), tag)))
  68. return sql_texts
  69. def __init__(self, config):
  70. self.config = config
  71. self._item_map_model = ItemMap(self.config['data_store_db'])
  72. self._clean_quality_model = CleanQuality(self.config['data_store_db'])
  73. self._clean_bind_model = CleanBind(self.config['data_store_db'])
  74. self._item_data_model = ItemData(self.config['data_store_db'])
  75. self._init_item_maps()
  76. def _init_item_maps(self):
  77. data = self._item_map_model.find_all()
  78. for one in data:
  79. self._item_map['{}.{}'.format(one[0], one[2])] = one[3]