metric_process_config.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. package service
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "text/template"
  12. "time"
  13. jsoniter "github.com/json-iterator/go"
  14. "github.com/tidwall/gjson"
  15. "newaterobot-process/entity"
  16. "newaterobot-process/model"
  17. "newaterobot-process/utils"
  18. )
  19. // MetricProcessConfigService 指标处理配置服务
  20. type MetricProcessConfigService struct {
  21. configModel *model.MetricProcessConfigModel
  22. httpClient *http.Client
  23. }
  24. // NewMetricProcessConfigService 创建指标处理配置服务实例
  25. func NewMetricProcessConfigService() *MetricProcessConfigService {
  26. return &MetricProcessConfigService{
  27. configModel: &model.MetricProcessConfigModel{},
  28. httpClient: &http.Client{
  29. Timeout: 30 * time.Second,
  30. },
  31. }
  32. }
  33. // ProcessMetric 处理指标
  34. func (s *MetricProcessConfigService) ProcessMetric(inputData entity.MetricProcessRequest) (entity.MetricProcessResponse, error) {
  35. ret := entity.MetricProcessResponse{}
  36. // 根据指标类型获取配置
  37. configs, err := s.configModel.GetByMetricType(inputData.MetricType, inputData.Intent)
  38. if err != nil {
  39. return ret, err
  40. }
  41. configsMap := make(map[string]entity.MetricProcessConfig)
  42. for _, config := range configs {
  43. key := fmt.Sprintf("%d_%d", config.Intent, config.MetricType)
  44. configsMap[key] = config
  45. }
  46. key := fmt.Sprintf("%d_%d", inputData.Intent, inputData.MetricType)
  47. var config entity.MetricProcessConfig
  48. var ok bool
  49. // 未配置该处理分支直接返回,在dify中处理
  50. if config, ok = configsMap[key]; !ok {
  51. ret.Flag = entity.NoExecute
  52. return ret, nil
  53. }
  54. if config.MetricType == 24 {
  55. inputData.STime = time.Now().Format("2006-01-02") + " 00:00:00"
  56. inputData.ETime = time.Now().Format("2006-01-02") + " 23:59:59"
  57. } else if config.MetricType == 29 {
  58. inputData.STime = time.Now().Format("2006-01")
  59. } else if config.MetricType == 30 {
  60. inputData.STime = time.Now().Format("2006-01-02") + " 00:00:00"
  61. } else if config.MetricType == 33 {
  62. inputData.STime = time.Now().Format("2006-01-02")
  63. inputData.ETime = time.Now().AddDate(0, 0, 180).Format("2006-01-02")
  64. } else if config.MetricType == 59 {
  65. inputData.DateTime = strings.Split(inputData.DateTime, " ")[0]
  66. } else if config.MetricType == 42 {
  67. inputData.STime = strconv.Itoa(time.Now().Year())
  68. inputData.ETime = strconv.Itoa(int(time.Now().Month()))
  69. }
  70. // 直接回复分支
  71. templateVarMap := s.objToMap(inputData)
  72. if config.Type == 1 {
  73. contextTemplate := config.ContextTemplate
  74. if inputData.IsEnglish == 1 {
  75. contextTemplate = config.EngContextTemplate
  76. }
  77. replyContext, err := s.parseTemplate(contextTemplate, templateVarMap)
  78. if err != nil {
  79. return ret, err
  80. }
  81. ret.ReplyContext = replyContext
  82. return ret, nil
  83. }
  84. // 发送http 请求
  85. var responseData string
  86. if strings.Contains(config.URLTemplate, ";") {
  87. urlList := strings.Split(config.URLTemplate, ";")
  88. pamList := strings.Split(config.QueryTemplate, ";")
  89. for idx, url := range urlList {
  90. config.URLTemplate = url
  91. config.QueryTemplate = pamList[idx]
  92. if strings.HasPrefix(pamList[idx], "{") {
  93. config.HTTPMethod = 1
  94. } else {
  95. config.HTTPMethod = 0
  96. }
  97. response, err := s.sendHTTPRequest(&config, templateVarMap, inputData.JwtToken)
  98. if err != nil {
  99. return ret, err
  100. }
  101. responseData = responseData + response + "#@#@#@"
  102. }
  103. responseData = strings.TrimRight(responseData, "#@#@#@")
  104. } else {
  105. responseData, err = s.sendHTTPRequest(&config, templateVarMap, inputData.JwtToken)
  106. if err != nil {
  107. return ret, err
  108. }
  109. }
  110. // 抽取字段
  111. responseExtract := make(map[string]string)
  112. err = json.Unmarshal([]byte(config.ResponseExtract), &responseExtract)
  113. if err != nil {
  114. return ret, err
  115. }
  116. // 额外逻辑处理
  117. switch config.MetricType {
  118. case 2:
  119. responseData, _ = TransformType2(responseData)
  120. case 4:
  121. responseData, _ = TransformType4(responseData)
  122. case 5:
  123. responseData, _ = TransformType5(responseData)
  124. case 6:
  125. responseData, _ = TransformType6(responseData)
  126. case 8:
  127. responseData, _ = TransformType8(responseData)
  128. case 15:
  129. responseData, _ = TransformType15(responseData)
  130. case 17:
  131. responseData, _ = TransformType17(responseData)
  132. case 19:
  133. responseData, _ = TransformType19(responseData)
  134. case 21:
  135. responseData, _ = TransformType21(responseData)
  136. case 22:
  137. responseData, _ = TransformType22(responseData)
  138. case 23:
  139. responseData, _ = TransformType23(responseData)
  140. case 24:
  141. responseData, _ = TransformType24(responseData)
  142. case 25, 26, 27:
  143. responseData, _ = TransformType25(responseData, inputData.IsEnglish)
  144. case 28:
  145. responseData, _ = TransformType28(responseData, inputData.IsEnglish)
  146. case 29:
  147. responseData, _ = TransformType29(responseData)
  148. case 30:
  149. responseData, _ = TransformType30(responseData)
  150. case 35:
  151. responseData, _ = TransformType35(responseData)
  152. case 36:
  153. responseData, _ = TransformType36(responseData)
  154. case 38:
  155. responseData, _ = TransformType38(responseData)
  156. case 39:
  157. responseData, _ = TransformType39(responseData)
  158. case 42:
  159. responseData, _ = TransformType42(responseData)
  160. case 51:
  161. responseData, _ = TransformType51(responseData)
  162. case 59:
  163. responseData, _ = TransformType59(responseData, inputData.IsEnglish, inputData.DateTime)
  164. }
  165. extractMap := s.extractFields(responseExtract, responseData)
  166. for k, v := range extractMap {
  167. templateVarMap[k] = v
  168. }
  169. // 解析返回模板
  170. replyContext, err := s.parseTemplate(config.ContextTemplate, templateVarMap)
  171. if err != nil {
  172. return ret, err
  173. }
  174. ret.Flag = entity.HttpReply
  175. ret.ReplyContext = replyContext
  176. info := make(map[string]interface{})
  177. info["pid"] = inputData.ID
  178. info["metric_type"] = config.MetricType
  179. info["datetime"] = inputData.DateTime
  180. info["metric"] = replyContext
  181. info["is_english"] = inputData.IsEnglish
  182. info["type"] = config.Type
  183. k := fmt.Sprintf("deepseek2:%s:%s", inputData.ConversationId, inputData.DialogueCount)
  184. msg, _ := jsoniter.MarshalToString(info)
  185. GetRedisService().SetObject(k, msg, 3600*24*7)
  186. return ret, nil
  187. }
  188. func (s *MetricProcessConfigService) ProcessMate(cid string, mid string) (map[string]interface{}, error) {
  189. k := fmt.Sprintf("deepseek2:%s:%s", cid, mid)
  190. row, err := GetRedisService().GetString(k)
  191. if err != nil {
  192. return nil, err
  193. }
  194. info := make(map[string]interface{})
  195. err = json.Unmarshal([]byte(row), &info)
  196. if err != nil {
  197. return nil, err
  198. }
  199. return info, nil
  200. }
  201. func (s *MetricProcessConfigService) objToMap(req entity.MetricProcessRequest) map[string]interface{} {
  202. ret := make(map[string]interface{})
  203. ret["name"] = req.Name
  204. ret["think_str"] = entity.ThinkStr
  205. ret["date_src"] = req.DateSrc
  206. ret["s_time"] = req.STime
  207. ret["e_time"] = req.ETime
  208. ret["local_url"] = req.LocalUrl
  209. ret["id"] = req.ID
  210. ret["datetime"] = req.DateTime
  211. return ret
  212. }
  213. // sendHTTPRequest 发送HTTP请求
  214. func (s *MetricProcessConfigService) sendHTTPRequest(config *entity.MetricProcessConfig, inputData map[string]interface{}, jwtToken string) (string, error) {
  215. // 解析URL模板
  216. reqUrl, err := s.parseTemplate(config.URLTemplate, inputData)
  217. if err != nil {
  218. return "", fmt.Errorf("解析URL模板失败: %v", err)
  219. }
  220. var req *http.Request
  221. switch config.HTTPMethod {
  222. case 0: // GET
  223. // 解析查询参数模板
  224. query, err := s.parseTemplate(config.QueryTemplate, inputData)
  225. if err != nil {
  226. return "", fmt.Errorf("解析查询参数模板失败: %v", err)
  227. }
  228. // 对查询参数进行urlencode处理
  229. if query != "" {
  230. pairs := strings.Split(query, "&")
  231. encodedQuery := ""
  232. for i, pair := range pairs {
  233. if i > 0 {
  234. encodedQuery += "&"
  235. }
  236. // 分割键值对
  237. kv := strings.Split(pair, "=")
  238. if len(kv) == 2 {
  239. // 对键和值都进行urlencode
  240. encodedKey := url.QueryEscape(kv[0])
  241. encodedValue := url.QueryEscape(kv[1])
  242. encodedQuery += encodedKey + "=" + encodedValue
  243. }
  244. }
  245. reqUrl += "?" + encodedQuery
  246. }
  247. req, err = http.NewRequest("GET", reqUrl, nil)
  248. if err != nil {
  249. return "", fmt.Errorf("创建GET请求失败: %v", err)
  250. }
  251. case 1: // POST
  252. // 解析请求体模板
  253. bodyStr, err := s.parseTemplate(config.QueryTemplate, inputData)
  254. if err != nil {
  255. return "", fmt.Errorf("解析请求体模板失败: %v", err)
  256. }
  257. body := strings.NewReader(bodyStr)
  258. req, err = http.NewRequest("POST", reqUrl, body)
  259. if err != nil {
  260. return "", fmt.Errorf("创建POST请求失败: %v", err)
  261. }
  262. // 设置默认内容类型
  263. req.Header.Set("Content-Type", "application/json")
  264. default:
  265. return "", fmt.Errorf("不支持的HTTP方法: %d", config.HTTPMethod)
  266. }
  267. req.Header.Set("JWT-TOKEN", jwtToken)
  268. // 发送请求
  269. resp, err := s.httpClient.Do(req)
  270. if err != nil {
  271. return "", fmt.Errorf("发送请求失败: %v", err)
  272. }
  273. defer resp.Body.Close()
  274. // 读取响应
  275. body, err := io.ReadAll(resp.Body)
  276. if err != nil {
  277. return "", fmt.Errorf("读取响应失败: %v", err)
  278. }
  279. responseString := string(body)
  280. if resp.StatusCode != http.StatusOK {
  281. return "", fmt.Errorf("请求失败,状态码: %d,响应: %s", resp.StatusCode, responseString)
  282. }
  283. return responseString, nil
  284. }
  285. // extractFields 提取指定字段
  286. func (s *MetricProcessConfigService) extractFields(rules map[string]string, responseBody string) map[string]interface{} {
  287. ret := make(map[string]interface{})
  288. for field, rule := range rules {
  289. result := gjson.Get(responseBody, rule)
  290. ret[field] = result.Value()
  291. }
  292. return ret
  293. }
  294. // parseTemplate 解析模板
  295. func (s *MetricProcessConfigService) parseTemplate(tpl string, data map[string]interface{}) (string, error) {
  296. if tpl == "" {
  297. return "", nil
  298. }
  299. funcMap := template.FuncMap{
  300. "SplitAndFetch": utils.SplitAndFetch,
  301. }
  302. // 创建模板
  303. t, err := template.New("").Funcs(funcMap).Parse(tpl)
  304. if err != nil {
  305. return "", fmt.Errorf("解析模板失败: %v", err)
  306. }
  307. // 执行模板
  308. var buf bytes.Buffer
  309. if err := t.Execute(&buf, data); err != nil {
  310. return "", fmt.Errorf("执行模板失败: %v", err)
  311. }
  312. return buf.String(), nil
  313. }