Azure Stream Analyticsで JSON データと Avro データを解析する

Azure Stream Analytics サービスでは、CSV、JSON、および Avro データ形式でのイベントの処理がサポートされています。 JSON データと Avro データのどちらも、入れ子になったオブジェクト (レコード) や配列などの複合型を含む構造にすることができます。

レコード データ型

レコード データ型は、対応する形式が入力データ ストリームで使用される場合に、JSON 配列と Avro の配列を表すために使用されます。 これらの例は、JSON 形式の入力イベントを読み取るサンプル センサーを示しています。 1 つのイベントの例を以下に示します。

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

既知のスキーマの入れ子のフィールドにアクセスする

クエリから入れ子になったフィールドに直接アクセスするには、ドット表記 (.) を使用します。 たとえば、このクエリでは、上記の JSON データの Location プロパティの緯度と経度の座標が選択されます。 次のスニペットに示すように、ドット表記を使用して複数のレベル間を移動します。

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

結果は次のとおりです。

|DeviceID|Lat|Long|Temperature|Version|
|-|-|-|-|-|
|12345|47|122|80|1.2.45|

すべてのプロパティを選択する

入れ子になったレコードのすべてのプロパティは、 * ワイルドカードを使用して選択できます。 次の例を確認してください。

SELECT
    DeviceID,
    Location.*
FROM input

結果は次のとおりです。

|DeviceID|Lat|Long|
|-|-|-|
|12345|47|122|

プロパティ名が変数であるときに入れ子のフィールドにアクセスする

プロパティ名が変数の場合は 、GetRecordPropertyValue 関数を使用します。 この関数は、プロパティ名をハードコーディングせずに動的クエリを作成するのに役立ちます。

たとえば、サンプル データ ストリームを、各デバイス センサーのしきい値を含む 参照データと結合 する必要がある場合を考えてみましょう。 そのような参照データのスニペットを次のスニペットに示します。

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

目標は、記事の上部にあるサンプル データセットをその参照データに結合し、しきい値を超えるセンサー メジャーごとに 1 つのイベントを出力することです。 この結合は、複数のセンサーがそれぞれのしきい値を超えている場合に、1 つのイベントで複数の出力イベントを生成できることを意味します。 結合せずに同様の結果を得るには、以下の例を参照してください。

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue は、参照データから取得されるプロパティ名と一致する SensorReadings のプロパティを選択します。 次に、 SensorReadings から関連付けられている値を抽出します。

結果は次のとおりです。

|DeviceID|SensorName|AlertMessage|
| - | - | - |
| 12345 | Humidity | Alert: Sensor above threshold |

レコード フィールドを個々のイベントに変換する

レコード フィールドを個別のイベントに変換するには、 APPLY 演算子を GetRecordProperties 関数と共に使用します。

元のサンプル データを使用すると、次のクエリを使用して、さまざまなイベントにプロパティを抽出できます。

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

結果は次のとおりです。

|DeviceID|SensorName|AlertMessage|
|-|-|-|
|12345|Temperature|80|
|12345|Humidity|70|
|12345|CustomSensor01|5|
|12345|CustomSensor02|99|
|12345|SensorMetadata|[object Object]|

WITH を使用すると、これらのイベントをさまざまな宛先にルーティングできます。

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

SQL 参照データの JSON レコードを解析する

ジョブで参照データとしてAzure SQL Databaseを使用する場合は、JSON 形式のデータを含む列を含めることができます。 次の例は、この形式を示しています。

|DeviceID|Data|
|-|-|
|12345|{"key": "value1"}|
|54321|{"key": "value2"}|

単純な JavaScript ユーザー定義関数を記述することで、 データ 列の JSON レコードを解析できます。

function parseJson(string) {
return JSON.parse(string);
}

JSON レコードのフィールドにアクセスするには、次の例に示すように、Stream Analytics クエリにステップを作成します。

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

配列データ型

配列データ型は、順序が付けられた値のコレクションです。 このセクションでは、配列値に対する一般的な操作について詳しく説明します。 これらの例では、 GetArrayElementGetArrayElementsGetArrayLength、および APPLY 演算子の関数を使用します。

イベントの例を次に示します。 CustomSensor03SensorMetadataはどちらも配列型です。

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

特定の配列要素を操作する

指定したインデックス位置にある配列要素を選択します (最初の配列要素を選択します)。

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

結果は次のとおりです。

|firstElement|
|-|
|12|

配列の長さを選択する

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

結果は次のとおりです。

|arrayLength|
|-|
|3|

配列要素を個々のイベントに変換する

すべての配列要素を個々のイベントとして選択します。 APPLY 演算子と GetArrayElements 組み込み関数は、すべての配列要素を個々のイベントとして抽出します。

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

結果は次のとおりです。

|DeviceId|ArrayIndex|ArrayValue|
|-|-|-|
|12345|0|12|
|12345|1|-5|
|12345|2|0|
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

結果は次のとおりです。

|DeviceId|smKey|smValue|
|-|-|-|
|12345|Manufacturer|ABC|
|12345|Version|1.2.45|

抽出されたフィールドを列に表示するには、 WITH 構文と JOIN 操作を使用してデータセットをピボットします。 この結合には、重複を防ぐ 時間境界 条件が必要です。

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

結果は次のとおりです。

|DeviceId|Lat|Long|smVersion|smManufacturer|
|-|-|-|-|-|
|12345|47|122|1.2.45|ABC|

Azure Stream Analytics のデータ型