dcitemhistorydatamodel.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package model
  2. import (
  3. "GtDataStore/app/cmd/organization/pb"
  4. "context"
  5. "database/sql"
  6. "fmt"
  7. "github.com/zeromicro/go-zero/core/stores/sqlc"
  8. "github.com/zeromicro/go-zero/core/stores/sqlx"
  9. "strings"
  10. "time"
  11. )
  12. var _ DcItemHistoryDataModel = (*customDcItemHistoryDataModel)(nil)
  13. type (
  14. // DcItemHistoryDataModel is an interface to be customized, add more methods here,
  15. // and implement the added methods in customDcItemHistoryDataModel.
  16. DcItemHistoryDataModel interface {
  17. dcItemHistoryDataModel
  18. MultiInsert(ctx context.Context, projectId int64, datas []*pb.ItemHistoryData) (int64, error)
  19. QueryHistoryDataByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) ([]*ItemHistoryData, error)
  20. QueryHistoryDataMaxMinByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) (*MaxMinData, error)
  21. }
  22. customDcItemHistoryDataModel struct {
  23. *defaultDcItemHistoryDataModel
  24. }
  25. MaxMinData struct {
  26. MaxVal sql.NullFloat64 `db:"max_val"` // 最大值
  27. MinVal sql.NullFloat64 `db:"min_val"` // 最小值
  28. }
  29. ItemHistoryData struct {
  30. ItemName string `db:"item_name"` // 点位名
  31. Val float64 `db:"val"` // 值
  32. HTime time.Time `db:"h_time"` // 采集数据时间
  33. }
  34. )
  35. // NewDcItemHistoryDataModel returns a model for the database table.
  36. func NewDcItemHistoryDataModel(conn sqlx.SqlConn) DcItemHistoryDataModel {
  37. return &customDcItemHistoryDataModel{
  38. defaultDcItemHistoryDataModel: newDcItemHistoryDataModel(conn),
  39. }
  40. }
  41. func (m *defaultDcItemHistoryDataModel) MultiInsert(ctx context.Context, projectId int64, datas []*pb.ItemHistoryData) (int64, error) {
  42. query := fmt.Sprintf("insert ignore into %s (`item_name`,`val`,`h_time`) values (?, ?, ?)", m.getTableName(projectId))
  43. if bulk, err := sqlx.NewBulkInserter(m.conn, query); err == nil {
  44. for _, data := range datas {
  45. if err = bulk.Insert(data.ItemName, data.Val, data.HTime); err != nil {
  46. return 0, err
  47. }
  48. }
  49. bulk.Flush()
  50. return int64(len(datas)), nil
  51. } else {
  52. return 0, err
  53. }
  54. }
  55. func (m *defaultDcItemHistoryDataModel) QueryHistoryDataByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) ([]*ItemHistoryData, error) {
  56. resp := make([]*ItemHistoryData, 0)
  57. var err error
  58. if strings.Index(in.ItemName, ",") > 0 {
  59. ItemNames := strings.Split(in.ItemName, ",")
  60. var ItemNameStrs []string
  61. for _, val := range ItemNames {
  62. ItemNameStrs = append(ItemNameStrs, "'"+val+"'")
  63. }
  64. inClause := strings.Join(ItemNameStrs, ",")
  65. query := fmt.Sprintf("SELECT hd.* FROM %s AS hd "+
  66. " LEFT JOIN dc_item_config as ic ON hd.item_name = ic.item_name "+
  67. " WHERE hd.item_name in (%s) AND h_time BETWEEN ? AND ? AND (ic.id IS NULL OR (hd.val > ic.min_val AND hd.val < ic.max_val))", m.getTableName(in.ProjectId), inClause)
  68. err = m.conn.QueryRowsCtx(ctx, &resp, query, in.Stime, in.Etime)
  69. } else {
  70. query := fmt.Sprintf("SELECT hd.* FROM %s AS hd "+
  71. " LEFT JOIN dc_item_config as ic ON hd.item_name = ic.item_name "+
  72. " WHERE hd.item_name in (?) AND h_time BETWEEN ? AND ? AND (ic.id IS NULL OR (hd.val > ic.min_val AND hd.val < ic.max_val))", m.getTableName(in.ProjectId))
  73. err = m.conn.QueryRowsCtx(ctx, &resp, query, in.ItemName, in.Stime, in.Etime)
  74. }
  75. switch err {
  76. case nil:
  77. return resp, nil
  78. case sqlc.ErrNotFound:
  79. return nil, ErrNotFound
  80. default:
  81. return nil, err
  82. }
  83. }
  84. func (m *defaultDcItemHistoryDataModel) QueryHistoryDataMaxMinByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) (*MaxMinData, error) {
  85. resp := &MaxMinData{}
  86. var err error
  87. query := fmt.Sprintf("SELECT max(hd.val) as max_val, min(hd.val) as min_val FROM %s AS hd "+
  88. " LEFT JOIN dc_item_config as ic ON hd.item_name = ic.item_name "+
  89. " WHERE hd.item_name = ? AND h_time BETWEEN ? AND ? AND (ic.id IS NULL OR (hd.val > ic.min_val AND hd.val < ic.max_val))", m.getTableName(in.ProjectId))
  90. err = m.conn.QueryRowCtx(ctx, resp, query, in.ItemName, in.Stime, in.Etime)
  91. switch err {
  92. case nil:
  93. if !resp.MaxVal.Valid {
  94. resp.MaxVal.Float64 = 0
  95. }
  96. if !resp.MinVal.Valid {
  97. resp.MinVal.Float64 = 0
  98. }
  99. return resp, nil
  100. case sqlc.ErrNotFound:
  101. return nil, ErrNotFound
  102. default:
  103. return nil, err
  104. }
  105. }
  106. func (m *defaultDcItemHistoryDataModel) getTableName(projectId int64) string {
  107. return fmt.Sprintf("dc_item_history_data_%d", projectId)
  108. }
  109. /*
  110. func (m *defaultDcItemHistoryDataModel) QueryHistoryData(ctx context.Context, in *pb.ItemHistoryDataListReq) ([]DcItemHistoryData, error) {
  111. resp := make([]DcItemHistoryData, 0)
  112. var err error
  113. var query string
  114. if in.Aggregator == "realtime" {
  115. query = fmt.Sprintf("SELECT project_id,item_name,val,DATE_FORMAT(c_time, '%%Y-%%m-%%d %%H:00:00') as c_time FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY DATE_FORMAT(c_time, '%%Y-%%m-%%d %%H:00:00') ORDER BY TIME_TO_SEC(c_time) ASC) AS row_num FROM %s WHERE project_id = ? AND item_name = ? AND c_time >= ? AND c_time < ? AND val <> 0) subquery WHERE row_num = 1;", m.table)
  116. } else if in.Aggregator == "new" {
  117. groupSql := ""
  118. timeSql := ""
  119. if in.Interval == "minute" {
  120. query = fmt.Sprintf(`
  121. SELECT val, DATE_ADD(DATE_FORMAT(min_c_time, '%%Y-%%m-%%d %%H:%%i'), INTERVAL (FLOOR(TIME_TO_SEC(min_c_time)/600)*%d) MINUTE) as c_time
  122. FROM (
  123. SELECT MAX(val) AS val,
  124. MIN(c_time) AS min_c_time
  125. FROM %s
  126. WHERE project_id = ? AND item_name = ?
  127. AND c_time >= ? AND c_time < ?
  128. GROUP BY DATE_FORMAT(c_time, '%%Y-%%m-%%d %%H:%%i'), FLOOR(TIME_TO_SEC(c_time) / 600)
  129. ) AS subquery
  130. ORDER BY min_c_time ASC;`, in.Size, m.table)
  131. } else if in.Interval == "h" {
  132. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-%%d '),LPAD(FLOOR(DATE_FORMAT(ts, '%%H')/%d)*%d,2,0),':00:00'), INTERVAL %d hour)", in.Size, in.Size, in.Size)
  133. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d' ),FLOOR(DATE_FORMAT(ts, '%%H' )/%d)", in.Size)
  134. } else {
  135. // day
  136. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-'),LPAD(FLOOR(DATE_FORMAT(ts, '%%d')/%d)*%d,2,0),' 00:00:00'), INTERVAL %d day)", in.Size, in.Size, in.Size)
  137. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m' ),FLOOR(DATE_FORMAT(ts, '%%d' )/%d)", in.Size)
  138. }
  139. tableName := "ws_scada." + TableName(itemId)
  140. table := fmt.Sprintf(`(select *
  141. from %s
  142. where item_name = "%s" and project_id = %s
  143. and ts >= "%s" and ts < "%s"
  144. order by ts desc
  145. limit 99999999) A `, tableName, itemId, projectId, sTime, eTime)
  146. rows, err = DB.New().Table(table).
  147. Select(fmt.Sprintf("cast(item_value as DECIMAL(11,4)) as val, %s as htime", timeSql)).
  148. Group(groupSql).
  149. Order("htime asc").
  150. Rows()
  151. }
  152. err = m.conn.QueryRowCtx(ctx, &resp, query, in.ProjectId, in.ItemName, in.Stime, in.Etime)
  153. switch err {
  154. case nil:
  155. return resp, nil
  156. case sqlc.ErrNotFound:
  157. return nil, ErrNotFound
  158. default:
  159. return nil, err
  160. }
  161. var rows *sql.Rows
  162. if aggregator == "realtime" {
  163. rows, err = DB.New().Table("ws_scada."+TableName(itemId)).
  164. Select("item_value as val,CASE WHEN MINUTE(ts) >= 30 THEN DATE_FORMAT(DATE_ADD(ts, INTERVAL 1 HOUR), '%Y-%m-%d %H:00:00') ELSE DATE_FORMAT(ts, '%Y-%m-%d %H:00:00') END AS htime").
  165. Where("item_name = ? ", itemId).
  166. Where("ts >= ? and ts < ?", sTime, eTime).
  167. Where("project_id = ?", projectId).
  168. Group("htime").
  169. Rows()
  170. } else if aggregator == "new" {
  171. groupSql := ""
  172. timeSql := ""
  173. if interval == "minute" {
  174. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-%%d %%H:'),LPAD(FLOOR(DATE_FORMAT(ts, '%%i')/%d)*%d,2,0),':00'), INTERVAL %d minute)", sizeInt, sizeInt, sizeInt)
  175. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d %%H' ),FLOOR(DATE_FORMAT(ts, '%%i' )/%d)", sizeInt)
  176. } else if interval == "h" {
  177. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-%%d '),LPAD(FLOOR(DATE_FORMAT(ts, '%%H')/%d)*%d,2,0),':00:00'), INTERVAL %d hour)", sizeInt, sizeInt, sizeInt)
  178. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d' ),FLOOR(DATE_FORMAT(ts, '%%H' )/%d)", sizeInt)
  179. } else {
  180. // day
  181. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-'),LPAD(FLOOR(DATE_FORMAT(ts, '%%d')/%d)*%d,2,0),' 00:00:00'), INTERVAL %d day)", sizeInt, sizeInt, sizeInt)
  182. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m' ),FLOOR(DATE_FORMAT(ts, '%%d' )/%d)", sizeInt)
  183. }
  184. tableName := "ws_scada." + TableName(itemId)
  185. table := fmt.Sprintf(`(select *
  186. from %s
  187. where item_name = "%s" and project_id = %s
  188. and ts >= "%s" and ts < "%s"
  189. order by ts desc
  190. limit 99999999) A `, tableName, itemId, projectId, sTime, eTime)
  191. rows, err = DB.New().Table(table).
  192. Select(fmt.Sprintf("cast(item_value as DECIMAL(11,4)) as val, %s as htime", timeSql)).
  193. Group(groupSql).
  194. Order("htime asc").
  195. Rows()
  196. } else {
  197. groupSql := ""
  198. timeSql := ""
  199. if interval == "minute" {
  200. timeSql = fmt.Sprintf("concat(DATE_FORMAT(ts,'%%Y-%%m-%%d %%H:'),LPAD(FLOOR(DATE_FORMAT(ts, '%%i')/%d)*%d,2,0),':00')", sizeInt, sizeInt)
  201. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d %%H' ),FLOOR(DATE_FORMAT(ts, '%%i' )/%d)", sizeInt)
  202. } else if interval == "h" {
  203. timeSql = fmt.Sprintf("concat(DATE_FORMAT(ts, '%%Y-%%m-%%d ' ),LPAD(FLOOR(DATE_FORMAT(ts, '%%H' )/%d)*%d,2,0),':00:00')", sizeInt, sizeInt)
  204. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d' ),FLOOR(DATE_FORMAT(ts, '%%H' )/%d)", sizeInt)
  205. } else {
  206. //day
  207. timeSql = fmt.Sprintf("concat(DATE_FORMAT(ts, '%%Y-%%m-' ),LPAD(FLOOR(DATE_FORMAT(ts, '%%d' )/%d)*%d,2,0),' 00:00:00')", sizeInt, sizeInt)
  208. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m' ),FLOOR(DATE_FORMAT(ts, '%%d' )/%d)", sizeInt)
  209. }
  210. rows, err = DB.New().Table("ws_scada."+TableName(itemId)).
  211. Select(fmt.Sprintf("%s(cast(item_value as DECIMAL(11,4))) as val, %s as htime", aggregator, timeSql)).
  212. Where("item_name = ? ", itemId).
  213. Where("ts >= ? and ts < ?", sTime, eTime).
  214. Where("project_id = ?", projectId).
  215. Group(groupSql).
  216. Rows()
  217. }
  218. defer rows.Close()
  219. if err != nil {
  220. log.Error(err)
  221. return nil, err
  222. }
  223. // 查询点位信息 导出表格需要
  224. configMap, err := GetDrdcByItemNameAndDeviceId(itemId, fmt.Sprintf("%d", deviceId))
  225. if err != nil {
  226. log.Error(err)
  227. return nil, err
  228. }
  229. for rows.Next() {
  230. di := postgres.JinkeHistoryData{}
  231. var val sql.NullString
  232. err := rows.Scan(&val, &di.Htime)
  233. di.Val = NullStringConverse(val)
  234. if err != nil { // 获得的都是字符串
  235. log.Error("Some amazing wrong happens in the process of queryAll.", err)
  236. return nil, err
  237. }
  238. di.Name = configMap.ItemAlias
  239. //判断小数点
  240. if configMap.IsBool == false {
  241. di.Val = processPrecise(di.Val, configMap.ItemPrecise)
  242. }
  243. resp = append(resp, di)
  244. }
  245. return resp, nil
  246. }
  247. */