123456789101112131415161718192021222324252627282930313233343536373839404142434445464748 |
- #!encoding:utf8
- import yaml
- import os
- from services.jobs.job import Job
- '''
- 该程序为数据清洗文件,描述了数据清洗的过程的方法
- 执行步骤:
- 1. 生成一个处理批次标签 T
- 2. 从数据源拉取数据,并为这些数据标识标签 T
- 3. 将数据按 project_id + item 分组
- 4. 将分级的 item 名称转换为配置的目标 t_item
- 5. 执行t_item 的清洗策略
- 5.1 分析出 t_item 所在分组的数据质量,并入库
- 5.2 读取绑定在 t_item 的清洗策略
- 5.3 依据策略设置 na 的范围(被视为 na 的值)
- 5.4 识别离群数据点, 并将其视为 na
- 5.5 依据配置的填充规则,fillna 数据
- 5.6 数据入库
- 6. pub 清洗完成事件,标签为 T
- '''
- env = os.getenv('GTDATA_STORE', 'test')
- def main():
- config = load_config(env)
- job = Job(config)
- job.set_project_id([92])
- job.start()
- def load_config(env):
- config = {}
- if env != 'online':
- env = 'test'
- with open('../../config/{}.yaml'.format(env)) as fn:
- config = yaml.load(fn,Loader=yaml.FullLoader)
- return config
- if __name__ == '__main__':
- main()
|