我正在使用SpringBoot启动连接到RabbitMQ队列的SpringAMQP应用程序.我希望能够从生产者发送消息,指定回复队列,以便消费者只需要发送而无需调查目的地(因此不必在消息本身中传递回复数据).
这是我的配置(生产者和消费者之间共享)
private static final String QUEUE_NAME = "testQueue"; private static final String ROUTING_KEY = QUEUE_NAME; public static final String REPLY_QUEUE = "replyQueue"; private static final String USERNAME = "guest"; private static final String PASSWORD = "guest"; private static final String IP = "localhost"; private static final String VHOST = "/"; private static final int PORT = 5672; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); amqpAdmin().declareQueue(new Queue(QUEUE_NAME)); amqpAdmin().declareQueue(new Queue(REPLY_QUEUE)); return template; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(IP); connectionFactory.setUsername(USERNAME); connectionFactory.setPassword(PASSWORD); connectionFactory.setVirtualHost(VHOST); connectionFactory.setPort(PORT); return connectionFactory; }
我发的消息如下:
public Object sendAndReply(String queue, String content){ return template.convertSendAndReceive(queue, new Data(content), new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setReplyTo(ReplyTester.REPLY_QUEUE); return message; } }); }
等待回复如下:
public void replyToQueue(String queue){ template.receiveAndReply(queue, new ReceiveAndReplyCallback() { @Override public Data handle(Data payload) { System.out.println("Received: "+payload.toString()); return new Data("This is a reply for: "+payload.toString()); } }); }
但是,当发送时,我得到以下异常:
Exception in thread "main" org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property. at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:66) at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:112) at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:841) at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:820) at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:705) at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:697) at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:673) at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:663) at prodsend.Prod.sendAndReply(ReplyTester.java:137) at prodsend.ReplyTester.sendMessages(ReplyTester.java:49) at prodsend.ReplyTester.main(ReplyTester.java:102) Caused by: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property. at org.springframework.util.Assert.isNull(Assert.java:89) at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:711) at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:705) at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:835) ... 8 more
ReplyTest.137行指向上面方法中的return
行sendAndReply
.
编辑:这是上面提到的数据类:)
class Data{ public String d; public Data(String s){ d = s; } public String toString() { return d; } }
Boris the Sp.. 6
从文档:
基本的RPC模式.使用特定路由密钥将消息发送到默认交换,并尝试接收响应.实现通常会将reply-to标头设置为独占队列,并等待一段时间限制超时.
因此该方法convertSendAndReceive
处理设置replyTo
标题并返回Messaage
- 响应.这是一个同步模式 - RPC.
如果你想异步这样做- 你似乎 - 不要使用这种方法.使用适当的convertAndSend
方法并使用相应的方法MessagePostProcessor
添加replyTo
标题.
由于这是异步的,您需要注册一个单独的处理程序来接收回复.这需要在将消息发送给另一方之前完成.然后在发送消息后的某个时刻调用此处理程序 - 何时未知.阅读部分3.5.2异步消费者的的春天AQMP文档.
所以,异步流程:
sender在replyTo
queueue 上注册一个处理程序
发件人发送带有replyTo
set的消息
客户端调用receiveAndReply
,处理消息,并发送回复replyTo
发送回调方法被触发
同步流程是:
发件人使用sendAndReceive
和阻止发送消息
客户端调用receiveAndReply
,处理消息,并发送回复replyTo
发送者收到回复,唤醒并处理它
所以后一种情况要求发送者等待.当您使用receiveXXX
而不是注册异步处理程序时,如果客户端需要一段时间来调用,则发送方可能会等待很长时间receiveXXX
.
顺便说一句,如果您想使用同步方法但使用特定的方法,replyTo
您可以随时调用setReplyQueue
.还有一个setReplyTimeout
案例我提到客户端要么懒得阅读邮件或忘记回复.