metric_process_config.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package service
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/tidwall/gjson"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "newaterobot-process/entity"
  11. "newaterobot-process/model"
  12. "newaterobot-process/utils"
  13. "strings"
  14. "text/template"
  15. "time"
  16. )
  17. // MetricProcessConfigService 指标处理配置服务
  18. type MetricProcessConfigService struct {
  19. configModel *model.MetricProcessConfigModel
  20. httpClient *http.Client
  21. }
  22. // NewMetricProcessConfigService 创建指标处理配置服务实例
  23. func NewMetricProcessConfigService() *MetricProcessConfigService {
  24. return &MetricProcessConfigService{
  25. configModel: &model.MetricProcessConfigModel{},
  26. httpClient: &http.Client{
  27. Timeout: 30 * time.Second,
  28. },
  29. }
  30. }
  31. // ProcessMetric 处理指标
  32. func (s *MetricProcessConfigService) ProcessMetric(inputData entity.MetricProcessRequest) (entity.MetricProcessResponse, error) {
  33. ret := entity.MetricProcessResponse{}
  34. // 根据指标类型获取配置
  35. configs, err := s.configModel.GetByMetricType(inputData.MetricType, inputData.Intent)
  36. if err != nil {
  37. return ret, err
  38. }
  39. configsMap := make(map[string]entity.MetricProcessConfig)
  40. for _, config := range configs {
  41. key := fmt.Sprintf("%d_%d", config.Intent, config.MetricType)
  42. configsMap[key] = config
  43. }
  44. key := fmt.Sprintf("%d_%d", inputData.Intent, inputData.MetricType)
  45. var config entity.MetricProcessConfig
  46. var ok bool
  47. //未配置该处理分支直接返回,在dify中处理
  48. if config, ok = configsMap[key]; !ok {
  49. ret.Flag = entity.NoExecute
  50. return ret, nil
  51. }
  52. //直接回复分支
  53. templateVarMap := s.objToMap(inputData)
  54. if config.Type == 1 {
  55. contextTemplate := config.ContextTemplate
  56. if inputData.IsEnglish == 1 {
  57. contextTemplate = config.EngContextTemplate
  58. }
  59. replyContext, err := s.parseTemplate(contextTemplate, templateVarMap)
  60. if err != nil {
  61. return ret, err
  62. }
  63. ret.ReplyContext = replyContext
  64. return ret, nil
  65. }
  66. //发送http 请求
  67. responseData, err := s.sendHTTPRequest(&config, templateVarMap, inputData.JwtToken)
  68. if err != nil {
  69. return ret, err
  70. }
  71. //抽取字段
  72. responseExtract := make(map[string]string)
  73. err = json.Unmarshal([]byte(config.ResponseExtract), &responseExtract)
  74. if err != nil {
  75. return ret, err
  76. }
  77. extractMap := s.extractFields(responseExtract, responseData)
  78. for k, v := range extractMap {
  79. templateVarMap[k] = v
  80. }
  81. //解析返回模板
  82. replyContext, err := s.parseTemplate(config.ContextTemplate, templateVarMap)
  83. if err != nil {
  84. return ret, err
  85. }
  86. ret.Flag = entity.HttpReply
  87. ret.ReplyContext = replyContext
  88. return ret, nil
  89. }
  90. func (s *MetricProcessConfigService) objToMap(req entity.MetricProcessRequest) map[string]interface{} {
  91. ret := make(map[string]interface{})
  92. ret["name"] = req.Name
  93. ret["think_str"] = entity.ThinkStr
  94. ret["date_src"] = req.DateSrc
  95. ret["s_time"] = req.STime
  96. ret["e_time"] = req.ETime
  97. ret["local_url"] = req.LocalUrl
  98. ret["id"] = req.ID
  99. ret["datetime"] = req.DateTime
  100. return ret
  101. }
  102. // sendHTTPRequest 发送HTTP请求
  103. func (s *MetricProcessConfigService) sendHTTPRequest(config *entity.MetricProcessConfig, inputData map[string]interface{}, jwtToken string) (string, error) {
  104. // 解析URL模板
  105. reqUrl, err := s.parseTemplate(config.URLTemplate, inputData)
  106. if err != nil {
  107. return "", fmt.Errorf("解析URL模板失败: %v", err)
  108. }
  109. var req *http.Request
  110. switch config.HTTPMethod {
  111. case 0: // GET
  112. // 解析查询参数模板
  113. query, err := s.parseTemplate(config.QueryTemplate, inputData)
  114. if err != nil {
  115. return "", fmt.Errorf("解析查询参数模板失败: %v", err)
  116. }
  117. // 对查询参数进行urlencode处理
  118. if query != "" {
  119. pairs := strings.Split(query, "&")
  120. encodedQuery := ""
  121. for i, pair := range pairs {
  122. if i > 0 {
  123. encodedQuery += "&"
  124. }
  125. // 分割键值对
  126. kv := strings.Split(pair, "=")
  127. if len(kv) == 2 {
  128. // 对键和值都进行urlencode
  129. encodedKey := url.QueryEscape(kv[0])
  130. encodedValue := url.QueryEscape(kv[1])
  131. encodedQuery += encodedKey + "=" + encodedValue
  132. }
  133. }
  134. reqUrl += "?" + encodedQuery
  135. }
  136. req, err = http.NewRequest("GET", reqUrl, nil)
  137. if err != nil {
  138. return "", fmt.Errorf("创建GET请求失败: %v", err)
  139. }
  140. case 1: // POST
  141. // 解析请求体模板
  142. bodyStr, err := s.parseTemplate(config.QueryTemplate, inputData)
  143. if err != nil {
  144. return "", fmt.Errorf("解析请求体模板失败: %v", err)
  145. }
  146. body := strings.NewReader(bodyStr)
  147. req, err = http.NewRequest("POST", reqUrl, body)
  148. if err != nil {
  149. return "", fmt.Errorf("创建POST请求失败: %v", err)
  150. }
  151. // 设置默认内容类型
  152. req.Header.Set("Content-Type", "application/json")
  153. default:
  154. return "", fmt.Errorf("不支持的HTTP方法: %d", config.HTTPMethod)
  155. }
  156. req.Header.Set("JWT-TOKEN", jwtToken)
  157. // 发送请求
  158. resp, err := s.httpClient.Do(req)
  159. if err != nil {
  160. return "", fmt.Errorf("发送请求失败: %v", err)
  161. }
  162. defer resp.Body.Close()
  163. // 读取响应
  164. body, err := io.ReadAll(resp.Body)
  165. if err != nil {
  166. return "", fmt.Errorf("读取响应失败: %v", err)
  167. }
  168. responseString := string(body)
  169. if resp.StatusCode != http.StatusOK {
  170. return "", fmt.Errorf("请求失败,状态码: %d,响应: %s", resp.StatusCode, responseString)
  171. }
  172. return responseString, nil
  173. }
  174. // extractFields 提取指定字段
  175. func (s *MetricProcessConfigService) extractFields(rules map[string]string, responseBody string) map[string]interface{} {
  176. ret := make(map[string]interface{})
  177. for field, rule := range rules {
  178. result := gjson.Get(responseBody, rule)
  179. ret[field] = result.Value()
  180. }
  181. return ret
  182. }
  183. // parseTemplate 解析模板
  184. func (s *MetricProcessConfigService) parseTemplate(tpl string, data map[string]interface{}) (string, error) {
  185. if tpl == "" {
  186. return "", nil
  187. }
  188. funcMap := template.FuncMap{
  189. "SplitAndFetch": utils.SplitAndFetch,
  190. }
  191. // 创建模板
  192. t, err := template.New("").Funcs(funcMap).Parse(tpl)
  193. if err != nil {
  194. return "", fmt.Errorf("解析模板失败: %v", err)
  195. }
  196. // 执行模板
  197. var buf bytes.Buffer
  198. if err := t.Execute(&buf, data); err != nil {
  199. return "", fmt.Errorf("执行模板失败: %v", err)
  200. }
  201. return buf.String(), nil
  202. }