dcitemhistorydatamodel.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package model
  2. import (
  3. "GtDataStore/app/cmd/organization/pb"
  4. "context"
  5. "fmt"
  6. "github.com/zeromicro/go-zero/core/stores/sqlc"
  7. "github.com/zeromicro/go-zero/core/stores/sqlx"
  8. "strings"
  9. )
  10. var _ DcItemHistoryDataModel = (*customDcItemHistoryDataModel)(nil)
  11. type (
  12. // DcItemHistoryDataModel is an interface to be customized, add more methods here,
  13. // and implement the added methods in customDcItemHistoryDataModel.
  14. DcItemHistoryDataModel interface {
  15. dcItemHistoryDataModel
  16. MultiInsert(ctx context.Context, datas []DcItemHistoryData) (int64, error)
  17. QueryHistoryDataByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) ([]DcItemHistoryData, error)
  18. QueryHistoryDataMaxMinByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) (*MaxMinData, error)
  19. }
  20. customDcItemHistoryDataModel struct {
  21. *defaultDcItemHistoryDataModel
  22. }
  23. MaxMinData struct {
  24. MaxVal float64 `db:"max_val"` // 最大值
  25. MinVal float64 `db:"min_val"` // 最小值
  26. }
  27. )
  28. // NewDcItemHistoryDataModel returns a model for the database table.
  29. func NewDcItemHistoryDataModel(conn sqlx.SqlConn) DcItemHistoryDataModel {
  30. return &customDcItemHistoryDataModel{
  31. defaultDcItemHistoryDataModel: newDcItemHistoryDataModel(conn),
  32. }
  33. }
  34. func (m *defaultDcItemHistoryDataModel) MultiInsert(ctx context.Context, datas []DcItemHistoryData) (int64, error) {
  35. query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?)", m.table, dcItemHistoryDataRowsExpectAutoSet)
  36. if bulk, err := sqlx.NewBulkInserter(m.conn, query); err == nil {
  37. for _, data := range datas {
  38. if err = bulk.Insert(data.ProjectId, data.ItemName, data.Val, data.HTime, data.CTime); err != nil {
  39. return 0, err
  40. }
  41. }
  42. bulk.Flush()
  43. return int64(len(datas)), nil
  44. } else {
  45. return 0, err
  46. }
  47. }
  48. func (m *defaultDcItemHistoryDataModel) QueryHistoryDataByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) ([]DcItemHistoryData, error) {
  49. resp := make([]DcItemHistoryData, 0)
  50. var err error
  51. query := fmt.Sprintf("SELECT * FROM %s WHERE project_id = ? AND item_name in (?) AND h_time BETWEEN ? AND ? ORDER BY id desc", m.table)
  52. if strings.Index(in.ItemName, ",") > 0 {
  53. err = m.conn.QueryRowsCtx(ctx, &resp, query, in.ProjectId, strings.Split(in.ItemName, ","), in.Stime, in.Etime)
  54. } else {
  55. err = m.conn.QueryRowsCtx(ctx, &resp, query, in.ProjectId, in.ItemName, in.Stime, in.Etime)
  56. }
  57. switch err {
  58. case nil:
  59. return resp, nil
  60. case sqlc.ErrNotFound:
  61. return nil, ErrNotFound
  62. default:
  63. return nil, err
  64. }
  65. }
  66. func (m *defaultDcItemHistoryDataModel) QueryHistoryDataMaxMinByTime(ctx context.Context, in *pb.ItemHistoryDataByTimeReq) (*MaxMinData, error) {
  67. resp := &MaxMinData{}
  68. var err error
  69. query := fmt.Sprintf("SELECT max(val) as max_val, min(val) as min_val FROM %s WHERE project_id = ? AND item_name = ? AND h_time BETWEEN ? AND ? ORDER BY id desc", m.table)
  70. err = m.conn.QueryRowCtx(ctx, resp, query, in.ProjectId, in.ItemName, in.Stime, in.Etime)
  71. switch err {
  72. case nil:
  73. return resp, nil
  74. case sqlc.ErrNotFound:
  75. return nil, ErrNotFound
  76. default:
  77. return nil, err
  78. }
  79. }
  80. /*
  81. func (m *defaultDcItemHistoryDataModel) QueryHistoryData(ctx context.Context, in *pb.ItemHistoryDataListReq) ([]DcItemHistoryData, error) {
  82. resp := make([]DcItemHistoryData, 0)
  83. var err error
  84. var query string
  85. if in.Aggregator == "realtime" {
  86. query = fmt.Sprintf("SELECT project_id,item_name,val,DATE_FORMAT(c_time, '%%Y-%%m-%%d %%H:00:00') as c_time FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY DATE_FORMAT(c_time, '%%Y-%%m-%%d %%H:00:00') ORDER BY TIME_TO_SEC(c_time) ASC) AS row_num FROM %s WHERE project_id = ? AND item_name = ? AND c_time >= ? AND c_time < ? AND val <> 0) subquery WHERE row_num = 1;", m.table)
  87. } else if in.Aggregator == "new" {
  88. groupSql := ""
  89. timeSql := ""
  90. if in.Interval == "minute" {
  91. query = fmt.Sprintf(`
  92. SELECT val, DATE_ADD(DATE_FORMAT(min_c_time, '%%Y-%%m-%%d %%H:%%i'), INTERVAL (FLOOR(TIME_TO_SEC(min_c_time)/600)*%d) MINUTE) as c_time
  93. FROM (
  94. SELECT MAX(val) AS val,
  95. MIN(c_time) AS min_c_time
  96. FROM %s
  97. WHERE project_id = ? AND item_name = ?
  98. AND c_time >= ? AND c_time < ?
  99. GROUP BY DATE_FORMAT(c_time, '%%Y-%%m-%%d %%H:%%i'), FLOOR(TIME_TO_SEC(c_time) / 600)
  100. ) AS subquery
  101. ORDER BY min_c_time ASC;`, in.Size, m.table)
  102. } else if in.Interval == "h" {
  103. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-%%d '),LPAD(FLOOR(DATE_FORMAT(ts, '%%H')/%d)*%d,2,0),':00:00'), INTERVAL %d hour)", in.Size, in.Size, in.Size)
  104. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d' ),FLOOR(DATE_FORMAT(ts, '%%H' )/%d)", in.Size)
  105. } else {
  106. // day
  107. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-'),LPAD(FLOOR(DATE_FORMAT(ts, '%%d')/%d)*%d,2,0),' 00:00:00'), INTERVAL %d day)", in.Size, in.Size, in.Size)
  108. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m' ),FLOOR(DATE_FORMAT(ts, '%%d' )/%d)", in.Size)
  109. }
  110. tableName := "ws_scada." + TableName(itemId)
  111. table := fmt.Sprintf(`(select *
  112. from %s
  113. where item_name = "%s" and project_id = %s
  114. and ts >= "%s" and ts < "%s"
  115. order by ts desc
  116. limit 99999999) A `, tableName, itemId, projectId, sTime, eTime)
  117. rows, err = DB.New().Table(table).
  118. Select(fmt.Sprintf("cast(item_value as DECIMAL(11,4)) as val, %s as htime", timeSql)).
  119. Group(groupSql).
  120. Order("htime asc").
  121. Rows()
  122. }
  123. err = m.conn.QueryRowCtx(ctx, &resp, query, in.ProjectId, in.ItemName, in.Stime, in.Etime)
  124. switch err {
  125. case nil:
  126. return resp, nil
  127. case sqlc.ErrNotFound:
  128. return nil, ErrNotFound
  129. default:
  130. return nil, err
  131. }
  132. var rows *sql.Rows
  133. if aggregator == "realtime" {
  134. rows, err = DB.New().Table("ws_scada."+TableName(itemId)).
  135. Select("item_value as val,CASE WHEN MINUTE(ts) >= 30 THEN DATE_FORMAT(DATE_ADD(ts, INTERVAL 1 HOUR), '%Y-%m-%d %H:00:00') ELSE DATE_FORMAT(ts, '%Y-%m-%d %H:00:00') END AS htime").
  136. Where("item_name = ? ", itemId).
  137. Where("ts >= ? and ts < ?", sTime, eTime).
  138. Where("project_id = ?", projectId).
  139. Group("htime").
  140. Rows()
  141. } else if aggregator == "new" {
  142. groupSql := ""
  143. timeSql := ""
  144. if interval == "minute" {
  145. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-%%d %%H:'),LPAD(FLOOR(DATE_FORMAT(ts, '%%i')/%d)*%d,2,0),':00'), INTERVAL %d minute)", sizeInt, sizeInt, sizeInt)
  146. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d %%H' ),FLOOR(DATE_FORMAT(ts, '%%i' )/%d)", sizeInt)
  147. } else if interval == "h" {
  148. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-%%d '),LPAD(FLOOR(DATE_FORMAT(ts, '%%H')/%d)*%d,2,0),':00:00'), INTERVAL %d hour)", sizeInt, sizeInt, sizeInt)
  149. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d' ),FLOOR(DATE_FORMAT(ts, '%%H' )/%d)", sizeInt)
  150. } else {
  151. // day
  152. timeSql = fmt.Sprintf("DATE_ADD(concat(DATE_FORMAT(ts,'%%Y-%%m-'),LPAD(FLOOR(DATE_FORMAT(ts, '%%d')/%d)*%d,2,0),' 00:00:00'), INTERVAL %d day)", sizeInt, sizeInt, sizeInt)
  153. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m' ),FLOOR(DATE_FORMAT(ts, '%%d' )/%d)", sizeInt)
  154. }
  155. tableName := "ws_scada." + TableName(itemId)
  156. table := fmt.Sprintf(`(select *
  157. from %s
  158. where item_name = "%s" and project_id = %s
  159. and ts >= "%s" and ts < "%s"
  160. order by ts desc
  161. limit 99999999) A `, tableName, itemId, projectId, sTime, eTime)
  162. rows, err = DB.New().Table(table).
  163. Select(fmt.Sprintf("cast(item_value as DECIMAL(11,4)) as val, %s as htime", timeSql)).
  164. Group(groupSql).
  165. Order("htime asc").
  166. Rows()
  167. } else {
  168. groupSql := ""
  169. timeSql := ""
  170. if interval == "minute" {
  171. timeSql = fmt.Sprintf("concat(DATE_FORMAT(ts,'%%Y-%%m-%%d %%H:'),LPAD(FLOOR(DATE_FORMAT(ts, '%%i')/%d)*%d,2,0),':00')", sizeInt, sizeInt)
  172. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d %%H' ),FLOOR(DATE_FORMAT(ts, '%%i' )/%d)", sizeInt)
  173. } else if interval == "h" {
  174. timeSql = fmt.Sprintf("concat(DATE_FORMAT(ts, '%%Y-%%m-%%d ' ),LPAD(FLOOR(DATE_FORMAT(ts, '%%H' )/%d)*%d,2,0),':00:00')", sizeInt, sizeInt)
  175. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m-%%d' ),FLOOR(DATE_FORMAT(ts, '%%H' )/%d)", sizeInt)
  176. } else {
  177. //day
  178. timeSql = fmt.Sprintf("concat(DATE_FORMAT(ts, '%%Y-%%m-' ),LPAD(FLOOR(DATE_FORMAT(ts, '%%d' )/%d)*%d,2,0),' 00:00:00')", sizeInt, sizeInt)
  179. groupSql = fmt.Sprintf("DATE_FORMAT(ts, '%%Y-%%m' ),FLOOR(DATE_FORMAT(ts, '%%d' )/%d)", sizeInt)
  180. }
  181. rows, err = DB.New().Table("ws_scada."+TableName(itemId)).
  182. Select(fmt.Sprintf("%s(cast(item_value as DECIMAL(11,4))) as val, %s as htime", aggregator, timeSql)).
  183. Where("item_name = ? ", itemId).
  184. Where("ts >= ? and ts < ?", sTime, eTime).
  185. Where("project_id = ?", projectId).
  186. Group(groupSql).
  187. Rows()
  188. }
  189. defer rows.Close()
  190. if err != nil {
  191. log.Error(err)
  192. return nil, err
  193. }
  194. // 查询点位信息 导出表格需要
  195. configMap, err := GetDrdcByItemNameAndDeviceId(itemId, fmt.Sprintf("%d", deviceId))
  196. if err != nil {
  197. log.Error(err)
  198. return nil, err
  199. }
  200. for rows.Next() {
  201. di := postgres.JinkeHistoryData{}
  202. var val sql.NullString
  203. err := rows.Scan(&val, &di.Htime)
  204. di.Val = NullStringConverse(val)
  205. if err != nil { // 获得的都是字符串
  206. log.Error("Some amazing wrong happens in the process of queryAll.", err)
  207. return nil, err
  208. }
  209. di.Name = configMap.ItemAlias
  210. //判断小数点
  211. if configMap.IsBool == false {
  212. di.Val = processPrecise(di.Val, configMap.ItemPrecise)
  213. }
  214. resp = append(resp, di)
  215. }
  216. return resp, nil
  217. }
  218. */