How to manage two or more consumers via pthreads?

 1257523034_627418 发布于 2023-01-07 15:38

I have a generic problem I am looking to solve, where chunks of binary data sent from a standard input or regular file stream to an application, which in turn converts that binary data into text. Using threads, I want to process the text before piping it over to the next application, which modifies that text even further, and so on.

As a simple test case, I want to extract compressed data via gunzip. Specifically, I am looking at using gunzip -c - to extract chunks of binary data sent to it via its (reassigned) stdin file descriptor, and then pulling out chunks of text from its (reassigned) stdout file descriptor. I can then print these chunks of text to the real stdout or stderr (or do other stuff, later on).

(I realize that I can do gzip-based compression and extraction on the command line. My goal here is to use this test case to learn how to correctly pass around generic chunks of binary and text data between threads that either run that data through binaries, or process it further.)

In the case of my test program, I have set up three pthread_t threads:

produce_gzip_chunk_thread

consume_gzip_chunk_thread

consume_gunzip_chunk_thread

I pass each of these threads a shared data instance called thread_data, which contains a thread lock, two conditions, and some buffers and counter variables. I also include a set of file descriptors for a gunzip process opened with popen3():

typedef struct pthread_data pthread_data_t;
typedef struct popen3_desc popen3_desc_t;

struct pthread_data {
    pthread_mutex_t in_lock;
    pthread_cond_t in_cond;
    pthread_cond_t out_cond;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes;
    size_t n_in_bytes_written_to_gunzip;
    size_t n_out_bytes_read_from_gunzip;
    FILE *in_file_ptr;
    boolean in_eof;
    char in_line[LINE_LENGTH_VALUE];
    popen3_desc_t *gunzip_ptr;
};

struct popen3_desc {
    int in;
    int out;
    int err;
};

The produce_gzip_chunk_thread reads in a 1024-byte chunk of gzip-compressed bytes from a regular file called foo.gz.

这些字节被写入一个unsigned char名为的缓冲区in_buf,它是我传递给每个线程的共享数据结构的一部分:

void * produce_gzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> produce_gzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes = 0;

    d->in_eof = kFalse;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        n_in_bytes = fread(in_buf, sizeof(in_buf[0]), sizeof(in_buf), d->in_file_ptr);
        if (n_in_bytes > 0) {
            while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
                pthread_cond_wait(&d->in_cond, &d->in_lock);
            memcpy(d->in_buf, in_buf, n_in_bytes);
            d->n_in_bytes = n_in_bytes;
#ifdef DEBUG
            fprintf(stderr, "Debug: ######## [%07zu] produced chunk\n", d->n_in_bytes);
#endif
            pthread_cond_signal(&d->in_cond);
        }
        else if (feof(d->in_file_ptr) || ferror(d->in_file_ptr))
            break;
    } 
    d->in_eof = kTrue;
    pthread_mutex_unlock(&d->in_lock);
    pthread_cond_signal(&d->in_cond);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> produce_gzip_chunk()\n");
#endif
    return NULL;
}

一旦存储了正数的字节n_bytes- 也就是说,我们从gzip需要处理的输入存档中提取数据gunzip- 这会触发允许第二个线程consume_gzip_chunk_thread运行的条件:

void * consume_gzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_in_bytes_written_to_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        while (d->n_in_bytes == 0 && !d->in_eof)
            pthread_cond_wait(&d->in_cond, &d->in_lock);
        if (d->n_in_bytes) {
#ifdef DEBUG
            fprintf(stderr, "Debug: ........ [%07zu] processing chunk\n", d->n_in_bytes);
#endif
            if (!d->gunzip_ptr) {
#ifdef DEBUG
                fprintf(stderr, "Debug: * setting up gunzip ptr\n");
#endif
                d->gunzip_ptr = malloc(sizeof(popen3_desc_t));
                if (!d->gunzip_ptr) {
                    fprintf(stderr, "Error: Could not create gunzip file handle struct\n");
                    exit(EXIT_FAILURE);
                }

                popen3("gunzip -c -", 
                       &(d->gunzip_ptr->in), 
                       &(d->gunzip_ptr->out), 
                       &(d->gunzip_ptr->err), 
                       kTrue, 
                       kTrue);
                memset(d->in_line, 0, LINE_LENGTH_VALUE);
            }
            n_in_bytes_written_to_gunzip = (long) write(d->gunzip_ptr->in, d->in_buf, d->n_in_bytes);
#ifdef DEBUG
            fprintf(stderr, "Debug: ................ wrote [%07ld] bytes into the gunzip process\n", n_in_bytes_written_to_gunzip);
#endif
            if (n_in_bytes_written_to_gunzip > 0)
                d->n_in_bytes_written_to_gunzip = n_in_bytes_written_to_gunzip;

            d->n_in_bytes = 0;
            pthread_cond_signal(&d->out_cond);
        }
        if (d->in_eof) 
            break;
    } 
    pthread_mutex_unlock(&d->in_lock);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> consume_gzip_chunk()\n");
#endif
    return NULL;
}

当消耗的gzip数据块,我们使用write函数发送n_bytesin_bufgunzip过程的输入文件描述符.最后,我们发送另一个线程信号,但这一次out_cond,以帮助重新唤醒consume_gunzip_chunk_thread,从gunzip输出读取更多的工作:

void * consume_gunzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gunzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_out_bytes_read_from_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        while (d->n_in_bytes_written_to_gunzip == 0) {
            pthread_cond_wait(&d->out_cond, &d->in_lock);
        }
        if (d->n_in_bytes_written_to_gunzip) {
            sleep(1);
            n_out_bytes_read_from_gunzip = read(d->gunzip_ptr->out, d->in_line, LINE_LENGTH_VALUE);
#ifdef DEBUG
            fprintf(stderr, "Debug: ------------------------ read [%07ld] bytes out from the gunzip process\n", n_out_bytes_read_from_gunzip);
            fprintf(stderr, "Debug: ------------------------ gunzip output chunk:\n[%s]\n", d->in_line);
#endif
            memset(d->in_line, 0, strlen(d->in_line));
            if (n_out_bytes_read_from_gunzip > 0)
                d->n_out_bytes_read_from_gunzip = n_out_bytes_read_from_gunzip;
            d->n_in_bytes_written_to_gunzip = 0;
            pthread_cond_signal(&d->in_cond);
        }
        if (d->in_eof && (d->n_in_bytes_written_to_gunzip == 0))
            break;
    }
    pthread_mutex_unlock(&d->in_lock);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> consume_gunzip_chunk()\n");
#endif
    return NULL;
}

这会尝试readgunzip进程的输出文件描述符中获取任何可用字节.出于调试目的,我只想暂时打印它们stderr.

我面临的问题是我需要在执行之前添加一个sleep(1)语句consume_gunzip_chunk,read以使事情正常工作.

如果没有这个sleep(1)语句,我的测试程序通常不输出任何内容 - 除了每8-10次尝试一次,正确提取压缩数据.

问题 - 对于我的条件安排我做错了什么,这样sleep(1)需要调用才能使gzip提取正常工作?在生产场景中,使用更大的输入文件,强制每1kB等待一秒似乎是一个坏主意.


有关完整源代码的可重现性,以下是两个相关文件.这是标题:

/*
 * convert.h
 */

#ifndef CONVERT_H
#define CONVERT_H

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define CB_VERSION "1.0"
#define LINE_LENGTH_VALUE 65536
#define BUF_LENGTH_VALUE 1024
#define POPEN3_READ 0
#define POPEN3_WRITE 1

typedef int boolean;
extern const boolean kTrue;
extern const boolean kFalse;
const boolean kTrue = 1;
const boolean kFalse = 0;

typedef enum {
    kGzip,
    kUnknown
} format_t;

typedef struct pthread_data pthread_data_t;
typedef struct popen3_desc popen3_desc_t;

struct pthread_data {
    pthread_mutex_t in_lock;
    pthread_cond_t in_cond;
    pthread_cond_t out_cond;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes;
    size_t n_in_bytes_written_to_gunzip;
    size_t n_out_bytes_read_from_gunzip;
    boolean in_eof;
    FILE *in_file_ptr;
    popen3_desc_t *gunzip_ptr;
    char in_line[LINE_LENGTH_VALUE];
};

struct popen3_desc {
    int in;
    int out;
    int err;
};

static const char *name = "convert";
static const char *version = CB_VERSION;
static const char *authors = "Alex Reynolds";
static const char *usage = "\n" \
    "Usage: convert --input-format=str \n" \
    "  Process Flags:\n\n" \
    "  --input-format=str            | -f str  Input format (str = [ gzip ]; required)\n" \
    "  --help                        | -h      Show this usage message\n";

static struct convert_globals_t {
    char *input_format_str;
    format_t input_format;
    char **filenames;
    int num_filenames;
} convert_globals;

static struct option convert_client_long_options[] = {
    { "input-format",           required_argument,  NULL,   'f' },
    { "help",               no_argument,        NULL,   'h' },
    { NULL,             no_argument,        NULL,    0  }
}; 

static const char *convert_client_opt_string = "f:h?";

void * consume_gunzip_chunk        (void *t_data);
void * consume_gzip_chunk          (void *t_data);
void * produce_gzip_chunk          (void *t_data);
FILE * new_file_ptr                (const char *in_fn);
void   delete_file_ptr             (FILE **file_ptr);
pid_t  popen3                      (const char *command, 
                                    int *in_desc, 
                                    int *out_desc, 
                                    int *err_desc, 
                                    boolean nonblock_in, 
                                    boolean nonblock_outerr);
off_t  fsize                       (const char *fn);
void   initialize_globals          ();
void   parse_command_line_options  (int argc, 
                                    char **argv);
void   print_usage                 (FILE *stream);

#endif

这是实施:

/*
 * convert.c
 */

#include "convert.h"

int main(int argc, char **argv)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> main()\n");
#endif

    pthread_t produce_gzip_chunk_thread = NULL;
    pthread_t consume_gzip_chunk_thread = NULL;
    pthread_t consume_gunzip_chunk_thread = NULL;
    pthread_data_t *thread_data = NULL;

    parse_command_line_options(argc, argv);

    /* initialize thread data */
    thread_data = malloc(sizeof(pthread_data_t));
    thread_data->n_in_bytes = 0;
    thread_data->n_in_bytes_written_to_gunzip = 0;
    thread_data->n_out_bytes_read_from_gunzip = 0;
    thread_data->in_eof = kFalse;
    thread_data->in_file_ptr = new_file_ptr(convert_globals.filenames[0]);
    pthread_mutex_init(&(thread_data->in_lock), NULL);
    pthread_cond_init(&(thread_data->in_cond), NULL);
    pthread_cond_init(&(thread_data->out_cond), NULL);

    /* parse input */
    if (convert_globals.input_format == kGzip) 
        {
            if (pthread_create(&produce_gzip_chunk_thread, NULL, produce_gzip_chunk, (void *) thread_data) != 0) {
                fprintf(stderr, "Error: Could not create gzip chunk production thread\n");
                return EXIT_FAILURE;
            }
            if (pthread_create(&consume_gzip_chunk_thread, NULL, consume_gzip_chunk, (void *) thread_data) != 0) {
                fprintf(stderr, "Error: Could not create gzip chunk consumption thread\n");            
                return EXIT_FAILURE;
            }
            if (pthread_create(&consume_gunzip_chunk_thread, NULL, consume_gunzip_chunk, (void *) thread_data) != 0) {
                fprintf(stderr, "Error: Could not create gunzip chunk consumption thread\n");            
                return EXIT_FAILURE;
            }
            if (pthread_join(produce_gzip_chunk_thread, NULL) != 0) {
                fprintf(stderr, "Error: Could not join gzip chunk production thread\n");
                return EXIT_FAILURE;
            }
            if (pthread_join(consume_gzip_chunk_thread, NULL) != 0) {
                fprintf(stderr, "Error: Could not join gzip chunk consumption thread\n");
                return EXIT_FAILURE;
            }
            if (pthread_join(consume_gunzip_chunk_thread, NULL) != 0) {
                fprintf(stderr, "Error: Could not join gunzip chunk consumption thread\n");
                return EXIT_FAILURE;
            }
        }
    else
        {
            /* 
               handle text formats
            */
        }

    /* cleanup */
    delete_file_ptr(&thread_data->in_file_ptr);
    pthread_mutex_destroy(&(thread_data->in_lock));
    pthread_cond_destroy(&(thread_data->in_cond));
    pthread_cond_destroy(&(thread_data->out_cond));
    free(thread_data);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> main()\n");
#endif
    return EXIT_SUCCESS;
}

void * consume_gunzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gunzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_out_bytes_read_from_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        while (d->n_in_bytes_written_to_gunzip == 0) {
            pthread_cond_wait(&d->out_cond, &d->in_lock);
        }
        if (d->n_in_bytes_written_to_gunzip) {
            sleep(1);
            n_out_bytes_read_from_gunzip = read(d->gunzip_ptr->out, d->in_line, LINE_LENGTH_VALUE);
#ifdef DEBUG
            fprintf(stderr, "Debug: ------------------------ read [%07ld] bytes out from the gunzip process\n", n_out_bytes_read_from_gunzip);
            fprintf(stderr, "Debug: ------------------------ gunzip output chunk:\n[%s]\n", d->in_line);
#endif
            memset(d->in_line, 0, strlen(d->in_line));
            if (n_out_bytes_read_from_gunzip > 0)
                d->n_out_bytes_read_from_gunzip = n_out_bytes_read_from_gunzip;
            d->n_in_bytes_written_to_gunzip = 0;
            pthread_cond_signal(&d->in_cond);
        }
        if (d->in_eof && (d->n_in_bytes_written_to_gunzip == 0))
            break;
    }
    pthread_mutex_unlock(&d->in_lock);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> consume_gunzip_chunk()\n");
#endif
    return NULL;
}

void * consume_gzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> consume_gzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    long n_in_bytes_written_to_gunzip;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        while (d->n_in_bytes == 0 && !d->in_eof)
            pthread_cond_wait(&d->in_cond, &d->in_lock);
        if (d->n_in_bytes) {
#ifdef DEBUG
            fprintf(stderr, "Debug: ........ [%07zu] processing chunk\n", d->n_in_bytes);
#endif
            if (!d->gunzip_ptr) {
#ifdef DEBUG
                fprintf(stderr, "Debug: * setting up gunzip ptr\n");
#endif
                d->gunzip_ptr = malloc(sizeof(popen3_desc_t));
                if (!d->gunzip_ptr) {
                    fprintf(stderr, "Error: Could not create gunzip file handle struct\n");
                    exit(EXIT_FAILURE);
                }

                popen3("gunzip -c -", 
                       &(d->gunzip_ptr->in), 
                       &(d->gunzip_ptr->out), 
                       &(d->gunzip_ptr->err), 
                       kTrue, 
                       kTrue);
                memset(d->in_line, 0, LINE_LENGTH_VALUE);
            }
            n_in_bytes_written_to_gunzip = (long) write(d->gunzip_ptr->in, d->in_buf, d->n_in_bytes);
#ifdef DEBUG
            fprintf(stderr, "Debug: ................ wrote [%07ld] bytes into the gunzip process\n", n_in_bytes_written_to_gunzip);
#endif
            if (n_in_bytes_written_to_gunzip > 0)
                d->n_in_bytes_written_to_gunzip = n_in_bytes_written_to_gunzip;

            d->n_in_bytes = 0;
            /* pthread_cond_signal(&d->in_cond); */
            pthread_cond_signal(&d->out_cond);
        }
        if (d->in_eof) 
            break;
    } 
    pthread_mutex_unlock(&d->in_lock);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> consume_gzip_chunk()\n");
#endif
    return NULL;
}

void * produce_gzip_chunk(void *t_data)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> produce_gzip_chunk()\n");
#endif

    pthread_data_t *d = (pthread_data_t *)t_data;
    unsigned char in_buf[BUF_LENGTH_VALUE];
    size_t n_in_bytes = 0;

    d->in_eof = kFalse;

    pthread_mutex_lock(&d->in_lock);
    while(kTrue) {
        n_in_bytes = fread(in_buf, sizeof(in_buf[0]), sizeof(in_buf), d->in_file_ptr);
        if (n_in_bytes > 0) {
            while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
                pthread_cond_wait(&d->in_cond, &d->in_lock);
            memcpy(d->in_buf, in_buf, n_in_bytes);
            d->n_in_bytes = n_in_bytes;
#ifdef DEBUG
            fprintf(stderr, "Debug: ######## [%07zu] produced chunk\n", d->n_in_bytes);
#endif
            pthread_cond_signal(&d->in_cond);
        }
        else if (feof(d->in_file_ptr) || ferror(d->in_file_ptr))
            break;
    } 
    d->in_eof = kTrue;
    pthread_mutex_unlock(&d->in_lock);
    pthread_cond_signal(&d->in_cond);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> produce_gzip_chunk()\n");
#endif
    return NULL;
}

FILE * new_file_ptr(const char *in_fn)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> new_file_ptr()\n");
#endif

    FILE *file_ptr = NULL;
    boolean not_stdin = kTrue;

    not_stdin = strcmp(in_fn, "-");
    file_ptr = (not_stdin) ? fopen(in_fn, "r") : stdin;

    if (!file_ptr) {
        fprintf(stderr, "Error: Could not open input stream\n");
        exit(EXIT_FAILURE);
    }

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> new_file_ptr()\n");
#endif
    return file_ptr;
}

void delete_file_ptr(FILE **file_ptr)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> delete_file_ptr()\n");
#endif

    fclose(*file_ptr);
    *file_ptr = NULL;

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> delete_file_ptr()\n");
#endif
}

pid_t popen3(const char *command, int *in_desc, int *out_desc, int *err_desc, boolean nonblock_in, boolean nonblock_outerr)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> popen3()\n");
#endif

    int p_stdin[2], p_stdout[2], p_stderr[2];
    pid_t pid;

    if (pipe(p_stdin) != 0 || pipe(p_stdout) != 0 || pipe(p_stderr) != 0)
        return -1;

    if (nonblock_in) {
        fcntl(p_stdin[POPEN3_WRITE], F_SETFL, fcntl(p_stdin[POPEN3_WRITE], F_GETFL) | O_NONBLOCK);
    }

    if (nonblock_outerr) {
        fcntl(p_stdout[POPEN3_READ], F_SETFL, fcntl(p_stdout[POPEN3_READ], F_GETFL) | O_NONBLOCK);
        fcntl(p_stderr[POPEN3_READ], F_SETFL, fcntl(p_stderr[POPEN3_READ], F_GETFL) | O_NONBLOCK);
    }

    pid = fork();
    if (pid < 0)
        return pid; /* error */

    if (pid == 0) {
        close(p_stdin[POPEN3_WRITE]);
        close(p_stdout[POPEN3_READ]);
        close(p_stderr[POPEN3_READ]);
        dup2(p_stdin[POPEN3_READ], fileno(stdin));
        dup2(p_stdout[POPEN3_WRITE], fileno(stderr));
        dup2(p_stdout[POPEN3_WRITE], fileno(stdout));
        execl("/bin/sh", "sh", "-c", command, NULL);
        fprintf(stderr, "Error: Could not execl [%s]\n", command);
        exit(EXIT_FAILURE);
    }

    if (in_desc == NULL)
        close(p_stdin[POPEN3_WRITE]);
    else
        *in_desc = p_stdin[POPEN3_WRITE];

    if (out_desc == NULL)
        close(p_stdout[POPEN3_READ]);
    else
        *out_desc = p_stdout[POPEN3_READ];

    if (err_desc == NULL)
        close(p_stderr[POPEN3_READ]);
    else
        *err_desc = p_stderr[POPEN3_READ];

#ifdef DEBUG
    fprintf(stderr, "Debug: New *in_desc  = %d\n", *in_desc);
    fprintf(stderr, "Debug: New *out_desc = %d\n", *out_desc);
    fprintf(stderr, "Debug: New *err_desc = %d\n", *err_desc);
#endif

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> popen3()\n");
#endif
    return pid;
}

off_t fsize(const char *fn) 
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> fsize()\n");
#endif

    struct stat st; 

    if (stat(fn, &st) == 0)
        return st.st_size;

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> fsize()\n");
#endif
    return EXIT_FAILURE; 
}

void initialize_globals()
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> initialize_globals()\n");
#endif

    convert_globals.input_format = kUnknown;
    convert_globals.filenames = NULL;
    convert_globals.num_filenames = 0;

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> initialize_globals()\n");
#endif
}

void parse_command_line_options(int argc, char **argv)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> parse_command_line_options()\n");
#endif

    int client_long_index;
    int client_opt = getopt_long(argc, 
                                 argv, 
                                 convert_client_opt_string, 
                                 convert_client_long_options, 
                                 &client_long_index);
    char *in_format_str = NULL;

    opterr = 0; /* disable error reporting by GNU getopt */
    initialize_globals();

    while (client_opt != -1) 
        {
            switch (client_opt) 
                {
                case 'f':
                    in_format_str = optarg;
                    break;
                case 'h':
                    print_usage(stdout);
                    exit(EXIT_SUCCESS);
                case '?':
                    print_usage(stdout);
                    exit(EXIT_SUCCESS);
                default:
                    break;
                }
            client_opt = getopt_long(argc, 
                                     argv, 
                                     convert_client_opt_string, 
                                     convert_client_long_options, 
                                     &client_long_index);
        }

    convert_globals.filenames = argv + optind;
    convert_globals.num_filenames = argc - optind;    

    if (!in_format_str) {
        fprintf(stderr, "Error: Specified input format was omitted; please specify one of required input formats\n");
        print_usage(stderr);
        exit(EXIT_FAILURE);
    }
    else if (convert_globals.num_filenames != 1) {
        fprintf(stderr, "Error: Please specify an input file (either a regular file or '-' for stdin\n");
        print_usage(stderr);
        exit(EXIT_FAILURE);
    }

    /* map format string to setting */
    if (strcmp(in_format_str, "gzip") == 0)
        convert_globals.input_format = kGzip;
    else {
        fprintf(stderr, "Error: Specified input format is unknown; please specify one of required input formats\n");
        print_usage(stderr);
        exit(EXIT_FAILURE);
    }

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> parse_command_line_options()\n");
#endif
}

void print_usage(FILE *stream)
{
#ifdef DEBUG
    fprintf(stderr, "Debug: Entering --> print_usage()\n");
#endif

    fprintf(stream, 
            "%s\n" \
            "  version: %s\n" \
            "  author:  %s\n" \
            "%s\n", 
            name, 
            version,
            authors,
            usage);

#ifdef DEBUG
    fprintf(stderr, "Debug: Leaving  --> print_usage()\n");
#endif
}

这是构建过程:

$ mkdir -p objects
$ cc -Wall -Wextra -pedantic -std=c99 -D__STDC_CONSTANT_MACROS -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE=1 -DDEBUG=1 -g -O0 -fno-inline -c convert.c -o objects/convert.o -iquote${PWD}                                                        
$ cc -Wall -Wextra -pedantic -std=c99 -D__STDC_CONSTANT_MACROS -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE=1 -DDEBUG=1 -g -O0 -fno-inline objects/convert.o -o convert -lpthread

我已经能够在OS X和Linux主机上使用相当现代的编译环境构建此测试代码.

提前感谢任何有用的建议!

1 个回答
  • 我将首先说我觉得pthreads条件和互斥量在这里并不是真的有必要,非阻塞I/O也不是对你描述的问题的最佳反应.

    在我看来,你用条件和互斥量版本描述的问题是忘记close()刻苦管道末端的症状,结果是导致子进程stdin泄露的管道的写入结束文件描述符的副本泄露(进入那个孩子或其他人)活着.

    然后,鉴于与stdin的读取端相对应的写入端仍然存在,系统没有给出EOF,而是无限期地阻塞.

    在您的情况下,您确实阻止了管道端文件描述符泄漏到生成的子级(在您close()的子级端的正确调用,尽管您忘记了父端的错误端管道端) .但是,你并没有阻止这种泄漏给所有其他孩子!如果你调用两次,就会阻止三个描述符集合泄漏到子节点中,但是当父节点仍然拥有它们时,当下一次调用发生时,现在有6个文件描述符要关闭(旧的三个和你刚刚创建的新三件套).fork()popen3()close()popen3()popen3()fork()

    因此,在您的情况下,您应该在这些管道末端设置close-on-exec标志,因此:

    fcntl(fdIn [PIPEWR], F_SETFD, fcntl(fdIn [PIPEWR], F_GETFD) | FD_CLOEXEC);
    fcntl(fdOut[PIPERD], F_SETFD, fcntl(fdOut[PIPERD], F_GETFD) | FD_CLOEXEC);
    fcntl(fdErr[PIPERD], F_SETFD, fcntl(fdErr[PIPERD], F_GETFD) | FD_CLOEXEC);
    

    下面是产生6个线程和3个进程的代码,并在内部压缩然后解压缩后将其输入未修改传递给输出.它有效地实现gzip -c - | XOR 0x55 | XOR 0x55 | gunzip -c - | cat,其中:

      标准输入gzip由线程送入srcThrd.

      gzip的输出由线程读取a2xor0Thrd并送入线程xor0Thrd.

      线程在将xor0Thrd输入0x55传递给线程之前对其输入进行异或xor1Thrd.

      线程在将xor1Thrd输入0x55传递给线程之前对其输入进行异或xor22BThrd.

      线程xor22BThrd将其输入提供给进程gunzip.

      进程gunzip直接(不通过线程)将其输出提供给cat

      进程cat的输出由线程读取dstThrd并打印到标准输出.

    压缩由进程间管道通信完成,而XORing由进程内管道通信完成.不使用互斥锁或条件变量.main()非常容易理解.此代码应该很容易扩展到您的情况.

    /* Includes */
    #include <stdlib.h>
    #include <pthread.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <fcntl.h>
    
    
    
    /* Defines */
    #define PIPERD 0
    #define PIPEWR 1
    
    
    
    
    /* Data structures */
    typedef struct PIPESET{
        int Ain[2];
        int Aout[2];
        int Aerr[2];
        int xor0[2];
        int xor1[2];
        int xor2[2];
        int Bin[2];
        int BoutCin[2];
        int Berr[2];
        int Cout[2];
        int Cerr[2];
    } PIPESET;
    
    
    
    
    /* Function Implementations */
    
    /**
     * Source thread main method.
     * 
     * Slurps from standard input and feeds process A.
     */
    
    void* srcThrdMain(void* arg){
        PIPESET* pipeset = (PIPESET*)arg;
    
        char c;
        while(read(0, &c, 1) > 0){
            write(pipeset->Ain[PIPEWR], &c, 1);
        }
    
        close(pipeset->Ain[PIPEWR]);
    
        pthread_exit(NULL);
    }
    
    /**
     * A to XOR0 thread main method.
     * 
     * Manually pipes from standard output of process A to input of thread XOR0.
     */
    
    void* a2xor0ThrdMain(void* arg){
        PIPESET* pipeset = (PIPESET*)arg;
    
        char    buf[65536];
        ssize_t bytesRead;
    
        while((bytesRead = read(pipeset->Aout[PIPERD], buf, 65536)) > 0){
            write(pipeset->xor0[PIPEWR], buf, bytesRead);
        }
    
        close(pipeset->xor0[PIPEWR]);
    
        pthread_exit(NULL);
    }
    
    /**
     * XOR0 thread main method.
     * 
     * XORs input with 0x55 and outputs to input of XOR1.
     */
    
    void* xor0ThrdMain(void* arg){
        PIPESET* pipeset = (PIPESET*)arg;
    
        char c;
        while(read(pipeset->xor0[PIPERD], &c, 1) > 0){
            c ^= 0x55;
            write(pipeset->xor1[PIPEWR], &c, 1);
        }
    
        close(pipeset->xor1[PIPEWR]);
    
        pthread_exit(NULL);
    }
    
    /**
     * XOR1 thread main method.
     * 
     * XORs input with 0x55 and outputs to input of process B.
     */
    
    void* xor1ThrdMain(void* arg){
        PIPESET* pipeset = (PIPESET*)arg;
    
        char c;
        while(read(pipeset->xor1[PIPERD], &c, 1) > 0){
            c ^= 0x55;
            write(pipeset->xor2[PIPEWR], &c, 1);
        }
    
        close(pipeset->xor2[PIPEWR]);
    
        pthread_exit(NULL);
    }
    
    /**
     * XOR2 to B thread main method.
     * 
     * Manually pipes from input (output of XOR1) to input of process B.
     */
    
    void* xor22BThrdMain(void* arg){
        PIPESET* pipeset = (PIPESET*)arg;
    
        char    buf[65536];
        ssize_t bytesRead;
    
        while((bytesRead = read(pipeset->xor2[PIPERD], buf, 65536)) > 0){
            write(pipeset->Bin[PIPEWR], buf, bytesRead);
        }
    
        close(pipeset->Bin[PIPEWR]);
    
        pthread_exit(NULL);
    }
    
    /**
     * Destination thread main method.
     * 
     * Manually copies the standard output of process C to the standard output.
     */
    
    void* dstThrdMain(void* arg){
        PIPESET* pipeset = (PIPESET*)arg;
    
        char c;
        while(read(pipeset->Cout[PIPERD], &c, 1) > 0){
            write(1, &c, 1);
        }
    
        pthread_exit(NULL);
    }
    
    /**
     * Set close on exec flag on given descriptor.
     */
    
    void setCloExecFlag(int fd){
        fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
    }
    
    /**
     * Set close on exec flag on given descriptor.
     */
    
    void unsetCloExecFlag(int fd){
        fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) & ~FD_CLOEXEC);
    }
    
    /**
     * Pipe4.
     * 
     * Create a pipe with some ends possibly marked close-on-exec.
     */
    
    #define PIPE4_FLAG_NONE       (0U)
    #define PIPE4_FLAG_RD_CLOEXEC (1U << 0)
    #define PIPE4_FLAG_WR_CLOEXEC (1U << 1)
    
    int pipe4(int fd[2], int flags){
        int ret = pipe(fd);
    
        if(flags&PIPE4_FLAG_RD_CLOEXEC){setCloExecFlag(fd[PIPERD]);}
        if(flags&PIPE4_FLAG_WR_CLOEXEC){setCloExecFlag(fd[PIPEWR]);}
    
        return ret;
    }
    
    /**
     * Pipe4 explicit derivatives.
     */
    
    #define pipe4_cloexec(fd)  pipe4((fd), PIPE4_FLAG_RD_CLOEXEC|PIPE4_FLAG_WR_CLOEXEC)
    
    /**
     * Popen4.
     * 
     * General-case for spawning a process and tethering it with cloexec pipes on stdin,
     * stdout and stderr.
     * 
     * @param [in]      cmd    The command to execute.
     * @param [in/out]  pin    The pointer to the cloexec pipe for stdin.
     * @param [in/out]  pout   The pointer to the cloexec pipe for stdout.
     * @param [in/out]  perr   The pointer to the cloexec pipe for stderr.
     * @param [in]      flags  A bitwise OR of flags to this function. Available
     *                         flags are:
     * 
     *     POPEN4_FLAG_NONE:
     *         Explicitly specify no flags.
     *     POPEN4_FLAG_NOCLOSE_PARENT_STDIN,
     *     POPEN4_FLAG_NOCLOSE_PARENT_STDOUT,
     *     POPEN4_FLAG_NOCLOSE_PARENT_STDERR:
     *         Don't close pin[PIPERD], pout[PIPEWR] and perr[PIPEWR] in the parent,
     *         respectively.
     *     POPEN4_FLAG_CLOSE_CHILD_STDIN,
     *     POPEN4_FLAG_CLOSE_CHILD_STDOUT,
     *     POPEN4_FLAG_CLOSE_CHILD_STDERR:
     *         Close the respective streams in the child. Ignores pin, pout and perr
     *         entirely. Overrides a NOCLOSE_PARENT flag for the same stream.
     */
    
    #define POPEN4_FLAG_NONE                             (0U)
    #define POPEN4_FLAG_NOCLOSE_PARENT_STDIN        (1U << 0)
    #define POPEN4_FLAG_NOCLOSE_PARENT_STDOUT       (1U << 1)
    #define POPEN4_FLAG_NOCLOSE_PARENT_STDERR       (1U << 2)
    #define POPEN4_FLAG_CLOSE_CHILD_STDIN           (1U << 3)
    #define POPEN4_FLAG_CLOSE_CHILD_STDOUT          (1U << 4)
    #define POPEN4_FLAG_CLOSE_CHILD_STDERR          (1U << 5)
    
    pid_t popen4(const char* cmd, int pin[2], int pout[2], int perr[2], int flags){
        /********************
         **  FORK PROCESS  **
         ********************/
        pid_t ret = fork();
    
        if(ret < 0){
            /**
             * Error in fork(), still in parent.
             */
    
            fprintf(stderr, "fork() failed!\n");
            return ret;
        }else if(ret == 0){
            /**
             * Child-side of fork
             */
    
            if(flags & POPEN4_FLAG_CLOSE_CHILD_STDIN){
                close(0);
            }else{
                unsetCloExecFlag(pin [PIPERD]);
                dup2(pin [PIPERD], 0);
            }
            if(flags & POPEN4_FLAG_CLOSE_CHILD_STDOUT){
                close(1);
            }else{
                unsetCloExecFlag(pout[PIPEWR]);
                dup2(pout[PIPEWR], 1);
            }
            if(flags & POPEN4_FLAG_CLOSE_CHILD_STDERR){
                close(2);
            }else{
                unsetCloExecFlag(perr[PIPEWR]);
                dup2(perr[PIPEWR], 2);
            }
    
            execl("/bin/sh", "sh", "-c", cmd, NULL);
    
            fprintf(stderr, "exec() failed!\n");
            exit(-1);
        }else{
            /**
             * Parent-side of fork
             */
    
            if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDIN  &&
               ~flags & POPEN4_FLAG_CLOSE_CHILD_STDIN){
                close(pin [PIPERD]);
            }
            if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDOUT &&
               ~flags & POPEN4_FLAG_CLOSE_CHILD_STDOUT){
                close(pout[PIPEWR]);
            }
            if(~flags & POPEN4_FLAG_NOCLOSE_PARENT_STDERR &&
               ~flags & POPEN4_FLAG_CLOSE_CHILD_STDERR){
                close(perr[PIPEWR]);
            }
    
            return ret;
        }
    
        /* Unreachable */
        return ret;
    }
    
    /**
     * Main Function.
     * 
     * Sets up the whole piping scheme.
     */
    
    int main(int argc, char* argv[]){
        pthread_t srcThrd, a2xor0Thrd, xor0Thrd, xor1Thrd, xor22BThrd, dstThrd;
        pid_t     gzip, gunzip, cat;
        PIPESET   pipeset;
    
        pipe4_cloexec(pipeset.Ain);
        pipe4_cloexec(pipeset.Aout);
        pipe4_cloexec(pipeset.Aerr);
        pipe4_cloexec(pipeset.Bin);
        pipe4_cloexec(pipeset.BoutCin);
        pipe4_cloexec(pipeset.Berr);
        pipe4_cloexec(pipeset.Cout);
        pipe4_cloexec(pipeset.Cerr);
        pipe4_cloexec(pipeset.xor0);
        pipe4_cloexec(pipeset.xor1);
        pipe4_cloexec(pipeset.xor2);
    
        /* Spawn processes */
        gzip   = popen4("gzip -c -",   pipeset.Ain,     pipeset.Aout,    pipeset.Aerr, POPEN4_FLAG_NONE);
        gunzip = popen4("gunzip -c -", pipeset.Bin,     pipeset.BoutCin, pipeset.Berr, POPEN4_FLAG_NONE);
        cat    = popen4("cat",         pipeset.BoutCin, pipeset.Cout,    pipeset.Cerr, POPEN4_FLAG_NONE);
    
    
        /* Spawn threads */
        pthread_create(&srcThrd,    NULL, srcThrdMain,    &pipeset);
        pthread_create(&a2xor0Thrd, NULL, a2xor0ThrdMain, &pipeset);
        pthread_create(&xor0Thrd,   NULL, xor0ThrdMain,   &pipeset);
        pthread_create(&xor1Thrd,   NULL, xor1ThrdMain,   &pipeset);
        pthread_create(&xor22BThrd, NULL, xor22BThrdMain, &pipeset);
        pthread_create(&dstThrd,    NULL, dstThrdMain,    &pipeset);
        pthread_join(srcThrd,    (void**)NULL);
        pthread_join(a2xor0Thrd, (void**)NULL);
        pthread_join(xor0Thrd,   (void**)NULL);
        pthread_join(xor1Thrd,   (void**)NULL);
        pthread_join(xor22BThrd, (void**)NULL);
        pthread_join(dstThrd,    (void**)NULL);
        return 0;
    }
    

    评论你自己的代码

    您的代码存在许多问题,其中大部分与线程无关.

    你没有close()文件描述符d->gunzip_ptr->in.这意味着gunzip永远不会知道其上没有更多的输入stdin,所以它永远不会退出.

    因为gunzip永远不会退出,所以它永远不会close()是stdout,因此read()在另一端的阻塞永远不会解除阻塞.相反-1,非阻塞读取将始终给出errno == EAGAIN.

    popen3()没有close() p_stdin[POPEN3_READ],p_stdout[POPEN3_WRITE]或者p_stderr[POPEN3_WRITE]在父母方面fork().只有孩子应该有这些描述符.未能关闭这些意味着当父本身试图读取子节点的stdout和stderr时,它将永远不会看到EOF,原因与上面相同:因为它本身仍然拥有一个写端管道,它可以在其中写入,使新数据出现在读取端.

    你的代码隐含地依赖于gunzip为你写入的每1024个字写出至少一个字节.不能保证会出现这种情况,因为它gunzip可以在闲暇时内部缓冲.

    这是因为您的代码读取然后将最多BUF_LENGTH_VALUE字节的块复制到d->in_buf.然后,指定您通过读取的字节数fread()d->n_in_bytes.d->n_in_bytes在你write()写入gunzipstdin的调用中使用了同样的东西.然后你发出consume_gunzip_chunk()唤醒信号,接着pthread_cond_wait()是下一个gzip压缩块.但是这个gzip压缩的块可能永远不会出现,因为不能保证gunzip只能从输入的前1024个字节解压缩有用的输出,甚至不保证它会write()输出而不是缓冲它直到它有,比如说,4096字节(整页)的输出.因此,read()呼入consume_gunzip_chunk()可能永远不会成功(甚至返回,如果read()是阻止).如果read()永远不会返回,那么consume_gunzip_chunk()就不会发出信号d->in_cond,因此所有三个线程都会卡住.即使read()是非阻塞,gzip的最后一个输出块也可能永远不会出现,因为它gzip的输入永远不会被关闭,所以它不会write()通过它们将它们清除掉,所以read()在另一端永远不会有用的数据,没有任何数量的恳求将引发它没有close().

    可能的(可能?)BUG的原因:d->n_out_bytes_read_from_gunzip一旦它变为非0,将永远不会0再次成为.这意味着极其莫名其妙

    while (d->n_in_bytes != 0 || d->n_out_bytes_read_from_gunzip != 0)
        pthread_cond_wait(&d->in_cond, &d->in_lock);
    

    produce_gzip_chunk()遗嘱中,一旦进入d->n_out_bytes_read_from_gunzip != 0,将永远陷入困境.通过调用sleep(1)内部consume_gunzip_chunk(),哪些设置d->n_out_bytes_read_from_gunzip,您可以通过读取所有输入来解决问题,然后consume_gunzip_chunk()可以通过设置d->n_out_bytes_read_from_gunzip为非零值来锁定系统.

    两个线程可以调用pthread_cond_wait(&d->in_cond, &d->in_lock);,这些是produce_gzip_chunk()consume_gzip_chunk().绝对不能保证在consume_gunzip_chunk()通话时pthread_cond_signal(&d->in_cond);,"正确"的线程(无论你的设计中是哪个)都会收到信号.为了确保所有这些都能使用pthread_cond_broadcast(),但是你会让自己暴露在雷鸣般的群体问题中.pthread_cond_broadcast()在我看来,需要在这种情况下使用也是一种糟糕设计的症状.

    相关的,你pthread_cond_signal(&d->in_cond)在一个你调用的线程(实际上是一个函数)中调用pthread_cond_wait(&d->in_cond, &d->in_lock).这有什么用途?

    您使用d->in_lock太多不同的目的,使自己暴露于死锁的可能性,或由于过度保护而导致性能低下.特别是你用它作为既保护d->in_condd->out_cond.这是太强大保护-输出gunzipd->in_line应该能够与输入同时发生gunzip被写入流入和流出d->in_buf.

    在内consume_gunzip_chunk(),你有

    while (d->n_in_bytes_written_to_gunzip == 0) {
        pthread_cond_wait(&d->out_cond, &d->in_lock);
    }
    if (d->n_in_bytes_written_to_gunzip) {
        ...
    

    if绝不会失败!你有没有想过的案例?

    考虑制作整个struct pthread_datavolatile(或至少是多个线程使用的那些整数元素),因为编译器可能决定优化实际应该保留的加载和存储.

    赞美

    为了不听起来太消极,我想说一般来说你的问题不是由于误用pthreads API而是由于错误的消费者 - 生产者逻辑和缺乏close()s.此外,您似乎明白pthread_cond_wait()可能会虚假地唤醒,因此您将其包装在一个检查不变量的循环中.

    在将来:

    我会使用管道,甚至在线程之间.这使您无需实施自己的消费者 - 生产者计划; 内核已经解决了这个问题已经为你,并为您提供的pipe(),read()write()原语,这些都是你需要考虑这个现成的解决方案的优势.它还使代码更清晰,没有互斥锁和条件变量.人们必须简单地勤奋地关闭两端,并且必须在存在的情况下在管道周围非常小心fork().规则很简单:

    如果存在管道的写入端,则read()打开的读取端不会给出EOF但会阻塞或EAGAIN.

    如果管道的写入端都已关闭,则read()打开的读端将显示EOF.

    如果管道的读取端都已关闭,则其write()任何一个写入端都将导致SIGPIPE.

    fork()复制整个过程,包括所有描述符(模数可能是疯狂的东西pthread_atfork())!

    2023-01-07 15:40 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有