服务器超出范围时basic-publish的超时

 叫我小小小火枪的天空_603 发布于 2022-12-21 06:50

我的情况是rabbitmq服务器空间不足,如下所示

Filesystem                       1K-blocks    Used Available Use% Mounted on
/dev/mapper/ramonubuntu--vg-root   6299376 5956336         0 100% /

生产者向服务器发布消息(消息需要持久化),然后将永久阻止,它将等待发布的响应.当然我们应该避免服务器空间不足的情况,但有没有任何超时机制让生产者退出等待?

我尝试过心跳和SO_TIMEOUT,它们都不起作用,因为网络工作正常.以下是我的制片人.

 protected void publish(byte[] message) throws Exception {
    // ConnectionFactory can be reused between threads.
    ConnectionFactory factory = new SoTimeoutConnectionFactory();
    factory.setHost(this.getHost());
    factory.setVirtualHost("te");
    factory.setPort(5672);
    factory.setUsername("amqp");
    factory.setPassword("amqp");
    factory.setConnectionTimeout(10 * 1000);
    // doesn't help if server got out of space
    factory.setRequestedHeartbeat(1);
    final Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // declare a 'topic' type of exchange
    channel.exchangeDeclare(this.exchangeName, "topic", true);

    channel.addReturnListener(new ReturnListener() {

        @Override
        public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
                AMQP.BasicProperties properties, byte[] body) throws IOException {
            logger.warn("[X]Returned message(replyCode:" + replyCode + ",replyText:" + replyText
                    + ",exchange:" + exchange + ",routingKey:" + routingKey + ",body:" + new String(body));
        }

    });

    channel.confirmSelect();
    channel.addConfirmListener(new ConfirmListener() {

        @Override
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            logger.info("Ack: " + deliveryTag);
            // RabbitMessagePublishMain.this.release(connection);
        }

        @Override
        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            logger.info("Nack: " + deliveryTag);
            // RabbitMessagePublishMain.this.release(connection);
        }

    });

    channel.basicPublish(this.exchangeName, RabbitMessageConsumerMain.EXCHANGE_NAME + ".-1", true,
            MessageProperties.PERSISTENT_BASIC, message);
    channel.waitForConfirmsOrDie(10*1000);
    // now we can close connection
    connection.close();
}

它将阻止'channel.waitForConfirmsOrDie(10*1000);'和SotimeoutConnectionFactory,

public class SoTimeoutConnectionFactory extends ConnectionFactory {

    @Override
    protected void configureSocket(Socket socket) throws IOException {
        super.configureSocket(socket);
        socket.setSoTimeout(10 * 1000);
    }
}

我还捕获了制作人和rabbimq之间的网络, 在此输入图像描述

请帮忙.

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有