热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Golang并发操作RabbitMQ

<svg


send.go

通过amqp连接RabbitMQ,在通过协程发送信息

package main
import (
"github.com/streadway/amqp"
"log"
"rabbitmqTest/utils"
"sync"
)
func main() {
//TODO 连接地址改为自己主机地址
conn, err := amqp.Dial("amqp://guest:guest@192.168.100.101:5672/")
utils.FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
utils.FailOnError(err, "Failed to open a channel")
defer ch.Close()
bodyMap := make(map[string]string)
bodyMap["test1"] = "a"
bodyMap["test2"] = "b"
bodyMap["test3"] = "c"
var wg sync.WaitGroup
errList := make(chan error, 2 * len(bodyMap))
for name, body := range bodyMap {
wg.Add(1)
go func(name, body string) {
defer wg.Done()
q, err := ch.QueueDeclare(
name, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
errList <- err
return
}
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
errList <- err
return
}
log.Printf(" [x] Sent %s", body)
}(name, body)
}
wg.Wait()
close(errList)
if len(errList) > 0{
for err := range errList {
utils.FailOnError(err, "Failed send message")
}
}
}

recv.go


package main
import (
"log"
"github.com/streadway/amqp"
"rabbitmqTest/utils"
"sync"
)
func main() {
//TODO 连接地址改为自己主机地址
conn, err := amqp.Dial("amqp://guest:guest@192.168.100.101:5672/")
utils.FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
utils.FailOnError(err, "Failed to open a channel")
defer ch.Close()
nameList := []string{
"test1",
"test2",
"test3",
}
log.Printf(" [*] Waiting for messages. To exit press CTRL C")
var wg sync.WaitGroup
errList := make(chan error, 2 * len(nameList))
for _, name := range nameList {
go func(name string) {
q, err := ch.QueueDeclare(
name, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
utils.FailOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
utils.FailOnError(err, "Failed to register a consumer")
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}(name)
}
wg.Wait()
close(errList)
if len(errList) > 0{
for err := range errList {
utils.FailOnError(err, "Failed send message")
}
}
forever := make(chan bool)
<-forever
}

log.go


package utils
import (
"log"
)
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}


推荐阅读
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • FeatureRequestIsyourfeaturerequestrelatedtoaproblem?Please ... [详细]
  • Android开发实现的计时器功能示例
    本文分享了Android开发实现的计时器功能示例,包括效果图、布局和按钮的使用。通过使用Chronometer控件,可以实现计时器功能。该示例适用于Android平台,供开发者参考。 ... [详细]
  • Go GUIlxn/walk 学习3.菜单栏和工具栏的具体实现
    本文介绍了使用Go语言的GUI库lxn/walk实现菜单栏和工具栏的具体方法,包括消息窗口的产生、文件放置动作响应和提示框的应用。部分代码来自上一篇博客和lxn/walk官方示例。文章提供了学习GUI开发的实际案例和代码示例。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 本文介绍了在多平台下进行条件编译的必要性,以及具体的实现方法。通过示例代码展示了如何使用条件编译来实现不同平台的功能。最后总结了只要接口相同,不同平台下的编译运行结果也会相同。 ... [详细]
  • 3.223.28周学习总结中的贪心作业收获及困惑
    本文是对3.223.28周学习总结中的贪心作业进行总结,作者在解题过程中参考了他人的代码,但前提是要先理解题目并有解题思路。作者分享了自己在贪心作业中的收获,同时提到了一道让他困惑的题目,即input details部分引发的疑惑。 ... [详细]
  • 第四章高阶函数(参数传递、高阶函数、lambda表达式)(python进阶)的讲解和应用
    本文主要讲解了第四章高阶函数(参数传递、高阶函数、lambda表达式)的相关知识,包括函数参数传递机制和赋值机制、引用传递的概念和应用、默认参数的定义和使用等内容。同时介绍了高阶函数和lambda表达式的概念,并给出了一些实例代码进行演示。对于想要进一步提升python编程能力的读者来说,本文将是一个不错的学习资料。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • Java SE从入门到放弃(三)的逻辑运算符详解
    本文详细介绍了Java SE中的逻辑运算符,包括逻辑运算符的操作和运算结果,以及与运算符的不同之处。通过代码演示,展示了逻辑运算符的使用方法和注意事项。文章以Java SE从入门到放弃(三)为背景,对逻辑运算符进行了深入的解析。 ... [详细]
  • RabbitMq之发布确认高级部分1.为什么会需要发布确认高级部分?在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢 ... [详细]
  • RabbitMQ的消息持久化处理
    1、RabbitMQ的消息持久化处理,消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。2、auto ... [详细]
  • 消息中间件RabbitMQ 高级特性之消费端ACK与重回队列
    什么是消费端的ACK和重回队列?消费端的手工ACK和NACK消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿如果由于服务器宕机等严重问题 ... [详细]
author-avatar
天涯老许_137
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有