Parcourir la source

Merge remote-tracking branch 'origin/master'

gaoyagang il y a 11 mois
Parent
commit
abd0699014
2 fichiers modifiés avec 45 ajouts et 1 suppressions
  1. 44 1
      rabbitMQ/consumer.go
  2. 1 0
      rabbitMQ/vars.go

+ 44 - 1
rabbitMQ/consumer.go

@@ -1,6 +1,9 @@
 package rabbitMQ
 
-import "github.com/streadway/amqp"
+import (
+	"github.com/streadway/amqp"
+	"strings"
+)
 
 func NewConsumer(exchange Exchange) (*Consumer, error) {
 	conn, err := NewConnect(exchange.Dns)
@@ -24,7 +27,47 @@ func NewConsumer(exchange Exchange) (*Consumer, error) {
 func (c *Consumer) Subscribe(queueName string) (<-chan amqp.Delivery, error) {
 	return c.channel.Consume(queueName, "", true, false, false, false, nil)
 }
+func (c *Consumer) SubscribeMulti(queueNames ...string) error {
+	exchange := "topic_logs"
+	err := c.channel.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
+	if err != nil {
+		println(err.Error())
+		return err
+	}
+	for _, queueName := range queueNames {
+		q, err := c.channel.QueueDeclare(queueName, true, false, false, false, nil)
+		if err != nil {
+			println(err.Error())
+			return err
+		}
+		bindingKey := strings.Join([]string{queueName, "*"}, ".")
+		err = c.channel.QueueBind(q.Name, bindingKey, exchange, false, nil)
+		if err != nil {
+			println(err.Error())
+			return err
+		}
 
+	}
+	c.queues = queueNames
+	return nil
+}
+func (c *Consumer) HandlerMulti(process func(body []byte)) error {
+	for _, queue := range c.queues {
+		msgs, err := c.channel.Consume(queue, "", true, false, false, false, nil)
+		if err != nil {
+			println(err.Error())
+			return err
+		}
+		go func() {
+			for msg := range msgs {
+				func(msg amqp.Delivery) {
+					process(msg.Body)
+				}(msg)
+			}
+		}()
+	}
+	return nil
+}
 func (c *Consumer) Handler(queueName string, handler func(body []byte) error) error {
 	ch, err := c.channel.Consume(queueName, "", true, false, false, false, nil)
 	if err != nil {

+ 1 - 0
rabbitMQ/vars.go

@@ -22,6 +22,7 @@ type (
 	Consumer struct {
 		conn    *amqp.Connection
 		channel *amqp.Channel
+		queues  []string
 	}
 )