マッピング データ フローの外部呼び出し変換

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

Data Factory in Microsoft Fabric は、よりシンプルなアーキテクチャ、組み込みの AI、および新機能を備えた次世代のAzure Data Factoryです。 データ統合を初めて使用する場合は、Fabric Data Factory から始めます。 既存の ADF ワークロードをFabricにアップグレードして、データ サイエンス、リアルタイム分析、レポートの新機能にアクセスできます。

データ フローは、Azure Data Factory パイプラインとAzure Synapse Analytics パイプラインの両方で使用できます。 この記事は、マッピング データ フローに適用されます。 変換を初めて使用する場合は、入門記事「 マッピング データ フローを使用したデータの変換」を参照してください。

ヒント

Dataflow Gen2 の同等の変換 (カスタム列) については、データ フロー ユーザーのマッピングに関する Dataflow Gen2 のガイドを参照してください。

外部呼び出し変換を使用すると、データ エンジニアは、カスタム結果をデータ フロー ストリームに追加するために、外部 REST エンドポイントを 1 行ずつ呼び出すことができます。

構成

外部呼び出し変換の構成パネルで、最初に、接続先となる外部エンドポイントの種類を選択します。 次に、受信列をマップします。 最後に、ダウンストリームの変換で使用する出力データ構造を定義します。

外部呼び出し

設定

インライン データセットの種類と、関連するリンクされたサービスを選択します。 現在、REST のみがサポートされています。 ただし、SQL ストアド プロシージャやその他のリンクされたサービスの種類も使用できるようになります。 設定のプロパティの説明については、REST ソースの構成に関するページを参照してください。

マッピング

自動マッピングを選択して、すべての入力列をエンドポイントに渡すことができます。 ここでは、必要に応じて、手動で列を設定し、ターゲット エンドポイントに送信される列の名前を変更することもできます。

出力

ここでは、外部呼び出しの出力のデータ構造を定義します。 本文の構造を定義し、外部呼び出しから返されるヘッダーと状態を格納する方法を選択できます。

本体、ヘッダー、状態を保存することを選択する場合、それらをダウンストリーム データ変換で使用できるようにするために、最初にそれぞれの列名を選択します。

ADF データ フロー構文を使用して、本体データ構造を手動で定義できます。 本文の列名とデータ型を定義するには、[プロジェクションのインポート] を選択し、ADF が外部呼び出しからのスキーマ出力を検出できるようにします。 天気 REST API GET 呼び出しからの出力としてのスキーマ定義構造の例を次に示します。

({@context} as string[],
		geometry as (coordinates as string[][][],
		type as string),
		properties as (elevation as (unitCode as string,
		value as string),
		forecastGenerator as string,
		generatedAt as string,
		periods as (detailedForecast as string, endTime as string, icon as string, isDaytime as string, name as string, number as string, shortForecast as string, startTime as string, temperature as string, temperatureTrend as string, temperatureUnit as string, windDirection as string, windSpeed as string)[],
		units as string,
		updateTime as string,
		updated as string,
		validTimes as string),
		type as string)

データ フロー スクリプトを含むサンプル

外部呼び出しのサンプル

source(output(
		id as string
	),
	allowSchemaDrift: true,
	validateSchema: false,
	ignoreNoFilesFound: false) ~> source1
Filter1 call(mapColumn(
		id
	),
	skipDuplicateMapInputs: false,
	skipDuplicateMapOutputs: false,
	output(
		headers as [string,string],
		body as (name as string)
	),
	allowSchemaDrift: true,
	store: 'restservice',
	format: 'rest',
	timeout: 30,
	httpMethod: 'POST',
	entity: 'api/Todo/',
	requestFormat: ['type' -> 'json'],
	responseFormat: ['type' -> 'json', 'documentForm' -> 'documentPerLine']) ~> ExternalCall1
source1 filter(toInteger(id)==1) ~> Filter1
ExternalCall1 sink(allowSchemaDrift: true,
	validateSchema: false,
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	store: 'cache',
	format: 'inline',
	output: false,
	saveOrder: 1) ~> sink1

データ フローのスクリプト

ExternalCall1 sink(allowSchemaDrift: true,
	validateSchema: false,
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	store: 'cache',
	format: 'inline',
	output: false,
	saveOrder: 1) ~> sink1