func.go 15 KB


  1. package envitem
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "crypto/md5"
  7. "database/sql/driver"
  8. "encoding/hex"
  9. "encoding/json"
  10. "errors"
  11. "fmt"
  12. "io"
  13. "metawant.greentech.com.cn/gaoyagang/gt-common/httplib"
  14. "net"
  15. "net/http"
  16. "net/url"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "time"
  21. )
  22. func SetOptions(options Options) {
  23. snapUrl = fmt.Sprintf("http://%s/api/v1/plc-current", options.GtServerIp)
  24. ctlUrl = fmt.Sprintf("http://%s/api/v1/plc/set-var-values", options.GtServerIp)
  25. plcItemUrl = fmt.Sprintf("http://%s/api/v1/plc-list/", options.GtServerIp)
  26. if options.Cache != nil {
  27. cache = options.Cache
  28. }
  29. fetchMultiItem = options.FetchMultiItem
  30. adjustValue = options.AdjustValue
  31. plcItemSecret = options.PlcItemSecret
  32. }
  33. func (m MultiEnvItem) GetProjectId() int64 {
  34. for _, item := range m {
  35. return item.ProjectId
  36. }
  37. return 0
  38. }
  39. func (m MultiEnvItem) getItemNames() []string {
  40. items := make([]string, 0)
  41. for _, item := range m {
  42. items = append(items, item.Item)
  43. }
  44. return items
  45. }
  46. func (m MultiEnvItem) FillCurrentValue() error {
  47. datas := make([]*ItemValueResp, 0)
  48. var err error
  49. if fetchMultiItem {
  50. datas, err = m.getCurrentData()
  51. } else {
  52. datas, err = m.getCurrentDataByOne()
  53. }
  54. if err != nil {
  55. return err
  56. }
  57. // 先做一个item -> key的映射
  58. ikm := make(map[string]string)
  59. for s, item := range m {
  60. ikm[item.Item] = s
  61. }
  62. for _, data := range datas {
  63. if data != nil {
  64. if k, ok := ikm[data.ItemName]; ok {
  65. m[k].Value = data.Val
  66. m[k].Htime = data.HTime
  67. _ = m[k].setPrevValue()
  68. }
  69. }
  70. }
  71. return nil
  72. }
  73. func (m MultiEnvItem) GetItemFloat64Value(key string) float64 {
  74. if envItem, ok := m[key]; ok {
  75. return envItem.GetItemFloat64Val()
  76. }
  77. return 0
  78. }
  79. func (m MultiEnvItem) GetItemInt64Value(key string) int64 {
  80. if envItem, ok := m[key]; ok {
  81. return envItem.GetItemInt64Val()
  82. }
  83. return 0
  84. }
  85. func (m MultiEnvItem) GetItemStringValue(key string) string {
  86. if envItem, ok := m[key]; ok {
  87. return envItem.GetItemStringVal()
  88. }
  89. return ""
  90. }
  91. func (m MultiEnvItem) GetItemHtime(key string) *time.Time {
  92. if envItem, ok := m[key]; ok {
  93. return envItem.GetItemHtime()
  94. }
  95. return nil
  96. }
  97. func (m MultiEnvItem) FindString() map[string]string {
  98. a := make(map[string]string)
  99. for _, item := range m {
  100. a[item.Item] = item.GetItemStringVal()
  101. }
  102. return a
  103. }
  104. func (m MultiEnvItem) FindPrevString() map[string]string {
  105. a := make(map[string]string)
  106. for _, item := range m {
  107. a[item.Item] = item.GetItemPrevStringVal()
  108. }
  109. return a
  110. }
  111. func (m MultiEnvItem) ClearValues() {
  112. for _, item := range m {
  113. item.clearValue()
  114. }
  115. }
  116. func (m MultiEnvItem) getCurrentDataByOne() ([]*ItemValueResp, error) {
  117. datas := make([]*ItemValueResp, len(m))
  118. i := 0
  119. var wg sync.WaitGroup
  120. wg.Add(len(m))
  121. for _, item := range m {
  122. go func(index int, one *EnvItem) {
  123. defer wg.Done()
  124. datas[index], _ = one.getCurrentData()
  125. }(i, item)
  126. i++
  127. }
  128. wg.Wait()
  129. return datas, nil
  130. }
  131. func (m MultiEnvItem) getCurrentData() ([]*ItemValueResp, error) {
  132. req := httplib.Post(snapUrl)
  133. data := make([]*ItemValueReq, 1)
  134. data[0] = &ItemValueReq{
  135. DeviceItems: strings.Join(m.getItemNames(), ","),
  136. ProjectId: m.GetProjectId(),
  137. }
  138. jsonBytes, err := json.Marshal(data)
  139. if err != nil {
  140. return nil, err
  141. }
  142. req.Body(jsonBytes)
  143. req.SetTimeout(time.Millisecond * 2000)
  144. r, err := req.Response()
  145. if err != nil {
  146. return nil, err
  147. }
  148. defer r.Body.Close()
  149. if r.StatusCode == 200 {
  150. resp, err := req.Bytes()
  151. if err != nil {
  152. return nil, err
  153. }
  154. res := &ItemValueResps{}
  155. err = json.Unmarshal(resp, res)
  156. if err != nil {
  157. return nil, err
  158. }
  159. if len(res.Data) == 0 {
  160. return nil, errors.New("not found envitem's value")
  161. }
  162. return res.Data, nil
  163. }
  164. return nil, errors.New(fmt.Sprintf("request statusCode: %d", r.StatusCode))
  165. }
  166. func (e *EnvItem) getCurrentData() (*ItemValueResp, error) {
  167. req := httplib.Post(snapUrl)
  168. data := make([]*ItemValueReq, 1)
  169. data[0] = &ItemValueReq{
  170. DeviceItems: e.Item,
  171. ProjectId: e.ProjectId,
  172. }
  173. jsonBytes, err := json.Marshal(data)
  174. if err != nil {
  175. return nil, err
  176. }
  177. req.Body(jsonBytes)
  178. req.SetTimeout(time.Millisecond * 2000)
  179. r, err := req.Response()
  180. if err != nil {
  181. return nil, err
  182. }
  183. defer r.Body.Close()
  184. if r.StatusCode == 200 {
  185. resp, err := req.Bytes()
  186. if err != nil {
  187. return nil, err
  188. }
  189. res := &ItemValueResps{}
  190. err = json.Unmarshal(resp, res)
  191. if err != nil {
  192. return nil, err
  193. }
  194. if len(res.Data) == 0 {
  195. return nil, errors.New("not found envitem's value")
  196. }
  197. if adjustValue && cache != nil && len(res.Data) > 0 {
  198. if rv, err := strconv.ParseInt(res.Data[0].Val, 10, 64); err == nil {
  199. adjust, _ := e.GetAdjustInt64Val()
  200. res.Data[0].Val = fmt.Sprintf("%d", rv+adjust)
  201. } else if rv, err := strconv.ParseFloat(res.Data[0].Val, 64); err == nil {
  202. adjust, _ := e.GetAdjustFloat64Val()
  203. res.Data[0].Val = fmt.Sprintf("%f", rv+adjust)
  204. }
  205. }
  206. return res.Data[0], nil
  207. }
  208. return nil, errors.New(fmt.Sprintf("request statusCode: %d", r.StatusCode))
  209. }
  210. func (e *EnvItem) getCurrentValue() (string, string, error) {
  211. resp, err := e.getCurrentData()
  212. if err != nil {
  213. return "", "", err
  214. }
  215. e.Value = resp.Val
  216. e.Htime = resp.HTime
  217. _ = e.setPrevValue()
  218. return resp.Val, resp.HTime, nil
  219. }
  220. func (e *EnvItem) getPrevValue() (string, string, error) {
  221. if cache == nil {
  222. return "", "", errors.New("not cache")
  223. }
  224. key := fmt.Sprintf(CACHE_PREV_VALUE_KEY, e.ProjectId, e.Item)
  225. if cmd := cache.Get(context.Background(), key); cmd != nil {
  226. n := strings.Split(cmd.Val(), "|")
  227. if len(n) != 2 {
  228. return "", "", errors.New("getPrevValue cache length error")
  229. }
  230. return n[0], n[1], nil
  231. } else {
  232. return "", "", cmd.Err()
  233. }
  234. }
  235. func (e *EnvItem) setPrevValue() error {
  236. if cache == nil {
  237. return errors.New("not cache")
  238. }
  239. key := fmt.Sprintf(CACHE_PREV_VALUE_KEY, e.ProjectId, e.Item)
  240. value := fmt.Sprintf("%s|%s", e.Value, e.Htime)
  241. if statusCmd := cache.Set(context.Background(), key, value, CACHE_PREV_VALUE_KEY_EXPIRE); statusCmd != nil {
  242. return nil
  243. } else {
  244. return errors.New("setPrevValue error")
  245. }
  246. }
  247. func (e *EnvItem) GetItemFloat64Val() float64 {
  248. if e.Value == nil {
  249. e.getCurrentValue()
  250. }
  251. switch e.Value.(type) {
  252. case string:
  253. if v, e := strconv.ParseFloat(e.Value.(string), 64); e == nil {
  254. return v
  255. }
  256. }
  257. return 0
  258. }
  259. func (e *EnvItem) GetItemInt64Val() int64 {
  260. if e.Value == nil {
  261. e.getCurrentValue()
  262. }
  263. switch e.Value.(type) {
  264. case string:
  265. if v, e := strconv.ParseInt(e.Value.(string), 10, 64); e == nil {
  266. return v
  267. }
  268. }
  269. return 0
  270. }
  271. func (e *EnvItem) GetItemStringVal() string {
  272. if e.Value == nil {
  273. e.getCurrentValue()
  274. }
  275. if v, ok := e.Value.(string); ok {
  276. return v
  277. }
  278. return ""
  279. }
  280. func (e *EnvItem) GetItemHtime() *time.Time {
  281. if e.Value == nil {
  282. e.getCurrentValue()
  283. }
  284. if ht, err := time.ParseInLocation("2006-01-02 15:04:05", e.Htime, time.Local); err == nil {
  285. return &ht
  286. }
  287. return nil
  288. }
  289. func (e *EnvItem) GetItemPrevFloat64Val() float64 {
  290. ov, _, err := e.getPrevValue()
  291. if err != nil {
  292. return 0
  293. }
  294. if v, err := strconv.ParseFloat(ov, 64); err == nil {
  295. return v
  296. }
  297. return 0
  298. }
  299. func (e *EnvItem) GetItemPrevInt64Val() int64 {
  300. ov, _, err := e.getPrevValue()
  301. if err != nil {
  302. return 0
  303. }
  304. if v, err := strconv.ParseInt(ov, 10, 64); err == nil {
  305. return v
  306. }
  307. return 0
  308. }
  309. func (e *EnvItem) GetItemPrevStringVal() string {
  310. ov, _, _ := e.getPrevValue()
  311. return ov
  312. }
  313. func (e *EnvItem) GetItemPrevHtime() *time.Time {
  314. _, ht, _ := e.getPrevValue()
  315. if t, err := time.ParseInLocation("2006-01-02 15:04:05", ht, time.Local); err == nil {
  316. return &t
  317. }
  318. return nil
  319. }
  320. func (e *EnvItem) GetAdjustInt64Val() (int64, error) {
  321. return getAdjustInt64Val(e.ProjectId, e.Item)
  322. }
  323. func (e *EnvItem) GetAdjustFloat64Val() (float64, error) {
  324. return getAdjustFloat64Val(e.ProjectId, e.Item)
  325. }
  326. func (e *EnvItem) GetAdjustStringVal() (string, error) {
  327. return getAdjustStringVal(e.ProjectId, e.Item)
  328. }
  329. func (e *EnvItem) SetAdjust(value string, expire time.Duration) error {
  330. if adjustValue == false {
  331. return nil
  332. }
  333. if cache == nil {
  334. return errors.New("not cache")
  335. }
  336. key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, e.ProjectId, e.Item)
  337. scmd := cache.Set(context.Background(), key, value, expire)
  338. return scmd.Err()
  339. }
  340. func (e *EnvItem) IncreAdjust(expire time.Duration) (int64, error) {
  341. if adjustValue == false {
  342. return 0, nil
  343. }
  344. if cache == nil {
  345. return 0, errors.New("not cache")
  346. }
  347. key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, e.ProjectId, e.Item)
  348. intCmd := cache.Incr(context.Background(), key)
  349. if intCmd.Err() != nil {
  350. return 0, intCmd.Err()
  351. }
  352. cache.Expire(context.Background(), key, expire)
  353. return intCmd.Val(), nil
  354. }
  355. func (e *EnvItem) SetValue(o, v string) error {
  356. ts := time.Now().Unix()
  357. data := make(SetCurrentsReq, 1)
  358. data[0] = SetCurrentReq{
  359. ProjectId: e.ProjectId,
  360. Item: e.Item,
  361. OldValue: o,
  362. NewValue: v,
  363. }
  364. sign := data.Sign(plcItemSecret, ts)
  365. jsonByte, _ := json.Marshal(data)
  366. requestBuf := bytes.NewBuffer(jsonByte)
  367. headers := http.Header{}
  368. headers.Add("Content-Type", "application/json")
  369. resp, err := _sendRequest(ctlUrl, "POST", fmt.Sprintf("sign=%s&timestamp=%d", sign, ts), headers, requestBuf)
  370. if err != nil {
  371. return fmt.Errorf("设置点位数据失败参数,%s,%s", string(jsonByte), err.Error())
  372. }
  373. respData := &SetCurrentResp{}
  374. err = json.Unmarshal(resp.Bytes(), respData)
  375. if err != nil {
  376. return fmt.Errorf("设置点位数据失败参数,%s,%s,%s", string(jsonByte), err.Error(), resp)
  377. }
  378. if respData.Code != http.StatusOK {
  379. return fmt.Errorf("设置点位数据失败参数,%s,%s", string(jsonByte), resp)
  380. }
  381. return nil
  382. }
  383. // WaitValue 等待点位的值, 变为指定值
  384. // v: 等待的值
  385. // op: -1: 小于, 0: 等于, 1: 大于
  386. // interval: 轮询间隔
  387. // retry: 最多尝试次数 0: 表示不限制
  388. func (e *EnvItem) WaitValue(v string, op int, interval time.Duration, retry int) <-chan bool {
  389. c := make(chan bool)
  390. go func() {
  391. t := 0
  392. for {
  393. val, _, err := e.getCurrentValue()
  394. if err == nil && strings.Compare(v, val) == op {
  395. c <- true
  396. }
  397. if retry > 0 {
  398. t += 1
  399. if t >= retry {
  400. c <- false
  401. }
  402. }
  403. time.Sleep(interval)
  404. }
  405. }()
  406. return c
  407. }
  408. func (e *EnvItem) ClearAdjust() {
  409. if adjustValue == false {
  410. return
  411. }
  412. if cache == nil {
  413. return
  414. }
  415. key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, e.ProjectId, e.Item)
  416. cache.Del(context.Background(), key)
  417. }
  418. func (e *EnvItem) clearValue() {
  419. e.Value = nil
  420. e.Htime = ""
  421. }
  422. // Scan 实现方法
  423. func (d *MultiEnvItem) Scan(input interface{}) error {
  424. _ = json.Unmarshal(input.([]byte), &d)
  425. return nil
  426. }
  427. func (d MultiEnvItem) Value() (driver.Value, error) {
  428. return json.Marshal(d)
  429. }
  430. func getAdjustInt64Val(projectId int64, item string) (int64, error) {
  431. if adjustValue == false {
  432. return 0, nil
  433. }
  434. if cache == nil {
  435. return 0, errors.New("not cache")
  436. }
  437. key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, projectId, item)
  438. scmd := cache.Get(context.Background(), key)
  439. if scmd != nil && scmd.Err() != nil {
  440. return 0, scmd.Err()
  441. }
  442. return scmd.Int64()
  443. }
  444. func getAdjustFloat64Val(projectId int64, item string) (float64, error) {
  445. if adjustValue == false {
  446. return 0, nil
  447. }
  448. if cache == nil {
  449. return 0, errors.New("not cache")
  450. }
  451. key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, projectId, item)
  452. scmd := cache.Get(context.Background(), key)
  453. if scmd != nil && scmd.Err() != nil {
  454. return 0, scmd.Err()
  455. }
  456. return scmd.Float64()
  457. }
  458. func getAdjustStringVal(projectId int64, item string) (string, error) {
  459. if adjustValue == false {
  460. return "", nil
  461. }
  462. if cache == nil {
  463. return "", errors.New("not cache")
  464. }
  465. key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, projectId, item)
  466. scmd := cache.Get(context.Background(), key)
  467. if scmd != nil && scmd.Err() != nil {
  468. return "", scmd.Err()
  469. }
  470. return scmd.String(), nil
  471. }
  472. func (s SetCurrentsReq) Sign(secret string, ts int64) string {
  473. jsonByte, _ := json.Marshal(s)
  474. requestBuf := bytes.NewBuffer(jsonByte)
  475. hasher := md5.New()
  476. hasher.Write([]byte(fmt.Sprintf("%s%s%d", requestBuf.Bytes(), secret, ts)))
  477. return strings.ToUpper(hex.EncodeToString(hasher.Sum(nil)))
  478. }
  479. func (v *VirtualPlcItem) GetVirtualPlcItems() (*VirtualPlcItemResp, error) {
  480. tmpPlcItemUrl := fmt.Sprintf("%s%d", plcItemUrl, v.ProjectId)
  481. req := httplib.Get(tmpPlcItemUrl)
  482. req.Param("project_id", fmt.Sprintf("%d", v.ProjectId))
  483. req.Param("pageSize", fmt.Sprintf("%d", v.PageSize))
  484. r, err := req.Response()
  485. if err != nil {
  486. return nil, err
  487. }
  488. defer r.Body.Close()
  489. if r.StatusCode == 200 {
  490. resp, err := req.Bytes()
  491. if err != nil {
  492. return nil, err
  493. }
  494. res := &VirtualPlcItemResp{}
  495. err = json.Unmarshal(resp, res)
  496. if err != nil {
  497. return nil, err
  498. }
  499. return res, nil
  500. }
  501. return nil, errors.New(fmt.Sprintf("request statusCode: %d", r.StatusCode))
  502. }
  503. func _sendRequest(urlStr, method, queryParam string, headers http.Header, postData io.Reader) (buffer *bytes.Buffer, err error) {
  504. client := &http.Client{
  505. Transport: &http.Transport{
  506. Proxy: http.ProxyFromEnvironment,
  507. DialContext: (&net.Dialer{
  508. Timeout: 3 * time.Second,
  509. KeepAlive: 10 * time.Second,
  510. }).DialContext,
  511. MaxIdleConns: 10,
  512. MaxIdleConnsPerHost: 10,
  513. IdleConnTimeout: 10 * time.Second,
  514. },
  515. Timeout: 10 * time.Second,
  516. }
  517. URL := urlStr
  518. Method := method
  519. if len(Method) == 0 {
  520. Method = "GET"
  521. }
  522. if len(queryParam) != 0 {
  523. //reqQueryParam = queryParam
  524. queryParam = strings.Replace(queryParam, " ", "%20", -1) //转义请求参数中所有的空格
  525. queryParam = strings.Replace(queryParam, "/", "%2F", -1) //转义请求参数中所有的斜线
  526. queryParam = strings.Replace(queryParam, "(", "%28", -1) //转义请求参数中所有的左括号
  527. queryParam = strings.Replace(queryParam, ")", "%29", -1) //转义请求参数中所有的右括号
  528. queryParam = strings.Replace(queryParam, ",", "%2C", -1) //转义请求参数中所有的逗号
  529. queryParam = strings.Replace(queryParam, ";", "%3B", -1) //转义请求参数中所有的分号
  530. l, err := url.Parse("?" + queryParam)
  531. if err != nil {
  532. //log.Errorf(err, "parse http url error")
  533. return nil, err
  534. }
  535. param := l.Query().Encode()
  536. URL = fmt.Sprintf("%s?%s", URL, param)
  537. }
  538. reqBuffer := &bytes.Buffer{}
  539. if postData != nil {
  540. reqBuffer.ReadFrom(postData)
  541. }
  542. req, err := http.NewRequest(Method, URL, reqBuffer)
  543. if len(headers) > 0 {
  544. for s, header := range headers {
  545. if len(header) > 0 {
  546. req.Header.Add(s, header[0])
  547. }
  548. }
  549. }
  550. if nil != err {
  551. return nil, err
  552. }
  553. resp, err := client.Do(req)
  554. if nil != err {
  555. //log.Errorf(err, "request failed:%v")
  556. return nil, err
  557. }
  558. var data io.ReadCloser
  559. switch resp.Header.Get("Content-Encoding") {
  560. case "gzip":
  561. data, err = gzip.NewReader(resp.Body)
  562. if nil != err {
  563. return nil, err
  564. }
  565. default:
  566. data = resp.Body
  567. }
  568. defer func() {
  569. if resp != nil && resp.Body != nil {
  570. resp.Body.Close()
  571. }
  572. }()
  573. buffer = &bytes.Buffer{}
  574. _, err = buffer.ReadFrom(data)
  575. if nil != err {
  576. return nil, err
  577. }
  578. return buffer, nil
  579. }