package rabbitMQ import "github.com/streadway/amqp" func NewConsumer(exchange Exchange) (*Consumer, error) { conn, err := NewConnect(exchange.Dns) if err != nil { return nil, err } channel, err := conn.Channel() if err != nil { return nil, err } consumer := &Consumer{ conn: conn, channel: channel, } return consumer, nil } func (c *Consumer) Subscribe(queueName string) (<-chan amqp.Delivery, error) { return c.channel.Consume(queueName, "", true, false, false, false, nil) } func (c *Consumer) Handler(queueName string, handler func(body []byte) error) error { ch, err := c.channel.Consume(queueName, "", true, false, false, false, nil) if err != nil { return err } for delivery := range ch { delivery := delivery go func() { err := handler(delivery.Body) if err != nil { println(err.Error()) //return c.channel.Ack() } }() } return nil } func (c *Consumer) Close() { c.channel.Close() c.conn.Close() }