Skip to main content
 首页 » 编程设计

python之如何使用 Python 对 Spark 中的线性回归进行一次热编码

2024年10月24日14hnrainll

我有为 Random Forest regression 编码编写的代码。但是Random Forest regression不需要indexer之后的One Hot Encoding。现在我想尝试需要One Hot EncodingLinear Regression。我经历了 Spark One Hot Encoder文档,但不知道如何将其合并到我当前的代码中。如何在当前代码中添加 One Hot Encoding 步骤?

from pyspark.ml.feature import StringIndexer 
from pyspark.ml.pipeline import Pipeline 
from pyspark.ml.feature import VectorAssembler 
import org.apache.spark.ml.feature.OneHotEncoder 
 
label_col = "x4" 
 
# converting RDD to dataframe 
train_data_df = train_data.toDF(("x0","x1","x2","x3","x4")) 
 
# Indexers encode strings with doubles 
string_indexers = [ 
   StringIndexer(inputCol=x, outputCol="idx_{0}".format(x)) 
   for x in train_data_df.columns if x != label_col 
] 
 
# Assembles multiple columns into a single vector 
assembler = VectorAssembler( 
    inputCols=["idx_{0}".format(x) for x in train_data_df.columns if x != label_col], 
    outputCol="features" 
) 
 
 
pipeline = Pipeline(stages=string_indexers + [assembler]) 
model = pipeline.fit(train_data_df) 
indexed = model.transform(train_data_df) 
 
label_points = (indexed 
.select(col(label_col).cast("double").alias("label"), col("features")) 
.map(lambda row: LabeledPoint(row.label, row.features))) 

更新:

from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel 
 
 
###### FOR TEST DATA ###### 
label_col_test = "x4" 
 
# converting RDD to dataframe 
test_data_df = test_data.toDF(("x0","x1","x2","x3","x4")) 
 
# Indexers encode strings with doubles 
string_indexers_test = [ 
   StringIndexer(inputCol=x, outputCol="idx_{0}".format(x)) 
   for x in testData_df_1.columns if x != label_col_test 
] 
 
# encoders 
encoders_test = [ 
   StringIndexer(inputCol="idx_{0}".format(x), outputCol="enc_{0}".format(x)) 
   for x in testData_df_1.columns if x != label_col_test 
] 
 
# Assembles multiple columns into a single vector 
assembler_test = VectorAssembler( 
    inputCols=["idx_{0}".format(x) for x in testData_df_1.columns if x != label_col_test], 
    outputCol="features" 
) 
 
 
pipeline_test = Pipeline(stages=string_indexers_test + encoders_test + [assembler_test]) 
model_test = pipeline_test.fit(test_data_df) 
indexed_test = model_test.transform(test_data_df) 
 
label_points_test = (indexed_test 
    .select(col(label_col_test).cast("float").alias("label"), col("features")) 
    .map(lambda row: LabeledPoint(row.label, row.features))) 
 
# Build the model 
model = LinearRegressionWithSGD.train(label_points) 
 
valuesAndPreds = label_points_test.map(lambda p: (p.label, model.predict(p.features))) 
 
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count() 
print("Mean Squared Error = " + str(MSE)) 

请您参考如下方法:

您可以简单地将其添加为索引和汇编之间的一个步骤:

encoders = [ 
   StringIndexer(inputCol="idx_{0}".format(x), outputCol="enc_{0}".format(x)) 
   for x in train_data_df.columns if x != label_col 
] 
 
assembler = VectorAssembler( 
    inputCols=[ 
        "enc_{0}".format(x) for x in train_data_df.columns if x != label_col 
    ], 
    outputCol="features" 
) 
 
 
pipeline = Pipeline(stages=string_indexers + encoders + [assembler])