123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- package rabbitMQ
- import (
- "github.com/streadway/amqp"
- "strings"
- )
- func NewConsumer(exchange Exchange) (*Consumer, error) {
- conn, err := NewConnect(exchange.Dns)
- if err != nil {
- return nil, err
- }
- channel, err := conn.Channel()
- if err != nil {
- return nil, err
- }
- consumer := &Consumer{
- conn: conn,
- channel: channel,
- }
- return consumer, nil
- }
- func (c *Consumer) Subscribe(queueName string) (<-chan amqp.Delivery, error) {
- return c.channel.Consume(queueName, "", true, false, false, false, nil)
- }
- func (c *Consumer) SubscribeMulti(queueNames ...string) error {
- exchange := "topic_logs"
- err := c.channel.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
- if err != nil {
- println(err.Error())
- return err
- }
- for _, queueName := range queueNames {
- q, err := c.channel.QueueDeclare(queueName, true, false, false, false, nil)
- if err != nil {
- println(err.Error())
- return err
- }
- bindingKey := strings.Join([]string{queueName, "*"}, ".")
- err = c.channel.QueueBind(q.Name, bindingKey, exchange, false, nil)
- if err != nil {
- println(err.Error())
- return err
- }
- }
- c.queues = queueNames
- return nil
- }
- func (c *Consumer) HandlerMulti(process func(body []byte)) error {
- for _, queue := range c.queues {
- msgs, err := c.channel.Consume(queue, "", true, false, false, false, nil)
- if err != nil {
- println(err.Error())
- return err
- }
- go func() {
- for msg := range msgs {
- func(msg amqp.Delivery) {
- process(msg.Body)
- }(msg)
- }
- }()
- }
- return nil
- }
- func (c *Consumer) Handler(queueName string, handler func(body []byte) error) error {
- ch, err := c.channel.Consume(queueName, "", true, false, false, false, nil)
- if err != nil {
- return err
- }
- for delivery := range ch {
- delivery := delivery
- go func() {
- err := handler(delivery.Body)
- if err != nil {
- println(err.Error())
- //return c.channel.Ack()
- }
- }()
- }
- return nil
- }
- func (c *Consumer) Close() {
- c.channel.Close()
- c.conn.Close()
- }
|