Skip to main content
 首页 » 编程设计

python之PySpark之在没有显式 session key 的情况下组合 session 数据/遍历所有行

2025年05月04日51wayfarer

我正在尝试在 PySpark 中没有真正的 session “ key ”的情况下聚合 session 数据。我有在特定时间在某个区域检测到个人的数据,我想将其汇总到特定访问期间在每个区域花费的持续时间(见下文)。

这里棘手的部分是我想将某人离开每个区域的时间推断为他们在下一个区域被检测到的时间。这意味着我需要使用下一个区域 ID 的开始时间作为任何给定区域 ID 的结束时间。同一个人的区域 ID 也可以出现多次。

我在 MapReduce 中实现了这一点,我在其中遍历所有行并汇总时间,直到检测到新的 AreaID 或 Individual,然后输出记录。有没有办法在 Spark 中做类似的事情?有没有更好的方法来解决这个问题?

还要注意,除非在另一个区域(例如下面的 IndividualY、AreaT)检测到个人,否则我不想输出记录

我有以下格式的数据集:

Individual  AreaID  Datetime of Detection 
IndividualX AreaQ   1/7/2015 0:00 
IndividualX AreaQ   1/7/2015 1:00 
IndividualX AreaW   1/7/2015 3:00 
IndividualX AreaQ   1/7/2015 4:00 
IndividualY AreaZ   2/7/2015 4:00 
IndividualY AreaZ   2/7/2015 5:00 
IndividualY AreaW   2/7/2015 6:00 
IndividualY AreaT   2/7/2015 7:00 

我想要期望的输出:

Individual  AreaID  Start_Time      End_Time        Duration (minutes) 
IndividualX AreaQ   1/7/2015 0:00   1/7/2015 3:00   180 
IndividualX AreaW   1/7/2015 3:00   1/7/2015 4:00   60 
IndividualY AreaZ   2/7/2015 4:00   2/7/2015 6:00   120 
IndividualY AreaW   2/7/2015 6:00   2/7/2015 7:00   60 

请您参考如下方法:

这是一个特别漂亮的解决方案,但您可以使用 DataFrames 和窗口函数。假设您的输入如下所示:

rdd = sc.parallelize([ 
    ("IndividualX", "AreaQ",  "1/7/2015 0:00"), 
    ("IndividualX", "AreaQ",  "1/7/2015 1:00"), 
    ("IndividualX", "AreaW",  "1/7/2015 3:00"), 
    ("IndividualX", "AreaQ",  "1/7/2015 4:00"), 
    ("IndividualY", "AreaZ",  "2/7/2015 4:00"), 
    ("IndividualY", "AreaZ",  "2/7/2015 5:00"), 
    ("IndividualY", "AreaW",  "2/7/2015 6:00"), 
    ("IndividualY", "AreaT",  "2/7/2015 7:00") 
]) 

首先我们必须将它转换为 DataFrame:

from datetime import datetime 
from pyspark.sql import Row 
from pyspark.sql import HiveContext 
 
sqlContext = HiveContext(sc) 
 
row = Row("individual", "area_id", "datetime") 
fmt = "%d/%m/%Y %H:%M" 
df = rdd.map(lambda r: row(r[0], r[1], datetime.strptime(r[2], fmt))).toDF() 

接下来让我们定义一个窗口:

from pyspark.sql import functions as f 
from pyspark.sql.window import Window 
 
w = Window().partitionBy("individual").orderBy("datetime") 

和临时列:

p_area_id = f.lag("area_id").over(w) # Previous area 
 
ind =  f.sum(( 
    p_area_id.isNull() | # No previous observation 
    (p_area_id != f.col("area_id")) # Area changed 
).cast("integer")).over(w) 

使用上面定义的指标,我们可以选择访问该区域的最小时间戳:

tmp = (df 
   .withColumn("ind", ind) 
   .groupBy("individual", "area_id", "ind") 
   .agg(f.min("datetime").alias("datetime")) 
   .drop("ind")) 

最后我们可以定义目标列:

end_time = f.lead(f.col("datetime")).over(w) 
 
duration = ( 
    f.col("end_time").cast("integer") - f.col("datetime").cast("integer")) / 60 

并构建输出DataFrame:

result = (tmp 
    .withColumn("end_time", end_time) 
    .where(f.col("end_time").isNotNull()) 
    .withColumn("duration", duration) 
    .withColumnRenamed("datetime", "start_time")) 

并输出:

+-----------+-------+--------------------+--------------------+--------+ 
| individual|area_id|          start_time|            end_time|duration| 
+-----------+-------+--------------------+--------------------+--------+ 
|IndividualX|  AreaQ|2015-07-01 00:00:...|2015-07-01 03:00:...|   180.0| 
|IndividualX|  AreaW|2015-07-01 03:00:...|2015-07-01 04:00:...|    60.0| 
|IndividualY|  AreaZ|2015-07-02 04:00:...|2015-07-02 06:00:...|   120.0| 
|IndividualY|  AreaW|2015-07-02 06:00:...|2015-07-02 07:00:...|    60.0| 
+-----------+-------+--------------------+--------------------+--------+ 

如果您更喜欢普通的 RDD,您可以 reshape 为这样的东西:

(individual, (area_id, datetime)) 

和下一个 groupByKey 并在本地执行所需的操作。