123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672 |
- package envitem
- import (
- "bytes"
- "compress/gzip"
- "context"
- "crypto/md5"
- "database/sql/driver"
- "encoding/hex"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "metawant.greentech.com.cn/gaoyagang/gt-common/httplib"
- "net"
- "net/http"
- "net/url"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- func SetOptions(options Options) {
- snapUrl = fmt.Sprintf("http://%s/api/v1/plc-current", options.GtServerIp)
- ctlUrl = fmt.Sprintf("http://%s/api/v1/plc/set-var-values", options.GtServerIp)
- plcItemUrl = fmt.Sprintf("http://%s/api/v1/plc-list/", options.GtServerIp)
- if options.Cache != nil {
- cache = options.Cache
- }
- fetchMultiItem = options.FetchMultiItem
- adjustValue = options.AdjustValue
- plcItemSecret = options.PlcItemSecret
- }
- func (m MultiEnvItem) GetProjectId() int64 {
- for _, item := range m {
- return item.ProjectId
- }
- return 0
- }
- func (m MultiEnvItem) getItemNames() []string {
- items := make([]string, 0)
- for _, item := range m {
- items = append(items, item.Item)
- }
- return items
- }
- func (m MultiEnvItem) FillCurrentValue() error {
- datas := make([]*ItemValueResp, 0)
- var err error
- if fetchMultiItem {
- datas, err = m.getCurrentData()
- } else {
- datas, err = m.getCurrentDataByOne()
- }
- if err != nil {
- return err
- }
- // 先做一个item -> key的映射
- ikm := make(map[string]string)
- for s, item := range m {
- ikm[item.Item] = s
- }
- for _, data := range datas {
- if data != nil {
- if k, ok := ikm[data.ItemName]; ok {
- m[k].Value = data.Val
- m[k].Htime = data.HTime
- _ = m[k].setPrevValue()
- }
- }
- }
- return nil
- }
- func (m MultiEnvItem) GetItemFloat64Value(key string) float64 {
- if envItem, ok := m[key]; ok {
- return envItem.GetItemFloat64Val()
- }
- return 0
- }
- func (m MultiEnvItem) GetItemInt64Value(key string) int64 {
- if envItem, ok := m[key]; ok {
- return envItem.GetItemInt64Val()
- }
- return 0
- }
- func (m MultiEnvItem) GetItemStringValue(key string) string {
- if envItem, ok := m[key]; ok {
- return envItem.GetItemStringVal()
- }
- return ""
- }
- func (m MultiEnvItem) GetItemHtime(key string) *time.Time {
- if envItem, ok := m[key]; ok {
- return envItem.GetItemHtime()
- }
- return nil
- }
- func (m MultiEnvItem) FindString() map[string]string {
- a := make(map[string]string)
- for _, item := range m {
- a[item.Item] = item.GetItemStringVal()
- }
- return a
- }
- func (m MultiEnvItem) FindPrevString() map[string]string {
- a := make(map[string]string)
- for _, item := range m {
- a[item.Item] = item.GetItemPrevStringVal()
- }
- return a
- }
- func (m MultiEnvItem) ClearValues() {
- for _, item := range m {
- item.clearValue()
- }
- }
- func (m MultiEnvItem) getCurrentDataByOne() ([]*ItemValueResp, error) {
- datas := make([]*ItemValueResp, len(m))
- i := 0
- var wg sync.WaitGroup
- wg.Add(len(m))
- for _, item := range m {
- go func(index int, one *EnvItem) {
- defer wg.Done()
- datas[index], _ = one.getCurrentData()
- }(i, item)
- i++
- }
- wg.Wait()
- return datas, nil
- }
- func (m MultiEnvItem) getCurrentData() ([]*ItemValueResp, error) {
- req := httplib.Post(snapUrl)
- data := make([]*ItemValueReq, 1)
- data[0] = &ItemValueReq{
- DeviceItems: strings.Join(m.getItemNames(), ","),
- ProjectId: m.GetProjectId(),
- }
- jsonBytes, err := json.Marshal(data)
- if err != nil {
- return nil, err
- }
- req.Body(jsonBytes)
- req.SetTimeout(time.Millisecond * 2000)
- r, err := req.Response()
- if err != nil {
- return nil, err
- }
- defer r.Body.Close()
- if r.StatusCode == 200 {
- resp, err := req.Bytes()
- if err != nil {
- return nil, err
- }
- res := &ItemValueResps{}
- err = json.Unmarshal(resp, res)
- if err != nil {
- return nil, err
- }
- if len(res.Data) == 0 {
- return nil, errors.New("not found envitem's value")
- }
- return res.Data, nil
- }
- return nil, errors.New(fmt.Sprintf("request statusCode: %d", r.StatusCode))
- }
- func (e *EnvItem) getCurrentData() (*ItemValueResp, error) {
- req := httplib.Post(snapUrl)
- data := make([]*ItemValueReq, 1)
- data[0] = &ItemValueReq{
- DeviceItems: e.Item,
- ProjectId: e.ProjectId,
- }
- jsonBytes, err := json.Marshal(data)
- if err != nil {
- return nil, err
- }
- req.Body(jsonBytes)
- req.SetTimeout(time.Millisecond * 2000)
- r, err := req.Response()
- if err != nil {
- return nil, err
- }
- defer r.Body.Close()
- if r.StatusCode == 200 {
- resp, err := req.Bytes()
- if err != nil {
- return nil, err
- }
- res := &ItemValueResps{}
- err = json.Unmarshal(resp, res)
- if err != nil {
- return nil, err
- }
- if len(res.Data) == 0 {
- return nil, errors.New("not found envitem's value")
- }
- if adjustValue && cache != nil && len(res.Data) > 0 {
- if rv, err := strconv.ParseInt(res.Data[0].Val, 10, 64); err == nil {
- adjust, _ := e.GetAdjustInt64Val()
- res.Data[0].Val = fmt.Sprintf("%d", rv+adjust)
- } else if rv, err := strconv.ParseFloat(res.Data[0].Val, 64); err == nil {
- adjust, _ := e.GetAdjustFloat64Val()
- res.Data[0].Val = fmt.Sprintf("%f", rv+adjust)
- }
- }
- return res.Data[0], nil
- }
- return nil, errors.New(fmt.Sprintf("request statusCode: %d", r.StatusCode))
- }
- func (e *EnvItem) getCurrentValue() (string, string, error) {
- resp, err := e.getCurrentData()
- if err != nil {
- return "", "", err
- }
- e.Value = resp.Val
- e.Htime = resp.HTime
- _ = e.setPrevValue()
- return resp.Val, resp.HTime, nil
- }
- func (e *EnvItem) getPrevValue() (string, error) {
- if v, ok := itemPrevValues.Load(e.Item); ok {
- return v.(string), nil
- }
- return "", errors.New("not found prev envitem's value")
- }
- func (e *EnvItem) setPrevValue() error {
- itemPrevValues.Store(e.Item, e.Value)
- return nil
- }
- func (e *EnvItem) GetItemFloat64Val() float64 {
- if e.Value == nil {
- e.getCurrentValue()
- }
- switch e.Value.(type) {
- case string:
- if v, e := strconv.ParseFloat(e.Value.(string), 64); e == nil {
- return v
- }
- }
- return 0
- }
- func (e *EnvItem) GetItemInt64Val() int64 {
- if e.Value == nil {
- e.getCurrentValue()
- }
- switch e.Value.(type) {
- case string:
- if v, e := strconv.ParseInt(e.Value.(string), 10, 64); e == nil {
- return v
- }
- }
- return 0
- }
- func (e *EnvItem) GetItemStringVal() string {
- if e.Value == nil {
- e.getCurrentValue()
- }
- if v, ok := e.Value.(string); ok {
- return v
- }
- return ""
- }
- func (e *EnvItem) GetItemHtime() *time.Time {
- if e.Value == nil {
- e.getCurrentValue()
- }
- if ht, err := time.ParseInLocation("2006-01-02 15:04:05", e.Htime, time.Local); err == nil {
- return &ht
- }
- return nil
- }
- func (e *EnvItem) GetItemPrevFloat64Val() float64 {
- ov, err := e.getPrevValue()
- if err != nil {
- return 0
- }
- if v, err := strconv.ParseFloat(ov, 64); err == nil {
- return v
- }
- return 0
- }
- func (e *EnvItem) GetItemPrevInt64Val() int64 {
- ov, err := e.getPrevValue()
- if err != nil {
- return 0
- }
- if v, err := strconv.ParseInt(ov, 10, 64); err == nil {
- return v
- }
- return 0
- }
- func (e *EnvItem) GetItemPrevStringVal() string {
- if ov, err := e.getPrevValue(); err == nil {
- return ov
- }
- return ""
- }
- func (e *EnvItem) GetAdjustInt64Val() (int64, error) {
- return getAdjustInt64Val(e.ProjectId, e.Item)
- }
- func (e *EnvItem) GetAdjustFloat64Val() (float64, error) {
- return getAdjustFloat64Val(e.ProjectId, e.Item)
- }
- func (e *EnvItem) GetAdjustStringVal() (string, error) {
- return getAdjustStringVal(e.ProjectId, e.Item)
- }
- func (e *EnvItem) SetAdjust(value string, expire time.Duration) error {
- if adjustValue == false {
- return nil
- }
- if cache == nil {
- return errors.New("not cache")
- }
- key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, e.ProjectId, e.Item)
- scmd := cache.Set(context.Background(), key, value, expire)
- return scmd.Err()
- }
- func (e *EnvItem) IncreAdjust(expire time.Duration) (int64, error) {
- if adjustValue == false {
- return 0, nil
- }
- if cache == nil {
- return 0, errors.New("not cache")
- }
- key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, e.ProjectId, e.Item)
- intCmd := cache.Incr(context.Background(), key)
- if intCmd.Err() != nil {
- return 0, intCmd.Err()
- }
- cache.Expire(context.Background(), key, expire)
- return intCmd.Val(), nil
- }
- func (e *EnvItem) SetValue(o, v string) error {
- ts := time.Now().Unix()
- data := make(SetCurrentsReq, 1)
- data[0] = SetCurrentReq{
- ProjectId: e.ProjectId,
- Item: e.Item,
- OldValue: o,
- NewValue: v,
- }
- sign := data.Sign(plcItemSecret, ts)
- jsonByte, _ := json.Marshal(data)
- requestBuf := bytes.NewBuffer(jsonByte)
- headers := http.Header{}
- headers.Add("Content-Type", "application/json")
- resp, err := _sendRequest(ctlUrl, "POST", fmt.Sprintf("sign=%s×tamp=%d", sign, ts), headers, requestBuf)
- if err != nil {
- return fmt.Errorf("设置点位数据失败参数,%s,%s", string(jsonByte), err.Error())
- }
- respData := &SetCurrentResp{}
- err = json.Unmarshal(resp.Bytes(), respData)
- if err != nil {
- return fmt.Errorf("设置点位数据失败参数,%s,%s,%s", string(jsonByte), err.Error(), resp)
- }
- if respData.Code != http.StatusOK {
- return fmt.Errorf("设置点位数据失败参数,%s,%s", string(jsonByte), resp)
- }
- return nil
- }
- // WaitValue 等待点位的值, 变为指定值
- // v: 等待的值
- // op: -1: 小于, 0: 等于, 1: 大于
- // interval: 轮询间隔
- // retry: 最多尝试次数 0: 表示不限制
- func (e *EnvItem) WaitValue(v string, op int, interval time.Duration, retry int) <-chan bool {
- c := make(chan bool)
- go func() {
- t := 0
- for {
- val, _, err := e.getCurrentValue()
- if err == nil && strings.Compare(v, val) == op {
- c <- true
- }
- if retry > 0 {
- t += 1
- if t >= retry {
- c <- false
- }
- }
- time.Sleep(interval)
- }
- }()
- return c
- }
- func (e *EnvItem) ClearAdjust() {
- if adjustValue == false {
- return
- }
- if cache == nil {
- return
- }
- key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, e.ProjectId, e.Item)
- cache.Del(context.Background(), key)
- }
- func (e *EnvItem) clearValue() {
- e.Value = nil
- e.Htime = ""
- }
- // Scan 实现方法
- func (d *MultiEnvItem) Scan(input interface{}) error {
- _ = json.Unmarshal(input.([]byte), &d)
- return nil
- }
- func (d MultiEnvItem) Value() (driver.Value, error) {
- return json.Marshal(d)
- }
- func getAdjustInt64Val(projectId int64, item string) (int64, error) {
- if adjustValue == false {
- return 0, nil
- }
- if cache == nil {
- return 0, errors.New("not cache")
- }
- key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, projectId, item)
- scmd := cache.Get(context.Background(), key)
- if scmd != nil && scmd.Err() != nil {
- return 0, scmd.Err()
- }
- return scmd.Int64()
- }
- func getAdjustFloat64Val(projectId int64, item string) (float64, error) {
- if adjustValue == false {
- return 0, nil
- }
- if cache == nil {
- return 0, errors.New("not cache")
- }
- key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, projectId, item)
- scmd := cache.Get(context.Background(), key)
- if scmd != nil && scmd.Err() != nil {
- return 0, scmd.Err()
- }
- return scmd.Float64()
- }
- func getAdjustStringVal(projectId int64, item string) (string, error) {
- if adjustValue == false {
- return "", nil
- }
- if cache == nil {
- return "", errors.New("not cache")
- }
- key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, projectId, item)
- scmd := cache.Get(context.Background(), key)
- if scmd != nil && scmd.Err() != nil {
- return "", scmd.Err()
- }
- return scmd.String(), nil
- }
- func (s SetCurrentsReq) Sign(secret string, ts int64) string {
- jsonByte, _ := json.Marshal(s)
- requestBuf := bytes.NewBuffer(jsonByte)
- hasher := md5.New()
- hasher.Write([]byte(fmt.Sprintf("%s%s%d", requestBuf.Bytes(), secret, ts)))
- return strings.ToUpper(hex.EncodeToString(hasher.Sum(nil)))
- }
- func (v *VirtualPlcItem) GetVirtualPlcItems() (*VirtualPlcItemResp, error) {
- tmpPlcItemUrl := fmt.Sprintf("%s%d", plcItemUrl, v.ProjectId)
- req := httplib.Get(tmpPlcItemUrl)
- req.Param("project_id", fmt.Sprintf("%d", v.ProjectId))
- req.Param("pageSize", fmt.Sprintf("%d", v.PageSize))
- r, err := req.Response()
- if err != nil {
- return nil, err
- }
- defer r.Body.Close()
- if r.StatusCode == 200 {
- resp, err := req.Bytes()
- if err != nil {
- return nil, err
- }
- res := &VirtualPlcItemResp{}
- err = json.Unmarshal(resp, res)
- if err != nil {
- return nil, err
- }
- return res, nil
- }
- return nil, errors.New(fmt.Sprintf("request statusCode: %d", r.StatusCode))
- }
- func _sendRequest(urlStr, method, queryParam string, headers http.Header, postData io.Reader) (buffer *bytes.Buffer, err error) {
- client := &http.Client{
- Transport: &http.Transport{
- Proxy: http.ProxyFromEnvironment,
- DialContext: (&net.Dialer{
- Timeout: 3 * time.Second,
- KeepAlive: 10 * time.Second,
- }).DialContext,
- MaxIdleConns: 10,
- MaxIdleConnsPerHost: 10,
- IdleConnTimeout: 10 * time.Second,
- },
- Timeout: 10 * time.Second,
- }
- URL := urlStr
- Method := method
- if len(Method) == 0 {
- Method = "GET"
- }
- if len(queryParam) != 0 {
- //reqQueryParam = queryParam
- queryParam = strings.Replace(queryParam, " ", "%20", -1) //转义请求参数中所有的空格
- queryParam = strings.Replace(queryParam, "/", "%2F", -1) //转义请求参数中所有的斜线
- queryParam = strings.Replace(queryParam, "(", "%28", -1) //转义请求参数中所有的左括号
- queryParam = strings.Replace(queryParam, ")", "%29", -1) //转义请求参数中所有的右括号
- queryParam = strings.Replace(queryParam, ",", "%2C", -1) //转义请求参数中所有的逗号
- queryParam = strings.Replace(queryParam, ";", "%3B", -1) //转义请求参数中所有的分号
- l, err := url.Parse("?" + queryParam)
- if err != nil {
- //log.Errorf(err, "parse http url error")
- return nil, err
- }
- param := l.Query().Encode()
- URL = fmt.Sprintf("%s?%s", URL, param)
- }
- reqBuffer := &bytes.Buffer{}
- if postData != nil {
- reqBuffer.ReadFrom(postData)
- }
- req, err := http.NewRequest(Method, URL, reqBuffer)
- if len(headers) > 0 {
- for s, header := range headers {
- if len(header) > 0 {
- req.Header.Add(s, header[0])
- }
- }
- }
- if nil != err {
- return nil, err
- }
- resp, err := client.Do(req)
- if nil != err {
- //log.Errorf(err, "request failed:%v")
- return nil, err
- }
- var data io.ReadCloser
- switch resp.Header.Get("Content-Encoding") {
- case "gzip":
- data, err = gzip.NewReader(resp.Body)
- if nil != err {
- return nil, err
- }
- default:
- data = resp.Body
- }
- defer func() {
- if resp != nil && resp.Body != nil {
- resp.Body.Close()
- }
- }()
- buffer = &bytes.Buffer{}
- _, err = buffer.ReadFrom(data)
- if nil != err {
- return nil, err
- }
- return buffer, nil
- }
|