consumer.go 968 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package rabbitMQ
  2. import "github.com/streadway/amqp"
  3. func NewConsumer(exchange Exchange) (*Consumer, error) {
  4. conn, err := NewConnect(exchange.Dns)
  5. if err != nil {
  6. return nil, err
  7. }
  8. channel, err := conn.Channel()
  9. if err != nil {
  10. return nil, err
  11. }
  12. consumer := &Consumer{
  13. conn: conn,
  14. channel: channel,
  15. }
  16. return consumer, nil
  17. }
  18. func (c *Consumer) Subscribe(queueName string) (<-chan amqp.Delivery, error) {
  19. return c.channel.Consume(queueName, "", true, false, false, false, nil)
  20. }
  21. func (c *Consumer) Handler(queueName string, handler func(body []byte) error) error {
  22. ch, err := c.channel.Consume(queueName, "", true, false, false, false, nil)
  23. if err != nil {
  24. return err
  25. }
  26. for delivery := range ch {
  27. delivery := delivery
  28. go func() {
  29. err := handler(delivery.Body)
  30. if err != nil {
  31. println(err.Error())
  32. //return c.channel.Ack()
  33. }
  34. }()
  35. }
  36. return nil
  37. }
  38. func (c *Consumer) Close() {
  39. c.channel.Close()
  40. c.conn.Close()
  41. }