1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- package rabbitMQ
- import "github.com/streadway/amqp"
- func NewConsumer(exchange Exchange) (*Consumer, error) {
- conn, err := NewConnect(exchange.Dns)
- if err != nil {
- return nil, err
- }
- channel, err := conn.Channel()
- if err != nil {
- return nil, err
- }
- consumer := &Consumer{
- conn: conn,
- channel: channel,
- }
- return consumer, nil
- }
- func (c *Consumer) Subscribe(queueName string) (<-chan amqp.Delivery, error) {
- return c.channel.Consume(queueName, "", true, false, false, false, nil)
- }
- func (c *Consumer) Handler(queueName string, handler func(body []byte) error) error {
- ch, err := c.channel.Consume(queueName, "", true, false, false, false, nil)
- if err != nil {
- return err
- }
- for delivery := range ch {
- delivery := delivery
- go func() {
- err := handler(delivery.Body)
- if err != nil {
- println(err.Error())
- //return c.channel.Ack()
- }
- }()
- }
- return nil
- }
- func (c *Consumer) Close() {
- c.channel.Close()
- c.conn.Close()
- }
|