from models.data_center.models import ItemMap, CleanQuality, CleanBind, ItemData class CleanConfig(): source_item: str = '' target_item: str = '' range_na: list = [] dropna: bool = False fillna: bool = False fillna_way: str = None drop_solitude: bool = False duplicate: bool = False def __init__(self, source_item, target_item, record): self.source_item = source_item self.target_item = source_item if 'range_na' in record: self.range_na = record['range_na'].split(',') if 'dropna' in record: self.dropna = record['dropna'] != 0 if 'fillna' in record: self.fillna = record['fillna'] != 0 if 'fillna_way' in record: self.fillna_way = record['fillna_way'] if 'drop_solitude' in record: self.drop_solitude = record['drop_solitude'] != 0 if 'duplicate' in record: self.duplicate = record['duplicate'] != 0 class DataCenterSet(): _item_data_model = None _item_map_model = None _clean_quality_model = None _clean_bind_model = None _item_map = {} def get_target_item(self, k): if k in self._item_map: return self._item_map[k] return None def insert_describe(self, project_id, tag, item, describe): record = { 'project_id': project_id, 'tag': "{}".format(tag), 'item': "{}".format(item), 'describe': describe.to_csv(), # 'unique': 'unique' in describe and describe['unique'] or 'null', # 'freq': 'freq' in describe and describe['freq'] or 'null', # 'top': 'top' in describe and describe['top'] or 'null', # 'min': 'min' in describe and describe['min'] or 'null', # 'max': 'max' in describe and describe['max'] or 'null', # 'mean': 'mean' in describe and describe['mean'] or 'null', # 'std': 'std' in describe and describe['std'] or 'null', # '25%': '25%' in describe and describe['25%'] or 'null', # '50%': '50%' in describe and describe['50%'] or 'null', # '75%': '75%' in describe and describe['75%'] or 'null', } self._clean_quality_model.insert(record) def get_clean_config(self, project_id, source_item, target_item): record = self._clean_bind_model.get(project_id, target_item) return CleanConfig(source_item, target_item, record) def insert_cleaned_data(self, project_id, source_item, tag, series): datas = self.__to_sql_values(project_id, source_item, tag, series) self._item_data_model.multi_insert(datas) def __to_sql_values(self, project_id, source_item, tag, series): sql_texts = [] for index, row in series.iterrows(): try: float(row.values[0]) except ValueError: continue sql_texts.append(str((project_id, source_item, row.values[0], row.values[1].strftime("%Y-%m-%d %H:%M:%S"), tag))) return sql_texts def __init__(self, config): self.config = config self._item_map_model = ItemMap(self.config['data_store_db']) self._clean_quality_model = CleanQuality(self.config['data_store_db']) self._clean_bind_model = CleanBind(self.config['data_store_db']) self._item_data_model = ItemData(self.config['data_store_db']) self._init_item_maps() def _init_item_maps(self): data = self._item_map_model.find_all() for one in data: self._item_map['{}.{}'.format(one[0], one[2])] = one[3]