dcitemhistorydatamodel.go 12 KB


  1. package model
  2. import (
  3. "GtDataStore/app/cmd/organization/pb"
  4. "context"
  5. "database/sql"
  6. "fmt"
  7. "github.com/zeromicro/go-zero/core/stores/sqlc"
  8. "github.com/zeromicro/go-zero/core/stores/sqlx"
  9. "strings"
  10. "time"
  11. )
  12. var _ DcItemHistoryDataModel = (*customDcItemHistoryDataModel)(nil)
  13. type (
  14. // DcItemHistoryDataModel is an interface to be customized, add more methods here,
  15. // and implement the added methods in customDcItemHistoryDataModel.
  16. DcItemHistoryDataModel interface {
  17. dcItemHistoryDataModel
  18. MultiInsert(ctx context.Context, projectId int64, datas []*pb.ItemHistoryData) (int64, error)
  19. QueryHistoryDataByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) ([]*ItemHistoryData, error)
  20. QueryHistoryDataMaxMinByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) (*MaxMinData, error)
  21. QueryHistoryDataFirstByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) (*ItemHistoryData, error)
  22. QueryHistoryDataLastByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) (*ItemHistoryData, error)
  23. }
  24. customDcItemHistoryDataModel struct {
  25. *defaultDcItemHistoryDataModel
  26. }
  27. MaxMinData struct {
  28. MaxVal sql.NullFloat64 `db:"max_val"` // 最大值
  29. MinVal sql.NullFloat64 `db:"min_val"` // 最小值
  30. }
  31. ItemHistoryData struct {
  32. ItemName string `db:"item_name"` // 点位名
  33. Val float64 `db:"val"` // 值
  34. HTime time.Time `db:"h_time"` // 采集数据时间
  35. }
  36. )
  37. // NewDcItemHistoryDataModel returns a model for the database table.
  38. func NewDcItemHistoryDataModel(conn sqlx.SqlConn) DcItemHistoryDataModel {
  39. return &customDcItemHistoryDataModel{
  40. defaultDcItemHistoryDataModel: newDcItemHistoryDataModel(conn),
  41. }
  42. }
  43. func (m *defaultDcItemHistoryDataModel) MultiInsert(ctx context.Context, projectId int64, datas []*pb.ItemHistoryData) (int64, error) {
  44. query := fmt.Sprintf("insert ignore into %s (`item_name`,`val`,`h_time`) values (?, ?, ?)", m.getTableName(projectId))
  45. if bulk, err := sqlx.NewBulkInserter(m.conn, query); err == nil {
  46. for _, data := range datas {
  47. if err = bulk.Insert(data.ItemName, data.Val, data.HTime); err != nil {
  48. return 0, err
  49. }
  50. }
  51. bulk.Flush()
  52. return int64(len(datas)), nil
  53. } else {
  54. return 0, err
  55. }
  56. }
  57. func (m *defaultDcItemHistoryDataModel) QueryHistoryDataByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) ([]*ItemHistoryData, error) {
  58. resp := make([]*ItemHistoryData, 0)
  59. var err error
  60. if strings.Index(in.ItemName, ",") > 0 {
  61. ItemNames := strings.Split(in.ItemName, ",")
  62. var ItemNameStrs []string
  63. for _, val := range ItemNames {
  64. ItemNameStrs = append(ItemNameStrs, "'"+val+"'")
  65. }
  66. inClause := strings.Join(ItemNameStrs, ",")
  67. query := fmt.Sprintf("SELECT hd.* FROM %s AS hd "+
  68. " LEFT JOIN dc_item_config as ic ON hd.item_name = ic.item_name "+
  69. " WHERE hd.item_name in (%s) AND h_time BETWEEN ? AND ? AND (ic.id IS NULL OR (hd.val > ic.min_val AND hd.val < ic.max_val))", m.getTableName(in.ProjectId), inClause)
  70. err = m.conn.QueryRowsCtx(ctx, &resp, query, in.Stime, in.Etime)
  71. } else {
  72. query := fmt.Sprintf("SELECT hd.* FROM %s AS hd "+
  73. " LEFT JOIN dc_item_config as ic ON hd.item_name = ic.item_name "+
  74. " WHERE hd.item_name in (?) AND h_time BETWEEN ? AND ? AND (ic.id IS NULL OR (hd.val > ic.min_val AND hd.val < ic.max_val))", m.getTableName(in.ProjectId))
  75. err = m.conn.QueryRowsCtx(ctx, &resp, query, in.ItemName, in.Stime, in.Etime)
  76. }
  77. switch err {
  78. case nil:
  79. return resp, nil
  80. case sqlc.ErrNotFound:
  81. return nil, ErrNotFound
  82. default:
  83. return nil, err
  84. }
  85. }
  86. func (m *defaultDcItemHistoryDataModel) QueryHistoryDataMaxMinByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) (*MaxMinData, error) {
  87. resp := &MaxMinData{}
  88. var err error
  89. query := fmt.Sprintf("SELECT max(hd.val) as max_val, min(hd.val) as min_val FROM %s AS hd "+
  90. " LEFT JOIN dc_item_config as ic ON hd.item_name = ic.item_name "+
  91. " WHERE hd.item_name = ? AND h_time BETWEEN ? AND ? AND (ic.id IS NULL OR (hd.val > ic.min_val AND hd.val < ic.max_val))", m.getTableName(in.ProjectId))
  92. err = m.conn.QueryRowCtx(ctx, resp, query, in.ItemName, in.Stime, in.Etime)
  93. switch err {
  94. case nil:
  95. if !resp.MaxVal.Valid {
  96. resp.MaxVal.Float64 = 0
  97. }
  98. if !resp.MinVal.Valid {
  99. resp.MinVal.Float64 = 0
  100. }
  101. return resp, nil
  102. case sqlc.ErrNotFound:
  103. return nil, ErrNotFound
  104. default:
  105. return nil, err
  106. }
  107. }
  108. func (m *defaultDcItemHistoryDataModel) QueryHistoryDataFirstByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) (*ItemHistoryData, error) {
  109. resp := &ItemHistoryData{}
  110. var err error
  111. query := fmt.Sprintf("SELECT hd.* FROM %s AS hd "+
  112. " LEFT JOIN dc_item_config as ic ON hd.item_name = ic.item_name "+
  113. " WHERE hd.item_name = ? AND h_time BETWEEN ? AND ? AND (ic.id IS NULL OR (hd.val > ic.min_val AND hd.val < ic.max_val))"+
  114. " Order by h_time asc limit 1", m.getTableName(in.ProjectId))
  115. err = m.conn.QueryRowCtx(ctx, resp, query, in.ItemName, in.Stime, in.Etime)
  116. switch err {
  117. case nil:
  118. return resp, nil
  119. case sqlc.ErrNotFound:
  120. return nil, ErrNotFound
  121. default:
  122. return nil, err
  123. }
  124. }
  125. func (m *defaultDcItemHistoryDataModel) QueryHistoryDataLastByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) (*ItemHistoryData, error) {
  126. resp := &ItemHistoryData{}
  127. var err error
  128. if in.Stime != "" && in.Etime != "" {
  129. query := fmt.Sprintf("SELECT hd.* FROM %s AS hd "+
  130. " LEFT JOIN dc_item_config as ic ON hd.item_name = ic.item_name "+
  131. " WHERE hd.item_name = ? AND h_time BETWEEN ? AND ? AND (ic.id IS NULL OR (hd.val > ic.min_val AND hd.val < ic.max_val))"+
  132. " Order by h_time desc limit 1", m.getTableName(in.ProjectId))
  133. err = m.conn.QueryRowCtx(ctx, resp, query, in.ItemName, in.Stime, in.Etime)
  134. } else {
  135. query := fmt.Sprintf("SELECT hd.* FROM %s AS hd "+
  136. " LEFT JOIN dc_item_config as ic ON hd.item_name = ic.item_name "+
  137. " WHERE hd.item_name = ? AND (ic.id IS NULL OR (hd.val > ic.min_val AND hd.val < ic.max_val))"+
  138. " Order by h_time desc limit 1", m.getTableName(in.ProjectId))
  139. err = m.conn.QueryRowCtx(ctx, resp, query, in.ItemName)
  140. }
  141. switch err {
  142. case nil:
  143. return resp, nil
  144. case sqlc.ErrNotFound:
  145. return nil, ErrNotFound
  146. default:
  147. return nil, err
  148. }
  149. }
  150. func (m *defaultDcItemHistoryDataModel) getTableName(projectId int64) string {
  151. return fmt.Sprintf("dc_item_history_data_%d", projectId)
  152. }
  153. /*
  154. func (m *defaultDcItemHistoryDataModel) QueryHistoryData(ctx context.Context, in *pb.ItemHistoryDataListReq) ([]DcItemHistoryData, error) {
  155. resp := make([]DcItemHistoryData, 0)
  156. var err error
  157. var query string
  158. if in.Aggregator == "realtime" {
  159. query = fmt.Sprintf("SELECT project_id,item_name,val,DATE_FORMAT(c_time, '%%Y-%%m-%%d %%H:00:00') as c_time FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY DATE_FORMAT(c_time, '%%Y-%%m-%%d %%H:00:00') ORDER BY TIME_TO_SEC(c_time) ASC) AS row_num FROM %s WHERE project_id = ? AND item_name = ? AND c_time >= ? AND c_time < ? AND val <> 0) subquery WHERE row_num = 1;", m.table)
  160. } else if in.Aggregator == "new" {
  161. groupSql := ""
  162. timeSql := ""
  163. if in.Interval == "minute" {
  164. query = fmt.Sprintf(`
  165. SELECT val, DATE_ADD(DATE_FORMAT(min_c_time, '%%Y-%%m-%%d %%H:%%i'), INTERVAL (FLOOR(TIME_TO_SEC(min_c_time)/600)*%d) MINUTE) as c_time
  166. FROM (
  167. SELECT MAX(val) AS val,
  168. MIN(c_time) AS min_c_time
  169. FROM %s
  170. WHERE project_id = ? AND item_name = ?
  171. AND c_time >= ? AND c_time < ?
  172. GROUP BY DATE_FORMAT(c_time, '%%Y-%%m-%%d %%H:%%i'), FLOOR(TIME_TO_SEC(c_time) / 600)
  173. ) AS subquery
  174. ORDER BY min_c_time ASC;`, in.Size, m.table)
  175. } else if in.Interval == "h" {
  176. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-%%d '),LPAD(FLOOR(DATE_FORMAT(ts, '%%H')/%d)*%d,2,0),':00:00'), INTERVAL %d hour)", in.Size, in.Size, in.Size)
  177. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d' ),FLOOR(DATE_FORMAT(ts, '%%H' )/%d)", in.Size)
  178. } else {
  179. // day
  180. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-'),LPAD(FLOOR(DATE_FORMAT(ts, '%%d')/%d)*%d,2,0),' 00:00:00'), INTERVAL %d day)", in.Size, in.Size, in.Size)
  181. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m' ),FLOOR(DATE_FORMAT(ts, '%%d' )/%d)", in.Size)
  182. }
  183. tableName := "ws_scada." + TableName(itemId)
  184. table := fmt.Sprintf(`(select *
  185. from %s
  186. where item_name = "%s" and project_id = %s
  187. and ts >= "%s" and ts < "%s"
  188. order by ts desc
  189. limit 99999999) A `, tableName, itemId, projectId, sTime, eTime)
  190. rows, err = DB.New().Table(table).
  191. Select(fmt.Sprintf("cast(item_value as DECIMAL(11,4)) as val, %s as htime", timeSql)).
  192. Group(groupSql).
  193. Order("htime asc").
  194. Rows()
  195. }
  196. err = m.conn.QueryRowCtx(ctx, &resp, query, in.ProjectId, in.ItemName, in.Stime, in.Etime)
  197. switch err {
  198. case nil:
  199. return resp, nil
  200. case sqlc.ErrNotFound:
  201. return nil, ErrNotFound
  202. default:
  203. return nil, err
  204. }
  205. var rows *sql.Rows
  206. if aggregator == "realtime" {
  207. rows, err = DB.New().Table("ws_scada."+TableName(itemId)).
  208. Select("item_value as val,CASE WHEN MINUTE(ts) >= 30 THEN DATE_FORMAT(DATE_ADD(ts, INTERVAL 1 HOUR), '%Y-%m-%d %H:00:00') ELSE DATE_FORMAT(ts, '%Y-%m-%d %H:00:00') END AS htime").
  209. Where("item_name = ? ", itemId).
  210. Where("ts >= ? and ts < ?", sTime, eTime).
  211. Where("project_id = ?", projectId).
  212. Group("htime").
  213. Rows()
  214. } else if aggregator == "new" {
  215. groupSql := ""
  216. timeSql := ""
  217. if interval == "minute" {
  218. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-%%d %%H:'),LPAD(FLOOR(DATE_FORMAT(ts, '%%i')/%d)*%d,2,0),':00'), INTERVAL %d minute)", sizeInt, sizeInt, sizeInt)
  219. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d %%H' ),FLOOR(DATE_FORMAT(ts, '%%i' )/%d)", sizeInt)
  220. } else if interval == "h" {
  221. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-%%d '),LPAD(FLOOR(DATE_FORMAT(ts, '%%H')/%d)*%d,2,0),':00:00'), INTERVAL %d hour)", sizeInt, sizeInt, sizeInt)
  222. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d' ),FLOOR(DATE_FORMAT(ts, '%%H' )/%d)", sizeInt)
  223. } else {
  224. // day
  225. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-'),LPAD(FLOOR(DATE_FORMAT(ts, '%%d')/%d)*%d,2,0),' 00:00:00'), INTERVAL %d day)", sizeInt, sizeInt, sizeInt)
  226. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m' ),FLOOR(DATE_FORMAT(ts, '%%d' )/%d)", sizeInt)
  227. }
  228. tableName := "ws_scada." + TableName(itemId)
  229. table := fmt.Sprintf(`(select *
  230. from %s
  231. where item_name = "%s" and project_id = %s
  232. and ts >= "%s" and ts < "%s"
  233. order by ts desc
  234. limit 99999999) A `, tableName, itemId, projectId, sTime, eTime)
  235. rows, err = DB.New().Table(table).
  236. Select(fmt.Sprintf("cast(item_value as DECIMAL(11,4)) as val, %s as htime", timeSql)).
  237. Group(groupSql).
  238. Order("htime asc").
  239. Rows()
  240. } else {
  241. groupSql := ""
  242. timeSql := ""
  243. if interval == "minute" {
  244. timeSql = fmt.Sprintf("concat(DATE_FORMAT(ts,'%%Y-%%m-%%d %%H:'),LPAD(FLOOR(DATE_FORMAT(ts, '%%i')/%d)*%d,2,0),':00')", sizeInt, sizeInt)
  245. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d %%H' ),FLOOR(DATE_FORMAT(ts, '%%i' )/%d)", sizeInt)
  246. } else if interval == "h" {
  247. timeSql = fmt.Sprintf("concat(DATE_FORMAT(ts, '%%Y-%%m-%%d ' ),LPAD(FLOOR(DATE_FORMAT(ts, '%%H' )/%d)*%d,2,0),':00:00')", sizeInt, sizeInt)
  248. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d' ),FLOOR(DATE_FORMAT(ts, '%%H' )/%d)", sizeInt)
  249. } else {
  250. //day
  251. timeSql = fmt.Sprintf("concat(DATE_FORMAT(ts, '%%Y-%%m-' ),LPAD(FLOOR(DATE_FORMAT(ts, '%%d' )/%d)*%d,2,0),' 00:00:00')", sizeInt, sizeInt)
  252. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m' ),FLOOR(DATE_FORMAT(ts, '%%d' )/%d)", sizeInt)
  253. }
  254. rows, err = DB.New().Table("ws_scada."+TableName(itemId)).
  255. Select(fmt.Sprintf("%s(cast(item_value as DECIMAL(11,4))) as val, %s as htime", aggregator, timeSql)).
  256. Where("item_name = ? ", itemId).
  257. Where("ts >= ? and ts < ?", sTime, eTime).
  258. Where("project_id = ?", projectId).
  259. Group(groupSql).
  260. Rows()
  261. }
  262. defer rows.Close()
  263. if err != nil {
  264. log.Error(err)
  265. return nil, err
  266. }
  267. // 查询点位信息 导出表格需要
  268. configMap, err := GetDrdcByItemNameAndDeviceId(itemId, fmt.Sprintf("%d", deviceId))
  269. if err != nil {
  270. log.Error(err)
  271. return nil, err
  272. }
  273. for rows.Next() {
  274. di := postgres.JinkeHistoryData{}
  275. var val sql.NullString
  276. err := rows.Scan(&val, &di.Htime)
  277. di.Val = NullStringConverse(val)
  278. if err != nil { // 获得的都是字符串
  279. log.Error("Some amazing wrong happens in the process of queryAll.", err)
  280. return nil, err
  281. }
  282. di.Name = configMap.ItemAlias
  283. //判断小数点
  284. if configMap.IsBool == false {
  285. di.Val = processPrecise(di.Val, configMap.ItemPrecise)
  286. }
  287. resp = append(resp, di)
  288. }
  289. return resp, nil
  290. }
  291. */