Skip to main content
 首页 » 编程设计

python之在 pyspark 中重新划分密集矩阵

2024年10月24日2zdz8207

我在 pyspark 中有一个密集矩阵 (100*100),我想将它重新分区 分成十组,每组包含 10 行。

from pyspark import SparkContext, SparkConf 
from pyspark.mllib import * 
sc = SparkContext("local", "Simple App") 
dm2 = Matrices.dense(100, 100, RandomRDDs.uniformRDD(sc, 10000).collect()) 
newRdd = sc.parallelize(dm2.toArray()) 
rerdd = newRdd.repartition(10) 

上面的代码导致 rerdd 包含 100 个元素。我想将此矩阵 dm2 呈现为按行划分的 block (例如,一个分区中有 10 行)。

请您参考如下方法:

我没有多大意义,但你可以例如做这样的事情

mat =  Matrices.dense(100, 100, np.arange(10000)) 
 
n_par = 10 
n_row = 100 
 
rdd = (sc 
    .parallelize( 
        # Add indices 
        enumerate( 
            # Extract and reshape values 
            mat.values.reshape(n_row, -1))) 
    # Partition and sort by row index 
    .repartitionAndSortWithinPartitions(n_par, lambda i: i // n_par)) 

检查分区数和每个分区的行数:

rdd.glom().map(len).collect() 
## [10, 10, 10, 10, 10, 10, 10, 10, 10, 10 

检查第一行是否包含所需数据:

assert np.all(rdd.first()[1] == np.arange(100))