本文重点介绍下如何在 Zeppelin 里高效的开发 PyFlink Job,特别是解决 PyFlink 的环境问题。
大家都知道PyFlink 的开发环境不容易搭建,稍有不慎,PyFlink 环境就会乱掉,而且很难排查原因
本文使用miniconda、conda-pack、mamba分别制作JobManager 上的 PyFlink Conda 环境、TaskManager 上的 PyFlink Conda 环境,然后在 Zeppelin 里使用 PyFlink 以及指定 Conda 环境,这样就可以在一个 Yarn 集群里同时使用多个版本的 PyFlink。
本文使用环境:flink 1.13.2, zeppelin:0.10.0-bin-all
需要改进的地方:
echo "name: pyflink_env
channels:- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/
dependencies:- Python=3.7- pip- pip:- apache-flink==1.13.2- jupyter- grpcio- protobuf- matplotlib- pandasql- pandas- scipy- seaborn- plotnine" > pyflink_env.ymlmamba env remove -n pyflink_env
mamba env create -f pyflink_env.ymlrm -rf pyflink_env.tar.gz
conda pack --ignore-missing-files -n pyflink_env -o pyflink_env.tar.gzhadoop fs -rmr /tmp/pyflink_env.tar.gz
hadoop fs -put pyflink_env.tar.gz /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_env.tar.gz
echo "name: pyflink_tm_env
channels:- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/
dependencies:- Python=3.7- pip- pip:- apache-flink==1.13.2- pandas" > pyflink_tm_env.ymlmamba env remove -n pyflink_tm_env
mamba env create -f pyflink_tm_env.ymlrm -rf pyflink_tm_env.zip
conda pack --ignore-missing-files --zip-symlinks -n pyflink_tm_env -o pyflink_tm_env.ziphadoop fs -rmr /tmp/pyflink_tm_env.zip
hadoop fs -put pyflink_tm_env.zip /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_tm_env.zip
下载 flink 1.13并解压,然后:
在 Zeppelin 里配置 Flink,主要配置的选项有:
FLINK_HOME /data/flink/flink-1.13.2
HADOOP_CONF_DIR /etc/hadoop/conf
HIVE_CONF_DIR /etc/hive/conf
flink.execution.mode yarn-applicationzeppelin.pyflink.python pythonyarn.ship-archives /data/flink/pyflink_env.tar.gz
zeppelin.interpreter.conda.env.name pyflink_env.tar.gz
zeppelin.pyflink.Python pyflink_env.tar.gz/bin/python Python.archives hdfs://172.25.21.170:8020/tmp/pyflink_tm_env.zip
Python.executable pyflink_tm_env.zip/bin/python3.7 flink.jm.memory 2048
flink.tm.memory 2048zeppelin.interpreter.connect.timeout 600000
%flink.ipyflink
%matplotlib inline
import matplotlib.pyplot as plt
plt.plot([1,2,3,4])
plt.ylabel('numbers')
plt.show()%flink.ipyflink
import time
class PandasVersion(ScalarFunction):def eval(self,s):import pandas as pdreturn pd.__version__ + " " + s
bt_env.register_funtion("pandas_version",udf(PandasVersion(),DataTypes.STRING,DataTypes.STRING)%bsql
select pandas_version('hello world')
在centos 7.x上安装miniconda
miniconda是一个免费的conda最小安装程序。
它是Anaconda的一个小型的引导版本,只包含conda、Python、它们所依赖的包,以及少量其他有用的包,包括pip、zlib和其他一些包
根据实际需要,下载对应的版本,latest表示最新的python版本,是python3.9.5
# 下载软件包
curl -O https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/Miniconda3-latest-Linux-x86_64.sh# 安装:按照指示输入一次回车,接受协议,遇到选择yes,然后就安装好了
bash Miniconda3-latest-Linux-x86_64.sh
vi /root/.bashrc
source /root/.bashrc
# >>> conda initialize >>>
# !! Contents within this block are managed by 'conda init' !!
__conda_setup="$('/data/flink/miniconda3/bin/conda' 'shell.bash' 'hook' 2> /dev/null)"
if [ $? -eq 0 ]; theneval "$__conda_setup"
elseif [ -f "/data/flink/miniconda3/etc/profile.d/conda.sh" ]; then. "/data/flink/miniconda3/etc/profile.d/conda.sh"elseexport PATH="/data/flink/miniconda3/bin:$PATH"fi
fi
unset __conda_setup
# <<
将镜像地址修改为国内源
通过如下命令检查配置
conda config --show
或 conda info
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/
conda config --set show_channel_urls yes
default_channels:- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/
show_channel_urls: true
channels:- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/- https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2/
conda-pack是一个命令行工具&#xff0c;用于创建conda环境的存档文件【archives 】&#xff0c;这些存档文件可以安装在其他系统上&#xff0c;这对于在一致的环境中部署代码非常有用。
conda install -c conda-forge conda-pack
# 创建pyhon环境
conda create -n py3.7 python&#61;3.7
conda activate py3.7
conda deactivate
# 查看运行环境列表
conda info -e# 针对py3.7运行环境打包
conda pack --n-threads&#61;&#96;nproc&#96; -n py3.7# 还原环境&#xff1a;将py3.7.tar.gz上传到其它服务器并解压
mkdir /home/test/miniconda3/envs/py3.7
tar -zxvf /tmp/py3.7.tar.gz -C /home/test/miniconda3/envs/py3.7# 查看运行环境列表
conda info -e
# 激活运行环境
conda activate py3.7
mamba是c&#43;&#43;中conda包管理器的重新实现&#xff0c;可以认为是更高级的conda。
有以下特点
与此同时&#xff0c;mamba利用了相同的命令行解析器、包安装和卸载代码以及事务验证例程&#xff0c;以尽可能保持与conda的兼容性。
conda install mamba -n base -c conda-forge
mamba repoquery search "pandas>0.20.3"
mamba repoquery depends --tree six
mamba repoquery whoneeds openssl
切换pip镜像源&#xff0c;加速软件包【如flink】的安装
创建或修改~/.pip/pip.conf文件
pip.conf文件的内容如下&#xff1a;
[global]
timeout &#61; 6000
index-url &#61; http://mirrors.aliyun.com/pypi/simple/
[install]
use-mirrors &#61;true
mirrors &#61;http://mirrors.aliyun.com/pypi/simple/
trusted-host &#61;mirrors.aliyun.com
Centos 安装 Miniconda
miniconda
conda-pack
mamba
Anaconda、Jupyter的安装部署及使用问题总结
python–切换pip镜像源&#xff0c;加速软件包的安装