Bemærk
Adgang til denne side kræver godkendelse. Du kan prøve at logge på eller ændre mapper.
Adgang til denne side kræver godkendelse. Du kan prøve at ændre mapper.
I denne artikel får du mere at vide om, hvordan du bruger Apache Spark MLlib til at oprette et program til maskinel indlæring, der håndterer enkel forudsigende analyse på et Azure Open-datasæt. Spark leverer indbyggede biblioteker til maskinel indlæring. I dette eksempel bruges klassificering via logistisk regression.
Kernebibliotekerne SparkML og MLlib Spark indeholder mange hjælpeprogrammer, der er nyttige til opgaver i forbindelse med maskinel indlæring. Disse hjælpeprogrammer er velegnede til:
- Klassificering
- Klyngedannelse
- Hypotesetest og beregning af eksempelstatistik
- Regression
- SVD (Singular Value Decomposition) og PCA (Principal Component Analysis)
- Emnemodellering
Forstå klassificering og logistisk regression
Klassificering, der er en populær maskinel indlæringsopgave, omfatter sortering af inputdata i kategorier. En klassificeringsalgoritme skal finde ud af, hvordan du tildeler mærkater til de angivne inputdata. En algoritme til maskinel indlæring kan f.eks. acceptere aktieoplysninger som input og opdele bestanden i to kategorier: aktier, som du skal sælge, og aktier, som du skal beholde.
Logistisk regressionsalgoritme er nyttig til klassificering. Spark-logistisk regressions-API er nyttig til binær klassificering af inputdata i en af to grupper. Du kan få flere oplysninger om logistisk regression under Wikipedia.
Logistisk regression producerer en logistisk funktion , der kan forudsige sandsynligheden for, at en inputvektor tilhører den ene eller den anden gruppe.
Eksempel på forudsigende analyse af NYC-taxadata
Dataene er tilgængelige via azure Open Datasets-ressourcen. Dette datasætundersæt hoster oplysninger om gule taxature, herunder starttidspunkter, sluttidspunkter, startplaceringer, slutplaceringer, rejseomkostninger og andre attributter.
Resten af denne artikel er afhængig af Apache Spark for først at udføre en analyse af DATAENE for NYC-drikkepenge og derefter udvikle en model for at forudsige, om en bestemt tur indeholder et tip eller ej.
Opret en Apache Spark-model til maskinel indlæring
Opret en PySpark-notesbog. Du kan finde flere oplysninger under Opret en notesbog.
Importér de typer, der kræves til denne notesbog.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluatorBrug MLflow til at spore dine maskinlæringseksperimenter og tilsvarende kørsler. Hvis Microsoft Fabric Autologging er aktiveret, registreres de tilsvarende målepunkter og parametre automatisk.
import mlflow
Konstruer inputdatarammen
Dette eksempel indlæser dataene i en Pandas-dataramme og konverterer dem derefter til en Apache Spark-dataramme. I det format kan du anvende andre Apache Spark-operationer til at rense og filtrere datasættet.
Indsæt disse linjer i en ny celle, og kør dem for at oprette en Spark DataFrame. Dette trin henter dataene direkte fra Azure Open Datasets lager. Du kan filtrere disse data ned for at undersøge et bestemt datavindue. Kodeeksemplet bruger et filter, der returnerer en enkelt måneds data.
blob_account_name = "azureopendatastorage" blob_container_name = "nyctlc" blob_relative_path = "yellow" wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}" nyc_tlc_df = spark.read.parquet(wasbs_path) \ .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-06")) \ .repartition(20)Denne kode reducerer datasættet til ca. 10.000 rækker. For at fremskynde udviklingen og træningen tager koden for nu datasættet ned.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)Se på dataene med den indbyggede
display()kommando. Med denne kommando kan du nemt se en dataprøve eller grafisk udforske tendenser i dataene.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Forbered dataene
Dataforberedelse er et afgørende trin i processen til maskinel indlæring. Det omfatter rengøring, transformation og organisering af rådata for at gøre dem egnede til analyse og modellering. I dette kodeeksempel udfører du flere trin til dataforberedelse:
- Filtrer datasættet for at fjerne udenforliggende værdier og forkerte værdier
- Fjern kolonner, der ikke er nødvendige til modeltræning
- Opret nye kolonner ud fra rådata
- Opret en mærkat for at bestemme, om en given taxatur omfatter et tip eller ej
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Derefter skal du foretage endnu et pass over dataene for at tilføje de endelige funktioner.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Opret en logistisk regressionsmodel
Den endelige opgave konverterer de navngivne data til et format, som logistisk regression kan håndtere. Inputtet til en logistisk regressionsalgoritme skal have en mærkat-/funktionsvektorparstruktur, hvor funktionsvektoren er en vektor af tal, der repræsenterer inputpunktet.
Baseret på de endelige opgavekrav skal du omregne de kategoriske kolonner til tal. Specifikt konverter trafficTimeBins og weekdayString kolonnerne til heltalsrepræsentationer. Mange muligheder er tilgængelige for at imødekomme dette behov. Dette eksempel omfatter tilgangen OneHotEncoder :
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Denne handling resulterer i en ny DataFrame med alle kolonner i det korrekte format for at oplære en model.
Oplær en logistisk regressionsmodel
Den første opgave opdeler datasættet i et oplæringssæt og et test- eller valideringssæt.
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Når du har to DataFrames, opret modelformlen og kør den mod træningsdatarammen. Valider derefter mod testdatarammen. Eksperimentér med forskellige versioner af modelformlen for at se effekten af forskellige kombinationer.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print("Area under ROC = %s" % auc)
Celleoutputtet:
Area under ROC = 0.9749430523917996
Opret en visuel gengivelse af forudsigelsen
Byg en endelig visualisering for at fortolke modelresultaterne. En ROC-kurve kan vise resultatet.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Relateret indhold
- Brug AI-eksempler til at bygge modeller til maskinel indlæring: Brug AI-eksempler
- Spor kørsler af maskinel indlæring ved hjælp af eksperimenter: Eksperimenter med maskinel indlæring