cleansing.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. #!encoding:utf8
  2. import yaml
  3. import os
  4. from services.jobs.job import Job
  5. '''
  6. 该程序为数据清洗文件,描述了数据清洗的过程的方法
  7. 执行步骤:
  8. 1. 生成一个处理批次标签 T
  9. 2. 从数据源拉取数据,并为这些数据标识标签 T
  10. 3. 将数据按 project_id + item 分组
  11. 4. 将分级的 item 名称转换为配置的目标 t_item
  12. 5. 执行t_item 的清洗策略
  13. 5.1 分析出 t_item 所在分组的数据质量,并入库
  14. 5.2 读取绑定在 t_item 的清洗策略
  15. 5.3 依据策略设置 na 的范围(被视为 na 的值)
  16. 5.4 识别离群数据点, 并将其视为 na
  17. 5.5 依据配置的填充规则,fillna 数据
  18. 5.6 数据入库
  19. 6. pub 清洗完成事件,标签为 T
  20. '''
  21. env = os.getenv('GTDATA_STORE', 'test')
  22. def main():
  23. config = load_config(env)
  24. job = Job(config)
  25. job.set_project_id([92])
  26. job.start()
  27. def load_config(env):
  28. config = {}
  29. if env != 'online':
  30. env = 'test'
  31. with open('../../config/{}.yaml'.format(env)) as fn:
  32. config = yaml.load(fn,Loader=yaml.FullLoader)
  33. return config
  34. if __name__ == '__main__':
  35. main()