#!encoding:utf8 import yaml import os from services.jobs.job import Job ''' 该程序为数据清洗文件,描述了数据清洗的过程的方法 执行步骤: 1. 生成一个处理批次标签 T 2. 从数据源拉取数据,并为这些数据标识标签 T 3. 将数据按 project_id + item 分组 4. 将分级的 item 名称转换为配置的目标 t_item 5. 执行t_item 的清洗策略 5.1 分析出 t_item 所在分组的数据质量,并入库 5.2 读取绑定在 t_item 的清洗策略 5.3 依据策略设置 na 的范围(被视为 na 的值) 5.4 识别离群数据点, 并将其视为 na 5.5 依据配置的填充规则,fillna 数据 5.6 数据入库 6. pub 清洗完成事件,标签为 T ''' env = os.getenv('GTDATA_STORE', 'test') def main(): config = load_config(env) job = Job(config) job.set_project_id([92]) job.start() def load_config(env): config = {} if env != 'online': env = 'test' with open('../../config/{}.yaml'.format(env)) as fn: config = yaml.load(fn,Loader=yaml.FullLoader) return config if __name__ == '__main__': main()