在SpringAMQP中预先设置回复?

 瑶瑶bao呗 发布于 2022-12-21 19:12

我正在使用SpringBoot启动连接到RabbitMQ队列的SpringAMQP应用程序.我希望能够从生产者发送消息,指定回复队列,以便消费者只需要发送而无需调查目的地(因此不必在消息本身中传递回复数据).

这是我的配置(生产者和消费者之间共享)

private static final String QUEUE_NAME = "testQueue";
private static final String ROUTING_KEY = QUEUE_NAME;
public static final String REPLY_QUEUE = "replyQueue";
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String IP = "localhost";
private static final String VHOST = "/";
private static final int PORT = 5672;

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    amqpAdmin().declareQueue(new Queue(QUEUE_NAME));
    amqpAdmin().declareQueue(new Queue(REPLY_QUEUE));
    return template;
}

@Bean
public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(connectionFactory());
}

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(IP);
    connectionFactory.setUsername(USERNAME);
    connectionFactory.setPassword(PASSWORD);
    connectionFactory.setVirtualHost(VHOST);
    connectionFactory.setPort(PORT);
    return connectionFactory;
}

我发的消息如下:

public Object sendAndReply(String queue, String content){
        return template.convertSendAndReceive(queue, new Data(content), new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setReplyTo(ReplyTester.REPLY_QUEUE);
                return message;
            }
        });
    }

等待回复如下:

public void replyToQueue(String queue){
    template.receiveAndReply(queue, new ReceiveAndReplyCallback() {
        @Override
        public Data handle(Data payload) {
            System.out.println("Received: "+payload.toString());
            return new Data("This is a reply for: "+payload.toString());
        }
    });
}

但是,当发送时,我得到以下异常:

Exception in thread "main" org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property.
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:66)
    at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:112)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:841)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:820)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:705)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:697)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:673)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:663)
    at prodsend.Prod.sendAndReply(ReplyTester.java:137)
    at prodsend.ReplyTester.sendMessages(ReplyTester.java:49)
    at prodsend.ReplyTester.main(ReplyTester.java:102)
Caused by: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property.
    at org.springframework.util.Assert.isNull(Assert.java:89)
    at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:711)
    at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:705)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:835)
    ... 8 more

ReplyTest.137行指向上面方法中的returnsendAndReply.


编辑:这是上面提到的数据类:)

class Data{
    public String d;
    public Data(String s){ d = s; }
    public String toString() { return d; }
}

Boris the Sp.. 6

从文档:

基本的RPC模式.使用特定路由密钥将消息发送到默认交换,并尝试接收响应.实现通常会将reply-to标头设置为独占队列,并等待一段时间限制超时.

因此该方法convertSendAndReceive处理设置replyTo标题并返回Messaage- 响应.这是一个同步模式 - RPC.

如果你想异步这样做- 你似乎 - 不要使用这种方法.使用适当的convertAndSend方法并使用相应的方法MessagePostProcessor添加replyTo标题.

由于这是异步的,您需要注册一个单独的处理程序来接收回复.这需要将消息发送给另一方之前完成.然后在发送消息后的某个时刻调用此处理程序 - 何时未知.阅读部分3.5.2异步消费者的的春天AQMP文档.

所以,异步流程:

    sender在replyToqueueue 上注册一个处理程序

    发件人发送带有replyToset的消息

    客户端调用receiveAndReply,处理消息,并发送回复replyTo

    发送回调方法被触发

同步流程是:

    发件人使用sendAndReceive和阻止发送消息

    客户端调用receiveAndReply,处理消息,并发送回复replyTo

    发送者收到回复,唤醒并处理它

所以后一种情况要求发送者等待.当您使用receiveXXX而不是注册异步处理程序时,如果客户端需要一段时间来调用,则发送方可能会等待很长时间receiveXXX.

顺便说一句,如果您想使用同步方法但使用特定的方法,replyTo您可以随时调用setReplyQueue.还有一个setReplyTimeout案例我提到客户端要么懒得阅读邮件或忘记回复.

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有