consumer_test.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package rabbitMQ
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. func TestNewProducter1(t *testing.T) {
  7. exchange := Exchange{
  8. Name: "gt.dc.event",
  9. Type: "fanout", // 广播
  10. QueueName: "step_change",
  11. Key: "UF-4001A",
  12. Dns: "amqp://admin:devmq*1120@47.96.12.136:5672/",
  13. }
  14. got, err := NewProducter(exchange)
  15. t.Log(got, err)
  16. for {
  17. send := got.Publisher("step_change", []byte(`{"device_code":"UF_4001A"}`))
  18. _ = got.Publisher("step_change", []byte(`{"device_code":"UF_4001B"}`))
  19. _ = got.Publisher("step_change_a", []byte(`{"device_code":"UF_4001C"}`))
  20. _ = got.Publisher("step_change_b", []byte(`{"device_code":"UF_4001D"}`))
  21. t.Log(send)
  22. time.Sleep(2 * time.Second)
  23. }
  24. if got != nil {
  25. got.Close()
  26. }
  27. }
  28. func TestConsumer_Subscribe(t *testing.T) {
  29. exchange := Exchange{
  30. Name: "gt.dc.event",
  31. Type: "fanout", // 广播
  32. QueueName: "step_change",
  33. Key: "UF-4001A",
  34. Dns: "amqp://admin:devmq*1120@47.96.12.136:5672/",
  35. }
  36. c, _ := NewConsumer(exchange)
  37. go TestNewProducter1(t)
  38. go c.Handler("step_change", func(body []byte) error {
  39. t.Logf("new message from A step_change: %s", body)
  40. return nil
  41. })
  42. go c.Handler("step_change", func(body []byte) error {
  43. t.Logf("new message from B step_change: %s", body)
  44. return nil
  45. })
  46. c.Handler("step_change_b", func(body []byte) error {
  47. t.Logf("new message from step_change_b: %s", body)
  48. return nil
  49. })
  50. }