Selaa lähdekoodia

fit: init project

gaoyagang 1 vuosi sitten
vanhempi
commit
f5212e2256

+ 28 - 8
README.md

@@ -1,8 +1,28 @@
-# data_center
-
-a. 统一采集和存储水厂数据 
-b. 数据加工、清洗在数据中心完成,保持数据完整性和统一性 
-c. 提供高效的数据查询服务,以满足各业务对数据的需求 
-d. 提供数据展示层,以图表形式呈现数据 
-e. 将数据抽象成事件,并提供数据事件订阅服务 
-f. 数据预警事件,提供事件订阅服务 
+a. 统一采集和存储水厂数据
+b. 数据加工、清洗在数据中心完成,保持数据完整性和统一性
+c. 提供高效的数据查询服务,以满足各业务对数据的需求
+d. 提供数据展示层,以图表形式呈现数据
+e. 将数据抽象成事件,并提供数据事件订阅服务
+f. 数据预警事件,提供事件订阅服务
+
+
+
+目录结构:
+    .
+    ├── app                                        应用目录
+    │   ├── alarms                                 报警
+    │   ├── api                                    http 数据接口服务
+    │   ├── cleansing                              数据清洗
+    │   ├── events                                 事件
+    │   └── organization                           数据维度整理
+    ├── bin                                        外部脚本执行
+    ├── common                                     公用工具类
+    ├── config                                     通用配置文件
+    └── deploy                                     部署相关
+
+
+
+
+
+python 依赖安装:
+    pip install psycopg2 schedule pandas sshtunnel sqlalchemy pymysql redis

+ 48 - 0
app/cleansing/cleansing.py

@@ -0,0 +1,48 @@
+#!encoding:utf8
+
+import yaml
+import os
+from services.jobs.job import Job
+
+'''
+该程序为数据清洗文件,描述了数据清洗的过程的方法
+
+执行步骤:
+    1. 生成一个处理批次标签 T
+    2. 从数据源拉取数据,并为这些数据标识标签 T
+    3. 将数据按 project_id + item 分组
+    4. 将分级的 item 名称转换为配置的目标  t_item
+    5. 执行t_item 的清洗策略
+        5.1 分析出 t_item 所在分组的数据质量,并入库
+        5.2 读取绑定在 t_item 的清洗策略
+        5.3 依据策略设置 na 的范围(被视为 na 的值)
+        5.4 识别离群数据点, 并将其视为 na
+        5.5 依据配置的填充规则,fillna 数据
+        5.6 数据入库
+    6. pub 清洗完成事件,标签为 T
+'''
+
+env = os.getenv('GTDATA_STORE', 'test')
+
+def main():
+    config = load_config(env)
+
+    job = Job(config)
+
+    job.set_project_id([92])
+
+    job.start()
+
+def load_config(env):
+    config = {}
+
+    if env != 'online':
+        env = 'test'
+
+    with open('../../config/{}.yaml'.format(env)) as fn:
+        config = yaml.load(fn,Loader=yaml.FullLoader)
+
+    return config
+
+if __name__ == '__main__':
+    main()

+ 0 - 0
app/cleansing/models/__init__.py


+ 23 - 0
app/cleansing/models/data_center/Base.py

@@ -0,0 +1,23 @@
+
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy import create_engine,text
+
+class Base():
+
+    __engine = None
+
+    def __init__(self, db):
+        if Base.__engine is None:
+            Base.__engine = create_engine('mysql+pymysql://{user}:{password}@{host}:{port}/{dbname}'.format(**db))
+
+    def execute(self, query):
+        with Base.__engine.connect() as conn:
+            conn.execute(text(query))
+            conn.commit()
+
+    @staticmethod
+    def get_session():
+        DBSession = sessionmaker(bind=Base.__engine)
+        session = DBSession()
+
+        return session

+ 80 - 0
app/cleansing/models/data_center/models.py

@@ -0,0 +1,80 @@
+from models.data_center.Base import Base
+from sqlalchemy import text
+
+class ItemData(Base):
+    _table_name = 'dc_item_data'
+    
+    def multi_insert(self, datas):
+        if len(datas) == 0:
+            return
+        query = "INSERT INTO `dc_item_data` (`project_id`, `item`, `val`, `htime`, `tag`)VALUES {}".format(','.join(datas))
+        print(query, "\n")
+        super().execute(query)
+
+    def __init__(self, db):
+        super().__init__(db)
+
+class CleanBind(Base):
+    _table_name = 'dc_clean_bind'
+    
+    def get(self, project_id, target_item):
+        session = Base.get_session()
+
+        query = 'select * from {} where project_id={} and `item` = \'{}\' limit 1'.format(self._table_name, project_id, target_item)
+
+        try:
+            data = session.execute(text(query)).one()
+        except Exception:
+            return None
+        finally:
+            session.close()
+
+        return data
+
+    def __init__(self, db):
+        super().__init__(db)
+
+class CleanQuality(Base):
+    _table_name = 'dc_clean_quality'
+    
+    def insert(self, data):
+        # query = "INSERT INTO `{}` (`project_id`, `tag`, `item`, `count`, `unique`, `min`, `max`, `mean`, `std`, `p25`, `p50`, `p75`, `top`, `freq`) VALUES({project_id},{tag},{item},{count},{unique},'{min}','{max}','{mean}',{std},{25%},{50%},{75%},'{top}','{freq}')".format(self._table_name, **data)
+        query = "INSERT INTO `{}` (`project_id`, `tag`, `item`, `describe`) VALUES({project_id},'{tag}','{item}','{describe}')".format(self._table_name, **data)
+        super().execute(query)
+
+    def __init__(self, db):
+        super().__init__(db)
+
+class ItemMap(Base):
+
+    engine = None
+    _session = None
+    _table_name = 'dc_item_map'
+
+    def get(self, project_id, source_item, fields=['project_id','device_code','source_item','target_item']):
+        session = Base.get_session()
+
+        query = 'select {} from {} where project_id={} and `item_name` = \'{}\' limit 1'.format(','.join(fields), self._table_name, project_id, item_name)
+
+        try:
+            data = session.execute(text(query)).one()
+        except Exception:
+            return None
+        finally:
+            session.close()
+
+        return data
+
+    def find_all(self):
+        session = Base.get_session()
+
+        query = 'select project_id,device_code,source_item,target_item from {}'.format(self._table_name)
+        try:
+            data = session.execute(text(query)).all()
+        finally:
+            session.close()
+
+        return data
+
+    def __init__(self, db):
+        super().__init__(db)

+ 67 - 0
app/cleansing/models/source/mgj.py

@@ -0,0 +1,67 @@
+#!coding:utf-8
+
+import psycopg2
+
+'''
+select id from public.dataitem where name='Group_N_Reflux_Tank_Level';
+
+
+select val,htime from his.itemdata_20230925 where itemid='25';
+'''
+
+class MGJ():
+    conn = None
+    item_id_map = {}
+
+    def get_item_id(self, devid, name):
+        key = "{}_{}".format(devid, name)
+        if key in self.item_id_map:
+            return self.item_id_map[key]
+        query = "select id from public.dataitem where devid={} and name='{}' limit 1".format(devid, name)
+        print(query)
+        cur = self.conn.cursor()
+
+        cur.execute(query)
+
+        row = cur.fetchone()
+
+        cur.close()
+
+        if row is not None:
+            self.item_id_map[key] = row[0]
+            return row[0]
+
+        return '-'
+    
+    def get(self, day, devid, name, et):
+        itemid = self.get_item_id(devid, name)
+        if itemid == 0:
+            return None
+
+        query = "select val,htime from his.itemdata_{} where itemid='{}' and createtime <= '{}' order by createtime desc limit 1".format(day, itemid, et)
+        print(query)
+        cur = self.conn.cursor()
+        cur.execute(query)
+        row = cur.fetchone()
+
+        if row is not None:
+            return row[0]
+        
+        return '-'
+
+    def find(self, ymd, devid, name, start=0, limit=1000):
+        itemid = self.get_item_id(devid, name)
+        if itemid == 0:
+            return None
+
+        query = "select val,htime from his.itemdata_{} where itemid='{}' order by createtime desc offset {} limit {}".format(ymd, itemid, start, limit)
+        print(query)
+        cur = self.conn.cursor()
+        cur.execute(query)
+        rows = cur.fetchall()
+        cur.close()
+
+        return rows
+
+    def __init__(self, mgj_db):
+        self.conn = psycopg2.connect(database=mgj_db['dbname'], user=mgj_db['user'], password=mgj_db['password'], host=mgj_db['host'], port=mgj_db['port'])

+ 123 - 0
app/cleansing/models/source/opc.py

@@ -0,0 +1,123 @@
+#!coding:utf-8
+
+
+from sshtunnel import SSHTunnelForwarder
+from sqlalchemy import create_engine,text
+from sqlalchemy.orm import sessionmaker
+
+
+class Opc():
+
+    engine = None
+    _session = None
+
+    def get(self, project_id, item_name, fields=['item_value','c_time'], st='', et=''):
+        DBSession = sessionmaker(bind=self.engine)
+
+        # 创建session对象
+        session = DBSession()
+        table_name = self.routeTableName(item_name)
+        ts_cond = self.tsCond(st, et)
+
+        query = 'select {} from {} where `item_name` = \'{}\' {} order by ts desc limit 1'.format(','.join(fields), table_name, item_name, ts_cond)
+
+        print(query)
+        try:
+            data = session.execute(text(query)).one()
+        except Exception:
+            return None
+        finally:
+            session.close()
+
+        return data
+
+    def find(self, project_id, item_name, fields=['item_value','c_time'], st='', et='', start=0, limit=10):
+        DBSession = sessionmaker(bind=self.engine)
+
+        # 创建session对象
+        session = DBSession()
+        table_name = self.routeTableName(item_name)
+        ts_cond = self.tsCond(st, et)
+
+        query = 'select {} from {} where `item_name` = \'{}\' {} order by ts desc limit {},{}'.format(','.join(fields), table_name, item_name, ts_cond, start, limit)
+        try:
+            data = session.execute(text(query)).all()
+        finally:
+            session.close()
+
+        return data
+
+    def find_increment(self, table_name, fields=['id','project_id','item_name','item_value','ts'], last_id=0, limit=10000):
+        DBSession = sessionmaker(bind=self.engine)
+
+        # 创建session对象
+        self._session = DBSession()
+
+        query = 'select {} from {} where id>{} limit {}'.format(','.join(fields), table_name, last_id, limit)
+        try:
+            data = self._session.execute(text(query)).all()
+        finally:
+            self._session.close()
+
+        return data
+
+    def multi_items_find(self, project_id, item_names=[], st='', et='', start=0, limit=10):
+        DBSession = sessionmaker(bind=self.engine)
+
+        # 创建session对象
+        session = DBSession()
+        ts_cond = self.tsCond(st, et)
+        result = []
+
+        try:
+            for name in item_names:
+                table_name = self.routeTableName(item_name)
+                query = 'select {} from {} where `item_name` = \'{}\' {} order by ts desc limit {},{}'.format(','.join(fields), table_name, name, ts_cond, start, limit)
+                data = session.execute(text(query)).all()
+
+                result.append(data)
+        finally:
+            session.close()
+
+        return result
+
+    def tsCond(self, st, et):
+        ts_cond = ''
+        if len(st) > 0 and len(et) > 0:
+            ts_cond = ' and ts>=\'{}\' and ts<\'{}\' '.format(st, et)
+
+        if len(st) > 0:
+            ts_cond = ' and ts>=\'{}\' '.format(st)
+
+        if len(et) > 0:
+            ts_cond = ' and ts<=\'{}\' '.format(et)
+
+        return ts_cond
+
+    def routeTableName(self, item_name):
+        idx = self.bkdrhash(item_name)
+
+        return 'scada_data_{}'.format(idx)
+
+    def bkdrhash(self, s):
+        seed = 131
+        hash = 0
+        for i in range(0, len(s)):
+            hash = (hash * seed) + ord(s[i])
+
+        return (hash & 0x7FFFFFFF) % 40
+
+    def __init__(self, scada_db):
+        
+        tunnel = SSHTunnelForwarder(
+            (scada_db['ssh-host'], scada_db['ssh-port']),
+            ssh_username=scada_db['ssh-username'],
+            ssh_password=scada_db['ssh-password'],
+            remote_bind_address=(scada_db['host'], scada_db['port'])
+        )
+
+        tunnel.start()
+
+        scada_db['port'] = str(tunnel.local_bind_port)
+        # print('----------------------------------------------------mysql+pymysql://{user}:{password}@{host}:{port}/{dbname}'.format(**scada_db))
+        self.engine = create_engine('mysql+pymysql://{user}:{password}@{host}:{port}/{dbname}'.format(**scada_db))

+ 0 - 0
app/cleansing/services/__init__.py


+ 0 - 0
app/cleansing/services/clean_items/__init__.py


+ 81 - 0
app/cleansing/services/clean_items/dispatcher.py

@@ -0,0 +1,81 @@
+
+from threading import Thread
+from services.clean_items.items import Base, DCTmp
+from numpy import float64
+
+dispatcher = {
+    'tmp': lambda: DCTmp()
+}
+
+class Dispatcher(Thread):
+    _project_id = 0
+    _tag = ''
+    _s_item = ''
+    _t_item = None
+    _data = []
+    _job = None
+
+    def __init__(self, group_name, tag, t_item, data, job):
+        super().__init__()
+        self._tag = tag
+        self._t_item = t_item
+        self._data = data
+        self._job = job
+
+        self._project_id, self._s_item = group_name.split('.', 1)
+
+    def run(self) -> None:
+        if self._t_item not in dispatcher:
+            return None
+
+        cleaner = dispatcher[self._t_item]()
+
+        # 读取 config
+        config = self._job.get_clean_config(self._project_id, self._s_item, self._t_item)
+
+        dataframe = Base.to_dataframe(self._data, self._t_item, float64)
+
+        # 分析出数据质量
+        describe = Base.describe(dataframe)
+        # 数据质量入库
+        self._job.insert_describe(self._project_id, self._tag, self._s_item, describe[self._t_item])
+
+        # 进行清洗
+        cleaner.clean(config, describe, dataframe)
+        # 去除na值
+        dataframe.dropna(inplace=True)
+
+        if len(dataframe) > 0:
+            # 入库
+            self._job.insert_cleaned_data(self._project_id, self._s_item, self._tag, dataframe)
+
+class UnDispatcher(Thread):
+    _project_id = 0
+    _tag = ''
+    _s_item = ''
+    _t_item = None
+    _data = []
+    _job = None
+
+    def __init__(self, group_name, tag, t_item, data, job):
+        super().__init__()
+        self._tag = tag
+        self._t_item = t_item
+        self._data = data
+        self._job = job
+
+        self._project_id, self._s_item = group_name.split('.', 1)
+
+    def run(self):
+        series = Base.to_dataframe(self._data, self._t_item, str)
+
+         # 分析出数据质量
+        describe = Base.describe(series)
+        # 数据质量入库
+        self._job.insert_describe(self._project_id, self._tag, self._s_item, describe[self._t_item])
+
+        # 去除na的值
+        series.dropna(inplace=True)
+        if len(series) > 0:
+            # 入库
+            self._job.insert_cleaned_data(self._project_id, self._s_item, self._tag, series)

+ 124 - 0
app/cleansing/services/clean_items/items.py

@@ -0,0 +1,124 @@
+#!encoding:utf8
+import pandas as pd
+from decimal import Decimal,InvalidOperation
+from numpy import float128
+
+class Base():
+    @staticmethod
+    def describe(series):
+        return series.describe(include='all')
+
+    @staticmethod
+    def to_dataframe(data, target_item, dtype):
+        s1v = []
+        htimev = []
+
+        for one in data:
+            if dtype == str:
+                s1v.append(one[0])
+                htimev.append(one[1])
+            else:
+                try:
+                    v = Decimal(one[0])
+                except InvalidOperation:
+                    v = None
+                finally:
+                    s1v.append(v)
+                    htimev.append(one[1])
+
+        s1 = pd.Series(s1v, dtype=dtype)
+        s2 = pd.Series(htimev)
+        return pd.DataFrame({target_item: s1, 'htime': s2})
+
+    @staticmethod
+    def outlier_for_std(dataframe, source_item, bit=2) -> pd.DataFrame:
+        '''
+        通过标准方差来计算离群点
+        '''
+        mean, std = dataframe[source_item].mean(), dataframe[source_item].std()
+        upb, lob = mean + std * bit, mean - std * bit
+        
+        outliers = df[(df[source_item] < lob) | (df[source_item] > upb)]
+
+        return outliers
+
+    @staticmethod
+    def outlier_for_iqr(dataframe, source_item, bit=1.5) -> pd.DataFrame:
+        '''
+        通过四分位极差法来计算离群点
+        '''
+        q1, q3 = dataframe[source_item].quantile(0.25), dataframe[source_item].quantile(0.75)
+        iqr = q3 - q1
+        upb, lob = mean + iqr * bit, mean - iqr * bit
+        
+        outliers = df[(df[source_item] < lob) | (df[source_item] > upb)]
+
+        return outliers
+
+    @staticmethod
+    def fill_value_by_na(dataframe, source_item, fill_way):
+        if fill_way == 'mean':
+            return dataframe[source_item].mean()
+        
+        if fill_way == 'median':
+            return dataframe[source_item].median()
+
+        if fill_way == 'mode':
+            if len(dataframe[source_item].mode()) >= 1:
+                return dataframe[source_item].mode()[0]
+
+        if fill_way == 'median':
+            return dataframe[source_item].median()
+
+        if fill_way == 'min':
+            return dataframe[source_item].min()
+
+        if fill_way == 'max':
+            return dataframe[source_item].max()
+
+        return None
+
+
+class DCTmp(Base):
+    '''
+        1. range_na 加载,并设置 na的取值范围
+        2. dropna 是否为 True, 
+            2.1 True 则进行删除操作
+            2.2 False 则执行 3
+        3. 是否删除离群点
+            3.1 True时表示需要删除离群点
+                3.1.1 识别离群点
+                3.1.2 删除离群点数据 end
+            3.2 False 表示不删除离群点
+        4. 是否进行数据填充
+            4.1 False 放弃填充,原样返回 end
+            4.2 True 需要填充数据
+                4.2.1 读取填充方式,并计算出待填充的值
+                4.2.2 使用fillna进行填充 end
+    '''
+
+    def clean(self, config, describe, dataframe) -> None:
+        # step 1
+
+        # step 2
+        if config.dropna:
+            # step 2.1
+            dataframe.dropna(subset=config.source_item, inplace=True)
+        # step 4
+        elif config.fillna:
+            # step 4.2.1
+            fill_value = Base.fill_value_by_na(dataframe, config.fillna_way)
+            if fill_value is not None:
+                # step 4.2.2
+                dataframe.fillna(value={config.source_item:fill_value}, inplace=True)
+
+        # step 3
+        if config.drop_solitude:
+            # step 3.1.1
+            outliers = Base.outlier_for_std(dataframe, config.source_item)
+            if len(outliers) > 0:
+                # step 3.1.2
+                dataframe.drop(index=outliers.index, inplace=True)
+        else:
+            # 3.2
+            pass

+ 0 - 0
app/cleansing/services/jobs/__init__.py


+ 104 - 0
app/cleansing/services/jobs/data_center_set.py

@@ -0,0 +1,104 @@
+from models.data_center.models import ItemMap, CleanQuality, CleanBind, ItemData
+
+class CleanConfig():
+    source_item: str = ''
+    target_item: str = ''
+    range_na: list = []
+    dropna: bool = False
+    fillna: bool = False
+    fillna_way: str = None
+    drop_solitude: bool = False
+    duplicate: bool = False
+
+    def __init__(self, source_item, target_item, record):
+        self.source_item = source_item
+        self.target_item = source_item
+        if 'range_na' in record:
+            self.range_na = record['range_na'].split(',')
+
+        if 'dropna' in record:
+            self.dropna = record['dropna'] != 0
+
+        if 'fillna' in record:
+            self.fillna = record['fillna'] != 0
+
+        if 'fillna_way' in record:
+            self.fillna_way = record['fillna_way']
+
+        if 'drop_solitude' in record:
+            self.drop_solitude = record['drop_solitude'] != 0
+
+        if 'duplicate' in record:
+            self.duplicate = record['duplicate'] != 0
+
+class DataCenterSet():
+    _item_data_model = None
+    _item_map_model = None
+    _clean_quality_model = None
+    _clean_bind_model = None
+    _item_map = {}
+
+    def get_target_item(self, k):
+        if k in self._item_map:
+            return self._item_map[k]
+
+        return None
+
+
+    def insert_describe(self, project_id, tag, item, describe):
+        record = {
+            'project_id': project_id,
+            'tag': "{}".format(tag),
+            'item': "{}".format(item),
+            'describe': describe.to_csv(),
+            # 'unique': 'unique' in describe and describe['unique'] or 'null',
+            # 'freq': 'freq' in describe and describe['freq'] or 'null',
+            # 'top': 'top' in describe and describe['top'] or 'null',
+            # 'min': 'min' in describe and describe['min'] or 'null',
+            # 'max': 'max' in describe and describe['max'] or 'null',
+            # 'mean': 'mean' in describe and describe['mean'] or 'null',
+            # 'std': 'std' in describe and describe['std'] or 'null',
+            # '25%': '25%' in describe and describe['25%'] or 'null',
+            # '50%': '50%' in describe and describe['50%'] or 'null',
+            # '75%': '75%' in describe and describe['75%'] or 'null',
+        }
+
+        self._clean_quality_model.insert(record)
+
+    def get_clean_config(self, project_id, source_item, target_item):
+        record = self._clean_bind_model.get(project_id, target_item)
+
+        return CleanConfig(source_item, target_item, record)
+
+    def insert_cleaned_data(self, project_id, source_item, tag, series):
+        datas = self.__to_sql_values(project_id, source_item, tag, series)
+        self._item_data_model.multi_insert(datas)
+
+    def __to_sql_values(self, project_id, source_item, tag, series):
+        sql_texts = []
+        for index, row in series.iterrows():
+            try:
+                float(row.values[0])
+            except ValueError:
+                continue
+            sql_texts.append(str((project_id, source_item, row.values[0], row.values[1].strftime("%Y-%m-%d %H:%M:%S"), tag)))     
+        return sql_texts
+
+
+    def __init__(self, config):
+        self.config = config
+        self._item_map_model = ItemMap(self.config['data_store_db'])
+        self._clean_quality_model = CleanQuality(self.config['data_store_db'])
+        self._clean_bind_model = CleanBind(self.config['data_store_db'])
+        self._item_data_model = ItemData(self.config['data_store_db'])
+
+        self._init_item_maps()
+
+    def _init_item_maps(self):
+        data = self._item_map_model.find_all()
+
+        for one in data:
+            self._item_map['{}.{}'.format(one[0], one[2])] = one[3]
+
+
+    

+ 74 - 0
app/cleansing/services/jobs/job.py

@@ -0,0 +1,74 @@
+
+from time import strftime, sleep
+from services.jobs.source_data_set import SourceDataSet
+from services.jobs.data_center_set import DataCenterSet
+import schedule
+from services.clean_items.dispatcher import Dispatcher,UnDispatcher
+import redis
+
+
+class Job():
+    _sourceDataSet = None
+    _dataCenterSet = None
+    _project_ids = []
+    _item_maps = {}
+    _redis = None
+    
+    def __init__(self, config):
+        self.config = config
+        self._sourceDataSet = SourceDataSet(self.config)
+        self._dataCenterSet = DataCenterSet(self.config)
+        self._init_redis(self.config['redis'])
+
+    def set_project_id(self, project_ids=[]):
+        self._project_ids = project_ids
+
+    def _init_item_map(self):
+        self._item_maps = ''
+
+    def run(self):
+        tag = Job.generate_tag()
+
+        # 获取全量新增的数据
+        data = self._sourceDataSet.fetch_increment(10000)
+
+        # 数据按 project_id + item 分组
+        group_data = self._sourceDataSet.group_by_items(data, self._project_ids)
+
+        # 映射到目标 target_item
+        for group_name in group_data:
+            target_item = self._dataCenterSet.get_target_item(group_name)
+            if target_item is not None:
+                # 执行target_item 的清洗策略
+                Dispatcher(group_name, tag, target_item, group_data[group_name], self).start()
+            else:
+                # 不需要清洗的点位直接入库
+                UnDispatcher(group_name, tag, target_item, group_data[group_name], self).start()
+
+         # 发送一个清洗完成的消息到队列中
+        self._redis.rpush('data-center:mq:event', tag)
+        self._redis.rpush('data-center:mq:alarm', tag)
+        self._redis.rpush('data-center:mq:isomerism', tag)
+
+    def get_clean_config(self, project_id, source_item, target_item):
+        return self._dataCenterSet.get_clean_config(project_id, source_item, target_item)
+
+    def insert_describe(self, project_id, tag, item, describe):
+        self._dataCenterSet.insert_describe(project_id, tag, item, describe)
+
+    def insert_cleaned_data(self, project_id, source_item, tag, series):
+        self._dataCenterSet.insert_cleaned_data(project_id, source_item, tag, series)
+
+    def _init_redis(self, config):
+        self._redis = redis.Redis(host=config['host'], db=config['db'], port=config['port'], password=config['password'])
+
+    @staticmethod
+    def generate_tag():
+        return strftime('%y%m%d%H%M%S')
+
+    def start(self):
+        schedule.every(1).seconds.do(self.run)
+
+        while True:
+            schedule.run_pending()
+            sleep(5)

+ 41 - 0
app/cleansing/services/jobs/source_data_set.py

@@ -0,0 +1,41 @@
+from models.source.opc import Opc
+from models.source.mgj import MGJ
+
+class SourceDataSet():
+    _opc_model = None
+    _mgj_model = None
+
+    _last_id_dict = {}
+
+    def __init__(self, config):
+        self.config = config
+        self._opc_model = Opc(self.config['scada_db'])
+
+    def fetch_increment(self,limit):
+        result = []
+        for i in range(0, 40):
+            table_name = 'scada_data_{}'.format(i)
+
+            last_id = 0
+            if table_name in self._last_id_dict:
+                last_id = self._last_id_dict[table_name]
+
+            data = self._opc_model.find_increment(table_name, last_id=last_id,limit=limit)
+            if len(data):
+                self._last_id_dict[table_name] = data[len(data) - 1][0]
+                result.extend(data)
+
+        return result
+
+    def group_by_items(self, data, filter_project=[]):
+        result = {}
+        for one in data:
+            if len(filter_project) > 0 and one[1] not in filter_project:
+                continue
+            key = '{}.{}'.format(one[1], one[2])
+            if key not in result:
+                result[key] = []
+
+            result[key].append((one[3], one[4]))
+
+        return result

+ 0 - 0
config/online.yaml


+ 31 - 0
config/test.yaml

@@ -0,0 +1,31 @@
+scada_db:
+  ssh-host: "44.4"
+  ssh-port: 22
+  ssh-username: gaoyagang
+  ssh-password: kyodbb596322
+  host: 127.0.0.1
+  port: 3306
+  user: root
+  password: Greentech20200508**2
+  dbname: ws_scada
+
+mgj_db:
+  host: 121.43.178.151
+  port: 5432
+  user: postgres
+  password": Greentech@017
+  dbname": sdm
+
+
+data_store_db:
+  host: 192.168.60.201
+  port: 4000
+  user: ws_data
+  password: c712f89fc4f8edaf30e41b828f4e3d26
+  dbname: ws_data
+
+redis:
+  host: 47.96.12.136
+  port: 6379
+  password: 
+  db: 0

+ 3 - 0
go.mod

@@ -0,0 +1,3 @@
+module GtDataStore
+
+go 1.19