metric_process_config.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package service
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "strings"
  10. "text/template"
  11. "time"
  12. "github.com/tidwall/gjson"
  13. "newaterobot-process/entity"
  14. "newaterobot-process/model"
  15. "newaterobot-process/utils"
  16. )
  17. // MetricProcessConfigService 指标处理配置服务
  18. type MetricProcessConfigService struct {
  19. configModel *model.MetricProcessConfigModel
  20. httpClient *http.Client
  21. }
  22. // NewMetricProcessConfigService 创建指标处理配置服务实例
  23. func NewMetricProcessConfigService() *MetricProcessConfigService {
  24. return &MetricProcessConfigService{
  25. configModel: &model.MetricProcessConfigModel{},
  26. httpClient: &http.Client{
  27. Timeout: 30 * time.Second,
  28. },
  29. }
  30. }
  31. // ProcessMetric 处理指标
  32. func (s *MetricProcessConfigService) ProcessMetric(inputData entity.MetricProcessRequest) (entity.MetricProcessResponse, error) {
  33. ret := entity.MetricProcessResponse{}
  34. // 根据指标类型获取配置
  35. configs, err := s.configModel.GetByMetricType(inputData.MetricType, inputData.Intent)
  36. if err != nil {
  37. return ret, err
  38. }
  39. configsMap := make(map[string]entity.MetricProcessConfig)
  40. for _, config := range configs {
  41. key := fmt.Sprintf("%d_%d", config.Intent, config.MetricType)
  42. configsMap[key] = config
  43. }
  44. key := fmt.Sprintf("%d_%d", inputData.Intent, inputData.MetricType)
  45. var config entity.MetricProcessConfig
  46. var ok bool
  47. // 未配置该处理分支直接返回,在dify中处理
  48. if config, ok = configsMap[key]; !ok {
  49. ret.Flag = entity.NoExecute
  50. return ret, nil
  51. }
  52. // 直接回复分支
  53. templateVarMap := s.objToMap(inputData)
  54. if config.Type == 1 {
  55. contextTemplate := config.ContextTemplate
  56. if inputData.IsEnglish == 1 {
  57. contextTemplate = config.EngContextTemplate
  58. }
  59. replyContext, err := s.parseTemplate(contextTemplate, templateVarMap)
  60. if err != nil {
  61. return ret, err
  62. }
  63. ret.ReplyContext = replyContext
  64. return ret, nil
  65. }
  66. // 发送http 请求
  67. responseData, err := s.sendHTTPRequest(&config, templateVarMap, inputData.JwtToken)
  68. if err != nil {
  69. return ret, err
  70. }
  71. // 抽取字段
  72. responseExtract := make(map[string]string)
  73. err = json.Unmarshal([]byte(config.ResponseExtract), &responseExtract)
  74. if err != nil {
  75. return ret, err
  76. }
  77. // 额外逻辑处理
  78. switch config.MetricType {
  79. case 2:
  80. responseData, _ = TransformType2(responseData)
  81. case 4:
  82. responseData, _ = TransformType4(responseData)
  83. }
  84. extractMap := s.extractFields(responseExtract, responseData)
  85. for k, v := range extractMap {
  86. templateVarMap[k] = v
  87. }
  88. // 解析返回模板
  89. replyContext, err := s.parseTemplate(config.ContextTemplate, templateVarMap)
  90. if err != nil {
  91. return ret, err
  92. }
  93. ret.Flag = entity.HttpReply
  94. ret.ReplyContext = replyContext
  95. return ret, nil
  96. }
  97. func (s *MetricProcessConfigService) objToMap(req entity.MetricProcessRequest) map[string]interface{} {
  98. ret := make(map[string]interface{})
  99. ret["name"] = req.Name
  100. ret["think_str"] = entity.ThinkStr
  101. ret["date_src"] = req.DateSrc
  102. ret["s_time"] = req.STime
  103. ret["e_time"] = req.ETime
  104. ret["local_url"] = req.LocalUrl
  105. ret["id"] = req.ID
  106. ret["datetime"] = req.DateTime
  107. return ret
  108. }
  109. // sendHTTPRequest 发送HTTP请求
  110. func (s *MetricProcessConfigService) sendHTTPRequest(config *entity.MetricProcessConfig, inputData map[string]interface{}, jwtToken string) (string, error) {
  111. // 解析URL模板
  112. reqUrl, err := s.parseTemplate(config.URLTemplate, inputData)
  113. if err != nil {
  114. return "", fmt.Errorf("解析URL模板失败: %v", err)
  115. }
  116. var req *http.Request
  117. switch config.HTTPMethod {
  118. case 0: // GET
  119. // 解析查询参数模板
  120. query, err := s.parseTemplate(config.QueryTemplate, inputData)
  121. if err != nil {
  122. return "", fmt.Errorf("解析查询参数模板失败: %v", err)
  123. }
  124. // 对查询参数进行urlencode处理
  125. if query != "" {
  126. pairs := strings.Split(query, "&")
  127. encodedQuery := ""
  128. for i, pair := range pairs {
  129. if i > 0 {
  130. encodedQuery += "&"
  131. }
  132. // 分割键值对
  133. kv := strings.Split(pair, "=")
  134. if len(kv) == 2 {
  135. // 对键和值都进行urlencode
  136. encodedKey := url.QueryEscape(kv[0])
  137. encodedValue := url.QueryEscape(kv[1])
  138. encodedQuery += encodedKey + "=" + encodedValue
  139. }
  140. }
  141. reqUrl += "?" + encodedQuery
  142. }
  143. req, err = http.NewRequest("GET", reqUrl, nil)
  144. if err != nil {
  145. return "", fmt.Errorf("创建GET请求失败: %v", err)
  146. }
  147. case 1: // POST
  148. // 解析请求体模板
  149. bodyStr, err := s.parseTemplate(config.QueryTemplate, inputData)
  150. if err != nil {
  151. return "", fmt.Errorf("解析请求体模板失败: %v", err)
  152. }
  153. body := strings.NewReader(bodyStr)
  154. req, err = http.NewRequest("POST", reqUrl, body)
  155. if err != nil {
  156. return "", fmt.Errorf("创建POST请求失败: %v", err)
  157. }
  158. // 设置默认内容类型
  159. req.Header.Set("Content-Type", "application/json")
  160. default:
  161. return "", fmt.Errorf("不支持的HTTP方法: %d", config.HTTPMethod)
  162. }
  163. req.Header.Set("JWT-TOKEN", jwtToken)
  164. // 发送请求
  165. resp, err := s.httpClient.Do(req)
  166. if err != nil {
  167. return "", fmt.Errorf("发送请求失败: %v", err)
  168. }
  169. defer resp.Body.Close()
  170. // 读取响应
  171. body, err := io.ReadAll(resp.Body)
  172. if err != nil {
  173. return "", fmt.Errorf("读取响应失败: %v", err)
  174. }
  175. responseString := string(body)
  176. if resp.StatusCode != http.StatusOK {
  177. return "", fmt.Errorf("请求失败,状态码: %d,响应: %s", resp.StatusCode, responseString)
  178. }
  179. return responseString, nil
  180. }
  181. // extractFields 提取指定字段
  182. func (s *MetricProcessConfigService) extractFields(rules map[string]string, responseBody string) map[string]interface{} {
  183. ret := make(map[string]interface{})
  184. for field, rule := range rules {
  185. result := gjson.Get(responseBody, rule)
  186. ret[field] = result.Value()
  187. }
  188. return ret
  189. }
  190. // parseTemplate 解析模板
  191. func (s *MetricProcessConfigService) parseTemplate(tpl string, data map[string]interface{}) (string, error) {
  192. if tpl == "" {
  193. return "", nil
  194. }
  195. funcMap := template.FuncMap{
  196. "SplitAndFetch": utils.SplitAndFetch,
  197. }
  198. // 创建模板
  199. t, err := template.New("").Funcs(funcMap).Parse(tpl)
  200. if err != nil {
  201. return "", fmt.Errorf("解析模板失败: %v", err)
  202. }
  203. // 执行模板
  204. var buf bytes.Buffer
  205. if err := t.Execute(&buf, data); err != nil {
  206. return "", fmt.Errorf("执行模板失败: %v", err)
  207. }
  208. return buf.String(), nil
  209. }