Skip to main content
 首页 » 编程设计

python之使用 Pyspark 命名变量

2024年10月24日2cyq1162

尽管我的问题非常简单,但由于我是新手,所以在解决问题时遇到了问题。

针对我的问题的正常 python 查询如下:

for line in file('schedule.txt'): 
  origin,dest,depart,arrive,price=line.split(',') 

我可以这样读取文件

sched=sc.textFile('/PATH/schedule.txt') 

但是当我尝试下面的代码时:

  origin,dest,depart,arrive,price=sched.split(',') 

我收到这个错误:

--------------------------------------------------------------------------- 
AttributeError                            Traceback (most recent call last) 
<ipython-input-46-ba0e8c07ca89> in <module>() 
----> 1 origin,dest,depart,arrive,price=sched.split(',') 
 
AttributeError: 'RDD' object has no attribute 'split' 

我可以使用 lambda 函数拆分文件。但是不知道如何创建这 5 个变量名。

如果有人可以帮助我。

请您参考如下方法:

sched=sc.textFile('/PATH/schedule.txt') 返回一个 RDD,它是与 python 文件对象不同的数据类型,支持不同的 API .相当于你的 python 代码是这样的:

sched=sc.textFile('/PATH/schedule.txt') 
# extract values 
vals = sched.map(lambda line:line.split(',')) 
# now you can do some processing, for example sum price 
price = vals.reduce(lambda v1,v2:v1[4]+v2[4]) 
# or just collect the raw values 
raw_vals = vals.collect() 

更新: 如果您希望能够将每一行的值作为局部变量访问,您可以定义一个专用函数而不只是一个 lambda 并将其传递给 .map():

def process_line(line): 
    origin,dest,depart,arrive,price=line.split(',') 
    # do whatever 
    # remember to return a result 
 
sche.map(process_line) 

更新2:

您要对文件进行的具体处理并不简单,因为它需要写入共享变量 (flights)。相反,我建议按 orig,dest 对行进行分组,然后收集结果并插入字典中:

flights_data = sched.map(lambda line: ((line[0],line[1]),tuple(line[2:]))).groupByKey().collect() 
flights = {f:ds for f,ds in flights_data}