热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

PythonAPI操作Hadoophdfs详解

这篇文章主要介绍了PythonAPI操作Hadoophdfs详解,具有很好的参考价值,希望对大家有所帮助。一

Python API 操作Hadoop hdfs详解

http://pyhdfs.readthedocs.io/en/latest/

1:安装

由于是windows环境(linux其实也一样),只要有pip或者setup_install安装起来都是很方便的

>pip install hdfs

2:Client――创建集群连接

> from hdfs import *
> client = Client("http://s100:50070")

其他参数说明:

classhdfs.client.Client(url, root=None, proxy=None, timeout=None, session=None)

url:ip:端口

root:制定的hdfs根目录

proxy:制定登陆的用户身份

timeout:设置的超时时间

session:连接标识

client = Client("http://127.0.0.1:50070",root="/",timeout=100,session=False)
>>> client.list("/")
[u"home",u"input", u"output", u"tmp"]

3:dir――查看支持的方法

>dir(client)

4:status――获取路径的具体信息

其他参数:

status(hdfs_path, strict=True)

hdfs_path:就是hdfs路径

strict:设置为True时,如果hdfs_path路径不存在就会抛出异常,如果设置为False,如果路径为不存在,则返回None

5:list――获取指定路径的子目录信息

>client.list("/")
[u"home",u"input", u"output", u"tmp"]

其他参数:

list(hdfs_path, status=False)

status:为True时,也返回子目录的状态信息,默认为Flase

6:makedirs――创建目录

>client.makedirs("/123")

其他参数:makedirs(hdfs_path, permission=None)

permission:设置权限

>client.makedirs("/test",permission=777)

7: rename―重命名

>client.rename("/123","/test")

8:delete―删除

>client.delete("/test")

其他参数:

delete(hdfs_path, recursive=False)

recursive:删除文件和其子目录,设置为False如果不存在,则会抛出异常,默认为False

9:upload――上传数据

>client.upload("/test","F:[PPT]Google Protocol Buffers.pdf");

其他参数:

upload(hdfs_path, local_path, overwrite=False, n_threads=1, temp_dir=None,

chunk_size=65536,progress=None, cleanup=True, **kwargs)

overwrite:是否是覆盖性上传文件

n_threads:启动的线程数目

temp_dir:当overwrite=true时,远程文件一旦存在,则会在上传完之后进行交换

chunk_size:文件上传的大小区间

progress:回调函数来跟踪进度,为每一chunk_size字节。它将传递两个参数,文件上传的路径和传输的字节数。一旦完成,-1将作为第二个参数

cleanup:如果在上传任何文件时发生错误,则删除该文件

10:download――下载

>client.download("/test/NOTICE.txt","/home")

11:read――读取文件

withclient.read("/test/[PPT]Google Protocol Buffers.pdf") as reader:
print reader.read()

其他参数:

read(*args, **kwds)

hdfs_path:hdfs路径

offset:设置开始的字节位置

length:读取的长度(字节为单位)

buffer_size:用于传输数据的字节的缓冲区的大小。默认值设置在HDFS配置。

encoding:制定编码

chunk_size:如果设置为正数,上下文管理器将返回一个发生器产生的每一chunk_size字节而不是一个类似文件的对象

delimiter:如果设置,上下文管理器将返回一个发生器产生每次遇到分隔符。此参数要求指定的编码。

progress:回调函数来跟踪进度,为每一chunk_size字节(不可用,如果块大小不是指定)。它将传递两个参数,文件上传的路径和传输的字节数。称为一次与- 1作为第二个参数。

问题:

1.

hdfs.util.HdfsError: Permission denied: user=dr.who, access=WRITE, inode="/test":root:supergroup:drwxr-xr-x

解决办法是:在配置文件hdfs-site.xml中加入

 
 dfs.permissions 
 false 

/usr/local/hadoop-2.6.4/bin/hadoopjar /usr/local/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar-input <输入目录> # 可以指定多个输入路径,例如:-input "/user/foo/dir1" -input "/user/foo/dir2"

-inputformat<输入格式 JavaClassName> -output <输出目录>-outputformat <输出格式 JavaClassName> -mapper -reducer -combiner -partitioner -cmdenv # 可以传递环境变量,可以当作参数传入到任务中,可以配置多个

-file <依赖的文件> #配置文件,字典等依赖

-D # 作业的属性配置

Map.py:

#!/usr/local/bin/python
import sys
for line in sys.stdin:
 ss = line.strip().split(" ")
 for s in ss:
 if s.strip()!= "":
  print "%s	%s"% (s, 1)

Reduce.py:

#!/usr/local/bin/python

import sys
current_word = None
count_pool = []
sum = 0
for line in sys.stdin:
 word, val = line.strip().split("	")
 if current_word== None:
 current_word = word
 if current_word!= word:
 for count in count_pool:
  sum += count
 print "%s	%s"% (current_word, sum)
 current_word = word
 count_pool = []
 sum = 0
 count_pool.append(int(val))
for count in count_pool:
 sum += count
print "%s	%s"% (current_word, str(sum))

Run.sh:

HADOOP_CMD="/data/hadoop-2.7.0/bin/hadoop"
STREAM_JAR_PATH="/data/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar"
INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr-skipTrash $OUTPUT_PATH

# Step 1.

$HADOOP_CMD jar$STREAM_JAR_PATH 
 -input $INPUT_FILE_PATH_1 
 -output $OUTPUT_PATH 
 -mapper"python map.py" 
 -reducer "pythonred.py" 
 -file ./map.py 
 -file ./red.py

目的:通过python模拟mr,计算每年的最高气温。

1. 查看数据文件,需要截取年份和气温,生成key-value对。

[tianyc@TeletekHbase python]$ cat test.dat 
0067011990999991950051507004...9999999N9+00001+99999999999... 
0043011990999991950051512004...9999999N9+00221+99999999999... 
0043011990999991950051518004...9999999N9-00111+99999999999... 
0043012650999991949032412004...0500001N9+01111+99999999999... 
0043012650999991949032418004...0500001N9+00781+99999999999...

2. 编写map,打印key-value对

[tianyc@TeletekHbase python]$ cat map.py 
import re
import sys
for line in sys.stdin:
 val=line.strip()
 (year,temp)=(val[15:19],val[40:45])
 print "%s	%s" % (year,temp)

[tianyc@TeletekHbase python]$ cat test.dat|python map.py 
1950 +0000
1950 +0022
1950 -0011
1949 +0111
1949 +0078

3. 将结果排序

[tianyc@TeletekHbase python]$ cat test.dat|python map.py |sort
1949 +0078
1949 +0111
1950 +0000
1950 -0011
1950 +0022

4. 编写redurce,对map中间结果进行处理,生成最终结果

[tianyc@TeletekHbase python]$ cat red.py 
import sys
(last_key,max_val)=(None,0)
for line in sys.stdin:
 (key,val)=line.strip().split("	")
 if last_key and last_key!=key:
 print "%s	%s" % (last_key, max_val)
 (last_key, max_val)=(key,int(val))
else:
 (last_key, max_val)=(key,max(max_val,int(val)))
if last_key:
 print "%s	%s" % (last_key, max_val)

5. 执行。

[tianyc@TeletekHbase python]$ cat test.dat|python map.py |sort|python red.py 
1949 111
1950 22

使用python语言进行MapReduce程序开发主要分为两个步骤,一是编写程序,二是用Hadoop Streaming命令提交任务。

还是以词频统计为例

一、程序开发

1、Mapper

for line in sys.stdin:
 filelds = line.strip.split(" ")
 for item in fileds:
 print item+" "+"1"

2、Reducer

import sys
result={}
for line in sys.stdin:
 kvs = line.strip().split(" ")
 k = kvs[0]
 v = kvs[1]
 if k in result:
  result[k]+=1
 else:
  result[k] = 1
 for k,v in result.items():
 print k+" "+v
....

写完发现其实只用map就可以处理了...reduce只用cat就好了

3、运行脚本

1)Streaming简介

Hadoop的MapReduce和HDFS均采用Java进行实现,默认提供Java编程接口,用户通过这些编程接口,可以定义map、reduce函数等等。 

但是如果希望使用其他语言编写map、reduce函数怎么办呢?

Hadoop提供了一个框架Streaming,Streaming的原理是用Java实现一个包装用户程序的MapReduce程序,该程序负责调用hadoop提供的Java编程接口。

2)运行命令

/.../bin/hadoop streaming
-input /..../input
-output /..../output
-mapper "mapper.py"
-reducer "reducer.py"
-file mapper.py
-file reducer.py
-D mapred.job.name ="wordcount"
-D mapred.reduce.tasks = "1"

3)Streaming常用命令

(1)-input :指定作业输入,path可以是文件或者目录,可以使用*通配符,-input选项可以使用多次指定多个文件或目录作为输入。

(2)-output :指定作业输出目录,path必须不存在,而且执行作业的用户必须有创建该目录的权限,-output只能使用一次。

(3)-mapper:指定mapper可执行程序或Java类,必须指定且唯一。

(4)-reducer:指定reducer可执行程序或Java类,必须指定且唯一。

(5)-file, -cacheFile, -cacheArchive:分别用于向计算节点分发本地文件、HDFS文件和HDFS压缩文件,具体使用方法参考文件分发与打包。

(6)numReduceTasks:指定reducer的个数,如果设置-numReduceTasks 0或者-reducer NONE则没有reducer程序,mapper的输出直接作为整个作业的输出。

(7)-jobconf | -D NAME=VALUE:指定作业参数,NAME是参数名,VALUE是参数值,可以指定的参数参考hadoop-default.xml。

-jobconf mapred.job.name="My Job Name"设置作业名

-jobconf mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW设置作业优先级

-jobconf mapred.job.map.capacity=M设置同时最多运行M个map任务

-jobconf mapred.job.reduce.capacity=N设置同时最多运行N个reduce任务

-jobconf mapred.map.tasks 设置map任务个数

-jobconf mapred.reduce.tasks 设置reduce任务个数

-jobconf mapred.compress.map.output 设置map的输出是否压缩

-jobconf mapred.map.output.compression.codec 设置map的输出压缩方式

-jobconf mapred.output.compress 设置reduce的输出是否压缩

-jobconf mapred.output.compression.codec 设置reduce的输出压缩方式

-jobconf stream.map.output.field.separator 设置map输出分隔符

例子:

-D stream.map.output.field.separator=: 以冒号进行分隔

-D stream.num.map.output.key.fields=2 指定在第二个冒号处进行分隔,也就是第二个冒号之前的作为key,之后的作为value

(8)-combiner:指定combiner Java类,对应的Java类文件打包成jar文件后用-file分发。

(9)-partitioner:指定partitioner Java类,Streaming提供了一些实用的partitioner实现,参考KeyBasedFiledPartitoner和IntHashPartitioner。

(10)-inputformat, -outputformat:指定inputformat和outputformat Java类,用于读取输入数据和写入输出数据,分别要实现InputFormat和OutputFormat接口。如果不指定,默认使用TextInputFormat和TextOutputFormat。

(11)cmdenv NAME=VALUE:给mapper和reducer程序传递额外的环境变量,NAME是变量名,VALUE是变量值。

(12)-mapdebug, -reducedebug:分别指定mapper和reducer程序失败时运行的debug程序。

(13)-verbose:指定输出详细信息,例如分发哪些文件,实际作业配置参数值等,可以用于调试。

以上这篇Python API 操作Hadoop hdfs详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持编程笔记。

原文链接:https://blog.csdn.net/qq_36864672/article/details/78586194


推荐阅读
  • 树莓派Linux基础(一):查看文件系统的命令行操作
    本文介绍了在树莓派上通过SSH服务使用命令行查看文件系统的操作,包括cd命令用于变更目录、pwd命令用于显示当前目录位置、ls命令用于显示文件和目录列表。详细讲解了这些命令的使用方法和注意事项。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 如何在服务器主机上实现文件共享的方法和工具
    本文介绍了在服务器主机上实现文件共享的方法和工具,包括Linux主机和Windows主机的文件传输方式,Web运维和FTP/SFTP客户端运维两种方式,以及使用WinSCP工具将文件上传至Linux云服务器的操作方法。此外,还介绍了在迁移过程中需要安装迁移Agent并输入目的端服务器所在华为云的AK/SK,以及主机迁移服务会收集的源端服务器信息。 ... [详细]
  • 深入解析Linux下的I/O多路转接epoll技术
    本文深入解析了Linux下的I/O多路转接epoll技术,介绍了select和poll函数的问题,以及epoll函数的设计和优点。同时讲解了epoll函数的使用方法,包括epoll_create和epoll_ctl两个系统调用。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • Python实现变声器功能(萝莉音御姐音)的方法及步骤
    本文介绍了使用Python实现变声器功能(萝莉音御姐音)的方法及步骤。首先登录百度AL开发平台,选择语音合成,创建应用并填写应用信息,获取Appid、API Key和Secret Key。然后安装pythonsdk,可以通过pip install baidu-aip或python setup.py install进行安装。最后,书写代码实现变声器功能,使用AipSpeech库进行语音合成,可以设置音量等参数。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • 预备知识可参考我整理的博客Windows编程之线程:https:www.cnblogs.comZhuSenlinp16662075.htmlWindows编程之线程同步:https ... [详细]
  • 本文是一篇翻译文章,介绍了async/await的用法和特点。async关键字被放置在函数前面,意味着该函数总是返回一个promise。文章还提到了可以显式返回一个promise的方法。该特性使得async/await更易于理解和使用。本文还提到了一些可能的错误,并希望读者能够指正。 ... [详细]
  • 本文介绍了在RHEL 7中的系统日志管理和网络管理。系统日志管理包括rsyslog和systemd-journal两种日志服务,分别介绍了它们的特点、配置文件和日志查询方式。网络管理主要介绍了使用nmcli命令查看和配置网络接口的方法,包括查看网卡信息、添加、修改和删除配置文件等操作。 ... [详细]
  • Python脚本编写创建输出数据库并添加模型和场数据的方法
    本文介绍了使用Python脚本编写创建输出数据库并添加模型数据和场数据的方法。首先导入相应模块,然后创建输出数据库并添加材料属性、截面、部件实例、分析步和帧、节点和单元等对象。接着向输出数据库中添加场数据和历程数据,本例中只添加了节点位移。最后保存数据库文件并关闭文件。文章还提供了部分代码和Abaqus操作步骤。另外,作者还建立了关于Abaqus的学习交流群,欢迎加入并提问。 ... [详细]
  • Python中的PyInputPlus模块原文:https ... [详细]
  • 大坑|左上角_pycharm连接服务器同步写代码(图文详细过程)
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了pycharm连接服务器同步写代码(图文详细过程)相关的知识,希望对你有一定的参考价值。pycharm连接服务 ... [详细]
author-avatar
伊劾kj
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有