热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

谷歌数据流似乎下降了第1000个记录

如何解决《谷歌数据流似乎下降了第1000个记录》经验,为你挑选了1个好方法。

我使用Google Dataflow(apache-beam)设置了一个小测试.该实验的用例是获取(csv)文件并将选定列写入(txt)文件.

实验代码如下:

from __future__ import absolute_import

import argparse
import logging
import re

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class EmitColDoFn(beam.DoFn):
    first = True
    header = ""
    def __init__(self, i):
        super(EmitColDoFn, self).__init__()
        self.line_count =  Metrics.counter(self.__class__, 'lines')
        self.i = i

    def process(self, element):
        if self.first:
            self.header = element
            self.first = False
        else:
            self.line_count.inc()
            cols = re.split(',', element)
            return (cols[self.i],)

def run(argv=None):
    """Main entry point; defines and runs the wordcount pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='/users/sms/python_beam/data/MOCK_DATA (4).csv',
#                        default='gs://dataflow-samples/shakespeare/kinglear.txt',
                        help='Input file to process.')
    parser.add_argument('--output',
                        dest='output',
                        default="/users/sms/python_beam/data/",
#                        required=True,
                        help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_optiOns= PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(optiOns=pipeline_options)

    # Read the text file[pattern] into a PCollection.
    lines = p | 'read' >> ReadFromText(known_args.input)

    column = (lines
            | 'email col' >> (beam.ParDo(EmitColDoFn(3)))
            | "col file" >> WriteToText(known_args.output, ".txt", shard_name_template="SS_Col"))

    result = p.run()
    result.wait_until_finish()

    if (not hasattr(result, 'has_job')  # direct runner
        or result.has_job):  # not just a template creation
        lines_filter = MetricsFilter().with_name('lines')
        query_result = result.metrics().query(lines_filter)
        if query_result['counters']:
            lines_counter = query_result['counters'][0]

        print "Lines committed", lines_counter.committed
run()

以下样本1的最后几行:

990,Corabel,Feldbau,cfeldbaurh@deliciousdays.com,Female,84.102.162.190,DJ
991,Kiley,Rottcher,krottcherri@stanford.edu,Male,91.97.155.28,CA
992,Glenda,Clist,gclistrj@state.gov,Female,24.98.253.127,UA
993,Ingunna,Maher,imaherrk@army.mil,Female,159.31.127.19,PL
994,Megan,Giacopetti,mgiacopettirl@instagram.com,Female,115.6.63.52,RU
995,Briny,Dutnall,bdutnallrm@xrea.com,Female,102.81.33.24,SE
996,Jan,Caddan,jcaddanrn@jalbum.net,Female,115.142.222.106,PL

运行它会产生预期的输出:

/usr/local/bin/python2.7
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
Lines committed 996

Process finished with exit code 0

现在为了奇怪的结果.在下一次运行中,行数增加到1000.

994,Megan,Giacopetti,mgiacopettirl@instagram.com,Female,115.6.63.52,RU
995,Briny,Dutnall,bdutnallrm@xrea.com,Female,102.81.33.24,SE
996,Jan,Caddan,jcaddanrn@jalbum.net,Female,115.142.222.106,PL
997,Shannen,Gaisford,sgaisfordr7@rediff.com,Female,167.255.222.92,RU
998,Lorianna,Slyne,lslyner8@cbc.ca,Female,54.169.60.13,CN
999,Franklin,Yaakov,fyaakovr9@latimes.com,Male,122.1.92.236,CN
1000,Wilhelmine,Cariss,wcarissra@creativecommons.org,Female,237.48.113.255,PL

但这次出局是

/usr/local/bin/python2.7
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
Lines committed 999

Process finished with exit code 0

检查输出文件显示最后一行未处理.

bdutnallrm@xrea.com
jcaddanrn@jalbum.net
sgaisfordr7@rediff.com
lslyner8@cbc.ca
fyaakovr9@latimes.com

有什么想法在这里发生了什么?



1> Raghu Angadi..:

'EditColDoFn'跳过第一行,假设每个文件都有一个实例.当你有1000多行时,DirectRunner会创建两个包:第一个包含1000行,第二个包含1行.在Beam应用程序中,输入可能会分成多个包,以便并行处理.与文件数和捆绑数无关.相同的应用程序可以处理遍布许多文件的terra字节数据.

ReadFromText 有一个选项'skip_header_lines',您可以设置为1以跳过每个输入文件中的标题行.


推荐阅读
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 本文介绍了在Python3中如何使用选择文件对话框的格式打开和保存图片的方法。通过使用tkinter库中的filedialog模块的asksaveasfilename和askopenfilename函数,可以方便地选择要打开或保存的图片文件,并进行相关操作。具体的代码示例和操作步骤也被提供。 ... [详细]
  • 本文介绍了一个Java猜拳小游戏的代码,通过使用Scanner类获取用户输入的拳的数字,并随机生成计算机的拳,然后判断胜负。该游戏可以选择剪刀、石头、布三种拳,通过比较两者的拳来决定胜负。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 展开全部下面的代码是创建一个立方体Thisexamplescreatesanddisplaysasimplebox.#Thefirstlineloadstheinit_disp ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 阅读spring5源码DefaultSingletonBeanRegistry类遇到问题发现SpringBean中存在大量回调机制和aware接口,于是特意去了解 ... [详细]
  • 使用Flutternewintegration_test进行示例集成测试?回答首先在dev下的p ... [详细]
  • java多线程获取线程返回结果
    我们在使用java多线程编写相关业务代码时,往往有这样一种情况,某个线程依赖于其他线程执行结果。也就是说,我们需要在一个线程中获取另一个线程的信息。可以分为两种情况,一种是轮询,一 ... [详细]
  • python中不同的异常类型,如何进行异常处理?
    一、错误与异常程序中难免会出现错误,而错误分为两种1.语法错误:(这种错误,根本过不了python解释器的语法检测,必须在程序执行前就改正)2.逻辑错误:(逻辑错误),比如用户输入 ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
author-avatar
悶得咪_438
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有