func.go 11 KB


  1. package envitem
  2. import (
  3. "context"
  4. "database/sql/driver"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "metawant.greentech.com.cn/gaoyagang/gt-common/httplib"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. func SetOptions(options Options) {
  15. snapUrl = fmt.Sprintf("http://%s/api/v1/plc-current", options.GtServerIp)
  16. ctlUrl = fmt.Sprintf("http://%s/api/v1/plc/set-var-values", options.GtServerIp)
  17. plcItemUrl = fmt.Sprintf("http://%s/api/v1/plc-list/", options.GtServerIp)
  18. if options.Cache != nil {
  19. cache = options.Cache
  20. }
  21. fetchMultiItem = options.FetchMultiItem
  22. adjustValue = options.AdjustValue
  23. }
  24. func (m MultiEnvItem) GetProjectId() int64 {
  25. for _, item := range m {
  26. return item.ProjectId
  27. }
  28. return 0
  29. }
  30. func (m MultiEnvItem) getItemNames() []string {
  31. items := make([]string, 0)
  32. for _, item := range m {
  33. items = append(items, item.Item)
  34. }
  35. return items
  36. }
  37. func (m MultiEnvItem) FillCurrentValue() error {
  38. datas := make([]*ItemValueResp, 0)
  39. var err error
  40. if fetchMultiItem {
  41. datas, err = m.getCurrentData()
  42. } else {
  43. datas, err = m.getCurrentDataByOne()
  44. }
  45. if err != nil {
  46. return err
  47. }
  48. // 先做一个item -> key的映射
  49. ikm := make(map[string]string)
  50. for s, item := range m {
  51. ikm[item.Item] = s
  52. }
  53. for _, data := range datas {
  54. if data != nil {
  55. if k, ok := ikm[data.ItemName]; ok {
  56. m[k].Value = data.Val
  57. m[k].Htime = data.HTime
  58. _ = m[k].setPrevValue()
  59. }
  60. }
  61. }
  62. return nil
  63. }
  64. func (m MultiEnvItem) GetItemFloat64Value(key string) float64 {
  65. if envItem, ok := m[key]; ok {
  66. return envItem.GetItemFloat64Val()
  67. }
  68. return 0
  69. }
  70. func (m MultiEnvItem) GetItemInt64Value(key string) int64 {
  71. if envItem, ok := m[key]; ok {
  72. return envItem.GetItemInt64Val()
  73. }
  74. return 0
  75. }
  76. func (m MultiEnvItem) GetItemStringValue(key string) string {
  77. if envItem, ok := m[key]; ok {
  78. return envItem.GetItemStringVal()
  79. }
  80. return ""
  81. }
  82. func (m MultiEnvItem) GetItemHtime(key string) *time.Time {
  83. if envItem, ok := m[key]; ok {
  84. return envItem.GetItemHtime()
  85. }
  86. return nil
  87. }
  88. func (m MultiEnvItem) FindString() map[string]string {
  89. a := make(map[string]string)
  90. for _, item := range m {
  91. a[item.Item] = item.GetItemStringVal()
  92. }
  93. return a
  94. }
  95. func (m MultiEnvItem) FindPrevString() map[string]string {
  96. a := make(map[string]string)
  97. for _, item := range m {
  98. a[item.Item] = item.GetItemPrevStringVal()
  99. }
  100. return a
  101. }
  102. func (m MultiEnvItem) ClearValues() {
  103. for _, item := range m {
  104. item.clearValue()
  105. }
  106. }
  107. func (m MultiEnvItem) getCurrentDataByOne() ([]*ItemValueResp, error) {
  108. datas := make([]*ItemValueResp, len(m))
  109. i := 0
  110. var wg sync.WaitGroup
  111. wg.Add(len(m))
  112. for _, item := range m {
  113. go func(index int, one *EnvItem) {
  114. defer wg.Done()
  115. datas[index], _ = one.getCurrentData()
  116. }(i, item)
  117. i++
  118. }
  119. wg.Wait()
  120. return datas, nil
  121. }
  122. func (m MultiEnvItem) getCurrentData() ([]*ItemValueResp, error) {
  123. req := httplib.Post(snapUrl)
  124. data := make([]*ItemValueReq, 1)
  125. data[0] = &ItemValueReq{
  126. DeviceItems: strings.Join(m.getItemNames(), ","),
  127. ProjectId: m.GetProjectId(),
  128. }
  129. jsonBytes, err := json.Marshal(data)
  130. if err != nil {
  131. return nil, err
  132. }
  133. req.Body(jsonBytes)
  134. req.SetTimeout(time.Millisecond * 2000)
  135. r, err := req.Response()
  136. if err != nil {
  137. return nil, err
  138. }
  139. defer r.Body.Close()
  140. if r.StatusCode == 200 {
  141. resp, err := req.Bytes()
  142. if err != nil {
  143. return nil, err
  144. }
  145. res := &ItemValueResps{}
  146. err = json.Unmarshal(resp, res)
  147. if err != nil {
  148. return nil, err
  149. }
  150. if len(res.Data) == 0 {
  151. return nil, errors.New("not found envitem's value")
  152. }
  153. return res.Data, nil
  154. }
  155. return nil, errors.New(fmt.Sprintf("request statusCode: %d", r.StatusCode))
  156. }
  157. func (e *EnvItem) getCurrentData() (*ItemValueResp, error) {
  158. req := httplib.Post(snapUrl)
  159. data := make([]*ItemValueReq, 1)
  160. data[0] = &ItemValueReq{
  161. DeviceItems: e.Item,
  162. ProjectId: e.ProjectId,
  163. }
  164. jsonBytes, err := json.Marshal(data)
  165. if err != nil {
  166. return nil, err
  167. }
  168. req.Body(jsonBytes)
  169. req.SetTimeout(time.Millisecond * 2000)
  170. r, err := req.Response()
  171. if err != nil {
  172. return nil, err
  173. }
  174. defer r.Body.Close()
  175. if r.StatusCode == 200 {
  176. resp, err := req.Bytes()
  177. if err != nil {
  178. return nil, err
  179. }
  180. res := &ItemValueResps{}
  181. err = json.Unmarshal(resp, res)
  182. if err != nil {
  183. return nil, err
  184. }
  185. if len(res.Data) == 0 {
  186. return nil, errors.New("not found envitem's value")
  187. }
  188. if adjustValue && cache != nil && len(res.Data) > 0 {
  189. if rv, err := strconv.ParseInt(res.Data[0].Val, 10, 64); err == nil {
  190. adjust, _ := e.GetAdjustInt64Val()
  191. res.Data[0].Val = fmt.Sprintf("%d", rv+adjust)
  192. } else if rv, err := strconv.ParseFloat(res.Data[0].Val, 64); err == nil {
  193. adjust, _ := e.GetAdjustFloat64Val()
  194. res.Data[0].Val = fmt.Sprintf("%f", rv+adjust)
  195. }
  196. }
  197. return res.Data[0], nil
  198. }
  199. return nil, errors.New(fmt.Sprintf("request statusCode: %d", r.StatusCode))
  200. }
  201. func (e *EnvItem) getCurrentValue() (string, string, error) {
  202. resp, err := e.getCurrentData()
  203. if err != nil {
  204. return "", "", err
  205. }
  206. e.Value = resp.Val
  207. e.Htime = resp.HTime
  208. _ = e.setPrevValue()
  209. return resp.Val, resp.HTime, nil
  210. }
  211. func (e *EnvItem) getPrevValue() (string, string, error) {
  212. if cache == nil {
  213. return "", "", errors.New("not cache")
  214. }
  215. key := fmt.Sprintf(CACHE_PREV_VALUE_KEY, e.ProjectId, e.Item)
  216. if cmd := cache.Get(context.Background(), key); cmd != nil {
  217. n := strings.Split(cmd.Val(), "|")
  218. if len(n) != 2 {
  219. return "", "", errors.New("getPrevValue cache length error")
  220. }
  221. return n[0], n[1], nil
  222. } else {
  223. return "", "", cmd.Err()
  224. }
  225. }
  226. func (e *EnvItem) setPrevValue() error {
  227. if cache == nil {
  228. return errors.New("not cache")
  229. }
  230. key := fmt.Sprintf(CACHE_PREV_VALUE_KEY, e.ProjectId, e.Item)
  231. value := fmt.Sprintf("%s|%s", e.Value, e.Htime)
  232. if statusCmd := cache.Set(context.Background(), key, value, CACHE_PREV_VALUE_KEY_EXPIRE); statusCmd != nil {
  233. return nil
  234. } else {
  235. return errors.New("setPrevValue error")
  236. }
  237. }
  238. func (e *EnvItem) GetItemFloat64Val() float64 {
  239. if e.Value == nil {
  240. e.getCurrentValue()
  241. }
  242. switch e.Value.(type) {
  243. case string:
  244. if v, e := strconv.ParseFloat(e.Value.(string), 64); e == nil {
  245. return v
  246. }
  247. }
  248. return 0
  249. }
  250. func (e *EnvItem) GetItemInt64Val() int64 {
  251. if e.Value == nil {
  252. e.getCurrentValue()
  253. }
  254. switch e.Value.(type) {
  255. case string:
  256. if v, e := strconv.ParseInt(e.Value.(string), 10, 64); e == nil {
  257. return v
  258. }
  259. }
  260. return 0
  261. }
  262. func (e *EnvItem) GetItemStringVal() string {
  263. if e.Value == nil {
  264. e.getCurrentValue()
  265. }
  266. if v, ok := e.Value.(string); ok {
  267. return v
  268. }
  269. return ""
  270. }
  271. func (e *EnvItem) GetItemHtime() *time.Time {
  272. if e.Value == nil {
  273. e.getCurrentValue()
  274. }
  275. if ht, err := time.ParseInLocation("2006-01-02 15:04:05", e.Htime, time.Local); err == nil {
  276. return &ht
  277. }
  278. return nil
  279. }
  280. func (e *EnvItem) GetItemPrevFloat64Val() float64 {
  281. ov, _, err := e.getPrevValue()
  282. if err != nil {
  283. return 0
  284. }
  285. if v, err := strconv.ParseFloat(ov, 64); err == nil {
  286. return v
  287. }
  288. return 0
  289. }
  290. func (e *EnvItem) GetItemPrevInt64Val() int64 {
  291. ov, _, err := e.getPrevValue()
  292. if err != nil {
  293. return 0
  294. }
  295. if v, err := strconv.ParseInt(ov, 10, 64); err == nil {
  296. return v
  297. }
  298. return 0
  299. }
  300. func (e *EnvItem) GetItemPrevStringVal() string {
  301. ov, _, _ := e.getPrevValue()
  302. return ov
  303. }
  304. func (e *EnvItem) GetItemPrevHtime() *time.Time {
  305. _, ht, _ := e.getPrevValue()
  306. if t, err := time.ParseInLocation("2006-01-02 15:04:05", ht, time.Local); err == nil {
  307. return &t
  308. }
  309. return nil
  310. }
  311. func (e *EnvItem) GetAdjustInt64Val() (int64, error) {
  312. return getAdjustInt64Val(e.ProjectId, e.Item)
  313. }
  314. func (e *EnvItem) GetAdjustFloat64Val() (float64, error) {
  315. return getAdjustFloat64Val(e.ProjectId, e.Item)
  316. }
  317. func (e *EnvItem) GetAdjustStringVal() (string, error) {
  318. return getAdjustStringVal(e.ProjectId, e.Item)
  319. }
  320. func (e *EnvItem) SetAdjust(value string, expire time.Duration) error {
  321. if adjustValue == false {
  322. return nil
  323. }
  324. if cache == nil {
  325. return errors.New("not cache")
  326. }
  327. key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, e.ProjectId, e.Item)
  328. scmd := cache.Set(context.Background(), key, value, expire)
  329. return scmd.Err()
  330. }
  331. func (e *EnvItem) IncreAdjust(expire time.Duration) (int64, error) {
  332. if adjustValue == false {
  333. return 0, nil
  334. }
  335. if cache == nil {
  336. return 0, errors.New("not cache")
  337. }
  338. key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, e.ProjectId, e.Item)
  339. intCmd := cache.Incr(context.Background(), key)
  340. if intCmd.Err() != nil {
  341. return 0, intCmd.Err()
  342. }
  343. cache.Expire(context.Background(), key, expire)
  344. return intCmd.Val(), nil
  345. }
  346. // WaitValue 等待点位的值, 变为指定值
  347. // v: 等待的值
  348. // op: -1: 小于, 0: 等于, 1: 大于
  349. // interval: 轮询间隔
  350. // retry: 最多尝试次数 0: 表示不限制
  351. func (e *EnvItem) WaitValue(v string, op int, interval time.Duration, retry int) <-chan bool {
  352. c := make(chan bool)
  353. go func() {
  354. t := 0
  355. for {
  356. val, _, err := e.getCurrentValue()
  357. if err == nil && strings.Compare(v, val) == op {
  358. c <- true
  359. }
  360. if retry > 0 {
  361. t += 1
  362. if t >= retry {
  363. c <- false
  364. }
  365. }
  366. time.Sleep(interval)
  367. }
  368. }()
  369. return c
  370. }
  371. func (e *EnvItem) ClearAdjust() {
  372. if adjustValue == false {
  373. return
  374. }
  375. if cache == nil {
  376. return
  377. }
  378. key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, e.ProjectId, e.Item)
  379. cache.Del(context.Background(), key)
  380. }
  381. func (e *EnvItem) clearValue() {
  382. e.Value = nil
  383. e.Htime = ""
  384. }
  385. // Scan 实现方法
  386. func (d *MultiEnvItem) Scan(input interface{}) error {
  387. _ = json.Unmarshal(input.([]byte), &d)
  388. return nil
  389. }
  390. func (d MultiEnvItem) Value() (driver.Value, error) {
  391. return json.Marshal(d)
  392. }
  393. func getAdjustInt64Val(projectId int64, item string) (int64, error) {
  394. if adjustValue == false {
  395. return 0, nil
  396. }
  397. if cache == nil {
  398. return 0, errors.New("not cache")
  399. }
  400. key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, projectId, item)
  401. scmd := cache.Get(context.Background(), key)
  402. if scmd != nil && scmd.Err() != nil {
  403. return 0, scmd.Err()
  404. }
  405. return scmd.Int64()
  406. }
  407. func getAdjustFloat64Val(projectId int64, item string) (float64, error) {
  408. if adjustValue == false {
  409. return 0, nil
  410. }
  411. if cache == nil {
  412. return 0, errors.New("not cache")
  413. }
  414. key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, projectId, item)
  415. scmd := cache.Get(context.Background(), key)
  416. if scmd != nil && scmd.Err() != nil {
  417. return 0, scmd.Err()
  418. }
  419. return scmd.Float64()
  420. }
  421. func getAdjustStringVal(projectId int64, item string) (string, error) {
  422. if adjustValue == false {
  423. return "", nil
  424. }
  425. if cache == nil {
  426. return "", errors.New("not cache")
  427. }
  428. key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, projectId, item)
  429. scmd := cache.Get(context.Background(), key)
  430. if scmd != nil && scmd.Err() != nil {
  431. return "", scmd.Err()
  432. }
  433. return scmd.String(), nil
  434. }
  435. func (v *VirtualPlcItem) GetVirtualPlcItems() (*VirtualPlcItemResp, error) {
  436. tmpPlcItemUrl := fmt.Sprintf("%s%d", plcItemUrl, v.ProjectId)
  437. req := httplib.Get(tmpPlcItemUrl)
  438. req.Param("project_id", fmt.Sprintf("%d", v.ProjectId))
  439. req.Param("pageSize", fmt.Sprintf("%d", v.PageSize))
  440. r, err := req.Response()
  441. if err != nil {
  442. return nil, err
  443. }
  444. defer r.Body.Close()
  445. if r.StatusCode == 200 {
  446. resp, err := req.Bytes()
  447. if err != nil {
  448. return nil, err
  449. }
  450. res := &VirtualPlcItemResp{}
  451. err = json.Unmarshal(resp, res)
  452. if err != nil {
  453. return nil, err
  454. }
  455. return res, nil
  456. }
  457. return nil, errors.New(fmt.Sprintf("request statusCode: %d", r.StatusCode))
  458. }