Condividi tramite


Informazioni sui moduli WebAssembly (WASM) e sulle definizioni di grafo per i grafici del flusso di dati

I grafici del flusso di dati in Azure IoT Operations elaborano i dati di telemetria alla periferia instradandoli attraverso una serie di operatori, come mappe, filtri e rami. Imballi la tua logica di elaborazione personalizzata come moduli WebAssembly (WASM) e li colleghi tra loro in una definizione di grafo, così puoi trasformare, filtrare e arricchire i dati senza dover scrivere servizi completi.

Questo articolo illustra i tipi di operatore, il modello di flusso di dati tempestivo, la configurazione del modulo, le API host e lo schema WIT che supporta i moduli WASM. Per compilare, testare ed eseguire il debug dei moduli in locale con l'estensione VS Code o l'interfaccia della aio-dataflow riga di comando, vedere Compilare moduli WASM per i flussi di dati.

Operatori e moduli

Gli operatori sono le unità di elaborazione in un grafico del flusso di dati. Ogni tipo ha uno scopo specifico:

Operatore Scopo Tipo restituito
Mappa Trasformare ogni elemento di dati (ad esempio, convertire unità di temperatura) DataModel
Filter Passare o eliminare elementi in base a una condizione bool
Branch Indirizzare gli elementi a due percorsi diversi bool
Accumulare Aggregare gli elementi nelle finestre temporali DataModel
Concatena Unire più flussi mantenendo l'ordine N/A
Delay Avanzare i timestamp per controllare il timing N/A

Un modulo è il file binario WASM che implementa uno o più operatori. Ad esempio, un singolo temperature.wasm modulo può fornire sia un map operatore (per la conversione) che un filter operatore (per il controllo della soglia).

Graph Definition → References Module → Provides Operator → Processes Data
     ↓                    ↓               ↓              ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C

Questa separazione consente di riutilizzare lo stesso modulo con configurazioni del grafo diverse, moduli di versione in modo indipendente e modificare il comportamento tramite parametri di configurazione senza ricompilare.

Modello di flusso di dati tempestivo

I grafici del flusso di dati si basano sul modello computazionale Timely dataflow del progetto Naiad di Microsoft Research. Ogni elemento di dati contiene un timestamp di orologio logico ibrido:

record hybrid-logical-clock {
    timestamp: timespec,  // Wall-clock time (secs + nanos)
    counter: u64,         // Logical ordering for same-time events
    node-id: string,      // Originating node
}

Questo consente l'elaborazione deterministica (lo stesso input produce sempre lo stesso output), la semantica esattamente una volta e il coordinamento distribuito tra i nodi. Per lo schema WIT completo, vedere il repository samples.

Per informazioni su come sviluppare moduli WASM con l'estensione VS Code, vedere Compilare moduli WASM con l'estensione VS Code.

Operatori di scrittura

Operatore Map

Un operatore map trasforma ogni elemento di dati e restituisce una copia modificata. L'esempio di avvio rapido mostra una mappa di base. Ecco un esempio più complesso che usa i parametri di configurazione:

use std::sync::OnceLock;
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;

static OUTPUT_UNIT: OnceLock<String> = OnceLock::new();

fn unit_converter_init(configuration: ModuleConfiguration) -> bool {
    let unit = configuration.properties
        .iter()
        .find(|(k, _)| k == "output_unit")
        .map(|(_, v)| v.clone())
        .unwrap_or_else(|| "celsius".to_string());

    OUTPUT_UNIT.set(unit.clone()).unwrap();
    logger::log(Level::Info, "converter", &format!("Output unit: {unit}"));
    true
}

#[map_operator(init = "unit_converter_init")]
fn convert_temperature(input: DataModel) -> Result<DataModel, Error> {
    let DataModel::Message(mut msg) = input else {
        return Err(Error { message: "Expected Message variant".into() });
    };

    let payload = &msg.payload.read();
    let mut data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    if let Some(temp) = data["temperature"]["value"].as_f64() {
        let unit = OUTPUT_UNIT.get().map(|s| s.as_str()).unwrap_or("celsius");
        let converted = match unit {
            "kelvin" => (temp - 32.0) * 5.0 / 9.0 + 273.15,
            _ => (temp - 32.0) * 5.0 / 9.0, // celsius
        };
        data["temperature"]["value"] = serde_json::json!(converted);
        data["temperature"]["unit"] = serde_json::json!(unit);
        let out = serde_json::to_string(&data).unwrap();
        msg.payload = BufferOrBytes::Bytes(out.into_bytes());
    }

    Ok(DataModel::Message(msg))
}

Operatore di filtro

Un filtro ritorna true per far passare i dati o false per eliminarli.

use std::sync::OnceLock;
use wasm_graph_sdk::macros::filter_operator;
use wasm_graph_sdk::logger::{self, Level};

const DEFAULT_LOWER: f64 = -40.0;
const DEFAULT_UPPER: f64 = 3422.0;

static LOWER_BOUND: OnceLock<f64> = OnceLock::new();
static UPPER_BOUND: OnceLock<f64> = OnceLock::new();

fn filter_init(configuration: ModuleConfiguration) -> bool {
    for (key, value) in &configuration.properties {
        match key.as_str() {
            "temperature_lower_bound" => {
                if let Ok(v) = value.parse::<f64>() { LOWER_BOUND.set(v).ok(); }
                else { logger::log(Level::Error, "filter", &format!("Invalid lower bound: {value}")); }
            }
            "temperature_upper_bound" => {
                if let Ok(v) = value.parse::<f64>() { UPPER_BOUND.set(v).ok(); }
                else { logger::log(Level::Error, "filter", &format!("Invalid upper bound: {value}")); }
            }
            _ => {}
        }
    }
    true
}

#[filter_operator(init = "filter_init")]
fn filter_temperature(input: DataModel) -> Result<bool, Error> {
    let lower = LOWER_BOUND.get().copied().unwrap_or(DEFAULT_LOWER);
    let upper = UPPER_BOUND.get().copied().unwrap_or(DEFAULT_UPPER);

    let DataModel::Message(msg) = &input else { return Ok(true); };
    let payload = &msg.payload.read();
    let data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    if let Some(temp) = data.get("temperature").and_then(|t| t.get("value")).and_then(|v| v.as_f64()) {
        Ok(temp >= lower && temp <= upper)
    } else {
        Ok(true) // Pass through non-temperature messages
    }
}

Operatore di ramo

Una diramazione indirizza i dati su due percorsi. Restituire false per il primo braccio, true per il secondo.

use wasm_graph_sdk::macros::branch_operator;

fn branch_init(_configuration: ModuleConfiguration) -> bool { true }

#[branch_operator(init = "branch_init")]
fn branch_by_type(_timestamp: HybridLogicalClock, input: DataModel) -> Result<bool, Error> {
    let DataModel::Message(msg) = &input else { return Ok(true); };
    let payload = &msg.payload.read();
    let data: serde_json::Value = serde_json::from_slice(payload)
        .map_err(|e| Error { message: format!("JSON parse error: {e}") })?;

    // false = first arm (temperature data), true = second arm (everything else)
    Ok(data.get("temperature").is_none())
}

Parametri di configurazione del modulo

Gli operatori possono ricevere parametri di configurazione di runtime tramite la init funzione . In questo modo è possibile personalizzare il comportamento senza ricompilare il modulo.

La init funzione riceve una ModuleConfiguration struct:

record module-configuration {
    properties: list<tuple<string, string>>,   // Key-value pairs from graph definition
    module-schemas: list<module-schema>        // Schema definitions if configured
}

La init funzione viene chiamata una volta al caricamento del modulo. Tornare true per avviare l'elaborazione o false per segnalare un errore di configurazione. Se init restituisce false, l'operatore non elabora dati e il flusso di dati registra un errore.

Importante

Se l'operatore dipende dai parametri di configurazione (ad esempio, limiti di filtro o valori soglia), gestire sempre il caso in cui non vengono forniti. Usare valori predefiniti sensibili o restituire false da init. Non chiamare unwrap() né causare un errore di panico sui parametri mancanti, perché ciò fa arrestare l'operatore in fase di esecuzione senza un messaggio di errore chiaro.

I parametri vengono definiti nella sezione della definizione del moduleConfigurations grafo:

moduleConfigurations:
  - name: module-temperature/filter
    parameters:
      temperature_lower_bound:
        name: temperature_lower_bound
        description: "Minimum valid temperature in Celsius"
      temperature_upper_bound:
        name: temperature_upper_bound
        description: "Maximum valid temperature in Celsius"

Il name campo deve corrispondere al nome dell'operatore nella sezione del operations grafico. Per altre informazioni sulla struttura della definizione del grafo, vedere Configurare le definizioni dei grafici WebAssembly.

Dimensioni e prestazioni del modulo

I moduli WASM vengono eseguiti in un ambiente in modalità sandbox con risorse limitate. Tenere presenti queste linee guida:

  • Ridurre al minimo le dipendenze. Per Rust, usare default-features = false su serde e serde_json per ridurre le dimensioni binarie. Evitare di trascinare grandi casse.
  • Le dimensioni del modulo sono importanti. I moduli più piccoli vengono caricati più velocemente e usano meno memoria. Un convertitore di temperatura tipico è ~2 MB (versione Rust) o ~5 MB (Python). Usare le build di versione per l'ambiente di produzione.
  • Evitare operazioni di blocco. La process funzione deve essere completata rapidamente. Il calcolo intensivo ritarda l'intera pipeline del flusso di dati.
  • Usare wasm-tools per controllare. Eseguire wasm-tools component wit your-module.wasm per verificare che il modulo esporta le interfacce previste prima di eseguire il push in un registro.

Controllo delle versioni e CI/CD

Usare il versionamento semantico per i moduli e le definizioni di grafo. Il grafico del flusso di dati fa riferimento agli artefatti in base al nome e al tag (ad esempio, temperature:1.0.0), in modo da poter aggiornare i moduli senza modificare le definizioni dei grafi eseguendo il push di una nuova versione con lo stesso tag.

Per le compilazioni automatizzate, una pipeline tipica è simile alla seguente:

  1. Compilare il modulo WASM (usare il generatore Docker per coerenza).
  2. Eseguire wasm-tools component wit per verificare le interfacce esportate.
  3. Eseguire unit test sulla logica di base. Per altre informazioni, vedere Testare i moduli WASM.
  4. Eseguire il push nel Registro di sistema con ORAS, contrassegnando con la versione di compilazione.
  5. (Optional) Aggiorna il riferimento dell'artefatto nella definizione del grafo e esegui il push.

Il grafico del flusso di dati seleziona automaticamente le nuove versioni del modulo di cui è stato eseguito il push nello stesso tag senza richiedere una ridistribuzione. Vedere Aggiornare un modulo in un grafico in esecuzione.

API host

I moduli WASM possono usare le API host per la gestione dello stato, la registrazione e le metriche.

Stato store

Rendere persistenti i dati tra process le chiamate usando un archivio di stato distribuito.

use wasm_graph_sdk::state_store;

// Set value (fire-and-forget; state_store returns StateStoreError, not types::Error)
let options = state_store::SetOptions {
    conditions: state_store::SetConditions::Unconditional,
    expires: None,
};
let _ = state_store::set(key.as_bytes(), value.as_bytes(), None, None, options);

// Get value
let response = state_store::get(key.as_bytes(), None);

// Delete key
let _ = state_store::del(key.as_bytes(), None, None);

Registrazione

Registrazione dei log strutturata con livelli di gravità

use wasm_graph_sdk::logger::{self, Level};

logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));

Metrics

Metriche compatibili con OpenTelemetry:

use wasm_graph_sdk::metrics::{self, CounterValue, HistogramValue, Label};

let labels = vec![Label { key: "module".to_owned(), value: "my-operator".to_owned() }];
let _ = metrics::add_to_counter("requests_total", CounterValue::U64(1), Some(&labels));
let _ = metrics::record_to_histogram("processing_duration", HistogramValue::F64(duration_ms), Some(&labels));

Inferenza ONNX

Per incorporare ed eseguire modelli ONNX di piccole dimensioni all'interno dei moduli per l'inferenza in banda, vedere Eseguire l'inferenza ONNX nei grafici del flusso di dati WebAssembly.

Informazioni di riferimento sullo schema WIT

Tutti gli operatori implementano interfacce standardizzate definite usando tipi di interfaccia WebAssembly (WIT). Gli schemi completi sono disponibili nel repository samples.

Interfacce operatore

Ogni operatore ha una init funzione per la configurazione e una process funzione per la gestione dei dati:

interface map {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(message: data-model) -> result<data-model, error>;
}

interface filter {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(message: data-model) -> result<bool, error>;
}

interface branch {
    use types.{data-model, error, module-configuration};
    use hybrid-logical-clock.{hybrid-logical-clock};
    init: func(configuration: module-configuration) -> bool;
    process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}

interface accumulate {
    use types.{data-model, error, module-configuration};
    init: func(configuration: module-configuration) -> bool;
    process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}

Modello di dati

Da processor.wit (wasm-graph:processor@1.1.0):

record timestamp {
    timestamp: timespec,        // Physical time (seconds + nanoseconds)
    counter: u64,               // Logical counter for ordering
    node-id: buffer-or-string,  // Originating node
}

record message {
    timestamp: timestamp,
    topic: buffer-or-bytes,
    content-type: option<buffer-or-string>,
    payload: buffer-or-bytes,
    properties: message-properties,
    schema: option<message-schema>,
}

variant data-model {
    buffer-or-bytes(buffer-or-bytes),  // Raw byte data
    message(message),                  // MQTT messages (most common)
    snapshot(snapshot),                // Video/image frames
}

Annotazioni

La maggior parte degli operatori funziona con la message variante. Verificare la presenza di questo tipo all'inizio della process funzione. Il payload usa un handle di buffer host (buffer) per le letture zero-copy o i byte di proprietà del modulo (bytes). Chiamare buffer.read() per copiare i byte host nella memoria del modulo.