热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

PySpark求解连通图问题

前文回顾:PySpark与GraphFrames的安装与使用https:xxmdmst.blog.csdn.netarticledetails123009617net

前文回顾:

PySpark与GraphFrames的安装与使用
https://xxmdmst.blog.csdn.net/article/details/123009617

networkx快速解决连通图问题
https://xxmdmst.blog.csdn.net/article/details/123012333

前面我讲解了PySpark图计算库的使用以及纯python解决连通图问题的两个示例。这篇文章我们继续对上次的连通图问题改用PySpark实现。

需求1:找社区

刘备和关羽有关系,说明他们是一个社区,刘备和张飞也有关系,那么刘备、关羽、张飞归为一个社区,以此类推。

image-20220218200637173

对于这个连通图问题使用Pyspark如何解决呢?

首先,我们创建spark对象:

from pyspark.sql import SparkSession, Row
from graphframes import GraphFramespark = SparkSession \.builder \.appName("PySpark") \.master("local[*]") \.getOrCreate()
sc = spark.sparkContext
# 设置检查点目录
sc.setCheckpointDir("checkpoint")

然后构建数据:

data = [['刘备', '关羽'],['刘备', '张飞'],['张飞', '诸葛亮'],['曹操', '司马懿'],['司马懿', '张辽'],['曹操', '曹丕']
]
data = spark.createDataFrame(data, ["人员", "相关人员"])
data.show()

+------+--------+
| 人员|相关人员|
+------+--------+
| 刘备| 关羽|
| 刘备| 张飞|
| 张飞| 诸葛亮|
| 曹操| 司马懿|
|司马懿| 张辽|
| 曹操| 曹丕|
+------+--------+

很明显原始数据就是图计算所要求的边数据,只修改一下列名即可:

edges = data.toDF("src", "dst")
edges.printSchema()

root|-- src: string (nullable = true)|-- dst: string (nullable = true)

下面我们开始构建顶点数据:

vertices = (edges.rdd.flatMap(lambda x: x).distinct().map(lambda x: Row(x)).toDF(["id"])
)
vertices.show()

+------+
| id|
+------+
|诸葛亮|
| 刘备|
| 曹操|
|司马懿|
| 曹丕|
| 关羽|
| 张飞|
| 张辽|
+------+

下面使用spark的图计算 计算连通图:

g = GraphFrame(vertices, edges)
result = g.connectedComponents().orderBy("component")
result.show()

+------+------------+
| id| component|
+------+------------+
|司马懿| 0|
| 张辽| 0|
| 曹丕| 0|
| 曹操| 0|
| 关羽|635655159808|
| 刘备|635655159808|
| 张飞|635655159808|
|诸葛亮|635655159808|
+------+------------+

可以看到结果中已经顺利将一个社区的成员通过一个相同的component标识出来,成功解决需求。

需求2:统一用户识别

abcde这5个字段表示mac地址,ip地址,device_id,imei等唯一标识,tags表示用户的标签。由于某些原因,同一用户的唯一标识字段总是有几个字段存在缺失,现在要求将同一个用户的数据都能识别出来,同时将每个用户的标签进行合并。原始数据和结果模型示例如下:

img

首先,我们构建数据:

df = spark.createDataFrame([['a1', None, 'c1', None, None, 'tag1'],[None, None, 'c1', 'd1', None, 'tag2'],[None, 'b1', None, 'd1', None, 'tag3'],[None, 'b1', 'c1', 'd1', 'e1', 'tag4'],['a2', 'b2', None, None, None, 'tag1'],[None, 'b4', 'c4', None, 'e4', 'tag1'],['a2', None, None, 'd2', None, 'tag2'],[None, None, 'c2', 'd2', None, 'tag3'],[None, 'b3', None, None, 'e3', 'tag1'],[None, None, 'c3', None, 'e3', 'tag2'],
], list("abcde")+["tags"])
df.show()

结果:

+----+----+----+----+----+----+
| a| b| c| d| e|tags|
+----+----+----+----+----+----+
| a1|null| c1|null|null|tag1|
|null|null| c1| d1|null|tag2|
|null| b1|null| d1|null|tag3|
|null| b1| c1| d1| e1|tag4|
| a2| b2|null|null|null|tag1|
|null| b4| c4|null| e4|tag1|
| a2|null|null| d2|null|tag2|
|null|null| c2| d2|null|tag3|
|null| b3|null|null| e3|tag1|
|null|null| c3|null| e3|tag2|
+----+----+----+----+----+----+

接下来的思路依然跟上次一样,首先为每一行数据分配一个唯一id,然后对每个唯一标识的列,根据是否一样构建行与行之间的连接关系,所有的唯一标识列产生的连接关系共同作为图计算的边。

下面使用RDD的zipWithUniqueId方法为每一行产生一个唯一ID,并将这个ID移动到最前(由于这个数据后面可能会多次被频繁使用所以缓存起来):

tmp = df.rdd.zipWithUniqueId().map(lambda x: (x[1], x[0]))
tmp.cache()
tmp.first()

(0, Row(a='a1', b=None, c='c1', d=None, e=None, tags='tag1'))

根据唯一id构建顶点数据:

vertices = tmp.map(lambda x: Row(x[0])).toDF(["id"])
vertices.show()

+---+
| id|
+---+
| 0|
| 1|
| 7|
| 2|
| 8|
| 3|
| 4|
| 10|
| 5|
| 11|
+---+

接下来,构建边数据:

def func(p):for k, ids in p:ids &#61; list(ids)n &#61; len(ids)if n <&#61; 1:continuefor i in range(n-1):for j in range(i&#43;1, n):yield (ids[i], ids[j])edges &#61; []
keylist &#61; list("abcde")
for key in keylist:data &#61; tmp.mapPartitions(lambda area: [(row[key], i) for i, row in area if row[key]])edgeRDD &#61; data.groupByKey().mapPartitions(func)edges.append(edgeRDD)
edgesDF &#61; sc.union(edges).toDF(["src", "dst"])
edgesDF.cache()
edgesDF.show()

&#43;---&#43;---&#43;
|src|dst|
&#43;---&#43;---&#43;
| 8| 4|
| 7| 2|
| 0| 1|
| 0| 2|
| 1| 2|
| 4| 10|
| 1| 7|
| 1| 2|
| 7| 2|
| 5| 11|
&#43;---&#43;---&#43;

可以看到所有的行号关系已经被成功获取。

下面使用图计算 计算出属于同一用户的行&#xff1a;

gdf &#61; GraphFrame(vertices, edgesDF)
components &#61; gdf.connectedComponents()
components.show()

&#43;---&#43;---------&#43;
| id|component|
&#43;---&#43;---------&#43;
| 0| 0|
| 1| 0|
| 7| 0|
| 2| 0|
| 8| 4|
| 3| 3|
| 4| 4|
| 10| 4|
| 5| 5|
| 11| 5|
&#43;---&#43;---------&#43;

有了行号和所归属的组唯一标识&#xff0c;我们可以通过表连接获取原始数据的每一行所归属的component&#xff1a;

result &#61; tmp.cogroup(components.rdd) \.map(lambda pair: pair[1][0].data[0] &#43; Row(pair[1][1].data[0])) \.toDF(df.schema.names&#43;["component"])
result.cache()
result.show()

&#43;----&#43;----&#43;----&#43;----&#43;----&#43;----&#43;---------&#43;
| a| b| c| d| e|tags|component|
&#43;----&#43;----&#43;----&#43;----&#43;----&#43;----&#43;---------&#43;
| a1|null| c1|null|null|tag1| 0|
|null|null| c1| d1|null|tag2| 0|
|null| b1| c1| d1| e1|tag4| 0|
|null| b4| c4|null| e4|tag1| 3|
| a2|null|null| d2|null|tag2| 4|
|null| b3|null|null| e3|tag1| 5|
|null| b1|null| d1|null|tag3| 0|
| a2| b2|null|null|null|tag1| 4|
|null|null| c2| d2|null|tag3| 4|
|null|null| c3|null| e3|tag2| 5|
&#43;----&#43;----&#43;----&#43;----&#43;----&#43;----&#43;---------&#43;

可以看到我们已经成功的进行同一用户识别了&#xff0c;剩下的只需要分组并使用pandas的逻辑合并数据&#xff1a;

def func(pdf):row &#61; pdf[keylist].bfill().head(1)row["tags"] &#61; pdf.tags.str.cat(sep&#61;",")return rowresult.groupBy("component").applyInPandas(func, schema&#61;"a string, b string, c string, d string, e string, tags string"
).show()

&#43;----&#43;---&#43;---&#43;----&#43;----&#43;-------------------&#43;
| a| b| c| d| e| tags|
&#43;----&#43;---&#43;---&#43;----&#43;----&#43;-------------------&#43;
| a1| b1| c1| d1| e1|tag1,tag2,tag4,tag3|
|null| b4| c4|null| e4| tag1|
| a2| b2| c2| d2|null| tag2,tag1,tag3|
|null| b3| c3|null| e3| tag1,tag2|
&#43;----&#43;---&#43;---&#43;----&#43;----&#43;-------------------&#43;

可以看到已经顺利得到需要的结果。

注意&#xff1a;applyInPandas要求返回的结果必须是pandas的datafream对象&#xff0c;所以相对之前的逻辑由.iloc[0]改成了.head(1)

如果你的spark不是3.X版本&#xff0c;没有applyInPandas方法&#xff0c;用原生rdd的方法则会麻烦很多&#xff1a;

def func(pair):component, rows &#61; pairkeyList &#61; list("abcde")ids &#61; {}for row in rows:for key in keylist:v &#61; getattr(row, key)if v:ids[key] &#61; vids.setdefault("tags", []).append(row.tags)result &#61; []for key in keylist:result.append(ids.get(key))result.append(",".join(ids["tags"]))return resultresult2 &#61; result.rdd.groupBy(lambda row: row.component).map(func).toDF(df.schema)
result2.cache()
result2.show()

结果也一样&#xff1a;

&#43;----&#43;---&#43;---&#43;----&#43;----&#43;-------------------&#43;
| a| b| c| d| e| tags|
&#43;----&#43;---&#43;---&#43;----&#43;----&#43;-------------------&#43;
| a1| b1| c1| d1| e1|tag1,tag2,tag4,tag3|
|null| b4| c4|null| e4| tag1|
| a2| b2| c2| d2|null| tag2,tag1,tag3|
|null| b3| c3|null| e3| tag1,tag2|
&#43;----&#43;---&#43;---&#43;----&#43;----&#43;-------------------&#43;


推荐阅读
  • 在Kubernetes上部署JupyterHub的步骤和实验依赖
    本文介绍了在Kubernetes上部署JupyterHub的步骤和实验所需的依赖,包括安装Docker和K8s,使用kubeadm进行安装,以及更新下载的镜像等。 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • CSS3选择器的使用方法详解,提高Web开发效率和精准度
    本文详细介绍了CSS3新增的选择器方法,包括属性选择器的使用。通过CSS3选择器,可以提高Web开发的效率和精准度,使得查找元素更加方便和快捷。同时,本文还对属性选择器的各种用法进行了详细解释,并给出了相应的代码示例。通过学习本文,读者可以更好地掌握CSS3选择器的使用方法,提升自己的Web开发能力。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文讨论了Kotlin中扩展函数的一些惯用用法以及其合理性。作者认为在某些情况下,定义扩展函数没有意义,但官方的编码约定支持这种方式。文章还介绍了在类之外定义扩展函数的具体用法,并讨论了避免使用扩展函数的边缘情况。作者提出了对于扩展函数的合理性的质疑,并给出了自己的反驳。最后,文章强调了在编写Kotlin代码时可以自由地使用扩展函数的重要性。 ... [详细]
  • 第四章高阶函数(参数传递、高阶函数、lambda表达式)(python进阶)的讲解和应用
    本文主要讲解了第四章高阶函数(参数传递、高阶函数、lambda表达式)的相关知识,包括函数参数传递机制和赋值机制、引用传递的概念和应用、默认参数的定义和使用等内容。同时介绍了高阶函数和lambda表达式的概念,并给出了一些实例代码进行演示。对于想要进一步提升python编程能力的读者来说,本文将是一个不错的学习资料。 ... [详细]
  • 本文介绍了在CentOS上安装Python2.7.2的详细步骤,包括下载、解压、编译和安装等操作。同时提供了一些注意事项,以及测试安装是否成功的方法。 ... [详细]
  • Java程序设计第4周学习总结及注释应用的开发笔记
    本文由编程笔记#小编为大家整理,主要介绍了201521123087《Java程序设计》第4周学习总结相关的知识,包括注释的应用和使用类的注释与方法的注释进行注释的方法,并在Eclipse中查看。摘要内容大约为150字,提供了一定的参考价值。 ... [详细]
  • Week04面向对象设计与继承学习总结及作业要求
    本文总结了Week04面向对象设计与继承的重要知识点,包括对象、类、封装性、静态属性、静态方法、重载、继承和多态等。同时,还介绍了私有构造函数在类外部无法被调用、static不能访问非静态属性以及该类实例可以共享类里的static属性等内容。此外,还提到了作业要求,包括讲述一个在网上商城购物或在班级博客进行学习的故事,并使用Markdown的加粗标记和语句块标记标注关键名词和动词。最后,还提到了参考资料中关于UML类图如何绘制的范例。 ... [详细]
  • 本文介绍了在CentOS 6.4系统中更新源地址的方法,包括备份现有源文件、下载163源、修改文件名、更新列表和系统,并提供了相应的命令。 ... [详细]
  • GreenDAO快速入门
    前言之前在自己做项目的时候,用到了GreenDAO数据库,其实对于数据库辅助工具库从OrmLite,到litePal再到GreenDAO,总是在不停的切换,但是没有真正去了解他们的 ... [详细]
author-avatar
Mr_cool
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有