消耗消息时内存泄漏 - RabbitMQ C库

 技术交流 发布于 2022-12-22 18:52

我有一个简单的C程序消耗来自RabbitMQ的消息并销毁它.它消耗了500万条消息.

#include 
#include 
#include 

int main() {

    /* connect to broker */
    amqp_connection_state_t rabbit_conn = amqp_new_connection();
    if (rabbit_conn == NULL) {
        printf("cannot create AMQP connection\n");
        return EXIT_FAILURE;
    }
    amqp_socket_t *rabbit_socket = amqp_tcp_socket_new(rabbit_conn);
    if (rabbit_socket == NULL) {
        printf("cannot create AMQP socket\n");
        return EXIT_FAILURE;
    }

    int rc = amqp_socket_open(rabbit_socket, "localhost", 5672);
    if (rc != AMQP_STATUS_OK) {
        printf("cannot open socket to rabbitmq. reason: %s\n", amqp_error_string2(rc));
        return EXIT_FAILURE;
    }

    amqp_rpc_reply_t r = amqp_login(rabbit_conn, "/", AMQP_DEFAULT_MAX_CHANNELS, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    if (r.reply_type != AMQP_RESPONSE_NORMAL) {
        printf("cannot login to rabbitmq. reason: %s\n", amqp_error_string2(r.library_error));
        return EXIT_FAILURE;
    }
    amqp_channel_open(rabbit_conn, 1);
    r = amqp_get_rpc_reply(rabbit_conn);
    if (r.reply_type != AMQP_RESPONSE_NORMAL) {
        printf("cannot create channel. reason: %s\n", amqp_error_string2(r.library_error));
        return EXIT_FAILURE;
    }

    amqp_bytes_t queue = amqp_cstring_bytes("GPS");
    amqp_basic_consume(rabbit_conn, 1, queue, amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
    r = amqp_get_rpc_reply(rabbit_conn);
    if (r.reply_type != AMQP_RESPONSE_NORMAL) {
        printf("cannot put channel in consume mode. reason: %s\n", amqp_error_string2(r.library_error));
        return EXIT_FAILURE;
    }

    amqp_envelope_t envelope;
    int i = 0;
    while (i < 5000000) {
        /* read message from queue */
        r = amqp_consume_message(rabbit_conn, &envelope, NULL, 0);
        if (r.reply_type != AMQP_RESPONSE_NORMAL) {
            printf("cannot receive message from broker. reason: %s\n", amqp_error_string2(r.library_error));
            break;
        }

        amqp_basic_ack(rabbit_conn, 1, envelope.delivery_tag, 0);
        amqp_destroy_envelope(&envelope);

        i++;
    }

    amqp_channel_close(rabbit_conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(rabbit_conn, AMQP_REPLY_SUCCESS);

    return EXIT_SUCCESS;
}

问题是程序的内存使用量随着时间的推移而增加.这段代码有什么问题?我正在使用rabbitmq-c版本0.5.1.这是valgrind输出:

==6724== Memcheck, a memory error detector
==6724== Copyright (C) 2002-2013, and GNU GPL'd, by Julian Seward et al.
==6724== Using Valgrind-3.9.0 and LibVEX; rerun with -h for copyright info
==6724== Command: ./server
==6724== 
==6724== 
==6724== HEAP SUMMARY:
==6724==     in use at exit: 2,001,658,752 bytes in 15,280 blocks
==6724==   total heap usage: 20,030,603 allocs, 20,015,323 frees, 3,289,117,951 bytes allocated
==6724== 
==6724== 8 bytes in 1 blocks are indirectly lost in loss record 1 of 17
==6724==    at 0x4C2745D: malloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3A5CF: record_pool_block (amqp_mem.c:108)
==6724==    by 0x4E3A77E: amqp_pool_alloc (amqp_mem.c:159)
==6724==    by 0x4E3A7CB: amqp_pool_alloc_bytes (amqp_mem.c:176)
==6724==    by 0x4E355B0: amqp_handle_input (amqp_connection.c:294)
==6724==    by 0x4E3AA53: consume_one_frame (amqp_socket.c:540)
==6724==    by 0x4E3B61C: wait_frame_inner (amqp_socket.c:702)
==6724==    by 0x4E3BA2C: amqp_simple_wait_method (amqp_socket.c:935)
==6724==    by 0x4E3BD7B: amqp_login_inner (amqp_socket.c:1134)
==6724==    by 0x4E3C59F: amqp_login (amqp_socket.c:1342)
==6724==    by 0x400CFA: main (server.c:25)
==6724== 
==6724== 16 bytes in 1 blocks are indirectly lost in loss record 2 of 17
==6724==    at 0x4C293AA: realloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3A59B: record_pool_block (amqp_mem.c:113)
==6724==    by 0x4E3A77E: amqp_pool_alloc (amqp_mem.c:159)
==6724==    by 0x4E3A7CB: amqp_pool_alloc_bytes (amqp_mem.c:176)
==6724==    by 0x4E3D0C2: amqp_table_clone (amqp_table.c:576)
==6724==    by 0x4E3BE36: amqp_login_inner (amqp_socket.c:1148)
==6724==    by 0x4E3C59F: amqp_login (amqp_socket.c:1342)
==6724==    by 0x400CFA: main (server.c:25)
==6724== 
==6724== 40 bytes in 1 blocks are indirectly lost in loss record 3 of 17
==6724==    at 0x4C291D4: calloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3D5A7: amqp_tcp_socket_new (amqp_tcp_socket.c:282)
==6724==    by 0x400C50: main (server.c:13)
==6724== 
==6724== 80 bytes in 1 blocks are indirectly lost in loss record 4 of 17
==6724==    at 0x4C2745D: malloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3A8A7: amqp_get_or_create_channel_pool (amqp_mem.c:224)
==6724==    by 0x4E35591: amqp_handle_input (amqp_connection.c:289)
==6724==    by 0x4E3AA53: consume_one_frame (amqp_socket.c:540)
==6724==    by 0x4E3B61C: wait_frame_inner (amqp_socket.c:702)
==6724==    by 0x4E3BA2C: amqp_simple_wait_method (amqp_socket.c:935)
==6724==    by 0x4E3BD7B: amqp_login_inner (amqp_socket.c:1134)
==6724==    by 0x4E3C59F: amqp_login (amqp_socket.c:1342)
==6724==    by 0x400CFA: main (server.c:25)
==6724== 
==6724== 80 bytes in 1 blocks are indirectly lost in loss record 5 of 17
==6724==    at 0x4C2745D: malloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3A8A7: amqp_get_or_create_channel_pool (amqp_mem.c:224)
==6724==    by 0x4E35591: amqp_handle_input (amqp_connection.c:289)
==6724==    by 0x4E3AA53: consume_one_frame (amqp_socket.c:540)
==6724==    by 0x4E3B61C: wait_frame_inner (amqp_socket.c:702)
==6724==    by 0x4E3BB36: amqp_simple_rpc (amqp_socket.c:997)
==6724==    by 0x4E3C48D: amqp_simple_rpc_decoded (amqp_socket.c:1077)
==6724==    by 0x4E39FDF: amqp_channel_open (amqp_framing.c:1888)
==6724==    by 0x400D39: main (server.c:30)
==6724== 
==6724== 512 bytes in 1 blocks are indirectly lost in loss record 6 of 17
==6724==    at 0x4C291D4: calloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3A769: amqp_pool_alloc (amqp_mem.c:155)
==6724==    by 0x4E3D07A: amqp_table_clone (amqp_table.c:597)
==6724==    by 0x4E3BE36: amqp_login_inner (amqp_socket.c:1148)
==6724==    by 0x4E3C59F: amqp_login (amqp_socket.c:1342)
==6724==    by 0x400CFA: main (server.c:25)
==6724== 
==6724== 512 bytes in 1 blocks are indirectly lost in loss record 7 of 17
==6724==    at 0x4C291D4: calloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3A769: amqp_pool_alloc (amqp_mem.c:155)
==6724==    by 0x4E3A7CB: amqp_pool_alloc_bytes (amqp_mem.c:176)
==6724==    by 0x4E3D0C2: amqp_table_clone (amqp_table.c:576)
==6724==    by 0x4E3BE36: amqp_login_inner (amqp_socket.c:1148)
==6724==    by 0x4E3C59F: amqp_login (amqp_socket.c:1342)
==6724==    by 0x400CFA: main (server.c:25)
==6724== 
==6724== 65,536 bytes in 1 blocks are indirectly lost in loss record 8 of 17
==6724==    at 0x4C291D4: calloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3A769: amqp_pool_alloc (amqp_mem.c:155)
==6724==    by 0x4E3A7CB: amqp_pool_alloc_bytes (amqp_mem.c:176)
==6724==    by 0x4E355B0: amqp_handle_input (amqp_connection.c:294)
==6724==    by 0x4E3AA53: consume_one_frame (amqp_socket.c:540)
==6724==    by 0x4E3B61C: wait_frame_inner (amqp_socket.c:702)
==6724==    by 0x4E3BA2C: amqp_simple_wait_method (amqp_socket.c:935)
==6724==    by 0x4E3BD7B: amqp_login_inner (amqp_socket.c:1134)
==6724==    by 0x4E3C59F: amqp_login (amqp_socket.c:1342)
==6724==    by 0x400CFA: main (server.c:25)
==6724== 
==6724== 122,144 bytes in 1 blocks are indirectly lost in loss record 9 of 17
==6724==    at 0x4C293AA: realloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3A59B: record_pool_block (amqp_mem.c:113)
==6724==    by 0x4E3A77E: amqp_pool_alloc (amqp_mem.c:159)
==6724==    by 0x4E38580: amqp_decode_properties (amqp_framing.c:1068)
==6724==    by 0x4E357B9: amqp_handle_input (amqp_connection.c:357)
==6724==    by 0x4E3AA53: consume_one_frame (amqp_socket.c:540)
==6724==    by 0x4E3B61C: wait_frame_inner (amqp_socket.c:702)
==6724==    by 0x4E3B921: amqp_simple_wait_frame_on_channel (amqp_socket.c:889)
==6724==    by 0x4E35C1A: amqp_read_message (amqp_consumer.c:217)
==6724==    by 0x4E36567: amqp_consume_message (amqp_consumer.c:186)
==6724==    by 0x400EBE: main (server.c:49)
==6724== 
==6724== 131,072 bytes in 1 blocks are indirectly lost in loss record 10 of 17
==6724==    at 0x4C2745D: malloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E353A3: amqp_new_connection (amqp_connection.c:94)
==6724==    by 0x400C20: main (server.c:8)
==6724== 
==6724== 131,072 bytes in 1 blocks are indirectly lost in loss record 11 of 17
==6724==    at 0x4C293AA: realloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E352F0: amqp_tune_connection (amqp_connection.c:159)
==6724==    by 0x4E3C11E: amqp_login_inner (amqp_socket.c:1274)
==6724==    by 0x4E3C59F: amqp_login (amqp_socket.c:1342)
==6724==    by 0x400CFA: main (server.c:25)
==6724== 
==6724== 131,072 bytes in 1 blocks are indirectly lost in loss record 12 of 17
==6724==    at 0x4C291D4: calloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3A769: amqp_pool_alloc (amqp_mem.c:155)
==6724==    by 0x4E3A7CB: amqp_pool_alloc_bytes (amqp_mem.c:176)
==6724==    by 0x4E355B0: amqp_handle_input (amqp_connection.c:294)
==6724==    by 0x4E3AA53: consume_one_frame (amqp_socket.c:540)
==6724==    by 0x4E3B61C: wait_frame_inner (amqp_socket.c:702)
==6724==    by 0x4E3BB36: amqp_simple_rpc (amqp_socket.c:997)
==6724==    by 0x4E3C48D: amqp_simple_rpc_decoded (amqp_socket.c:1077)
==6724==    by 0x4E39FDF: amqp_channel_open (amqp_framing.c:1888)
==6724==    by 0x400D39: main (server.c:30)
==6724== 
==6724== 5,373,952 bytes in 41 blocks are possibly lost in loss record 13 of 17
==6724==    at 0x4C291D4: calloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3A769: amqp_pool_alloc (amqp_mem.c:155)
==6724==    by 0x4E3A7CB: amqp_pool_alloc_bytes (amqp_mem.c:176)
==6724==    by 0x4E355B0: amqp_handle_input (amqp_connection.c:294)
==6724==    by 0x4E3AA53: consume_one_frame (amqp_socket.c:540)
==6724==    by 0x4E3B61C: wait_frame_inner (amqp_socket.c:702)
==6724==    by 0x4E36441: amqp_consume_message (amqp_consumer.c:154)
==6724==    by 0x400EBE: main (server.c:49)
==6724== 
==6724== 5,898,240 bytes in 45 blocks are possibly lost in loss record 14 of 17
==6724==    at 0x4C291D4: calloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3A769: amqp_pool_alloc (amqp_mem.c:155)
==6724==    by 0x4E38580: amqp_decode_properties (amqp_framing.c:1068)
==6724==    by 0x4E357B9: amqp_handle_input (amqp_connection.c:357)
==6724==    by 0x4E3AA53: consume_one_frame (amqp_socket.c:540)
==6724==    by 0x4E3B61C: wait_frame_inner (amqp_socket.c:702)
==6724==    by 0x4E3B921: amqp_simple_wait_frame_on_channel (amqp_socket.c:889)
==6724==    by 0x4E35C1A: amqp_read_message (amqp_consumer.c:217)
==6724==    by 0x4E36567: amqp_consume_message (amqp_consumer.c:186)
==6724==    by 0x400EBE: main (server.c:49)
==6724== 
==6724== 994,705,408 bytes in 7,589 blocks are indirectly lost in loss record 15 of 17
==6724==    at 0x4C291D4: calloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3A769: amqp_pool_alloc (amqp_mem.c:155)
==6724==    by 0x4E38580: amqp_decode_properties (amqp_framing.c:1068)
==6724==    by 0x4E357B9: amqp_handle_input (amqp_connection.c:357)
==6724==    by 0x4E3AA53: consume_one_frame (amqp_socket.c:540)
==6724==    by 0x4E3B61C: wait_frame_inner (amqp_socket.c:702)
==6724==    by 0x4E3B921: amqp_simple_wait_frame_on_channel (amqp_socket.c:889)
==6724==    by 0x4E35C1A: amqp_read_message (amqp_consumer.c:217)
==6724==    by 0x4E36567: amqp_consume_message (amqp_consumer.c:186)
==6724==    by 0x400EBE: main (server.c:49)
==6724== 
==6724== 995,098,624 bytes in 7,592 blocks are indirectly lost in loss record 16 of 17
==6724==    at 0x4C291D4: calloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3A769: amqp_pool_alloc (amqp_mem.c:155)
==6724==    by 0x4E3A7CB: amqp_pool_alloc_bytes (amqp_mem.c:176)
==6724==    by 0x4E355B0: amqp_handle_input (amqp_connection.c:294)
==6724==    by 0x4E3AA53: consume_one_frame (amqp_socket.c:540)
==6724==    by 0x4E3B61C: wait_frame_inner (amqp_socket.c:702)
==6724==    by 0x4E36441: amqp_consume_message (amqp_consumer.c:154)
==6724==    by 0x400EBE: main (server.c:49)
==6724== 
==6724== 1,990,386,560 (384 direct, 1,990,386,176 indirect) bytes in 1 blocks are definitely lost in loss record 17 of 17
==6724==    at 0x4C291D4: calloc (in /usr/lib64/valgrind/vgpreload_memcheck-amd64-linux.so)
==6724==    by 0x4E3533F: amqp_new_connection (amqp_connection.c:73)
==6724==    by 0x400C20: main (server.c:8)
==6724== 
==6724== LEAK SUMMARY:
==6724==    definitely lost: 384 bytes in 1 blocks
==6724==    indirectly lost: 1,990,386,176 bytes in 15,193 blocks
==6724==      possibly lost: 11,272,192 bytes in 86 blocks
==6724==    still reachable: 0 bytes in 0 blocks
==6724==         suppressed: 0 bytes in 0 blocks
==6724== 
==6724== For counts of detected and suppressed errors, rerun with: -v
==6724== ERROR SUMMARY: 3 errors from 3 contexts (suppressed: 2 from 2)

alanxz.. 5

amqp_destroy_connection()在关闭连接后需要打电话.

编辑:此外:您应该amqp_maybe_release_buffers_on_channel()在消费每条消息后打电话.

另外,仅供参考:amqp_channel_close()如果你要立即打电话amqp_connection_close(),你不需要打电话,后者将隐含关闭所有频道.

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有