Del via


Byg en model til maskinel indlæring med Apache Spark MLlib

I denne artikel får du mere at vide om, hvordan du bruger Apache Spark MLlib til at oprette et program til maskinel indlæring, der håndterer enkel forudsigende analyse på et Azure Open-datasæt. Spark leverer indbyggede biblioteker til maskinel indlæring. I dette eksempel bruges klassificering via logistisk regression.

Kernebibliotekerne SparkML og MLlib Spark indeholder mange hjælpeprogrammer, der er nyttige til opgaver i forbindelse med maskinel indlæring. Disse hjælpeprogrammer er velegnede til:

  • Klassificering
  • Klyngedannelse
  • Hypotesetest og beregning af eksempelstatistik
  • Regression
  • SVD (Singular Value Decomposition) og PCA (Principal Component Analysis)
  • Emnemodellering

Forstå klassificering og logistisk regression

Klassificering, der er en populær maskinel indlæringsopgave, omfatter sortering af inputdata i kategorier. En klassificeringsalgoritme skal finde ud af, hvordan du tildeler mærkater til de angivne inputdata. En algoritme til maskinel indlæring kan f.eks. acceptere aktieoplysninger som input og opdele bestanden i to kategorier: aktier, som du skal sælge, og aktier, som du skal beholde.

Logistisk regressionsalgoritme er nyttig til klassificering. Spark-logistisk regressions-API er nyttig til binær klassificering af inputdata i en af to grupper. Du kan få flere oplysninger om logistisk regression under Wikipedia.

Logistisk regression producerer en logistisk funktion , der kan forudsige sandsynligheden for, at en inputvektor tilhører den ene eller den anden gruppe.

Eksempel på forudsigende analyse af NYC-taxadata

Dataene er tilgængelige via azure Open Datasets-ressourcen. Dette datasætundersæt hoster oplysninger om gule taxature, herunder starttidspunkter, sluttidspunkter, startplaceringer, slutplaceringer, rejseomkostninger og andre attributter.

Resten af denne artikel er afhængig af Apache Spark for først at udføre en analyse af DATAENE for NYC-drikkepenge og derefter udvikle en model for at forudsige, om en bestemt tur indeholder et tip eller ej.

Opret en Apache Spark-model til maskinel indlæring

  1. Opret en PySpark-notesbog. Du kan finde flere oplysninger under Opret en notesbog.

  2. Importér de typer, der kræves til denne notesbog.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Brug MLflow til at spore dine maskinlæringseksperimenter og tilsvarende kørsler. Hvis Microsoft Fabric Autologging er aktiveret, registreres de tilsvarende målepunkter og parametre automatisk.

    import mlflow
    

Konstruer inputdatarammen

Dette eksempel indlæser dataene i en Pandas-dataramme og konverterer dem derefter til en Apache Spark-dataramme. I det format kan du anvende andre Apache Spark-operationer til at rense og filtrere datasættet.

  1. Indsæt disse linjer i en ny celle, og kør dem for at oprette en Spark DataFrame. Dette trin henter dataene direkte fra Azure Open Datasets lager. Du kan filtrere disse data ned for at undersøge et bestemt datavindue. Kodeeksemplet bruger et filter, der returnerer en enkelt måneds data.

    blob_account_name = "azureopendatastorage"
    blob_container_name = "nyctlc"
    blob_relative_path = "yellow"
    wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
    
    nyc_tlc_df = spark.read.parquet(wasbs_path) \
        .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-06")) \
        .repartition(20)
    
  2. Denne kode reducerer datasættet til ca. 10.000 rækker. For at fremskynde udviklingen og træningen tager koden for nu datasættet ned.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Se på dataene med den indbyggede display() kommando. Med denne kommando kan du nemt se en dataprøve eller grafisk udforske tendenser i dataene.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Forbered dataene

Dataforberedelse er et afgørende trin i processen til maskinel indlæring. Det omfatter rengøring, transformation og organisering af rådata for at gøre dem egnede til analyse og modellering. I dette kodeeksempel udfører du flere trin til dataforberedelse:

  • Filtrer datasættet for at fjerne udenforliggende værdier og forkerte værdier
  • Fjern kolonner, der ikke er nødvendige til modeltræning
  • Opret nye kolonner ud fra rådata
  • Opret en mærkat for at bestemme, om en given taxatur omfatter et tip eller ej
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Derefter skal du foretage endnu et pass over dataene for at tilføje de endelige funktioner.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Opret en logistisk regressionsmodel

Den endelige opgave konverterer de navngivne data til et format, som logistisk regression kan håndtere. Inputtet til en logistisk regressionsalgoritme skal have en mærkat-/funktionsvektorparstruktur, hvor funktionsvektoren er en vektor af tal, der repræsenterer inputpunktet.

Baseret på de endelige opgavekrav skal du omregne de kategoriske kolonner til tal. Specifikt konverter trafficTimeBins og weekdayString kolonnerne til heltalsrepræsentationer. Mange muligheder er tilgængelige for at imødekomme dette behov. Dette eksempel omfatter tilgangen OneHotEncoder :

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Denne handling resulterer i en ny DataFrame med alle kolonner i det korrekte format for at oplære en model.

Oplær en logistisk regressionsmodel

Den første opgave opdeler datasættet i et oplæringssæt og et test- eller valideringssæt.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Når du har to DataFrames, opret modelformlen og kør den mod træningsdatarammen. Valider derefter mod testdatarammen. Eksperimentér med forskellige versioner af modelformlen for at se effekten af forskellige kombinationer.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print("Area under ROC = %s" % auc)

Celleoutputtet:

Area under ROC = 0.9749430523917996

Opret en visuel gengivelse af forudsigelsen

Byg en endelig visualisering for at fortolke modelresultaterne. En ROC-kurve kan vise resultatet.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graf, der viser ROC-kurven for logistisk regression i tipmodellen.