我的目标是读取由 csv 数据组成的 hdfs 上的二进制(gpg 加密)文件。我的方法——遵循 this answer -- 已经定义了一个 Python 函数来读取和解密 gpg 文件,生成每一行,并将此函数作为 flatMap
应用于并行文件列表。
本质上,Python 函数生成一个子进程,该子进程使用 hadoop
读取文件并将结果通过管道传输到 gpg
进行解密。这在本地模式下运行 Spark 时工作得很好。然而,分布式运行它 (yarn-client
),一个简单的行计数返回 0
,主要是因为 Python 认为 stdout
管道总是关闭的。
问题似乎是子进程涉及两个命令之间的管道。当我删除后者时(只是加密文件的行数),行数与我在命令行上得到的相匹配。我尝试了多种方法,结果都一样。
这是 Python 函数:
import subprocess as sp
def read_gpg_file_on_hdfs(filename):
# Method 1:
p = sp.Popen('hadoop fs -cat {} | gpg -d'.format(filename), shell=True,
stdout=sp.PIPE)
# Method 2:
p1 = sp.Popen(['hadoop', 'fs', '-cat', filename], stdout=sp.PIPE)
p = sp.Popen(['gpg', '-d'], stdin=p1.stdout, stdout=sp.PIPE)
p1.stdout.close()
# Method 3:
p = sp.Ppen('gpg -d <(hadoop fs -cat {})'.format(filename), shell=True,
stdout=sp.PIPE, stderr=sp.PIPE)
for line in p.stdout:
yield line.strip()
这是 Spark 命令:
sc.parallelize(['/path/to/file.gpg']).flatMap(read_gpg_file_on_hdfs).count()
现在我知道 PySpark 使用管道与 Spark 通信,但我没有遵循细节,我不知道这是否会影响我尝试做的事情。我的问题是是否有办法完成我想做的事情。
请注意,我使用的是分布式 Spark 1.2.1(MapR 的最新版本)。此外,我考虑过使用 binaryFiles
,但对于我有时会遇到的大型 gpg 文件,这会失败。
提前致谢!
请您参考如下方法:
事实证明,gpg
命令实际上是问题所在。据推测,这与子进程如何在本地模式和分布式模式下启动的细节有关,但在本地模式下,gpg
的 homedir
设置正确。但是当以分布式模式启动时,homedir
指向一个不正确的目录,第二个子进程立即失败。此错误消息似乎没有记录在任何地方,因此 stdout
只是作为空字符串返回。