我使用此文件建议http://spark.apache.org/docs/1.1.1/submitting-applications.html
spsark版本1.1.0
./spark/bin/spark-submit --py-files /home/hadoop/loganalysis/parser-src.zip \ /home/hadoop/loganalysis/ship-test.py
和代码中的conf:
conf = (SparkConf() .setMaster("yarn-client") .setAppName("LogAnalysis") .set("spark.executor.memory", "1g") .set("spark.executor.cores", "4") .set("spark.executor.num", "2") .set("spark.driver.memory", "4g") .set("spark.kryoserializer.buffer.mb", "128"))
和slave节点抱怨ImportError
14/12/25 05:09:53 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-172-31-10-8.cn-north-1.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/hadoop/spark/python/pyspark/worker.py", line 75, in main command = pickleSer._read_with_length(infile) File "/home/hadoop/spark/python/pyspark/serializers.py", line 150, in _read_with_length return self.loads(obj) ImportError: No module named parser
和parser-src.zip在本地测试.
[hadoop@ip-172-31-10-231 ~]$ python Python 2.7.8 (default, Nov 3 2014, 10:17:30) [GCC 4.8.2 20140120 (Red Hat 4.8.2-16)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import sys >>> sys.path.insert(1, '/home/hadoop/loganalysis/parser-src.zip') >>> from parser import parser >>> parser.parse>>>
我正在尝试获取有关远程工作人员的信息.看看它是否复制了文件.sys.path看起来是什么......这很棘手.
更新:我用这个发现zip文件已被删除.和sys.path已设置.仍导入获取错误.
data = list(range(4)) disdata = sc.parallelize(data) result = disdata.map(lambda x: "sys.path: {0}\nDIR: {1} \n FILES: {2} \n parser: {3}".format(sys.path, os.getcwd(), os.listdir('.'), str(parser))) result.collect() print(result.take(4))
我似乎不得不深入挖掘cloudpickle.这意味着我需要了解cloudpickle是如何工作的并且首先失败.
: An error occurred while calling o40.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 0.0 failed 4 times, most recent failure: Lost task 4.3 in stage 0.0 (TID 23, ip-172-31-10-8.cn-north-1.compute.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/hadoop/spark/python/pyspark/worker.py", line 75, in main command = pickleSer._read_with_length(infile) File "/home/hadoop/spark/python/pyspark/serializers.py", line 150, in _read_with_length return self.loads(obj) File "/home/hadoop/spark/python/pyspark/cloudpickle.py", line 811, in subimport __import__(name) ImportError: ('No module named parser',, ('parser.parser',))
更新:
有人在火花0.8遇到同样的问题 http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Importing-other-py-files-in-PYTHONPATH-td2301.html
但他把他的lib放在python dist-packages和导入作品中.我试过但仍然导致导入错误.
更新:
OH.gush ..我认为问题是由于不了解zip文件和python导入行为引起的.我将parser.py传递给--py文件,它有效,抱怨另一个依赖.和zip只有.py文件[不包括.pyc]似乎也有效.
但我不太明白为什么.
试试这个功能吧 SparkContext
sc.addPyFile(path)
根据这里的pyspark
文件
为将来在此SparkContext上执行的所有任务添加.py或.zip依赖项.传递的路径可以是本地文件,HDFS(或其他Hadoop支持的文件系统)中的文件,也可以是HTTP,HTTPS或FTP URI.
尝试将您的python模块文件上传到公共云存储(例如AWS S3)并将URL传递给该方法.
这是一个更全面的阅读材料:http://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_python.html
尝试从方法本身而不是在驱动程序脚本的顶部导入自定义模块,例如:
def parse_record(record): import parser p = parser.parse(record) return p
而不是
import parser def parse_record(record): p = parser.parse(record) return p
Cloud Pickle似乎无法识别何时导入自定义模块,因此它似乎尝试挑选顶级模块以及运行该方法所需的其他数据.根据我的经验,这意味着顶层模块似乎存在,但它们缺少可用的成员,并且嵌套模块无法按预期使用.一旦from A import *
从方法(import A.B
)内部导入或从内部导入,模块就按预期工作.