我在 pyspark 中有一个密集矩阵 (100*100),我想将它重新分区
分成十组,每组包含 10 行。
from pyspark import SparkContext, SparkConf
from pyspark.mllib import *
sc = SparkContext("local", "Simple App")
dm2 = Matrices.dense(100, 100, RandomRDDs.uniformRDD(sc, 10000).collect())
newRdd = sc.parallelize(dm2.toArray())
rerdd = newRdd.repartition(10)
上面的代码导致 rerdd
包含 100 个元素。我想将此矩阵 dm2
呈现为按行划分的 block (例如,一个分区中有 10 行)。
请您参考如下方法:
我没有多大意义,但你可以例如做这样的事情
mat = Matrices.dense(100, 100, np.arange(10000))
n_par = 10
n_row = 100
rdd = (sc
.parallelize(
# Add indices
enumerate(
# Extract and reshape values
mat.values.reshape(n_row, -1)))
# Partition and sort by row index
.repartitionAndSortWithinPartitions(n_par, lambda i: i // n_par))
检查分区数和每个分区的行数:
rdd.glom().map(len).collect()
## [10, 10, 10, 10, 10, 10, 10, 10, 10, 10
检查第一行是否包含所需数据:
assert np.all(rdd.first()[1] == np.arange(100))