博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
PyODPS DataFrame 处理笛卡尔积的几种方式
阅读量:6407 次
发布时间:2019-06-23

本文共 8274 字,大约阅读时间需要 27 分钟。

提供了 DataFrame API 来用类似 pandas 的接口进行大规模数据分析以及预处理,本文主要介绍如何使用 PyODPS 执行笛卡尔积的操作。

笛卡尔积最常出现的场景是两两之间需要比较或者运算。以计算地理位置距离为例,假设大表 Coordinates1 存储目标点经纬度坐标,共有 M 行数据,小表 Coordinates2 存储出发点经纬度坐标,共有 N 行数据,现在需要计算所有离目标点最近的出发点坐标。对于一个目标点来说,我们需要计算所有的出发点到目标点的距离,然后找到最小距离,所以整个中间过程需要产生 M * N 条数据,也就是一个笛卡尔积问题。

haversine 公式

首先简单介绍一下背景知识,已知两个地理位置的坐标点的经纬度,求解两点之间的距离可以使用 haversine 公式,使用 Python 的表达如下:

def  haversine(lat1,  lon1,  lat2,  lon2):        #  lat1,  lon1  为位置  1  的经纬度坐标        #  lat2,  lon2  为位置  2  的经纬度坐标        import  numpy  as  np        dlon  =  np.radians(lon2  -  lon1)        dlat  =  np.radians(lat2  -  lat1)        a  =  np.sin(  dlat  /2  )  **2  +  np.cos(np.radians(lat1))  *  np.cos(np.radians(lat2))  *  np.sin(  dlon  /2  )  **2        c  =  2  *  np.arcsin(np.sqrt(a))        r  =  6371  #  地球平均半径,单位为公里        return  c  *  r复制代码

MapJoin

目前最推荐的方法就是使用 mapjoin,PyODPS 中使用 mapjoin 的方式十分简单,只需要两个 dataframe join 时指定 mapjoin=True,执行时会对右表做 mapjoin 操作。

In  [3]:  df1  =  o.get_table('coordinates1').to_df()                                                                                                                                                                                        In  [4]:  df2  =  o.get_table('coordinates2').to_df()                                                                                                                                                                                        In  [5]:  df3  =  df1.join(df2,  mapjoin=True)                                                                                                                                                                                                        In  [6]:  df1.schema                                                                                                                                                                                                                                                      Out[6]:  odps.Schema  {    latitude                    float64                  longitude                  float64                  id                                string                }In  [7]:  df2.schema                                                                                                                                                                                                                                                      Out[7]:  odps.Schema  {    latitude                    float64                  longitude                  float64                  id                                string                }In  [8]:  df3.schema                                                                                                                                                                                                                                                      Out[8]:  odps.Schema  {    latitude_x                        float64                  longitude_x                      float64                  id_x                                    string                    latitude_y                        float64                  longitude_y                      float64                  id_y                                    string                }复制代码

可以看到在执行 join 时默认会将重名列加上 _x 和 _y 后缀,可通过在 suffixes 参数中传入一个二元 tuple 来自定义后缀,当有了 join 之后的表后,通过 PyODPS 中 DataFrame 的自建函数就可以计算出距离,十分简洁明了,并且效率很高。

In  [9]:  r  =  6371        ...:  dis1  =  (df3.latitude_y  -  df3.latitude_x).radians()        ...:  dis2  =  (df3.longitude_y  -  df3.longitude_x).radians()        ...:  a  =  (dis1  /  2).sin()  **  2  +  df3.latitude_x.radians().cos()  *  df3.latitude_y.radians().cos()  *  (dis2  /  2).sin()  **  2        ...:  df3['dis']  =  2  *  a.sqrt().arcsin()  *  r                                                                                                                                                                                                                                                                                                                                                                                                      In [12]: df3.head(10)                                                                                                                        Out[12]:     latitude_x  longitude_x id_x  latitude_y   longitude_y id_y       dis0   76.252432    59.628253    0   84.045210     6.517522    0  1246.8649811   76.252432    59.628253    0   59.061796     0.794939    1  2925.9531472   76.252432    59.628253    0   42.368304    30.119837    2  4020.6049423   76.252432    59.628253    0   81.290936    51.682749    3   584.7797484   76.252432    59.628253    0   34.665222   147.167070    4  6213.9449425   76.252432    59.628253    0   58.058854   165.471565    5  4205.2191796   76.252432    59.628253    0   79.150677    58.661890    6   323.0707857   76.252432    59.628253    0   72.622352   123.195778    7  1839.3807608   76.252432    59.628253    0   80.063614   138.845193    8  1703.7824219   76.252432    59.628253    0   36.231584    90.774527    9  4717.284949In [13]: df1.count()                                                                                                                         Out[13]: 2000In [14]: df2.count()                                                                                                                         Out[14]: 100In [15]: df3.count()                                                                                                                         Out[15]: 200000复制代码

df3 已经是有 M * N 条数据了,接下来如果需要知道最小距离,直接对 df3 调用 groupby 接上 min 聚合函数就可以得到每个目标点的最小距离。

In [16]: df3.groupby('id_x').dis.min().head(10)                                                                                              Out[16]:        dis_min0   323.0707851    64.7554932  1249.2831693   309.8182884  1790.4847485   385.1077396   498.8161577   615.9874678   437.7654329   272.589621复制代码

DataFrame 自定义函数

如果我们需要知道对应最小距离的点的城市,也就是表中对应的 id ,可以在 mapjoin 之后调用 MapReduce,不过我们还有另一种方式是使用 DataFrame 的 方法。要对一行数据使用自定义函数,可以使用 apply 方法,axis 参数必须为 1,表示在行上操作。

表资源

要注意 apply 是在服务端执行的 UDF,所以不能在函数内使用类似于df=o.get_table('table_name').to_df() 的表达式去获得表数据,具体原理可以参考。以本文中的情况为例,要想将表 1 与表 2 中所有的记录计算,那么需要将表 2 作为一个,然后在自定义中引用该表资源。PyODPS 中使用表资源也十分方便,只需要将一个 collection 传入 resources 参数即可。collection 是个可迭代对象,不是一个 DataFrame 对象,不可以直接调用 DataFrame 的接口,每个迭代值是一个 namedtuple,可以通过字段名或者偏移来取对应的值。

## use dataframe udfdf1 = o.get_table('coordinates1').to_df()df2 = o.get_table('coordinates2').to_df()def func(collections):    import pandas as pd        collection = collections[0]        ids = []    latitudes = []    longitudes = []    for r in collection:        ids.append(r.id)        latitudes.append(r.latitude)        longitudes.append(r.longitude)    df = pd.DataFrame({
'id': ids, 'latitude':latitudes, 'longitude':longitudes}) def h(x): df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude) return df.iloc[df['dis'].idxmin()]['id'] return hdf1[df1.id, df1.apply(func, resources=[df2], axis=1, reduce=True, types='string').rename('min_id')].execute( libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])复制代码

在自定义函数中,将表资源通过循环读成 pandas DataFrame,利用 pandas 的 loc 可以很方便的找到最小值对应的行,从而得到距离最近的出发点 id。另外,如果在自定义函数中需要使用到三方包(例如本例中的 pandas)可以参考这篇。

全局变量

当小表的数据量十分小的时候,我们甚至可以将小表数据作为全局变量在自定义函数中使用。

df1 = o.get_table('coordinates1').to_df()df2 = o.get_table('coordinates2').to_df()df = df2.to_pandas()def func(x):    df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude)    return df.iloc[df['dis'].idxmin()]['id']df1[df1.id, df1.apply(func, axis=1, reduce=True, types='string').rename('min_id')].execute(    libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])复制代码

在上传函数的时候,会将函数内使用到的全局变量(上面代码中的 df) pickle 到 UDF 中。但是注意这种方式使用场景很局限,因为 ODPS 的上传的文件资源大小是有限制的,所以数据量太大会导致 UDF 生成的资源太大从而无法上传,而且这种方式最好保证三方包的客户端与服务端的版本一致,否则很有可能出现序列化的问题,所以建议只在数据量非常小的时候使用。

总结

使用 PyODPS 解决笛卡尔积的问题主要分为两种方式,一种是 mapjoin,比较直观,性能好,一般能用 mapjoin 解决的我们都推荐使用 mapjoin,并且最好使用内建函数计算,能到达最高的效率,但是它不够灵活。另一种是使用 DataFrame 自定义函数,比较灵活,性能相对差一点(可以使用 pandas 或者 numpy 获得性能上的提升),通过使用表资源,将小表作为表资源传入 DataFrame 自定义函数中,从而完成笛卡尔积的操作。

本文为云栖社区原创内容,未经允许不得转载。

转载于:https://juejin.im/post/5d0201a8e51d455ca0436230

你可能感兴趣的文章
路由选路原则
查看>>
jvm 学习(一)
查看>>
JavaScript简介
查看>>
SQL Server附加数据库拒绝访问解决方法汇总
查看>>
SM2算法原理及实现
查看>>
RHCA教材翻译计划
查看>>
js-小括号在不同场合下的作用
查看>>
我的友情链接
查看>>
kvm中虚拟机的硬盘扩容
查看>>
Android (Launch Mode) 四种启动模式
查看>>
透视学理论(二)
查看>>
Dubbo/HSF在Service Mesh下的思考和方案
查看>>
Django form表单
查看>>
CTYL-9.14(tomcat端口与阿里云安全组,域名与tomcat配置,域名与反向代理)
查看>>
Java 多线程相关问题记录
查看>>
LNMP架构介绍、MySQL安装、PHP安装、 Nginx介绍
查看>>
简单的Spark+Mysql整合开发
查看>>
阿里java面试经验大汇总(附阿里职位需求)
查看>>
Python全套零基础视频教程+软件2018最新编程视频!
查看>>
内存管理之1:x86段式内存管理与保护模式
查看>>