热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

如何在zeppelin里高效的开发PyFlinkJob?

本文重点介绍下如何在Zeppelin里高效的开发PyFlinkJob,特别是解决PyFlink的环境问题。大家都知道PyFlink的开发环境不容易搭建,

本文重点介绍下如何在 Zeppelin 里高效的开发 PyFlink Job,特别是解决 PyFlink 的环境问题。

大家都知道PyFlink 的开发环境不容易搭建,稍有不慎,PyFlink 环境就会乱掉,而且很难排查原因
本文使用miniconda、conda-pack、mamba分别制作JobManager 上的 PyFlink Conda 环境、TaskManager 上的 PyFlink Conda 环境,然后在 Zeppelin 里使用 PyFlink 以及指定 Conda 环境,这样就可以在一个 Yarn 集群里同时使用多个版本的 PyFlink。

本文使用环境:flink 1.13.2, zeppelin:0.10.0-bin-all

需要改进的地方:


  • 需要创建 2 个 conda env ,原因是 Zeppelin 支持 tar.gz 格式,而 Flink 只支持 zip 格式
  • apache-flink 目前包含了 Flink 的 jar 包,导致打出来的 conda env 特别大(500MB以上),yarn container 在初始化的时候耗时会比较长,这个需要 Flink 社区提供一个轻量级的 Python 包 (不包含 Flink jar 包),就可以大大减小 conda env 的大小。

1. 搭建 PyFlink 环境


1.1.制作 JobManager 上的 PyFlink Conda 环境


  • 注意dependencies所列的第三方包是在 PyFlink 客户端 (JobManager) 需要的包,比如 Matplotlib ,并且确保至少安装了所列的这些包:
  • jupyter,grpcio,protobuf 是Zeppelin 需要的
  • apache-flink 指定flink的版本

echo "name: pyflink_env
channels:- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/
dependencies:- Python=3.7- pip- pip:- apache-flink==1.13.2- jupyter- grpcio- protobuf- matplotlib- pandasql- pandas- scipy- seaborn- plotnine"
> pyflink_env.ymlmamba env remove -n pyflink_env
mamba env create -f pyflink_env.ymlrm -rf pyflink_env.tar.gz
conda pack --ignore-missing-files -n pyflink_env -o pyflink_env.tar.gzhadoop fs -rmr /tmp/pyflink_env.tar.gz
hadoop fs -put pyflink_env.tar.gz /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_env.tar.gz

1.2.制作 TaskManager 上的 PyFlink Conda 环境

echo "name: pyflink_tm_env
channels:- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/
dependencies:- Python=3.7- pip- pip:- apache-flink==1.13.2- pandas"
> pyflink_tm_env.ymlmamba env remove -n pyflink_tm_env
mamba env create -f pyflink_tm_env.ymlrm -rf pyflink_tm_env.zip
conda pack --ignore-missing-files --zip-symlinks -n pyflink_tm_env -o pyflink_tm_env.ziphadoop fs -rmr /tmp/pyflink_tm_env.zip
hadoop fs -put pyflink_tm_env.zip /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_tm_env.zip

1.3.安装本地的flink

下载 flink 1.13并解压,然后:


  • 把 opt目录下的flink-python-*.jar 这个 jar 包 copy 到flink 的 lib 文件夹下;
  • 把 opt/python 这个文件夹 copy 到flink 的 lib 文件夹下。

1.4. 在 PyFlink 中使用 Conda 环境

在 Zeppelin 里配置 Flink,主要配置的选项有:


  • flink.execution.mode 为 yarn-application, 本文所讲的方法只适用于 yarn-application 模式;
  • 指定 yarn.ship-archives,zeppelin.pyflink.Python 以及 zeppelin.interpreter.conda.env.name 来配置 JobManager 侧的 PyFlink Conda 环境;
  • 指定 Python.archives 以及 Python.executable 来指定 TaskManager 侧的 PyFlink Conda 环境;
  • 指定其他可选的 Flink 配置,比如这里的 flink.jm.memory 和 flink.tm.memory。

FLINK_HOME /data/flink/flink-1.13.2
HADOOP_CONF_DIR /etc/hadoop/conf
HIVE_CONF_DIR /etc/hive/conf
flink.execution.mode yarn-applicationzeppelin.pyflink.python pythonyarn.ship-archives /data/flink/pyflink_env.tar.gz
zeppelin.interpreter.conda.env.name pyflink_env.tar.gz
zeppelin.pyflink.Python pyflink_env.tar.gz/bin/python Python.archives hdfs://172.25.21.170:8020/tmp/pyflink_tm_env.zip
Python.executable pyflink_tm_env.zip/bin/python3.7 flink.jm.memory 2048
flink.tm.memory 2048zeppelin.interpreter.connect.timeout 600000

1.5. 使用示例

%flink.ipyflink
%matplotlib inline
import matplotlib.pyplot as plt
plt.plot([1,2,3,4])
plt.ylabel('numbers')
plt.show()%flink.ipyflink
import time
class PandasVersion(ScalarFunction):def eval(self,s):import pandas as pdreturn pd.__version__ + " " + s
bt_env.register_funtion("pandas_version",udf(PandasVersion(),DataTypes.STRING,DataTypes.STRING)%bsql
select pandas_version('hello world')

在这里插入图片描述


附录


准备conda环境

在centos 7.x上安装miniconda


安装miniconda

miniconda是一个免费的conda最小安装程序
它是Anaconda的一个小型的引导版本,只包含conda、Python、它们所依赖的包,以及少量其他有用的包,包括pip、zlib和其他一些包


下载并安装

根据实际需要,下载对应的版本,latest表示最新的python版本,是python3.9.5


  • Miniconda3-py37_4.10.3-Linux64.sh
  • Miniconda3-py38_4.10.3-Linux-x86_64.sh
  • Miniconda3-py39_4.10.3-Linux-x86_64.sh

# 下载软件包
curl -O https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/Miniconda3-latest-Linux-x86_64.sh# 安装:按照指示输入一次回车,接受协议,遇到选择yes,然后就安装好了
bash Miniconda3-latest-Linux-x86_64.sh

检查环境配置


  • 检查是否在文件尾添加如下内容 vi /root/.bashrc
  • 加载环境变量 source /root/.bashrc

# >>> conda initialize >>>
# !! Contents within this block are managed by 'conda init' !!
__conda_setup="$('/data/flink/miniconda3/bin/conda' 'shell.bash' 'hook' 2> /dev/null)"
if [ $? -eq 0 ]; theneval "$__conda_setup"
elseif [ -f "/data/flink/miniconda3/etc/profile.d/conda.sh" ]; then. "/data/flink/miniconda3/etc/profile.d/conda.sh"elseexport PATH="/data/flink/miniconda3/bin:$PATH"fi
fi
unset __conda_setup
# <<

修改镜像地址

将镜像地址修改为国内源
通过如下命令检查配置
conda config --showconda info


  • 命令行方式

conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/
conda config --set show_channel_urls yes

  • 修改文件方式
    vim ~/.condarc

default_channels:- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/
show_channel_urls: true
channels:- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/

安装conda-pack

conda-pack是一个命令行工具&#xff0c;用于创建conda环境的存档文件【archives 】&#xff0c;这些存档文件可以安装在其他系统上&#xff0c;这对于在一致的环境中部署代码非常有用。


安装

conda install -c conda-forge conda-pack


使用conda pack进行环境迁移

# 创建pyhon环境
conda create -n py3.7 python&#61;3.7
conda activate py3.7
conda deactivate
# 查看运行环境列表
conda info -e# 针对py3.7运行环境打包
conda pack --n-threads&#61;&#96;nproc&#96; -n py3.7# 还原环境&#xff1a;将py3.7.tar.gz上传到其它服务器并解压
mkdir /home/test/miniconda3/envs/py3.7
tar -zxvf /tmp/py3.7.tar.gz -C /home/test/miniconda3/envs/py3.7# 查看运行环境列表
conda info -e
# 激活运行环境
conda activate py3.7

mamba安装

mamba是c&#43;&#43;中conda包管理器的重新实现&#xff0c;可以认为是更高级的conda
有以下特点


  • 使用多线程并行下载repository 数据和包文件&#xff0c;实现更高效的安装
  • libsolv可以更快地解决依赖关系&#xff0c;libsolv是Red Hat、Fedora和OpenSUSE的RPM包管理器中使用的最新库
  • mamba的核心部分是用c&#43;&#43;实现的&#xff0c;以获得最大的效率

与此同时&#xff0c;mamba利用了相同的命令行解析器、包安装和卸载代码以及事务验证例程&#xff0c;以尽可能保持与conda的兼容性。


安装

conda install mamba -n base -c conda-forge


额外功能


  • 查找软件包
    mamba repoquery search "pandas>0.20.3"
  • 查看软件包的依赖&#xff08;已安装软件包&#xff09;
    mamba repoquery depends --tree six
  • 查看软件包被谁依赖
    mamba repoquery whoneeds openssl

切换pip镜像源

切换pip镜像源&#xff0c;加速软件包【如flink】的安装

创建或修改~/.pip/pip.conf文件
pip.conf文件的内容如下&#xff1a;

[global]
timeout &#61; 6000
index-url &#61; http://mirrors.aliyun.com/pypi/simple/
[install]
use-mirrors &#61;true
mirrors &#61;http://mirrors.aliyun.com/pypi/simple/
trusted-host &#61;mirrors.aliyun.com

参考链接

Centos 安装 Miniconda
miniconda
conda-pack
mamba
Anaconda、Jupyter的安装部署及使用问题总结
python–切换pip镜像源&#xff0c;加速软件包的安装


推荐阅读
  • 在Python 3环境中,当无法连接互联网时,可以通过下载离线模块包来实现模块的安装。具体步骤包括:首先从PyPI网站下载所需的模块包,然后将其传输到目标环境,并使用`pip install`命令进行本地安装。此方法不仅适用于单个模块,还支持依赖项的批量安装,确保开发环境的完整性和一致性。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 在Linux系统中,原本已安装了多个版本的Python 2,并且还安装了Anaconda,其中包含了Python 3。本文详细介绍了如何通过配置环境变量,使系统默认使用指定版本的Python,以便在不同版本之间轻松切换。此外,文章还提供了具体的实践步骤和注意事项,帮助用户高效地管理和使用不同版本的Python环境。 ... [详细]
  • 在第七天的深度学习课程中,我们将重点探讨DGL框架的高级应用,特别是在官方文档指导下进行数据集的下载与预处理。通过详细的步骤说明和实用技巧,帮助读者高效地构建和优化图神经网络的数据管道。此外,我们还将介绍如何利用DGL提供的模块化工具,实现数据的快速加载和预处理,以提升模型训练的效率和准确性。 ... [详细]
  • 精通jQuery:深入解析事件处理机制与应用技巧
    本文详细探讨了jQuery的事件处理机制及其应用技巧,通过具体的代码示例,逐一解析了每个jQuery代码片段与其对应的HTML结构。文章以标记为基准,CSS作为通用样式,确保每段代码都能独立运行。HTML和CSS代码统一放置在文章末尾,方便读者参考和实践。 ... [详细]
  • 在Linux系统中Nginx环境下SSL证书的安装步骤与WordPress CDN的高级配置指南
    在Linux系统中,Nginx环境下安装SSL证书的具体步骤及WordPress CDN的高级配置指南。首先,安装SSL证书需要准备两个关键配置文件,并建议在操作前备份相关服务器配置文件,以确保数据安全。随后,本文将详细介绍如何在Nginx中正确配置SSL证书,以及如何优化WordPress的CDN设置,提升网站性能和安全性。 ... [详细]
  • 本文详细介绍了在Windows操作系统上使用Python 3.8.5编译支持CUDA 11和cuDNN 8.0.2的TensorFlow 2.3的步骤。文章不仅提供了详细的编译指南,还分享了编译后的文件下载链接,方便用户快速获取所需资源。此外,文中还涵盖了常见的编译问题及其解决方案,确保用户能够顺利进行编译和安装。 ... [详细]
  • hive和mysql的区别是什么[mysql教程]
    hive和mysql的区别有:1、查询语言不同,hive是hql语言,MySQL是sql语句;2、数据存储位置不同,hive把数据存储在hdfs上,MySQL把数据存储在自己的系统 ... [详细]
  • 安装hadoop2.9.2jdk1.8centos7
    安装JDK1.8查看JDK1.8的安装https:www.cnblogs.comTJ21p13208514.html安装hadoop上传hadoop下载hadoop地址http:m ... [详细]
  • Ceph API微服务实现RBD块设备的高效创建与安全删除
    本文旨在实现Ceph块存储中RBD块设备的高效创建与安全删除功能。开发环境为CentOS 7,使用 IntelliJ IDEA 进行开发。首先介绍了 librbd 的基本概念及其在 Ceph 中的作用,随后详细描述了项目 Gradle 配置的优化过程,确保了开发环境的稳定性和兼容性。通过这一系列步骤,我们成功实现了 RBD 块设备的快速创建与安全删除,提升了系统的整体性能和可靠性。 ... [详细]
  • 在高清节目的高比特率传输过程中,使用外接USB硬盘进行时间平移(timeshift)时,出现了性能不足和流数据丢失的问题。通过深入研究,我们发现通过对图像组(GOP)和图像头(I-frame)的精确定位技术进行优化,可以显著提升系统的性能和稳定性。本研究提出了改进的图像组与图像头定位算法,有效减少了数据丢失,提高了流媒体传输的效率和质量。 ... [详细]
  • 2019年后蚂蚁集团与拼多多面试经验详述与深度剖析
    2019年后蚂蚁集团与拼多多面试经验详述与深度剖析 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 前期Linux环境准备1.修改Linux主机名2.修改IP3.修改主机名和IP的映射关系4.关闭防火墙5.ssh免登陆6.安装JDK,配置环境变量等集群规划主机 IP安装软件运行进 ... [详细]
  • 5分钟学会 gRPC
    5分钟学会gRPC-介绍我猜测大部分长期使用Java的开发者应该较少会接触gRPC,毕竟在Java圈子里大部分使用的还是DubboSpringClound这两类服务框架。我也是 ... [详细]
author-avatar
hanhff
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有