producter.go 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package rabbitMQ
  2. import (
  3. "github.com/streadway/amqp"
  4. "time"
  5. )
  6. func NewProducter(exchange Exchange) (*Producter, error) {
  7. conn, err := NewConnect(exchange.Dns)
  8. if err != nil {
  9. return nil, err
  10. }
  11. channel, err := conn.Channel()
  12. if err != nil {
  13. _ = conn.Close()
  14. return nil, err
  15. }
  16. producter := &Producter{
  17. conn: conn,
  18. channel: channel,
  19. Exchange: exchange,
  20. }
  21. _ = producter.channel.ExchangeDeclare(producter.Exchange.Name, producter.Exchange.Type, true, false, false, false, nil)
  22. return producter, nil
  23. }
  24. // Publisher 发布
  25. func (p *Producter) Publisher(queueName string, msg []byte) error {
  26. _, _ = p.channel.QueueDeclare(queueName, true, false, false, false, nil)
  27. return p.channel.Publish("", queueName, false, false, amqp.Publishing{
  28. ContentType: "text/plain",
  29. DeliveryMode: 2,
  30. Timestamp: time.Now(),
  31. Body: msg,
  32. })
  33. }
  34. // DelayPublisher 延时发布
  35. func (p *Producter) DelayPublisher() error {
  36. return nil
  37. }
  38. func (p *Producter) Close() {
  39. p.channel.Close()
  40. p.conn.Close()
  41. }