123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- from models.data_center.models import ItemMap, CleanQuality, CleanBind, ItemData
- class CleanConfig():
- source_item: str = ''
- target_item: str = ''
- range_na: list = []
- dropna: bool = False
- fillna: bool = False
- fillna_way: str = None
- drop_solitude: bool = False
- duplicate: bool = False
- def __init__(self, source_item, target_item, record):
- self.source_item = source_item
- self.target_item = source_item
- if 'range_na' in record:
- self.range_na = record['range_na'].split(',')
- if 'dropna' in record:
- self.dropna = record['dropna'] != 0
- if 'fillna' in record:
- self.fillna = record['fillna'] != 0
- if 'fillna_way' in record:
- self.fillna_way = record['fillna_way']
- if 'drop_solitude' in record:
- self.drop_solitude = record['drop_solitude'] != 0
- if 'duplicate' in record:
- self.duplicate = record['duplicate'] != 0
- class DataCenterSet():
- _item_data_model = None
- _item_map_model = None
- _clean_quality_model = None
- _clean_bind_model = None
- _item_map = {}
- def get_target_item(self, k):
- if k in self._item_map:
- return self._item_map[k]
- return None
- def insert_describe(self, project_id, tag, item, describe):
- record = {
- 'project_id': project_id,
- 'tag': "{}".format(tag),
- 'item': "{}".format(item),
- 'describe': describe.to_csv(),
- # 'unique': 'unique' in describe and describe['unique'] or 'null',
- # 'freq': 'freq' in describe and describe['freq'] or 'null',
- # 'top': 'top' in describe and describe['top'] or 'null',
- # 'min': 'min' in describe and describe['min'] or 'null',
- # 'max': 'max' in describe and describe['max'] or 'null',
- # 'mean': 'mean' in describe and describe['mean'] or 'null',
- # 'std': 'std' in describe and describe['std'] or 'null',
- # '25%': '25%' in describe and describe['25%'] or 'null',
- # '50%': '50%' in describe and describe['50%'] or 'null',
- # '75%': '75%' in describe and describe['75%'] or 'null',
- }
- self._clean_quality_model.insert(record)
- def get_clean_config(self, project_id, source_item, target_item):
- record = self._clean_bind_model.get(project_id, target_item)
- return CleanConfig(source_item, target_item, record)
- def insert_cleaned_data(self, project_id, source_item, tag, series):
- datas = self.__to_sql_values(project_id, source_item, tag, series)
- self._item_data_model.multi_insert(datas)
- def __to_sql_values(self, project_id, source_item, tag, series):
- sql_texts = []
- for index, row in series.iterrows():
- try:
- float(row.values[0])
- except ValueError:
- continue
- sql_texts.append(str((project_id, source_item, row.values[0], row.values[1].strftime("%Y-%m-%d %H:%M:%S"), tag)))
- return sql_texts
- def __init__(self, config):
- self.config = config
- self._item_map_model = ItemMap(self.config['data_store_db'])
- self._clean_quality_model = CleanQuality(self.config['data_store_db'])
- self._clean_bind_model = CleanBind(self.config['data_store_db'])
- self._item_data_model = ItemData(self.config['data_store_db'])
- self._init_item_maps()
- def _init_item_maps(self):
- data = self._item_map_model.find_all()
- for one in data:
- self._item_map['{}.{}'.format(one[0], one[2])] = one[3]
-
|