我有为 Random Forest regression
编码编写的代码。但是Random Forest regression
不需要indexer
之后的One Hot Encoding
。现在我想尝试需要One Hot Encoding
的Linear Regression
。我经历了 Spark One Hot Encoder文档,但不知道如何将其合并到我当前的代码中。如何在当前代码中添加 One Hot Encoding
步骤?
from pyspark.ml.feature import StringIndexer
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import VectorAssembler
import org.apache.spark.ml.feature.OneHotEncoder
label_col = "x4"
# converting RDD to dataframe
train_data_df = train_data.toDF(("x0","x1","x2","x3","x4"))
# Indexers encode strings with doubles
string_indexers = [
StringIndexer(inputCol=x, outputCol="idx_{0}".format(x))
for x in train_data_df.columns if x != label_col
]
# Assembles multiple columns into a single vector
assembler = VectorAssembler(
inputCols=["idx_{0}".format(x) for x in train_data_df.columns if x != label_col],
outputCol="features"
)
pipeline = Pipeline(stages=string_indexers + [assembler])
model = pipeline.fit(train_data_df)
indexed = model.transform(train_data_df)
label_points = (indexed
.select(col(label_col).cast("double").alias("label"), col("features"))
.map(lambda row: LabeledPoint(row.label, row.features)))
更新:
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel
###### FOR TEST DATA ######
label_col_test = "x4"
# converting RDD to dataframe
test_data_df = test_data.toDF(("x0","x1","x2","x3","x4"))
# Indexers encode strings with doubles
string_indexers_test = [
StringIndexer(inputCol=x, outputCol="idx_{0}".format(x))
for x in testData_df_1.columns if x != label_col_test
]
# encoders
encoders_test = [
StringIndexer(inputCol="idx_{0}".format(x), outputCol="enc_{0}".format(x))
for x in testData_df_1.columns if x != label_col_test
]
# Assembles multiple columns into a single vector
assembler_test = VectorAssembler(
inputCols=["idx_{0}".format(x) for x in testData_df_1.columns if x != label_col_test],
outputCol="features"
)
pipeline_test = Pipeline(stages=string_indexers_test + encoders_test + [assembler_test])
model_test = pipeline_test.fit(test_data_df)
indexed_test = model_test.transform(test_data_df)
label_points_test = (indexed_test
.select(col(label_col_test).cast("float").alias("label"), col("features"))
.map(lambda row: LabeledPoint(row.label, row.features)))
# Build the model
model = LinearRegressionWithSGD.train(label_points)
valuesAndPreds = label_points_test.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
请您参考如下方法:
您可以简单地将其添加为索引和汇编之间的一个步骤:
encoders = [
StringIndexer(inputCol="idx_{0}".format(x), outputCol="enc_{0}".format(x))
for x in train_data_df.columns if x != label_col
]
assembler = VectorAssembler(
inputCols=[
"enc_{0}".format(x) for x in train_data_df.columns if x != label_col
],
outputCol="features"
)
pipeline = Pipeline(stages=string_indexers + encoders + [assembler])