# **Machine Learning with `PySpark` through `MLlib`**

## **Tasks**

1) Split data intro train and test



2) Formulate three pipelines, train and evaluate them: 

    a. Feature selection with the UnivariateFeatureSelector and the fpr strategy (least 
     conservative).  

     b. Same, with the fwe strategy (most conservative). 

     c. Same, but doing PCA and using 3 components.


3) Formulate,  train  and  evaluate  another  pipeline  that  uses  features  obtained  from 
two sources: 

    a. Feature selection with the fpr 
    
    b. The 3 components from PCA.

4) Can you determine how many features are selected by fpr and fwe?


## **Requirements** 

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Intro to Spark').master("local[*]").getOrCreate()
sc = spark.sparkContext

In [2]:
import numpy as np
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, Imputer, PCA
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.feature import StandardScaler, UnivariateFeatureSelector

## **Data preparation for SparkML**

### **Reading the data**

First of all we read the data:

In [3]:
data = spark.read.csv(path='wind_available_second.csv.gz', header=True, inferSchema=True) 

In [4]:
data.show(5)

+-------+----+-----+---+----+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+--

### **Train-Test split**

Now, we perform a train-test split.

Our split will not be random but years bases, following the same arguments given in the previous assignment (since we have the same data).

In [5]:
# Usual step in Spark ML: rename the response as 'label'
data = data.withColumnRenamed('energy', "label")

# Split the data in train-test
response = 'label'
predictors = [x for x in data.columns if x not in [response, 'year', 'month', 'day', 'hour']]

In [6]:
# Predefine train-test split
train_years = [2005, 2006, 2007]
test_years = [2008, 2009]

data_train = data.filter(col('year').isin(train_years))
data_test = data.filter(col('year').isin(test_years))

In [7]:
# Predefine train_train (train2) - train_validate (validate) split
train2_years = [2006, 2007]
validate_years = [2005]

data_train2 = data.filter(col('year').isin(train2_years))
data_validate = data.filter(col('year').isin(validate_years))

### **Pipelines definition**

- Pipelines of the assignment:

   - **Pipeline 0**: imputer (mean), assembler, Linear Regression

   - **Pipeline 1**: imputer (mean), assembler, feature selector (fpr), Linear Regression

   - **Pipeline 2**: imputer (mean), assembler, feature selector (fwe), Linear Regression

   - **Pipeline 3**: imputer (mean), assembler, pca (k=3), Linear Regression

   - **Pipeline 4**: imputer (mean), assembler, feature selector (fpr) + pca (k=3), Linear Regression



- Extra pipeline:

   - **Pipeline 5**: imputer (not fixed), assembler, scaler, pca (not fixed), decision tree


The idea of including the extra pipeline is to show (and learn) how to do HPO in Spark.

This is the reason why we use decision trees (which has hyperparamiters) and   imputation and pca without fixing the imputation method nor the number of components in pca, for being  able of doing HPO on the hyperparameters of these three pipelines elements.

The following cells define the mentioned pipelines:

In [8]:
# Define an imputer for missing values imputation.
imputer = Imputer(inputCols=predictors, outputCols=predictors, strategy='mean')
# Define a assembler to create a new column 'features' whose i-th row is vector with the i-th column of the data on which the assembler is applied.
# This new column is necessary to apply ML model on data in Spark.
assembler = VectorAssembler(inputCols=predictors, outputCol='features', handleInvalid='keep')
# Define a feature selection method.
selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
                                     labelCol="label", selectionMode="fpr")
selector.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.5)
# Define a ML model, in this case Linear Regression model, on both the 'features' (predictors) and 'label' (response) columns.
model = LinearRegression(featuresCol="selectedFeatures", labelCol='label')
# Define the pipeline using the above elements.
pipeline1 = Pipeline(stages=[imputer, assembler, selector, model])

In [9]:
imputer = Imputer(inputCols=predictors, outputCols=predictors, strategy='mean')
assembler = VectorAssembler(inputCols=predictors, outputCol='features', handleInvalid='keep')
selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
                                     labelCol="label", selectionMode="fwe")
selector.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.5)
model = LinearRegression(featuresCol="selectedFeatures", labelCol='label')
pipeline2 = Pipeline(stages=[imputer, assembler, selector, model])

In [10]:
imputer = Imputer(inputCols=predictors, outputCols=predictors, strategy='mean')
assembler = VectorAssembler(inputCols=predictors, outputCol='features', handleInvalid='keep')
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
pca = PCA(inputCol="scaledFeatures",  outputCol="pcaFeatures", k=3)
model = LinearRegression(featuresCol="pcaFeatures", labelCol='label')
pipeline3 = Pipeline(stages=[imputer, assembler, scaler, pca, model])

In [11]:
imputer = Imputer(inputCols=predictors, outputCols=predictors, strategy='mean')  
assembler = VectorAssembler(inputCols=predictors, outputCol="features", handleInvalid='keep')
selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
                                     labelCol="label", selectionMode="fpr")
selector.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(0.5)
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
pca = PCA(k=3, inputCol="scaledFeatures", outputCol="pcaFeatures")
combinedAssembler = VectorAssembler(inputCols=["selectedFeatures", "pcaFeatures"], outputCol="combinedFeatures", handleInvalid='keep')
model = LinearRegression(featuresCol="combinedFeatures", labelCol='label')
pipeline4 = Pipeline(stages=[imputer, assembler, selector, scaler, pca, combinedAssembler, model])

In [12]:
imputer = Imputer(inputCols=predictors, outputCols=predictors) # strategy not fixed
assembler = VectorAssembler(inputCols=predictors, outputCol='features', handleInvalid='keep')
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
pca = PCA(inputCol="scaledFeatures", outputCol="pcaFeatures") # k not fixed
model = DecisionTreeRegressor(featuresCol="pcaFeatures", labelCol='label')
pipeline5 = Pipeline(stages=[imputer, assembler, scaler, pca, model])

In [13]:
imputer = Imputer(inputCols=predictors, outputCols=predictors, strategy='mean')
assembler = VectorAssembler(inputCols=predictors, outputCol='features', handleInvalid='keep')
model = LinearRegression(featuresCol="features", labelCol='label')
pipeline0 = Pipeline(stages=[imputer, assembler, model])

## **Pipelines evaluation**

We are going to evaluate the pipelines using train-train and train-validate by three ways:

- The predefine split

- Random simple validation

- k-fold, 

The idea, again,  is to make the project more enriching and productive.

Despite this, to select the best pipeline among the first four (the extra pipeline will not be compared), we will only take into account the results for the **predefine split**.

In addition, we will use **MAE** as validation metric, following the approach used in the previous project.

In [14]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")

### **Pipeline 1**

#### Predefine split

In [15]:
# Fit the pipeline
pipeline1_fitted = pipeline1.fit(data_train2)
# Apply the pipeline to the test data.
validate_predictions_pipeline1 = pipeline1_fitted.transform(data_validate).select(['label', 'prediction'])
# This action generates a new data set on which the transformers of the pipeline has been applied (imputation and assembler, in this case).
# After that the fitted model (Linear Regression) is used to make predictions for the resulting new data (using 'features' and 'label' columns).
# Then a column 'prediction' with the model predictions is added.

In [16]:
validate_predictions_pipeline1.show(3)

+-------+------------------+
|  label|        prediction|
+-------+------------------+
| 402.71|144.15834636166255|
|  696.8|188.89930250890757|
|1591.15| 751.1438624302173|
+-------+------------------+
only showing top 3 rows



In [29]:
MAE_pipeline1_predef_split = evaluator.evaluate(validate_predictions_pipeline1)
MAE_pipeline1_predef_split

450.58502952096546

In [22]:
data

DataFrame[label: double, year: int, month: int, day: int, hour: int, p54_162_1: double, p54_162_2: double, p54_162_3: double, p54_162_4: double, p54_162_5: double, p54_162_6: double, p54_162_7: double, p54_162_8: double, p54_162_9: double, p54_162_10: double, p54_162_11: double, p54_162_12: double, p54_162_13: double, p54_162_14: double, p54_162_15: double, p54_162_16: double, p54_162_17: double, p54_162_18: double, p54_162_19: double, p54_162_20: double, p54_162_21: double, p54_162_22: double, p54_162_23: double, p54_162_24: double, p54_162_25: double, p55_162_1: double, p55_162_2: double, p55_162_3: double, p55_162_4: double, p55_162_5: double, p55_162_6: double, p55_162_7: double, p55_162_8: double, p55_162_9: double, p55_162_10: double, p55_162_11: double, p55_162_12: double, p55_162_13: double, p55_162_14: double, p55_162_15: double, p55_162_16: double, p55_162_17: double, p55_162_18: double, p55_162_19: double, p55_162_20: double, p55_162_21: double, p55_162_22: double, p55_162_2

The number of selected features with this pipeline (**fpr** selector) is 544 out of 555.

In [32]:
len(pipeline1_fitted.stages[2].selectedFeatures)

544

#### Random simple validation

In [57]:
pipeline1_sv = TrainValidationSplit(estimator=pipeline1,
                              evaluator=RegressionEvaluator(metricName="mae"),
                              estimatorParamMaps=ParamGridBuilder().build(),
                              trainRatio=0.75)

pipeline1_sv_fitted = pipeline1_sv.fit(data_train)

In [58]:
MAE_pipeline1_sv = pipeline1_sv_fitted.validationMetrics[0]
MAE_pipeline1_sv

428.04922049265014

#### k-fold cv

In [59]:
pipeline1_kfold = CrossValidator(estimator=pipeline1, 
                                 evaluator=RegressionEvaluator(labelCol="label", 
                                                               predictionCol="prediction", 
                                                               metricName="mae"),
                                 numFolds=3,
                                 estimatorParamMaps=ParamGridBuilder().build()
                                )

pipeline1_kfold_fitted = pipeline1_kfold.fit(data_train)

In [60]:
MAE_pipeline1_kfold = pipeline1_kfold_fitted.avgMetrics[0]
MAE_pipeline1_kfold

432.9321541656756

### **Pipeline 2**

#### Predefine split

In [28]:
pipeline2_fitted = pipeline2.fit(data_train2)
validate_predictions_pipeline2 = pipeline2_fitted.transform(data_validate).select(['label', 'prediction'])
MAE_pipeline2_predef_split = evaluator.evaluate(validate_predictions_pipeline2)
MAE_pipeline2_predef_split

447.7850861127979

The number of selected features with this pipeline (**fwe** selector) is 496 out of 555.

In [33]:
len(pipeline2_fitted.stages[2].selectedFeatures)

496

#### Random simple validation

In [62]:
pipeline2_sv = TrainValidationSplit(estimator=pipeline2,
                              evaluator=RegressionEvaluator(metricName="mae"),
                              estimatorParamMaps=ParamGridBuilder().build(),
                              trainRatio=0.75)

pipeline2_sv_fitted = pipeline2_sv.fit(data_train)
MAE_pipeline2_sv = pipeline2_sv_fitted.validationMetrics[0]
MAE_pipeline2_sv

420.96574222724604

#### k-fold cv

In [63]:
pipeline2_kfold = CrossValidator(estimator=pipeline2, 
                                 evaluator=RegressionEvaluator(labelCol="label", 
                                                               predictionCol="prediction", 
                                                               metricName="mae"),
                                 numFolds=3,
                                 estimatorParamMaps=ParamGridBuilder().build()
                                )

pipeline2_kfold_fitted = pipeline2_kfold.fit(data_train)
MAE_pipeline2_kfold = pipeline2_kfold_fitted.avgMetrics[0]
MAE_pipeline2_kfold

428.8867064566628

### **Pipeline 3**

#### Predefine split

In [34]:
pipeline3_fitted = pipeline3.fit(data_train2)
validate_predictions_pipeline3 = pipeline3_fitted.transform(data_validate).select(['label', 'prediction'])
MAE_pipeline3_predef_split = evaluator.evaluate(validate_predictions_pipeline3)
MAE_pipeline3_predef_split

515.1934949236103

#### Random simple validation

In [65]:
pipeline3_sv = TrainValidationSplit(estimator=pipeline3,
                              evaluator=RegressionEvaluator(metricName="mae"),
                              estimatorParamMaps=ParamGridBuilder().build(),
                              trainRatio=0.75)

pipeline3_sv_fitted = pipeline3_sv.fit(data_train)
MAE_pipeline3_sv = pipeline3_sv_fitted.validationMetrics[0]
MAE_pipeline3_sv

505.81672893381335

#### k-fold cv

In [66]:
pipeline3_kfold = CrossValidator(estimator=pipeline3, 
                                 evaluator=RegressionEvaluator(labelCol="label", 
                                                               predictionCol="prediction", 
                                                               metricName="mae"),
                                 numFolds=3,
                                 estimatorParamMaps=ParamGridBuilder().build()
                                )

pipeline3_kfold_fitted = pipeline3_kfold.fit(data_train)
MAE_pipeline3_kfold = pipeline3_kfold_fitted.avgMetrics[0]
MAE_pipeline3_kfold

500.1136347904351

### **Pipeline 4**

#### Predefine split

In [32]:
pipeline4_fitted = pipeline4.fit(data_train2)
validate_predictions_pipeline4 = pipeline4_fitted.transform(data_validate).select(['label', 'prediction'])
MAE_pipeline4_predef_split = evaluator.evaluate(validate_predictions_pipeline4)
MAE_pipeline4_predef_split

451.70349096551405

#### Random simple validation

In [68]:
pipeline4_sv = TrainValidationSplit(estimator=pipeline4,
                              evaluator=RegressionEvaluator(metricName="mae"),
                              estimatorParamMaps=ParamGridBuilder().build(),
                              trainRatio=0.75)

pipeline4_sv_fitted = pipeline4_sv.fit(data_train)
MAE_pipeline4_sv = pipeline4_sv_fitted.validationMetrics[0]
MAE_pipeline4_sv

428.3733104604351

#### k-fold cv

In [69]:
pipeline4_kfold = CrossValidator(estimator=pipeline4, 
                                 evaluator=RegressionEvaluator(labelCol="label", 
                                                               predictionCol="prediction", 
                                                               metricName="mae"),
                                 numFolds=3,
                                 estimatorParamMaps=ParamGridBuilder().build()
                                )

pipeline4_kfold_fitted = pipeline4_kfold.fit(data_train)
MAE_pipeline4_kfold = pipeline4_kfold_fitted.avgMetrics[0]
MAE_pipeline4_kfold

433.057891066047

### **Pipeline 5 (extra)**

#### Using predefine split

Apache Spark's MLlib doesn't natively support predefined splits for HPO like scikit-learn does with its PredefinedSplit. 

This is the reason why we cannot compare the extra pipeline with the others by predefine split.

#### Using random  simple validation

In [14]:
paramGrid = ParamGridBuilder() \
    .addGrid(imputer.strategy, ['mean', 'median']) \
    .addGrid(pca.k, [3, 10]) \
    .addGrid(pipeline5.getStages()[-1].maxDepth, [2, 6]) \
    .addGrid(pipeline5.getStages()[-1].maxBins, [20, 40]) \
    .addGrid(pipeline5.getStages()[-1].minInstancesPerNode, [2, 10]) \
    .build()

pipeline5_HPO_sv = TrainValidationSplit(estimator=pipeline5,
                              evaluator=RegressionEvaluator(metricName="mae"),
                              estimatorParamMaps=paramGrid,
                              trainRatio=0.75)

pipeline5_HPO_sv_fitted = pipeline5_HPO_sv.fit(data_train)

final_model_pipeline5_sv = pipeline5_HPO_sv_fitted.bestModel
# Time: 5.20 mins

In [15]:
final_model_pipeline5_sv_params = {param.name: value for param, value in final_model_pipeline5_sv.stages[-1].extractParamMap().items()}
final_model_pipeline5_sv_params

{'cacheNodeIds': False,
 'checkpointInterval': 10,
 'featuresCol': 'pcaFeatures',
 'impurity': 'variance',
 'labelCol': 'label',
 'leafCol': '',
 'maxBins': 40,
 'maxDepth': 6,
 'maxMemoryInMB': 256,
 'minInfoGain': 0.0,
 'minInstancesPerNode': 10,
 'minWeightFractionPerNode': 0.0,
 'predictionCol': 'prediction',
 'seed': 3529065940959987723}

In [17]:
final_model_pipeline5_sv_MAE = min(pipeline5_HPO_sv_fitted.validationMetrics)
final_model_pipeline5_sv_MAE

366.0861391472056

#### Using k-fold cv

In [18]:
paramGrid = ParamGridBuilder() \
    .addGrid(imputer.strategy, ['mean', 'median']) \
    .addGrid(pca.k, [3, 10]) \
    .addGrid(pipeline5.getStages()[-1].maxDepth, [2, 6]) \
    .addGrid(pipeline5.getStages()[-1].maxBins, [20, 40]) \
    .addGrid(pipeline5.getStages()[-1].minInstancesPerNode, [2, 10]) \
    .build()

pipeline5_HPO_kfold = CrossValidator(estimator=pipeline5, 
                                     evaluator=RegressionEvaluator(labelCol="label", 
                                                               predictionCol="prediction", 
                                                               metricName="mae"),
                                     numFolds=2,
                                    estimatorParamMaps=paramGrid
                                )

pipeline5_HPO_kfold_fitted = pipeline5_HPO_kfold.fit(data_train)

final_model_pipeline5_kfold = pipeline5_HPO_kfold_fitted.bestModel
# Time: 8.26 mins

In [48]:
final_model_pipeline5_kfold_imputer_strategy = final_model_pipeline5_kfold.stages[0].getStrategy()
final_model_pipeline5_kfold_imputer_strategy

'median'

In [47]:
final_model_pipeline5_kfold_pca_n_components = final_model_pipeline5_kfold.stages[3].getK()
final_model_pipeline5_kfold_pca_n_components

3

In [19]:
final_model_pipeline5_kfold_params = {param.name: value for param, value in final_model_pipeline5_kfold.stages[-1].extractParamMap().items()}
final_model_pipeline5_kfold_params

{'cacheNodeIds': False,
 'checkpointInterval': 10,
 'featuresCol': 'pcaFeatures',
 'impurity': 'variance',
 'labelCol': 'label',
 'leafCol': '',
 'maxBins': 40,
 'maxDepth': 6,
 'maxMemoryInMB': 256,
 'minInfoGain': 0.0,
 'minInstancesPerNode': 10,
 'minWeightFractionPerNode': 0.0,
 'predictionCol': 'prediction',
 'seed': 3529065940959987723}

In [20]:
final_model_pipeline5_kfold_MAE = min(pipeline5_HPO_kfold_fitted.avgMetrics)
final_model_pipeline5_kfold_MAE

368.84319998691024

Now we can update the pipeline taking into account the HPO results and compute the MAE again, to check that it is the same that the one we have just obtained:

In [52]:
imputer = Imputer(inputCols=predictors, outputCols=predictors, strategy=final_model_pipeline5_kfold_imputer_strategy)  
assembler = VectorAssembler(inputCols=predictors, outputCol='features', handleInvalid='keep')
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
pca = PCA(inputCol="scaledFeatures", outputCol="pcaFeatures", k=final_model_pipeline5_kfold_pca_n_components)  
model = DecisionTreeRegressor(featuresCol="pcaFeatures", labelCol='label', 
                              maxDepth=final_model_pipeline5_kfold_params['maxDepth'],
                              maxBins=final_model_pipeline5_kfold_params['maxBins'],
                              minInstancesPerNode=final_model_pipeline5_kfold_params['minInstancesPerNode'])
pipeline5_final = Pipeline(stages=[imputer, assembler, scaler, pca, model])

In [53]:
pipeline5_final_kfold = CrossValidator(estimator=pipeline5_final, 
                                     evaluator=RegressionEvaluator(labelCol="label", 
                                                               predictionCol="prediction", 
                                                               metricName="mae"),
                                     numFolds=2,
                                     estimatorParamMaps=ParamGridBuilder().build()
                                    )
pipeline5_final_kfold_fitted = pipeline5_final_kfold.fit(data_train)
pipeline5_final_kfold_MAE = min(pipeline5_final_kfold_fitted.avgMetrics)
pipeline5_final_kfold_MAE

383.82841469017933

### Pipeline 0

#### Predefine split

In [25]:
pipeline0_fitted = pipeline0.fit(data_train2)
validate_predictions_pipeline0 = pipeline0_fitted.transform(data_validate).select(['label', 'prediction'])
MAE_pipeline0_predef_split = evaluator.evaluate(validate_predictions_pipeline0)
MAE_pipeline0_predef_split

452.3205499585219

#### Random simple validation

In [40]:
pipeline0_sv = TrainValidationSplit(estimator=pipeline0,
                              evaluator=RegressionEvaluator(metricName="mae"),
                              estimatorParamMaps=ParamGridBuilder().build(),
                              trainRatio=0.75)

pipeline0_sv_fitted = pipeline0_sv.fit(data_train)
MAE_pipeline0_sv = pipeline0_sv_fitted.validationMetrics[0]
MAE_pipeline0_sv

426.48961804895526

#### k-fold cv

In [39]:
pipeline0_kfold = CrossValidator(estimator=pipeline0, 
                                 evaluator=RegressionEvaluator(labelCol="label", 
                                                               predictionCol="prediction", 
                                                               metricName="mae"),
                                 numFolds=3,
                                 estimatorParamMaps=ParamGridBuilder().build()
                                )

pipeline0_kfold_fitted = pipeline0_kfold.fit(data_train)
MAE_pipeline0_kfold = pipeline0_kfold_fitted.avgMetrics[0]
MAE_pipeline0_kfold

434.6859310337352

## **Pipelines comparison**

In this section we are going to compare the different pipelines (from 0 to 4) according to their validation MAE computed by the predefined split.

In [33]:
MAE_arr = np.array([MAE_pipeline0_predef_split, MAE_pipeline1_predef_split, MAE_pipeline2_predef_split, 
                    MAE_pipeline3_predef_split, MAE_pipeline4_predef_split])

MAE_arr

array([452.32054996, 450.58502952, 447.78508611, 515.19349492,
       451.70349097])

In [35]:
pipelines_indices_sorted = np.argsort(MAE_arr)
pipelines_indices_sorted

array([2, 1, 4, 0, 3])

In [54]:
pipelines_indices_sorted[0]

2

Therefore, the "best" pipeline between the five compared is the **pipeline 2**: [imputer (mean), assembler, feature selector (fwe), Linear Regression. This will be consider the **final pipeline**.

Again, we want to highlight that we have excluded the extra pipeline (pipeline 5) of the comparison, because it couldn`t be evaluated with the predefine split, and this is the validation method we have choose to evaluate all the  pipelines.

## **Estimation of future performance for the final pipeline**

In this section we are going to estimate the future performance of the final pipeline (pipeline 2).

In [36]:
pipeline2_fitted = pipeline2.fit(data_train)
test_predictions_pipeline2 = pipeline2_fitted.transform(data_test).select(['label', 'prediction'])
MAE_future_performance_pipeline2_predef_split = evaluator.evaluate(test_predictions_pipeline2)
MAE_future_performance_pipeline2_predef_split

412.9088216020078

The estimation of future performance of the final pipeline in terms of MAE is 412.91.

## **Conclusions**


We can conclude several things from this project:

- Spark provide a simple and powerful way for carrying out Machine Learning tasks.

- Pipelines are one of the most powerful tools provided by Spark to perform Machine Learning tasks. Thanks to this project and the previous one, is pretty clear that pipelines are key element in any Machine Learning project, so, any good Data Scientist should know how to use them to make their ML projects much more efficient and robust.

- Feature selection is an important step in every Machine Learning pipeline since it can lead to better prediction performance.