• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

RabbitMQ 入门 (Go) - 2. 发布和接收消息

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文我将使用 Go 语言在 RabbitMQ 上发布和接收消息。 

Go 的标准库本身并没有 RabbitMQ 的原生绑定,但是有一个第三方库确能够支持 RabbitMQ,它的源码在 https://github.com/streadway/amqp ,其文档在 https://pkg.go.dev/github.com/streadway/amqp 

 

建立一个 Go 的项目,并使用 go mod init 进行初始化: 

 

使用 go get -u github.com/streadway/amqp 命令来安装这个库: 

 

获取 Queue 

代码如下: 

  1. 首先导入需要的包,主要是 streadway/amqp 

  2.  12 行,编写处理错误的函数 failOnError 

  3.  19 行,编写可以获得 AMQP ConnectionChannelQueue 的帮助函数 getQueue() 

我们知道我们需要将消息发布到 Exchange 上面,但是如果使用默认 Exchange 的话,就可以使用一个捷径:我们可以将消息直接发送到 Queue 的名称上(但实际并不是直接发送到 Queue 上面) 

  1. getQueue() 函数不用任何参数,它返回三个对象: 

    1. *amqp.Connection 表示应用和 RabbitMQ 之间的网络连接 

    2. *amqp.Channel 位于 Connection 之上,它提供了用于双方通信的通道。通过把 Connection  Channel 分开,客户应用中就可以在同一个 Connection 上拥有多个 Channel 用来通信,这样就减少了对资源的需求。 

    3. *amqp.Queue 也就是队列 

  2.  20 行,使用 amqp  Dial 函数可以返回一个 ConnectionDial 函数的参数是 RabbitMQ  URLURL 里面需要包含用户凭证。 

  3.  22 行,通过调用 Connection 对象上的 Channel 方法,创建一个 Channel 

  4.  24 行,通过调用 Channel 对象上的 QueueDeclare 方法,返回一个 Queue。注意:这个 Queue 不一定是被创建的,如果不存在指定名称的 Queue,那么 RabbitMQ 就会创建一个;如果存在指定名称的 Queue,但是和指定的配置不同,那么 RabbitMQ 就会拒绝这个请求,并抛出错误。 

  5. QueueDeclare 方法参数: 

    1. 第一个参数是 Queue 的名称:我们就写死一个 hello 

    2. 第二个参数是 durable bool,表示是否将添加到 Queue 的消息存储在硬盘上。如果这个参数值为 true,那么 RabbitMQ 服务器重启之后消息依然会存在。但是它会导致处理消息的能力明显下降。这里我把它设为 false 

    3. 第三个参数 autoDelete bool,它会告诉 RabbitMQ 如果消息没有消费者应该怎么做: 

      1. true:消息就会从 Queue 中删除 

      2. false:将消息保留直到某个消费者前来获取该消息。这里我把它设为 false 

    4. 第四个参数 exclusive bool,它允许我们把这个 Queue 设置为只能从请求它的那个 Connection 上进行访问。 

      1. 如果它为 true,但想创建一个来自其它 Connection 的同名 Queue,那么就会报错 

      2. 如果它是 false,那么想创建一个来自其它 Connection 的同名 Queue 的结果就是:两个 Connection 都连接到同一个 Queue,两个 Connection 会共享它。这里我把它设为 false 

    5. 第五个参数 noWait bool 

      1. 如果为 true,这个 Queue 就被认为已经在服务器上存在了,将它返回即可,如果它不存在,那么就会报错 

      2. 所以这里设置为 false,因为我要创建的 Queue 在服务器上不存在。 

    6. 第六个参数 args amqp.Table,这个参数用于某些特定场景,例如声明一些要被这个 Queue 匹配的 Headers,如果这个 Queue 被绑定到 Header Exchange 的话。这里我传的是 nil 

  6. 如果第 24 行的 QueueDeclare 方法调用成功,那么就会得到一个绑定到 Default Exchange  Queue 

  7. 注意:Default Exchange 的类型是 Direct,也就是说任何没有路由 Key(和 Queue 的名称相同,在这里就是 hello)的消息传进来,将会被直接通过 exchange 送往输出的 Queue 

  8.  31 行,将 3 个对象返回即可,注意 q 我们返回的是指针 

 

发布消息 

  1. 16 行,编写了一个 server 函数 

  2. 17 行,通过 getQueue 来获得 ConnectionChannel  Queue 

  3. 1819 行,按顺序 defer 关闭 Connection  Channel 

  4. 21 行,创建一个消息 amqp.Publishing 结构体,很多参数都是可选的,这里我设置两个: 

    1. ContentType 会指明消息的类型。RabbitMQ 会把数据变成字节流来传输,所以它其实并不关心消息的类型。但是如果你往同一个 Queue 发送不同类型的消息,那么还是设置一下这个字段比较好,便于区分消息的类型。 

    2. Body 可能是该结构体中最重要的字段:它的类型是 Byte Slice,里面包含着要传送的数据。 

  5.  26 行,将消息发布到消息代理上。这里我们使用 Channel 上的 Publish 方法,其参数有: 

    1. 第一个参数 Exchange:“”表示使用 Default Exchange,它没有名称 

    2. 第二个参数是路由 Key:本例中,需要把它设置为 Queue 的名称 

    3. 第三、四各参数 mandatory boolimmediate bool:用于发生者需要确认消息是否被传递成功,以及什么时候传递成功的。 

    4. 第五个参数就是消息本身:也就是 msg 

  6.  main 函数中调用 server 函数。这里循环调用是为了看看 RabbitMQ 的性能,你可以只调用一次。 

 

运行程序 

运行 go run . 命令: 

 

打开管理控制台: 

 

可以看到目前有 8  Exchange10  Queue,有 9636 个消息 

 

切换到 Exchange 画面: 

可以看到 Default Exchange 的消息速率。 

 

切换到 Queues 画面: 

可以看到 hello 这个 Queue 的运行信息。 

 

打开 hello Queue 

可以看到 hello Queue 的运行信息。 

 

移动到下面, 

点击 Get Messages 按钮 

 

就可以看到 Queue 里面的消息。 

 

接收消息

 RabbitMQ 接收消息与向 RabbitMQ 发布消息很类似。 接收消息仍然需要 ConnectionChannelQueue,但是交互方式略有不同。 

 

我将之前的程序代码更新一下,以便可以在同一个 Queue 里同时发送和接收消息: 

  1. 首先在 18 行建立 client 函数,用于处理接收消息的逻辑 

  2. 19 行,通过 getQueue() 获得 ConnectionChannelQueue 

  3. 23 行,调用 Channel 上的 Consume 方法。这个方法返回一个 Go  Channel,每从服务器接收到消息,就可以通过这个 Go Channel 获得。 

  4. Consume 方法的参数


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
GO语言练习:channel缓冲机制发布时间:2022-07-10
下一篇:
go语言中go+select的理解发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap