package envitem import ( "bytes" "compress/gzip" "context" "crypto/md5" "database/sql/driver" "encoding/hex" "encoding/json" "errors" "fmt" "io" "metawant.greentech.com.cn/gaoyagang/gt-common/httplib" "net" "net/http" "net/url" "strconv" "strings" "sync" "time" ) func SetOptions(options Options) { snapUrl = fmt.Sprintf("http://%s/api/v1/plc-current", options.GtServerIp) ctlUrl = fmt.Sprintf("http://%s/api/v1/plc/set-var-values", options.GtServerIp) plcItemUrl = fmt.Sprintf("http://%s/api/v1/plc-list/", options.GtServerIp) if options.Cache != nil { cache = options.Cache } fetchMultiItem = options.FetchMultiItem adjustValue = options.AdjustValue plcItemSecret = options.PlcItemSecret } func (m MultiEnvItem) GetProjectId() int64 { for _, item := range m { return item.ProjectId } return 0 } func (m MultiEnvItem) getItemNames() []string { items := make([]string, 0) for _, item := range m { items = append(items, item.Item) } return items } func (m MultiEnvItem) FillCurrentValue() error { datas := make([]*ItemValueResp, 0) var err error if fetchMultiItem { datas, err = m.getCurrentData() } else { datas, err = m.getCurrentDataByOne() } if err != nil { return err } // 先做一个item -> key的映射 ikm := make(map[string]string) for s, item := range m { ikm[item.Item] = s } for _, data := range datas { if data != nil { if k, ok := ikm[data.ItemName]; ok { m[k].Value = data.Val m[k].Htime = data.HTime _ = m[k].setPrevValue() } } } return nil } func (m MultiEnvItem) GetItemFloat64Value(key string) float64 { if envItem, ok := m[key]; ok { return envItem.GetItemFloat64Val() } return 0 } func (m MultiEnvItem) GetItemInt64Value(key string) int64 { if envItem, ok := m[key]; ok { return envItem.GetItemInt64Val() } return 0 } func (m MultiEnvItem) GetItemStringValue(key string) string { if envItem, ok := m[key]; ok { return envItem.GetItemStringVal() } return "" } func (m MultiEnvItem) GetItemHtime(key string) *time.Time { if envItem, ok := m[key]; ok { return envItem.GetItemHtime() } return nil } func (m MultiEnvItem) FindString() map[string]string { a := make(map[string]string) for _, item := range m { a[item.Item] = item.GetItemStringVal() } return a } func (m MultiEnvItem) FindPrevString() map[string]string { a := make(map[string]string) for _, item := range m { a[item.Item] = item.GetItemPrevStringVal() } return a } func (m MultiEnvItem) ClearValues() { for _, item := range m { item.clearValue() } } func (m MultiEnvItem) getCurrentDataByOne() ([]*ItemValueResp, error) { datas := make([]*ItemValueResp, len(m)) i := 0 var wg sync.WaitGroup wg.Add(len(m)) for _, item := range m { go func(index int, one *EnvItem) { defer wg.Done() datas[index], _ = one.getCurrentData() }(i, item) i++ } wg.Wait() return datas, nil } func (m MultiEnvItem) getCurrentData() ([]*ItemValueResp, error) { req := httplib.Post(snapUrl) data := make([]*ItemValueReq, 1) data[0] = &ItemValueReq{ DeviceItems: strings.Join(m.getItemNames(), ","), ProjectId: m.GetProjectId(), } jsonBytes, err := json.Marshal(data) if err != nil { return nil, err } req.Body(jsonBytes) req.SetTimeout(time.Millisecond * 2000) r, err := req.Response() if err != nil { return nil, err } defer r.Body.Close() if r.StatusCode == 200 { resp, err := req.Bytes() if err != nil { return nil, err } res := &ItemValueResps{} err = json.Unmarshal(resp, res) if err != nil { return nil, err } if len(res.Data) == 0 { return nil, errors.New("not found envitem's value") } return res.Data, nil } return nil, errors.New(fmt.Sprintf("request statusCode: %d", r.StatusCode)) } func (e *EnvItem) getCurrentData() (*ItemValueResp, error) { req := httplib.Post(snapUrl) data := make([]*ItemValueReq, 1) data[0] = &ItemValueReq{ DeviceItems: e.Item, ProjectId: e.ProjectId, } jsonBytes, err := json.Marshal(data) if err != nil { return nil, err } req.Body(jsonBytes) req.SetTimeout(time.Millisecond * 2000) r, err := req.Response() if err != nil { return nil, err } defer r.Body.Close() if r.StatusCode == 200 { resp, err := req.Bytes() if err != nil { return nil, err } res := &ItemValueResps{} err = json.Unmarshal(resp, res) if err != nil { return nil, err } if len(res.Data) == 0 { return nil, errors.New("not found envitem's value") } if adjustValue && cache != nil && len(res.Data) > 0 { if rv, err := strconv.ParseInt(res.Data[0].Val, 10, 64); err == nil { adjust, _ := e.GetAdjustInt64Val() res.Data[0].Val = fmt.Sprintf("%d", rv+adjust) } else if rv, err := strconv.ParseFloat(res.Data[0].Val, 64); err == nil { adjust, _ := e.GetAdjustFloat64Val() res.Data[0].Val = fmt.Sprintf("%f", rv+adjust) } } return res.Data[0], nil } return nil, errors.New(fmt.Sprintf("request statusCode: %d", r.StatusCode)) } func (e *EnvItem) getCurrentValue() (string, string, error) { resp, err := e.getCurrentData() if err != nil { return "", "", err } e.Value = resp.Val e.Htime = resp.HTime _ = e.setPrevValue() return resp.Val, resp.HTime, nil } func (e *EnvItem) getPrevValue() (string, error) { if v, ok := itemPrevValues.Load(e.Item); ok { return v.(string), nil } return "", errors.New("not found prev envitem's value") } func (e *EnvItem) setPrevValue() error { itemPrevValues.Store(e.Item, e.Value) return nil } func (e *EnvItem) GetItemFloat64Val() float64 { if e.Value == nil { e.getCurrentValue() } switch e.Value.(type) { case string: if v, e := strconv.ParseFloat(e.Value.(string), 64); e == nil { return v } } return 0 } func (e *EnvItem) GetItemInt64Val() int64 { if e.Value == nil { e.getCurrentValue() } switch e.Value.(type) { case string: if v, e := strconv.ParseInt(e.Value.(string), 10, 64); e == nil { return v } } return 0 } func (e *EnvItem) GetItemStringVal() string { if e.Value == nil { e.getCurrentValue() } if v, ok := e.Value.(string); ok { return v } return "" } func (e *EnvItem) GetItemHtime() *time.Time { if e.Value == nil { e.getCurrentValue() } if ht, err := time.ParseInLocation("2006-01-02 15:04:05", e.Htime, time.Local); err == nil { return &ht } return nil } func (e *EnvItem) GetItemPrevFloat64Val() float64 { ov, err := e.getPrevValue() if err != nil { return 0 } if v, err := strconv.ParseFloat(ov, 64); err == nil { return v } return 0 } func (e *EnvItem) GetItemPrevInt64Val() int64 { ov, err := e.getPrevValue() if err != nil { return 0 } if v, err := strconv.ParseInt(ov, 10, 64); err == nil { return v } return 0 } func (e *EnvItem) GetItemPrevStringVal() string { if ov, err := e.getPrevValue(); err == nil { return ov } return "" } func (e *EnvItem) GetAdjustInt64Val() (int64, error) { return getAdjustInt64Val(e.ProjectId, e.Item) } func (e *EnvItem) GetAdjustFloat64Val() (float64, error) { return getAdjustFloat64Val(e.ProjectId, e.Item) } func (e *EnvItem) GetAdjustStringVal() (string, error) { return getAdjustStringVal(e.ProjectId, e.Item) } func (e *EnvItem) SetAdjust(value string, expire time.Duration) error { if adjustValue == false { return nil } if cache == nil { return errors.New("not cache") } key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, e.ProjectId, e.Item) scmd := cache.Set(context.Background(), key, value, expire) return scmd.Err() } func (e *EnvItem) IncreAdjust(expire time.Duration) (int64, error) { if adjustValue == false { return 0, nil } if cache == nil { return 0, errors.New("not cache") } key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, e.ProjectId, e.Item) intCmd := cache.Incr(context.Background(), key) if intCmd.Err() != nil { return 0, intCmd.Err() } cache.Expire(context.Background(), key, expire) return intCmd.Val(), nil } func (e *EnvItem) SetValue(o, v string) error { ts := time.Now().Unix() data := make(SetCurrentsReq, 1) data[0] = SetCurrentReq{ ProjectId: e.ProjectId, Item: e.Item, OldValue: o, NewValue: v, } sign := data.Sign(plcItemSecret, ts) jsonByte, _ := json.Marshal(data) requestBuf := bytes.NewBuffer(jsonByte) headers := http.Header{} headers.Add("Content-Type", "application/json") resp, err := _sendRequest(ctlUrl, "POST", fmt.Sprintf("sign=%s×tamp=%d", sign, ts), headers, requestBuf) if err != nil { return fmt.Errorf("设置点位数据失败参数,%s,%s", string(jsonByte), err.Error()) } respData := &SetCurrentResp{} err = json.Unmarshal(resp.Bytes(), respData) if err != nil { return fmt.Errorf("设置点位数据失败参数,%s,%s,%s", string(jsonByte), err.Error(), resp) } if respData.Code != http.StatusOK { return fmt.Errorf("设置点位数据失败参数,%s,%s", string(jsonByte), resp) } return nil } // WaitValue 等待点位的值, 变为指定值 // v: 等待的值 // op: -1: 小于, 0: 等于, 1: 大于 // interval: 轮询间隔 // retry: 最多尝试次数 0: 表示不限制 func (e *EnvItem) WaitValue(v string, op int, interval time.Duration, retry int) <-chan bool { c := make(chan bool) go func() { t := 0 for { val, _, err := e.getCurrentValue() if err == nil && strings.Compare(v, val) == op { c <- true } if retry > 0 { t += 1 if t >= retry { c <- false } } time.Sleep(interval) } }() return c } func (e *EnvItem) ClearAdjust() { if adjustValue == false { return } if cache == nil { return } key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, e.ProjectId, e.Item) cache.Del(context.Background(), key) } func (e *EnvItem) clearValue() { e.Value = nil e.Htime = "" } // Scan 实现方法 func (d *MultiEnvItem) Scan(input interface{}) error { _ = json.Unmarshal(input.([]byte), &d) return nil } func (d MultiEnvItem) Value() (driver.Value, error) { return json.Marshal(d) } func getAdjustInt64Val(projectId int64, item string) (int64, error) { if adjustValue == false { return 0, nil } if cache == nil { return 0, errors.New("not cache") } key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, projectId, item) scmd := cache.Get(context.Background(), key) if scmd != nil && scmd.Err() != nil { return 0, scmd.Err() } return scmd.Int64() } func getAdjustFloat64Val(projectId int64, item string) (float64, error) { if adjustValue == false { return 0, nil } if cache == nil { return 0, errors.New("not cache") } key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, projectId, item) scmd := cache.Get(context.Background(), key) if scmd != nil && scmd.Err() != nil { return 0, scmd.Err() } return scmd.Float64() } func getAdjustStringVal(projectId int64, item string) (string, error) { if adjustValue == false { return "", nil } if cache == nil { return "", errors.New("not cache") } key := fmt.Sprintf(CACHE_ADJUST_VALUE_KEY, projectId, item) scmd := cache.Get(context.Background(), key) if scmd != nil && scmd.Err() != nil { return "", scmd.Err() } return scmd.String(), nil } func (s SetCurrentsReq) Sign(secret string, ts int64) string { jsonByte, _ := json.Marshal(s) requestBuf := bytes.NewBuffer(jsonByte) hasher := md5.New() hasher.Write([]byte(fmt.Sprintf("%s%s%d", requestBuf.Bytes(), secret, ts))) return strings.ToUpper(hex.EncodeToString(hasher.Sum(nil))) } func (v *VirtualPlcItem) GetVirtualPlcItems() (*VirtualPlcItemResp, error) { tmpPlcItemUrl := fmt.Sprintf("%s%d", plcItemUrl, v.ProjectId) req := httplib.Get(tmpPlcItemUrl) req.Param("project_id", fmt.Sprintf("%d", v.ProjectId)) req.Param("pageSize", fmt.Sprintf("%d", v.PageSize)) r, err := req.Response() if err != nil { return nil, err } defer r.Body.Close() if r.StatusCode == 200 { resp, err := req.Bytes() if err != nil { return nil, err } res := &VirtualPlcItemResp{} err = json.Unmarshal(resp, res) if err != nil { return nil, err } return res, nil } return nil, errors.New(fmt.Sprintf("request statusCode: %d", r.StatusCode)) } func _sendRequest(urlStr, method, queryParam string, headers http.Header, postData io.Reader) (buffer *bytes.Buffer, err error) { client := &http.Client{ Transport: &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{ Timeout: 3 * time.Second, KeepAlive: 10 * time.Second, }).DialContext, MaxIdleConns: 10, MaxIdleConnsPerHost: 10, IdleConnTimeout: 10 * time.Second, }, Timeout: 10 * time.Second, } URL := urlStr Method := method if len(Method) == 0 { Method = "GET" } if len(queryParam) != 0 { //reqQueryParam = queryParam queryParam = strings.Replace(queryParam, " ", "%20", -1) //转义请求参数中所有的空格 queryParam = strings.Replace(queryParam, "/", "%2F", -1) //转义请求参数中所有的斜线 queryParam = strings.Replace(queryParam, "(", "%28", -1) //转义请求参数中所有的左括号 queryParam = strings.Replace(queryParam, ")", "%29", -1) //转义请求参数中所有的右括号 queryParam = strings.Replace(queryParam, ",", "%2C", -1) //转义请求参数中所有的逗号 queryParam = strings.Replace(queryParam, ";", "%3B", -1) //转义请求参数中所有的分号 l, err := url.Parse("?" + queryParam) if err != nil { //log.Errorf(err, "parse http url error") return nil, err } param := l.Query().Encode() URL = fmt.Sprintf("%s?%s", URL, param) } reqBuffer := &bytes.Buffer{} if postData != nil { reqBuffer.ReadFrom(postData) } req, err := http.NewRequest(Method, URL, reqBuffer) if len(headers) > 0 { for s, header := range headers { if len(header) > 0 { req.Header.Add(s, header[0]) } } } if nil != err { return nil, err } resp, err := client.Do(req) if nil != err { //log.Errorf(err, "request failed:%v") return nil, err } var data io.ReadCloser switch resp.Header.Get("Content-Encoding") { case "gzip": data, err = gzip.NewReader(resp.Body) if nil != err { return nil, err } default: data = resp.Body } defer func() { if resp != nil && resp.Body != nil { resp.Body.Close() } }() buffer = &bytes.Buffer{} _, err = buffer.ReadFrom(data) if nil != err { return nil, err } return buffer, nil }