作者:孤火自燃 | 来源:互联网 | 2023-02-04 22:51
我尝试用spring boot连接到RabbitMQ.连接应始终重新启动/重试连接.在致命异常后重新连接时出现问题.应用程序永远不会丢失连接,也不会无限期地重试连接.
@Bean
public IntegrationFlow flow() {
final SimpleMessageListenerContainer listenerCOntainer= new SimpleMessageListenerContainer(getConnectionFactory());
listenerContainer.setQueues(getQueue());
listenerContainer.setDeclarationRetries(Integer.MAX_VALUE);
final AmqpInboundChannelAdapterSpec adapter = (AmqpInboundChannelAdapterSpec) Amqp.inboundAdapter(listenerContainer);
return IntegrationFlows
.from(adapter)
.filter(filter)
.transform(transformer)
.handle(processor)
.get();
}
我可能会遇到一个fatal
例外情况ApplicationListener
.这个异常并不总是一样,最后一次是'PossibleAuthenticationFailureException'
如果异常是致命的,我停止然后启动容器.我意识到这可能是错误的地方,因为看起来容器在调用事件后停止了.
@Override
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
if (event.isFatal()) {
SimpleMessageListenerContainer simpleMessageListenerCOntainer= (SimpleMessageListenerContainer) event.getSource()
simpleMessageListenerContainer.stop();
simpleMessageListenerContainer.start();
LOG.info("started container");
}
}
导致以下输出并且与兔子没有连接.(以下输出后没有任何内容,它只是没有连接)
[rContainer#0-34] startConnectionOnFatalConnectionListener:已启动容器.
[rContainer#0-34] osarlSimpleMessageListenerContainer:从被中止的消费者中停止容器
版本:
RabbitMQ 3.6.8,Erlang 19.2
Spring Integration(spring-integration-amqp):4.3.9
我能够获得更多日志条目,在调用ListenerContainerConsumerFailedEvent之前记录以下行:
07:05.843 ERROR o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0)
07:05.843 ERROR o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0)
07:15.087 ERROR o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:476) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1280) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
Caused by: org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.PossibleAuthenticationFailureException: Possibly caused by authentication failure
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:309) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:547) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:90) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:140) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:76) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:472) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
... 2 common frames omitted
Caused by: com.rabbitmq.client.PossibleAuthenticationFailureException: Possibly caused by authentication failure
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:342) ~[amqp-client-3.6.3.jar!/:na]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:813) ~[amqp-client-3.6.3.jar!/:na]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:767) ~[amqp-client-3.6.3.jar!/:na]
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:887) ~[amqp-client-3.6.3.jar!/:na]
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:300) ~[spring-rabbit-1.6.1.RELEASE.jar!/:na]
... 7 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-3.6.3.jar!/:na]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:37) ~[amqp-client-3.6.3.jar!/:na]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:367) ~[amqp-client-3.6.3.jar!/:na]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:234) ~[amqp-client-3.6.3.jar!/:na]
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:212) ~[amqp-client-3.6.3.jar!/:na]
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:327) ~[amqp-client-3.6.3.jar!/:na]
... 11 common frames omitted
Caused by: java.io.EOFException: null
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.8.0_92]
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.6.3.jar!/:na]
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.6.3.jar!/:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:542) ~[amqp-client-3.6.3.jar!/:na]
... 1 common frames omitted
解决方法:实施运行状况检查.运行状况检查检查是否存在与rabbitMQ的连接.如果运行状况检查未成功,则重新启动应用程序.(实现为cron,调用bash脚本.bash脚本停止并启动容器).