Преглед на файлове

scada 报警支持订阅多个主题

zhangqian преди 11 месеца
родител
ревизия
7233ae5cbb
променени са 1 файла, в които са добавени 26 реда и са изтрити 1 реда
  1. 26 1
      rabbitMQ/consumer.go

+ 26 - 1
rabbitMQ/consumer.go

@@ -1,6 +1,9 @@
 package rabbitMQ
 
-import "github.com/streadway/amqp"
+import (
+	"github.com/streadway/amqp"
+	"strings"
+)
 
 func NewConsumer(exchange Exchange) (*Consumer, error) {
 	conn, err := NewConnect(exchange.Dns)
@@ -24,7 +27,29 @@ func NewConsumer(exchange Exchange) (*Consumer, error) {
 func (c *Consumer) Subscribe(queueName string) (<-chan amqp.Delivery, error) {
 	return c.channel.Consume(queueName, "", true, false, false, false, nil)
 }
+func (c *Consumer) SubscribeMulti(queueNames ...string) error {
+	exchange := "topic_logs"
+	err := c.channel.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
+	if err != nil {
+		println(err.Error())
+		return err
+	}
+	for _, queueName := range queueNames {
+		q, err := c.channel.QueueDeclare(queueName, true, false, false, false, nil)
+		if err != nil {
+			println(err.Error())
+			return err
+		}
+		bindingKey := strings.Join([]string{queueName, "*"}, ".")
+		err = c.channel.QueueBind(q.Name, bindingKey, exchange, false, nil)
+		if err != nil {
+			println(err.Error())
+			return err
+		}
 
+	}
+	return nil
+}
 func (c *Consumer) Handler(queueName string, handler func(body []byte) error) error {
 	ch, err := c.channel.Consume(queueName, "", true, false, false, false, nil)
 	if err != nil {