Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
I grafici del flusso di dati in Azure IoT Operations elaborano i dati di telemetria alla periferia instradandoli attraverso una serie di operatori, come mappe, filtri e rami. Imballi la tua logica di elaborazione personalizzata come moduli WebAssembly (WASM) e li colleghi tra loro in una definizione di grafo, così puoi trasformare, filtrare e arricchire i dati senza dover scrivere servizi completi.
Questo articolo illustra i tipi di operatore, il modello di flusso di dati tempestivo, la configurazione del modulo, le API host e lo schema WIT che supporta i moduli WASM. Per compilare, testare ed eseguire il debug dei moduli in locale con l'estensione VS Code o l'interfaccia della aio-dataflow riga di comando, vedere Compilare moduli WASM per i flussi di dati.
Operatori e moduli
Gli operatori sono le unità di elaborazione in un grafico del flusso di dati. Ogni tipo ha uno scopo specifico:
| Operatore | Scopo | Tipo restituito |
|---|---|---|
| Mappa | Trasformare ogni elemento di dati (ad esempio, convertire unità di temperatura) | DataModel |
| Filter | Passare o eliminare elementi in base a una condizione | bool |
| Branch | Indirizzare gli elementi a due percorsi diversi | bool |
| Accumulare | Aggregare gli elementi nelle finestre temporali | DataModel |
| Concatena | Unire più flussi mantenendo l'ordine | N/A |
| Delay | Avanzare i timestamp per controllare il timing | N/A |
Un modulo è il file binario WASM che implementa uno o più operatori. Ad esempio, un singolo temperature.wasm modulo può fornire sia un map operatore (per la conversione) che un filter operatore (per il controllo della soglia).
Graph Definition → References Module → Provides Operator → Processes Data
↓ ↓ ↓ ↓
"temperature:1.0.0" → temperature.wasm → map function → °F to °C
Questa separazione consente di riutilizzare lo stesso modulo con configurazioni del grafo diverse, moduli di versione in modo indipendente e modificare il comportamento tramite parametri di configurazione senza ricompilare.
Modello di flusso di dati tempestivo
I grafici del flusso di dati si basano sul modello computazionale Timely dataflow del progetto Naiad di Microsoft Research. Ogni elemento di dati contiene un timestamp di orologio logico ibrido:
record hybrid-logical-clock {
timestamp: timespec, // Wall-clock time (secs + nanos)
counter: u64, // Logical ordering for same-time events
node-id: string, // Originating node
}
Questo consente l'elaborazione deterministica (lo stesso input produce sempre lo stesso output), la semantica esattamente una volta e il coordinamento distribuito tra i nodi. Per lo schema WIT completo, vedere il repository samples.
Per informazioni su come sviluppare moduli WASM con l'estensione VS Code, vedere Compilare moduli WASM con l'estensione VS Code.
Operatori di scrittura
Operatore Map
Un operatore map trasforma ogni elemento di dati e restituisce una copia modificata. L'esempio di avvio rapido mostra una mappa di base. Ecco un esempio più complesso che usa i parametri di configurazione:
use std::sync::OnceLock;
use wasm_graph_sdk::logger::{self, Level};
use wasm_graph_sdk::macros::map_operator;
static OUTPUT_UNIT: OnceLock<String> = OnceLock::new();
fn unit_converter_init(configuration: ModuleConfiguration) -> bool {
let unit = configuration.properties
.iter()
.find(|(k, _)| k == "output_unit")
.map(|(_, v)| v.clone())
.unwrap_or_else(|| "celsius".to_string());
OUTPUT_UNIT.set(unit.clone()).unwrap();
logger::log(Level::Info, "converter", &format!("Output unit: {unit}"));
true
}
#[map_operator(init = "unit_converter_init")]
fn convert_temperature(input: DataModel) -> Result<DataModel, Error> {
let DataModel::Message(mut msg) = input else {
return Err(Error { message: "Expected Message variant".into() });
};
let payload = &msg.payload.read();
let mut data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
if let Some(temp) = data["temperature"]["value"].as_f64() {
let unit = OUTPUT_UNIT.get().map(|s| s.as_str()).unwrap_or("celsius");
let converted = match unit {
"kelvin" => (temp - 32.0) * 5.0 / 9.0 + 273.15,
_ => (temp - 32.0) * 5.0 / 9.0, // celsius
};
data["temperature"]["value"] = serde_json::json!(converted);
data["temperature"]["unit"] = serde_json::json!(unit);
let out = serde_json::to_string(&data).unwrap();
msg.payload = BufferOrBytes::Bytes(out.into_bytes());
}
Ok(DataModel::Message(msg))
}
Operatore di filtro
Un filtro ritorna true per far passare i dati o false per eliminarli.
use std::sync::OnceLock;
use wasm_graph_sdk::macros::filter_operator;
use wasm_graph_sdk::logger::{self, Level};
const DEFAULT_LOWER: f64 = -40.0;
const DEFAULT_UPPER: f64 = 3422.0;
static LOWER_BOUND: OnceLock<f64> = OnceLock::new();
static UPPER_BOUND: OnceLock<f64> = OnceLock::new();
fn filter_init(configuration: ModuleConfiguration) -> bool {
for (key, value) in &configuration.properties {
match key.as_str() {
"temperature_lower_bound" => {
if let Ok(v) = value.parse::<f64>() { LOWER_BOUND.set(v).ok(); }
else { logger::log(Level::Error, "filter", &format!("Invalid lower bound: {value}")); }
}
"temperature_upper_bound" => {
if let Ok(v) = value.parse::<f64>() { UPPER_BOUND.set(v).ok(); }
else { logger::log(Level::Error, "filter", &format!("Invalid upper bound: {value}")); }
}
_ => {}
}
}
true
}
#[filter_operator(init = "filter_init")]
fn filter_temperature(input: DataModel) -> Result<bool, Error> {
let lower = LOWER_BOUND.get().copied().unwrap_or(DEFAULT_LOWER);
let upper = UPPER_BOUND.get().copied().unwrap_or(DEFAULT_UPPER);
let DataModel::Message(msg) = &input else { return Ok(true); };
let payload = &msg.payload.read();
let data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
if let Some(temp) = data.get("temperature").and_then(|t| t.get("value")).and_then(|v| v.as_f64()) {
Ok(temp >= lower && temp <= upper)
} else {
Ok(true) // Pass through non-temperature messages
}
}
Operatore di ramo
Una diramazione indirizza i dati su due percorsi. Restituire false per il primo braccio, true per il secondo.
use wasm_graph_sdk::macros::branch_operator;
fn branch_init(_configuration: ModuleConfiguration) -> bool { true }
#[branch_operator(init = "branch_init")]
fn branch_by_type(_timestamp: HybridLogicalClock, input: DataModel) -> Result<bool, Error> {
let DataModel::Message(msg) = &input else { return Ok(true); };
let payload = &msg.payload.read();
let data: serde_json::Value = serde_json::from_slice(payload)
.map_err(|e| Error { message: format!("JSON parse error: {e}") })?;
// false = first arm (temperature data), true = second arm (everything else)
Ok(data.get("temperature").is_none())
}
Parametri di configurazione del modulo
Gli operatori possono ricevere parametri di configurazione di runtime tramite la init funzione . In questo modo è possibile personalizzare il comportamento senza ricompilare il modulo.
La init funzione riceve una ModuleConfiguration struct:
record module-configuration {
properties: list<tuple<string, string>>, // Key-value pairs from graph definition
module-schemas: list<module-schema> // Schema definitions if configured
}
La init funzione viene chiamata una volta al caricamento del modulo. Tornare true per avviare l'elaborazione o false per segnalare un errore di configurazione. Se init restituisce false, l'operatore non elabora dati e il flusso di dati registra un errore.
Importante
Se l'operatore dipende dai parametri di configurazione (ad esempio, limiti di filtro o valori soglia), gestire sempre il caso in cui non vengono forniti. Usare valori predefiniti sensibili o restituire false da init. Non chiamare unwrap() né causare un errore di panico sui parametri mancanti, perché ciò fa arrestare l'operatore in fase di esecuzione senza un messaggio di errore chiaro.
I parametri vengono definiti nella sezione della definizione del moduleConfigurations grafo:
moduleConfigurations:
- name: module-temperature/filter
parameters:
temperature_lower_bound:
name: temperature_lower_bound
description: "Minimum valid temperature in Celsius"
temperature_upper_bound:
name: temperature_upper_bound
description: "Maximum valid temperature in Celsius"
Il name campo deve corrispondere al nome dell'operatore nella sezione del operations grafico. Per altre informazioni sulla struttura della definizione del grafo, vedere Configurare le definizioni dei grafici WebAssembly.
Dimensioni e prestazioni del modulo
I moduli WASM vengono eseguiti in un ambiente in modalità sandbox con risorse limitate. Tenere presenti queste linee guida:
- Ridurre al minimo le dipendenze. Per Rust, usare
default-features = falsesuserdeeserde_jsonper ridurre le dimensioni binarie. Evitare di trascinare grandi casse. - Le dimensioni del modulo sono importanti. I moduli più piccoli vengono caricati più velocemente e usano meno memoria. Un convertitore di temperatura tipico è ~2 MB (versione Rust) o ~5 MB (Python). Usare le build di versione per l'ambiente di produzione.
- Evitare operazioni di blocco. La
processfunzione deve essere completata rapidamente. Il calcolo intensivo ritarda l'intera pipeline del flusso di dati. - Usare
wasm-toolsper controllare. Eseguirewasm-tools component wit your-module.wasmper verificare che il modulo esporta le interfacce previste prima di eseguire il push in un registro.
Controllo delle versioni e CI/CD
Usare il versionamento semantico per i moduli e le definizioni di grafo. Il grafico del flusso di dati fa riferimento agli artefatti in base al nome e al tag (ad esempio, temperature:1.0.0), in modo da poter aggiornare i moduli senza modificare le definizioni dei grafi eseguendo il push di una nuova versione con lo stesso tag.
Per le compilazioni automatizzate, una pipeline tipica è simile alla seguente:
- Compilare il modulo WASM (usare il generatore Docker per coerenza).
- Eseguire
wasm-tools component witper verificare le interfacce esportate. - Eseguire unit test sulla logica di base. Per altre informazioni, vedere Testare i moduli WASM.
- Eseguire il push nel Registro di sistema con ORAS, contrassegnando con la versione di compilazione.
- (Optional) Aggiorna il riferimento dell'artefatto nella definizione del grafo e esegui il push.
Il grafico del flusso di dati seleziona automaticamente le nuove versioni del modulo di cui è stato eseguito il push nello stesso tag senza richiedere una ridistribuzione. Vedere Aggiornare un modulo in un grafico in esecuzione.
API host
I moduli WASM possono usare le API host per la gestione dello stato, la registrazione e le metriche.
Stato store
Rendere persistenti i dati tra process le chiamate usando un archivio di stato distribuito.
use wasm_graph_sdk::state_store;
// Set value (fire-and-forget; state_store returns StateStoreError, not types::Error)
let options = state_store::SetOptions {
conditions: state_store::SetConditions::Unconditional,
expires: None,
};
let _ = state_store::set(key.as_bytes(), value.as_bytes(), None, None, options);
// Get value
let response = state_store::get(key.as_bytes(), None);
// Delete key
let _ = state_store::del(key.as_bytes(), None, None);
Registrazione
Registrazione dei log strutturata con livelli di gravità
use wasm_graph_sdk::logger::{self, Level};
logger::log(Level::Info, "my-operator", "Processing started");
logger::log(Level::Error, "my-operator", &format!("Error: {}", error));
Metrics
Metriche compatibili con OpenTelemetry:
use wasm_graph_sdk::metrics::{self, CounterValue, HistogramValue, Label};
let labels = vec![Label { key: "module".to_owned(), value: "my-operator".to_owned() }];
let _ = metrics::add_to_counter("requests_total", CounterValue::U64(1), Some(&labels));
let _ = metrics::record_to_histogram("processing_duration", HistogramValue::F64(duration_ms), Some(&labels));
Inferenza ONNX
Per incorporare ed eseguire modelli ONNX di piccole dimensioni all'interno dei moduli per l'inferenza in banda, vedere Eseguire l'inferenza ONNX nei grafici del flusso di dati WebAssembly.
Informazioni di riferimento sullo schema WIT
Tutti gli operatori implementano interfacce standardizzate definite usando tipi di interfaccia WebAssembly (WIT). Gli schemi completi sono disponibili nel repository samples.
Interfacce operatore
Ogni operatore ha una init funzione per la configurazione e una process funzione per la gestione dei dati:
interface map {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(message: data-model) -> result<data-model, error>;
}
interface filter {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(message: data-model) -> result<bool, error>;
}
interface branch {
use types.{data-model, error, module-configuration};
use hybrid-logical-clock.{hybrid-logical-clock};
init: func(configuration: module-configuration) -> bool;
process: func(timestamp: hybrid-logical-clock, message: data-model) -> result<bool, error>;
}
interface accumulate {
use types.{data-model, error, module-configuration};
init: func(configuration: module-configuration) -> bool;
process: func(staged: data-model, message: list<data-model>) -> result<data-model, error>;
}
Modello di dati
Da processor.wit (wasm-graph:processor@1.1.0):
record timestamp {
timestamp: timespec, // Physical time (seconds + nanoseconds)
counter: u64, // Logical counter for ordering
node-id: buffer-or-string, // Originating node
}
record message {
timestamp: timestamp,
topic: buffer-or-bytes,
content-type: option<buffer-or-string>,
payload: buffer-or-bytes,
properties: message-properties,
schema: option<message-schema>,
}
variant data-model {
buffer-or-bytes(buffer-or-bytes), // Raw byte data
message(message), // MQTT messages (most common)
snapshot(snapshot), // Video/image frames
}
Annotazioni
La maggior parte degli operatori funziona con la message variante. Verificare la presenza di questo tipo all'inizio della process funzione. Il payload usa un handle di buffer host (buffer) per le letture zero-copy o i byte di proprietà del modulo (bytes). Chiamare buffer.read() per copiare i byte host nella memoria del modulo.
Contenuti correlati
- Compilare moduli WASM per i flussi di dati
- Creare grafici WASM con stato incorporato utilizzando l'archivio degli stati
- Usare il registro degli schemi con i moduli WASM
- Eseguire il debug di moduli WASM
- Testare i moduli WASM
- Configurare le definizioni dei gragrafi
- Distribuire la definizione dei grafi
- Inferenza ONNX nei moduli WASM
- Usare WASM nei grafici del flusso di dati