热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Rabbitmq的原生javaAPI讲解

说明实际生产中一般是使用springboot封装的Template等组件来操作Rabbitmq的,但是如果了解了原生的API的使用,可以更好的理解Ra

说明

实际生产中一般是使用springboot封装的Template等组件来操作Rabbitmq的,但是如果了解了原生的API的使用,可以更好的理解Rabbitmq的特性和对springboot封装的组件的使用原理有一定了解。


关键的类


ConnectionFactory

从字面上看是连接工厂的意思,实际上该对象就是一个连接工厂对象,主要用于创建一个连接到Rabbitmq服务器的tcp连接。

基本的方法:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); //设置连接的服务器的ip
factory.setPort(5672); //设置端口,如果使用AMQP协议进行通信的话,这里就写该协议的通信端口。
factory.setUsername("guest"); //用户名
factory.setPassword("guest"); //密码
//设置这个uri可以代替上面的设置值,协议://用户名:密码@ip:port
//factory.setUri("amqp://guest:guest@192.168.18.140:5672");
factory.setVirtualHost("/"); //设置连接的虚拟主机
//设置连接超时时间
factory.setConnectionTimeout(10000);Connection conn = factory.newConnection(); //创建一个TCP连接。

Connection

一个与rabbitmq服务器的TCP连接对象,因为Rabbitmq为了节省连接频繁创建和销毁的资源消耗,使用了channel来进行对Rabbitmq服务器的操作。所以Connection对象不会进行对Rabbitmq服务器的操作,该操作要交给Channel对象,而Connection对象的主要目的是为了创建一个Channel对象。

Connection conn = factory.newConnection();
Channel channel = conn.createChannel(); //创建一个channel对象,用于操作服务器。
Channel channel = conn.createChannel(1); //创建一个channel对象,并手动指定一个通道号码。上面的方法是自动为我们分配号码。范围1-2047
Optional<Channel> channel &#61; conn.openChannel(); //创建一个channel对象&#xff0c;把他封装在Optional对象中。
Optional<Channel> channel &#61; conn.openChannel(1);//创建一个channel对象&#xff0c;把他封装在Optional对象中。并手动指定通道号码。范围1-2047conn.close(); //关闭连接

Channel

对Rabbitmq服务器的操作基本都在这里了。

//交换器相关操作

//对交换器的操作&#xff0c;挑出来的都是重载方法中最全参数的重载方法&#xff0c;其他都是这个方法的删减用于使用一些默认值。
//声明一个交换器&#xff0c;这个是重载方法中参数最全的方法了&#xff0c;其他重载方法是对该方法的一些删减以使用某个默认值。DeclareOk 返回值主要用于查看一些创建信息。如果声明失败返回null。
//如果交换器已经存在&#xff0c;那么声明的参数一定要与已存在的交换器的参数一致&#xff0c;否则会报错。
Exchange.DeclareOk exchangeDeclare(String exchange, //要声明的交换器名称BuiltinExchangeType type, //要声明的交换器类型&#xff0c;BuiltinExchangeType 是一个枚举类&#xff0c;对交换器四种类型进行一个枚举。boolean durable, //是否持久化队列boolean autoDelete, //是否自动删除&#xff0c;具体含义可以看博主的另一篇文章有讲解boolean internal, //是否内部交换器Map<String, Object> arguments) throws IOException; //其他属性参数。//异步创建交换器&#xff0c;上面的是同步等待创建成功&#xff0c;这个异步&#xff0c;参数与上面一致。
void exchangeDeclareNoWait(String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments) throws IOException;//对交换机进行绑定交换器
Exchange.BindOk exchangeBind(String destination, //设置目标交换器名称。String source, //设置源交换器名称&#xff0c;也就是消息先到该交换器&#xff0c;再路由到目标交换器。String routingKey, //绑定键&#xff0c;到时与消息的路由键进行匹配看是否路由到绑定的交换机或队列中去。Map<String, Object> arguments) throws IOException; //其他参数//异步对交换机绑定交换机&#xff0c;参数与上面一致。
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;//删除一个交换器
Exchange.DeleteOk exchangeDelete(String exchange, //要删除的交换器名称boolean ifUnused //如果为true&#xff0c;则交换器没有被生产者使用时才可以删除&#xff0c;否则删除失败&#xff0c;另一个重载方法使用ifUnused &#61;false为默认值) throws IOException;//异步删除一个交换器&#xff0c;参数与上面方法一致。
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;//对交换器与交换器进行接触绑定
Exchange.UnbindOk exchangeUnbind(String destination, //目标交换器名称String source, //源交换器名称&#xff0c;在这边来解绑。String routingKey, //绑定键&#xff0c;destination、source、routingKey三者才能确定一个绑定关系。Map<String, Object> arguments //其他参数) throws IOException;//异步对交换器与交换器进行接触绑定&#xff0c;参数与上面一致
void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;//查看交换器是否存在&#xff0c;存在就正常返回&#xff0c;不存在就抛出异常
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

//队列相关操作

//队列相关操作
//声明创建一个队列。
Queue.DeclareOk queueDeclare(String queue, //要创建的队列名boolean durable, //是否持久化队列boolean exclusive, //是否排他队列&#xff0c;如果true&#xff0c;那么该队列只能在声明该队列的Connection下的Channel中使用&#xff0c;其他Connection创建的Channel不能使用&#xff0c;连接断开时队列自动删除boolean autoDelete,//是否自动删除Map<String, Object> arguments //其他参数) throws IOException;//异步声明创建一个队列
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;//交换器绑定队列
Queue.BindOk queueBind(String queue, //要绑定的队列名String exchange, //要绑定的交换器名String routingKey, //绑定键Map<String, Object> arguments //其他参数) throws IOException;//异步进行交换器绑定队列
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;//删除队列
Queue.DeleteOk queueDelete(String queue, //要删除的队列名boolean ifUnused, //如果为true&#xff0c;则只有队列没有被消费者使用时才删除成功&#xff0c;如果为false&#xff0c;就不管有没有&#xff0c;直接删除。boolean ifEmpty //如果为true&#xff0c;则只有队列为空队列时才能删除成功) throws IOException;//异步删除队列
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;//接触交换器与队列的绑定
Queue.UnbindOk queueUnbind(String queue, //要解绑的队列名String exchange, //要解绑的交换器名String routingKey, //绑定键 三者才能唯一确定一个绑定Map<String, Object> arguments //其他参数) throws IOException;//清空队列
Queue.PurgeOk queuePurge(String queue) throws IOException;//查看队列是否存在&#xff0c;存在就返回一个DeclareOk &#xff0c;不存在或者该队列时排他队列并且创建者不是这个连接&#xff0c;就抛出异常。Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

Message属性参数配置&#xff1a;

//原生API的消息属性参数配置主要由AMQP.BasicProperties类配置Map<String, Object> headers &#61; new HashMap<String, Object>();headers.put("name", "gupao");headers.put("level", "top");Map<String, Object> headers &#61; new HashMap<String, Object>();headers.put("name", "gupao");headers.put("level", "top");AMQP.BasicProperties properties &#61; new AMQP.BasicProperties.Builder().deliveryMode(2) // 2代表持久化.contentEncoding("UTF-8") // 编码.expiration("10000") // TTL&#xff0c;过期时间.headers(headers) // 自定义属性.priority(5) // 优先级&#xff0c;默认为5&#xff0c;配合队列的 x-max-priority 属性使用.messageId(String.valueOf(UUID.randomUUID())) //消息的Id.appId("order-service") //消息推送的应用程序.contentType("application/json") //消息的内容类型.replyTo("replyToQueue") //RPC模式下的回调.timestamp(new Date()) //消息发送的时间.userId("15355153") //一般生产者的id.build();//然后在消息发布时&#xff0c;指定properties
channel.basicPublish("", QUEUE_NAME, properties, "aaaa".getBytes());

这里暂时就讲那么多&#xff0c;关于消息的生产消费有很多细节&#xff0c;比如事务&#xff0c;消息确认、回调等等等等。另外开个新章节专门讲这些问题。

下面附上一个普通的消息收发例子&#xff1a;

public class MyProducer {private final static String EXCHANGE_NAME &#61; "SIMPLE_EXCHANGE";public static void main(String[] args) throws Exception {ConnectionFactory factory &#61; new ConnectionFactory();// 连接IPfactory.setHost("192.168.18.140");// 连接端口factory.setPort(5672);// 用户factory.setUsername("guest");factory.setPassword("guest");//设置这个uri可以代替上面的设置值&#xff0c;协议://用户名:密码&#64;ip:port//factory.setUri("amqp://guest:guest&#64;192.168.18.140:5672");// 虚拟机factory.setVirtualHost("/");//设置连接超时时间factory.setConnectionTimeout(10000);// 建立连接Connection conn &#61; factory.newConnection();// 创建消息通道Channel channel &#61; conn.createChannel(1000);// 要发送消息String msg &#61; "Hello world, Rabbit MQ";// String exchange 要发送到的交换器名称, String routingKey 消息的路由键, BasicProperties props 消息的额外属性, byte[] body 消息的具体内容channel.basicPublish(EXCHANGE_NAME, "testdemo", null, msg.getBytes());channel.close();conn.close();}
}public class MyConsumer {private final static String EXCHANGE_NAME &#61; "SIMPLE_EXCHANGE";private final static String QUEUE_NAME &#61; "SIMPLE_QUEUE";public static void main(String[] args) throws Exception {ConnectionFactory factory &#61; new ConnectionFactory();// 连接IPfactory.setHost("192.168.18.140");// 默认监听端口factory.setPort(5672);// 虚拟机factory.setVirtualHost("/");// 设置访问的用户factory.setUsername("guest");factory.setPassword("guest");// 建立连接Connection conn &#61; factory.newConnection();// 创建消息通道Channel channel &#61; conn.createChannel();// 声明交换机// String exchange 交换机名称, String type, boolean durable, boolean autoDelete, Map argumentsAMQP.Exchange.DeclareOk direct &#61; channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);// 声明队列// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map argumentschannel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" Waiting for message....");// 绑定队列和交换机channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"testdemo");// 创建消费者Consumer consumer &#61; new DefaultConsumer(channel) {&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String msg &#61; new String(body, "UTF-8");System.out.println("Received message : &#39;" &#43; msg &#43; "&#39;");System.out.println("consumerTag : " &#43; consumerTag );System.out.println("deliveryTag : " &#43; envelope.getDeliveryTag() );}};// 开始获取消息// String queue, boolean autoAck, Consumer callbackchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

如果第一次&#xff0c;服务器并没有队列和交换器&#xff0c;要先启动消费者再启动生产者&#xff0c;因为队列和交换器的声明都在消费者中完成&#xff0c;如果有了队列和交换器&#xff0c;哪个先启动都没关系。


推荐阅读
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • Spring框架《一》简介
    Spring框架《一》1.Spring概述1.1简介1.2Spring模板二、IOC容器和Bean1.IOC和DI简介2.三种通过类型获取bean3.给bean的属性赋值3.1依赖 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 微信官方授权及获取OpenId的方法,服务器通过SpringBoot实现
    主要步骤:前端获取到code(wx.login),传入服务器服务器通过参数AppID和AppSecret访问官方接口,获取到OpenId ... [详细]
  • linux进阶50——无锁CAS
    1.概念比较并交换(compareandswap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作࿰ ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
  • GetWindowLong函数
    今天在看一个代码里头写了GetWindowLong(hwnd,0),我当时就有点费解,靠,上网搜索函数原型说明,死活找不到第 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • ZSI.generate.Wsdl2PythonError: unsupported local simpleType restriction ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
author-avatar
山海
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有