events.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package main
  2. import (
  3. "GtDataStore/app/cmd/events/internal/config"
  4. "GtDataStore/app/cmd/events/internal/logic/handler"
  5. "GtDataStore/app/cmd/events/internal/logic/job"
  6. "GtDataStore/app/cmd/events/internal/server"
  7. "GtDataStore/app/cmd/events/internal/svc"
  8. "GtDataStore/app/cmd/events/pb"
  9. "flag"
  10. "fmt"
  11. "github.com/zeromicro/go-zero/core/conf"
  12. "github.com/zeromicro/go-zero/core/service"
  13. "github.com/zeromicro/go-zero/zrpc"
  14. "google.golang.org/grpc"
  15. "google.golang.org/grpc/reflection"
  16. "metawant.greentech.com.cn/gaoyagang/gt-common/envitem"
  17. )
  18. var configFile = flag.String("f", "etc/events.yaml", "the config file")
  19. func main() {
  20. flag.Parse()
  21. var c config.Config
  22. conf.MustLoad(*configFile, &c)
  23. ctx := svc.NewServiceContext(c)
  24. s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
  25. pb.RegisterEventsServer(grpcServer, server.NewEventsServer(ctx))
  26. if c.Mode == service.DevMode || c.Mode == service.TestMode {
  27. reflection.Register(grpcServer)
  28. }
  29. })
  30. defer s.Stop()
  31. fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
  32. envitem.SetOptions(envitem.Options{GtServerIp: c.GtServerIp, Cache: ctx.Cache})
  33. if c.Mode != "dev" {
  34. go func() {
  35. defer func() {
  36. fmt.Print("async events job stop.....\n")
  37. }()
  38. // 启动事件识别
  39. fmt.Print("start new job for events.....\n")
  40. j := job.NewJob(1, "events-"+c.Mode, ctx, handler.EventHandlerTable, handler.EventIntervalTable)
  41. j.Run()
  42. }()
  43. }
  44. // 接收识别到的事件, 处理, 目前只有写入队列(暂时只有redis), 写入数据库
  45. go handler.HandlerEventNotify(ctx)
  46. // 处理事件
  47. go func() {
  48. // 启动事件监听
  49. eh, _ := handler.NewEventHandler(ctx)
  50. eh.Start()
  51. }()
  52. s.Start()
  53. }