并行读取和处理文件C#

 遗忘的vbnv 发布于 2023-02-06 15:14

我有非常大的文件,我必须阅读和处理.这可以使用线程并行完成吗?

这是我做过的一些代码.但它似乎没有得到更短的执行时间读取和处理文件一个接一个.

String[] files = openFileDialog1.FileNames;

Parallel.ForEach(files, f =>
{
    readTraceFile(f);
});        

private void readTraceFile(String file)
{
    StreamReader reader = new StreamReader(file);
    String line;

    while ((line = reader.ReadLine()) != null)
    {
        String pattern = "\\s{4,}";

        foreach (String trace in Regex.Split(line, pattern))
        {
            if (trace != String.Empty)
            {
                String[] details = Regex.Split(trace, "\\s+");

                Instruction instruction = new Instruction(details[0],
                    int.Parse(details[1]),
                    int.Parse(details[2]));
                Console.WriteLine("computing...");
                instructions.Add(instruction);
            }
        }
    }
}

Kirill Shlen.. 21

看起来您的应用程序的性能主要受IO限制.但是,您的代码中仍然有一些CPU限制工作.这两项工作是相互依赖的:在IO完成其工作之前,您的CPU绑定工作无法启动,并且在CPU完成上一个工作之前,IO不会继续执行下一个工作项.他们互相抱着对方.因此,如果您并行执行IO和CPU绑定工作,可以(在最底部解释)可以看到吞吐量的提高,如下所示:

void ReadAndProcessFiles(string[] filePaths)
{
    // Our thread-safe collection used for the handover.
    var lines = new BlockingCollection();

    // Build the pipeline.
    var stage1 = Task.Run(() =>
    {
        try
        {
            foreach (var filePath in filePaths)
            {
                using (var reader = new StreamReader(filePath))
                {
                    string line;

                    while ((line = reader.ReadLine()) != null)
                    {
                        // Hand over to stage 2 and continue reading.
                        lines.Add(line);
                    }
                }
            }
        }
        finally
        {
            lines.CompleteAdding();
        }
    });

    var stage2 = Task.Run(() =>
    {
        // Process lines on a ThreadPool thread
        // as soon as they become available.
        foreach (var line in lines.GetConsumingEnumerable())
        {
            String pattern = "\\s{4,}";

            foreach (String trace in Regex.Split(line, pattern))
            {
                if (trace != String.Empty)
                {
                    String[] details = Regex.Split(trace, "\\s+");

                    Instruction instruction = new Instruction(details[0],
                        int.Parse(details[1]),
                        int.Parse(details[2]));
                    Console.WriteLine("computing...");
                    instructions.Add(instruction);
                }
            }
        }
    });

    // Block until both tasks have completed.
    // This makes this method prone to deadlocking.
    // Consider using 'await Task.WhenAll' instead.
    Task.WaitAll(stage1, stage2);
}

我非常怀疑这是你的CPU工作,但如果恰好是这种情况,你也可以像这样并行化第2阶段:

    var stage2 = Task.Run(() =>
    {
        var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };

        Parallel.ForEach(lines.GetConsumingEnumerable(), parallelOptions, line =>
        {
            String pattern = "\\s{4,}";

            foreach (String trace in Regex.Split(line, pattern))
            {
                if (trace != String.Empty)
                {
                    String[] details = Regex.Split(trace, "\\s+");

                    Instruction instruction = new Instruction(details[0],
                        int.Parse(details[1]),
                        int.Parse(details[2]));
                    Console.WriteLine("computing...");
                    instructions.Add(instruction);
                }
            }
        });
    });

请注意,如果CPU工作组件与IO组件相比可以忽略不计,那么您将看不到太多的加速.工作量越均匀,与顺序处理相比,管道执行得越好.

由于我们正在讨论性能问题,因此我对上述代码中阻塞调用的数量并不特别兴奋.如果我在我自己的项目中这样做,我会离开async/await路由.在这种情况下,我选择不这样做,因为我希望保持易于理解和易于集成.

2 个回答
  • 从你想要做的事情看,你几乎肯定是I/O约束.在这种情况下尝试并行处理无济于事,实际上可能会因磁盘驱动器上的附加查找操作而导致处理速度变慢(除非您可以将数据拆分为多个轴).

    2023-02-06 15:17 回答
  • 看起来您的应用程序的性能主要受IO限制.但是,您的代码中仍然有一些CPU限制工作.这两项工作是相互依赖的:在IO完成其工作之前,您的CPU绑定工作无法启动,并且在CPU完成上一个工作之前,IO不会继续执行下一个工作项.他们互相抱着对方.因此,如果您并行执行IO和CPU绑定工作,可以(在最底部解释)可以看到吞吐量的提高,如下所示:

    void ReadAndProcessFiles(string[] filePaths)
    {
        // Our thread-safe collection used for the handover.
        var lines = new BlockingCollection<string>();
    
        // Build the pipeline.
        var stage1 = Task.Run(() =>
        {
            try
            {
                foreach (var filePath in filePaths)
                {
                    using (var reader = new StreamReader(filePath))
                    {
                        string line;
    
                        while ((line = reader.ReadLine()) != null)
                        {
                            // Hand over to stage 2 and continue reading.
                            lines.Add(line);
                        }
                    }
                }
            }
            finally
            {
                lines.CompleteAdding();
            }
        });
    
        var stage2 = Task.Run(() =>
        {
            // Process lines on a ThreadPool thread
            // as soon as they become available.
            foreach (var line in lines.GetConsumingEnumerable())
            {
                String pattern = "\\s{4,}";
    
                foreach (String trace in Regex.Split(line, pattern))
                {
                    if (trace != String.Empty)
                    {
                        String[] details = Regex.Split(trace, "\\s+");
    
                        Instruction instruction = new Instruction(details[0],
                            int.Parse(details[1]),
                            int.Parse(details[2]));
                        Console.WriteLine("computing...");
                        instructions.Add(instruction);
                    }
                }
            }
        });
    
        // Block until both tasks have completed.
        // This makes this method prone to deadlocking.
        // Consider using 'await Task.WhenAll' instead.
        Task.WaitAll(stage1, stage2);
    }
    

    我非常怀疑这是你的CPU工作,但如果恰好是这种情况,你也可以像这样并行化第2阶段:

        var stage2 = Task.Run(() =>
        {
            var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };
    
            Parallel.ForEach(lines.GetConsumingEnumerable(), parallelOptions, line =>
            {
                String pattern = "\\s{4,}";
    
                foreach (String trace in Regex.Split(line, pattern))
                {
                    if (trace != String.Empty)
                    {
                        String[] details = Regex.Split(trace, "\\s+");
    
                        Instruction instruction = new Instruction(details[0],
                            int.Parse(details[1]),
                            int.Parse(details[2]));
                        Console.WriteLine("computing...");
                        instructions.Add(instruction);
                    }
                }
            });
        });
    

    请注意,如果CPU工作组件与IO组件相比可以忽略不计,那么您将看不到太多的加速.工作量越均匀,与顺序处理相比,管道执行得越好.

    由于我们正在讨论性能问题,因此我对上述代码中阻塞调用的数量并不特别兴奋.如果我在我自己的项目中这样做,我会离开async/await路由.在这种情况下,我选择不这样做,因为我希望保持易于理解和易于集成.

    2023-02-06 15:18 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有