我的情况是rabbitmq服务器空间不足,如下所示
Filesystem 1K-blocks Used Available Use% Mounted on /dev/mapper/ramonubuntu--vg-root 6299376 5956336 0 100% /
生产者向服务器发布消息(消息需要持久化),然后将永久阻止,它将等待发布的响应.当然我们应该避免服务器空间不足的情况,但有没有任何超时机制让生产者退出等待?
我尝试过心跳和SO_TIMEOUT,它们都不起作用,因为网络工作正常.以下是我的制片人.
protected void publish(byte[] message) throws Exception { // ConnectionFactory can be reused between threads. ConnectionFactory factory = new SoTimeoutConnectionFactory(); factory.setHost(this.getHost()); factory.setVirtualHost("te"); factory.setPort(5672); factory.setUsername("amqp"); factory.setPassword("amqp"); factory.setConnectionTimeout(10 * 1000); // doesn't help if server got out of space factory.setRequestedHeartbeat(1); final Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // declare a 'topic' type of exchange channel.exchangeDeclare(this.exchangeName, "topic", true); channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { logger.warn("[X]Returned message(replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey + ",body:" + new String(body)); } }); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { logger.info("Ack: " + deliveryTag); // RabbitMessagePublishMain.this.release(connection); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { logger.info("Nack: " + deliveryTag); // RabbitMessagePublishMain.this.release(connection); } }); channel.basicPublish(this.exchangeName, RabbitMessageConsumerMain.EXCHANGE_NAME + ".-1", true, MessageProperties.PERSISTENT_BASIC, message); channel.waitForConfirmsOrDie(10*1000); // now we can close connection connection.close(); }
它将阻止'channel.waitForConfirmsOrDie(10*1000);'和SotimeoutConnectionFactory,
public class SoTimeoutConnectionFactory extends ConnectionFactory { @Override protected void configureSocket(Socket socket) throws IOException { super.configureSocket(socket); socket.setSoTimeout(10 * 1000); } }
我还捕获了制作人和rabbimq之间的网络,
请帮忙.