package rabbitMQ import ( "github.com/streadway/amqp" "strings" ) func NewConsumer(exchange Exchange) (*Consumer, error) { conn, err := NewConnect(exchange.Dns) if err != nil { return nil, err } channel, err := conn.Channel() if err != nil { return nil, err } consumer := &Consumer{ conn: conn, channel: channel, } return consumer, nil } func (c *Consumer) Subscribe(queueName string) (<-chan amqp.Delivery, error) { return c.channel.Consume(queueName, "", true, false, false, false, nil) } func (c *Consumer) SubscribeMulti(queueNames ...string) error { exchange := "topic_logs" err := c.channel.ExchangeDeclare(exchange, "topic", true, false, false, false, nil) if err != nil { println(err.Error()) return err } for _, queueName := range queueNames { q, err := c.channel.QueueDeclare(queueName, true, false, false, false, nil) if err != nil { println(err.Error()) return err } bindingKey := strings.Join([]string{queueName, "*"}, ".") err = c.channel.QueueBind(q.Name, bindingKey, exchange, false, nil) if err != nil { println(err.Error()) return err } } c.queues = queueNames return nil } func (c *Consumer) HandlerMulti(process func(body []byte)) error { for _, queue := range c.queues { msgs, err := c.channel.Consume(queue, "", true, false, false, false, nil) if err != nil { println(err.Error()) return err } go func() { for msg := range msgs { func(msg amqp.Delivery) { process(msg.Body) }(msg) } }() } return nil } func (c *Consumer) Handler(queueName string, handler func(body []byte) error) error { ch, err := c.channel.Consume(queueName, "", true, false, false, false, nil) if err != nil { return err } for delivery := range ch { delivery := delivery go func() { err := handler(delivery.Body) if err != nil { println(err.Error()) //return c.channel.Ack() } }() } return nil } func (c *Consumer) Close() { c.channel.Close() c.conn.Close() }