組み込みのインスタンス管理 API を使用して、永続的なワークフローでオーケストレーション インスタンスの開始、クエリ、終了、中断、再開、および消去を行います。
Durable Functions では、orchestration クライアント バインドはこれらの API を公開します。
Durable Task SDK では、DurableTaskClient クラスを通じて同じ操作を使用できます。 この記事では、両方のプラットフォームのコード例を使用して各インスタンス管理操作を実行する方法について説明します。
ヒント
Azure Durable Task Scheduler は、Durable Functions SDK と Durable Task SDK の両方に推奨されるバックエンドであり、永続的なワークフローを大規模に実行するためのフル マネージドのサーバーレス エクスペリエンスを提供します。
インスタンスを開始する
オーケストレーション クライアント の start-new (または schedule-new) メソッドは、新しいオーケストレーション インスタンスを開始します。 内部的には、このメソッドは構成されたバックエンド (Durable Task Scheduler や Azure Storage など) にメッセージを書き込み、返します。 このメッセージは、指定した名前でオーケストレーションの開始を非同期的にトリガーします。
新しいオーケストレーション インスタンスを開始するためのパラメーターを次に示します。
-
名前: スケジュールするオーケストレーター関数の名前。
-
入力: オーケストレーター関数への入力として渡す必要がある JSON シリアル化可能なデータ。
-
InstanceId: (省略可能) インスタンスの一意の ID。 このパラメーターを指定しない場合、メソッドはランダム ID を使用します。
ヒント
可能な限り、インスタンス ID にランダムな識別子を使用します。 ランダム インスタンス ID は、複数の VM 間でオーケストレーター関数をスケーリングするときに、均等な負荷分散を実現するのに役立ちます。 非ランダム インスタンス ID を使用する適切なタイミングは、ID が外部ソースから取得されたとき、または シングルトン オーケストレーター パターンを実装する場合です。
-
名前: スケジュールするオーケストレーションの名前。
-
入力: オーケストレーションへの入力として渡す必要がある JSON シリアル化可能なデータ。
-
InstanceId: (省略可能) インスタンスの一意の ID。 このパラメーターを指定しない場合、メソッドはランダム ID を使用します。
ヒント
可能な限り、インスタンス ID にランダムな識別子を使用します。 ランダム インスタンス ID は、複数の VM 間でオーケストレーションをスケーリングするときに、均等な負荷分散を実現するのに役立ちます。 非ランダム インスタンス ID を使用する適切なタイミングは、ID が外部ソースから取得されたとき、または シングルトン オーケストレーター パターンを実装する場合です。
次の例の関数は、新しいオーケストレーション インスタンスを開始します。
[FunctionName("HelloWorldQueueTrigger")]
public static async Task Run(
[QueueTrigger("start-queue")] string input,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
string instanceId = await starter.StartNewAsync("HelloWorld", input);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
}
注
前の C# コードでは、 IDurableOrchestrationClientを含むインプロセス モデルが使用されています。これは、Durable Functions 拡張機能の新しいバージョンでは古いものとしてマークされています。 新しい .NET プロジェクトの場合は、を使用することを検討してください。 詳細については、 Durable Functions のバージョンに関する記事を 参照してください。
特に指定がない限り、このページの例では、次の function.jsonで HTTP トリガーを使用します。
function.json
{
"bindings": [
{
"name": "req",
"type": "httpTrigger",
"direction": "in",
"methods": ["post"]
},
{
"name": "$return",
"type": "http",
"direction": "out"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
],
"disabled": false
}
注
この例では、Durable Functions バージョン 2.x を対象とします。 バージョン 1.x では、orchestrationClientの代わりに durableClient を使用します。
index.js
const df = require("durable-functions");
module.exports = async function(context, input) {
const client = df.getClient(context);
const instanceId = await client.startNew("HelloWorld", undefined, input);
context.log(`Started orchestration with ID = ${instanceId}.`);
};
特に指定がない限り、このページの例では、次の function.jsonで HTTP トリガーを使用します。
function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "req",
"type": "httpTrigger",
"direction": "in",
"methods": ["post"]
},
{
"name": "$return",
"type": "http",
"direction": "out"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
],
"disabled": false
}
注
この例では、Durable Functions バージョン 2.x を対象とします。 バージョン 1.x では、orchestrationClientの代わりに durableClient を使用します。
__init__.py
import logging
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new('HelloWorld', None, None)
logging.log(f"Started orchestration with ID = ${instance_id}.")
特に指定がない限り、このページの例では、次の function.jsonで HTTP トリガーを使用します。
function.json
{
"bindings": [
{
"name": "Request",
"type": "httpTrigger",
"direction": "in",
"methods": ["post"]
},
{
"name": "Response",
"type": "http",
"direction": "out"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
],
"disabled": false
}
注
この例では、Durable Functions バージョン 2.x を対象とします。 バージョン 1.x では、orchestrationClientの代わりに durableClient を使用します。
run.ps1
param($Request, $TriggerMetadata)
$InstanceId = Start-DurableOrchestration -FunctionName 'HelloWorld'
Write-Host "Started orchestration with ID = '$InstanceId'"
@FunctionName("HelloWorldQueueTrigger")
public void helloWorldQueueTrigger(
@QueueTrigger(name = "input", queueName = "start-queue", connection = "Storage") String input,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
DurableTaskClient client = durableContext.getClient();
String instanceID = client.scheduleNewOrchestrationInstance("HelloWorld");
context.getLogger().info("Scheduled orchestration with ID = " + instanceID);
}
オーケストレーターが関数から戻るまで待機するには、waitForInstanceStart() メソッドを使用します。
// wait up to 30 seconds for the scheduled orchestration to enter the "Running" state
client.waitForInstanceStart(instanceID, Duration.ofSeconds(30));
Important
現在、PowerShell Durable Task SDK は使用できません。
次のコードは、Durable Task SDK を使用して新しいオーケストレーション インスタンスを開始する方法を示しています。
using Microsoft.DurableTask.Client;
// Schedule a new orchestration instance
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("HelloWorld", input);
Console.WriteLine($"Started orchestration with ID = '{instanceId}'.");
// Optionally, wait for the orchestration to start
OrchestrationMetadata metadata = await client.WaitForInstanceStartAsync(instanceId, timeout: TimeSpan.FromSeconds(30));
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Schedule a new orchestration instance
instance_id = client.schedule_new_orchestration(hello_world, input=input_data)
print(f"Started orchestration with ID = '{instance_id}'.")
# Optionally, wait for the orchestration to start
state = client.wait_for_orchestration_start(instance_id, timeout=30)
import com.microsoft.durabletask.DurableTaskClient;
// Schedule a new orchestration instance
String instanceId = client.scheduleNewOrchestrationInstance("HelloWorld", input);
System.out.println("Started orchestration with ID = '" + instanceId + "'.");
// Optionally, wait for the orchestration to start
OrchestrationMetadata metadata = client.waitForInstanceStart(instanceId, Duration.ofSeconds(30), false);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Schedule a new orchestration instance
const instanceId = await client.scheduleNewOrchestration("HelloWorld", input);
console.log(`Started orchestration with ID = '${instanceId}'.`);
// Optionally, wait for the orchestration to start
const state = await client.waitForOrchestrationStart(instanceId, false, 30);
クエリ インスタンス
新しいオーケストレーション インスタンスを開始した後は、ほとんどの場合、ランタイムの状態を照会して、それらが実行されているか、完了しているか、失敗しているかを確認する必要があります。
オーケストレーション クライアント の get-status メソッドは、オーケストレーション インスタンスの状態を返します。
パラメーターとして、 instanceId (必須)、 showHistory (省略可能)、 showHistoryOutput (省略可能)、および showInput (省略可能) を受け取ります。
-
showHistory: trueに設定されている場合、応答には実行履歴が含まれます。
-
showHistoryOutput: trueに設定すると、実行履歴にアクティビティの出力が含まれます。
-
showInput: falseに設定されている場合、応答には関数の入力は含まれません。 既定値は true です。
このメソッドは、次のプロパティを持つオブジェクトを返します。
-
名前: オーケストレーター関数の名前。
-
InstanceId: オーケストレーションのインスタンス ID (
instanceId 入力と同じである必要があります)。
-
CreatedTime: オーケストレーター関数の実行を開始する時刻。
-
LastUpdatedTime: オーケストレーションが最後にチェックポイントした時刻。
-
入力: JSON 値としての関数の入力。
showInputがfalseになっている場合、このフィールドは設定されません。
-
CustomStatus: JSON 形式のカスタム オーケストレーションの状態。
-
出力: JSON 値としての関数の出力 (関数が完了した場合)。 オーケストレーター関数が失敗した場合、このプロパティにはエラーの詳細が含まれます。 オーケストレーター関数が中断または終了された場合、このプロパティには中断または終了の理由が含まれます (ある場合)。
-
RuntimeStatus: 次のいずれかの値。
-
保留中: インスタンスはスケジュールされていますが、まだ実行を開始していません。
-
実行中: インスタンスが実行されています。
-
完了: インスタンスは正常に完了しました。
-
ContinuedAsNew: インスタンスは、新しい履歴で自分自身を再起動しました。 この状態は一時的な状態です。
-
失敗: インスタンスがエラーで失敗しました。
-
終了: インスタンスが突然停止しました。
-
中断: インスタンスは中断され、後で再開できます。
-
履歴: オーケストレーションの実行履歴。 このフィールドは、
showHistory が true に設定されている場合にのみ設定されます。
-
showHistory: trueに設定されている場合、応答には実行履歴が含まれます。
-
showHistoryOutput: trueに設定すると、実行履歴にアクティビティの出力が含まれます。
-
showInput: falseに設定されている場合、応答にはオーケストレーションの入力は含まれません。 既定値は true です。
このメソッドは、次のプロパティを持つオブジェクトを返します。
-
名前: オーケストレーションの名前。
-
InstanceId: オーケストレーションのインスタンス ID (
instanceId 入力と同じである必要があります)。
-
CreatedTime: オーケストレーションが実行を開始する時刻。
-
LastUpdatedTime: オーケストレーションが最後にチェックポイントした時刻。
-
入力: JSON 値としてのオーケストレーションの入力。
showInputがfalseになっている場合、このフィールドは設定されません。
-
CustomStatus: JSON 形式のカスタム オーケストレーションの状態。
-
出力: オーケストレーションの JSON 値としての出力 (オーケストレーションが完了した場合)。 オーケストレーションが失敗した場合、このプロパティにはエラーの詳細が含まれます。 オーケストレーションが中断または終了された場合、このプロパティには中断または終了の理由が含まれます (ある場合)。
-
RuntimeStatus: 次のいずれかの値。
-
保留中: インスタンスはスケジュールされていますが、まだ実行を開始していません。
-
実行中: インスタンスが実行されています。
-
完了: インスタンスは正常に完了しました。
-
ContinuedAsNew: インスタンスは、新しい履歴で自分自身を再起動しました。 この状態は一時的な状態です。
-
失敗: インスタンスがエラーで失敗しました。
-
終了: インスタンスが突然停止しました。
-
中断: インスタンスは中断され、後で再開できます。
-
履歴: オーケストレーションの実行履歴。 このフィールドは、
showHistory が true に設定されている場合にのみ設定されます。
注
オーケストレーターは、スケジュールされたすべてのタスクが完了Completedオーケストレーターが戻るまで、としてマークされません。 つまり、オーケストレーターが return とマークされるためには、Completed ステートメントに到達するだけでは不十分です。 これは、WhenAny が使用される場合に特に関連しています。これらのオーケストレーターは、多くの場合、スケジュールされたすべてのタスクが実行される前に return を行います。
このメソッドは、インスタンスが存在しない場合は、 null (.NET と Java)、 undefined (JavaScript)、または None (Python) を返します。
[FunctionName("GetStatus")]
public static async Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("check-status-queue")] string instanceId)
{
DurableOrchestrationStatus status = await client.GetStatusAsync(instanceId);
// do something based on the current status.
}
注
前の C# コードでは、 IDurableOrchestrationClientを含むインプロセス モデルが使用されています。これは、Durable Functions 拡張機能の新しいバージョンでは古いものとしてマークされています。 新しい .NET プロジェクトの場合は、を使用することを検討してください。 詳細については、 Durable Functions のバージョンに関する記事を 参照してください。
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
const status = await client.getStatus(instanceId);
// do something based on the current status.
// example: if status.runtimeStatus === df.OrchestrationRuntimeStatus.Running: ...
}
function.json 構成については、「インスタンスの開始」を参照してください。
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
status = await client.get_status(instance_id)
# do something based on the current status
# example: if (existing_instance.runtime_status is df.OrchestrationRuntimeStatus.Running) { ...
param($Request, $TriggerMetadata)
# Get instanceid from body
$InstanceId = $Request.Body.InstanceId
$Status = Get-DurableStatus -InstanceId $InstanceId -ShowHistory -ShowHistoryOutput -ShowInput
Write-Host "Status: $($Status | ConvertTo-Json)"
# Do something based on status
# example: if ($Status.runtimeStatus -eq 'Running') { ... }
@FunctionName("GetStatus")
public void getStatus(
@QueueTrigger(name = "instanceID", queueName = "check-status-queue", connection = "Storage") String instanceID,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
final ExecutionContext context) {
DurableTaskClient client = durableContext.getClient();
OrchestrationMetadata metadata = client.getInstanceMetadata(instanceID, false);
if (metadata != null) {
OrchestrationRuntimeStatus status = metadata.getRuntimeStatus();
switch (status) {
// do something based on the current status
}
}
}
using Microsoft.DurableTask.Client;
// Get the status of an orchestration instance
OrchestrationMetadata? metadata = await client.GetInstanceAsync(instanceId, getInputsAndOutputs: true);
if (metadata != null)
{
OrchestrationRuntimeStatus status = metadata.RuntimeStatus;
// do something based on the current status
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Get the status of an orchestration instance
state = client.get_orchestration_state(instance_id, fetch_payloads=True)
if state is not None:
status = state.runtime_status
# do something based on the current status
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationMetadata;
// Get the status of an orchestration instance
OrchestrationMetadata metadata = client.getInstanceMetadata(instanceId, true);
if (metadata != null) {
OrchestrationRuntimeStatus status = metadata.getRuntimeStatus();
// do something based on the current status
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Get the status of an orchestration instance
const state = await client.getOrchestrationState(instanceId, true);
if (state) {
const status = state.runtimeStatus;
// do something based on the current status
}
すべてのオーケストレーション インスタンスに対してクエリを実行する
言語 SDK の API を使用して、 タスク ハブ内のすべてのオーケストレーション インスタンスの状態を照会できます。 この "list-instances" または "get-status" API は、クエリ パラメーターに一致するオーケストレーション インスタンスを表すオブジェクトの一覧を返します。
[FunctionName("GetAllStatus")]
public static async Task Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient client,
ILogger log)
{
var noFilter = new OrchestrationStatusQueryCondition();
OrchestrationStatusQueryResult result = await client.ListInstancesAsync(
noFilter,
CancellationToken.None);
foreach (DurableOrchestrationStatus instance in result.DurableOrchestrationState)
{
log.LogInformation(JsonConvert.SerializeObject(instance));
}
// Note: ListInstancesAsync only returns the first page of results.
// To request additional pages provide the result.ContinuationToken
// to the OrchestrationStatusQueryCondition's ContinuationToken property.
}
注
前の C# コードでは、 IDurableOrchestrationClientを含むインプロセス モデルが使用されています。これは、Durable Functions 拡張機能の新しいバージョンでは古いものとしてマークされています。 新しい .NET プロジェクトの場合は、を使用することを検討してください。 詳細については、 Durable Functions のバージョンに関する記事を 参照してください。
const df = require("durable-functions");
module.exports = async function(context, req) {
const client = df.getClient(context);
const instances = await client.getStatusAll();
instances.forEach((instance) => {
context.log(JSON.stringify(instance));
});
};
function.json 構成については、「 インスタンスの開始」を参照してください。
import logging
import json
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instances = await client.get_status_all()
for instance in instances:
logging.log(json.dumps(instance))
function.json 構成については、「 インスタンスの開始」を参照してください。
@FunctionName("GetAllStatus")
public String getAllStatus(
@HttpTrigger(name = "req", methods = {HttpMethod.GET}) HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
OrchestrationStatusQuery noFilter = new OrchestrationStatusQuery();
OrchestrationStatusQueryResult result = client.queryInstances(noFilter);
return "Found " + result.getOrchestrationState().size() + " orchestrations.";
}
using Microsoft.DurableTask.Client;
// Query all orchestration instances
AsyncPageable<OrchestrationMetadata> instances = client.GetAllInstancesAsync(new OrchestrationQuery());
await foreach (OrchestrationMetadata instance in instances)
{
Console.WriteLine(instance.InstanceId);
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Query all orchestration instances
instances = client.list_orchestrations()
for instance in instances:
print(instance.instance_id)
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationStatusQuery;
import com.microsoft.durabletask.OrchestrationStatusQueryResult;
// Query all orchestration instances
OrchestrationStatusQuery query = new OrchestrationStatusQuery();
OrchestrationStatusQueryResult result = client.queryInstances(query);
for (OrchestrationMetadata instance : result.getOrchestrationState()) {
System.out.println(instance.getInstanceId());
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Query all orchestration instances
const instances = client.getAllInstances();
for await (const instance of instances) {
console.log(instance.instanceId);
}
フィルターを使用してオーケストレーション インスタンスにクエリを実行する
標準インスタンス クエリで提供されるすべての情報が必要ない場合はどうしますか? たとえば、オーケストレーションの作成時刻またはオーケストレーション ランタイムの状態だけを探している場合はどうなりますか。 フィルターを適用してクエリを絞り込みます。
[FunctionName("QueryStatus")]
public static async Task Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient client,
ILogger log)
{
// Get the first 100 running or pending instances that were created between 7 and 1 days ago
var queryFilter = new OrchestrationStatusQueryCondition
{
RuntimeStatus = new[]
{
OrchestrationRuntimeStatus.Pending,
OrchestrationRuntimeStatus.Running,
},
CreatedTimeFrom = DateTime.UtcNow.Subtract(TimeSpan.FromDays(7)),
CreatedTimeTo = DateTime.UtcNow.Subtract(TimeSpan.FromDays(1)),
PageSize = 100,
};
OrchestrationStatusQueryResult result = await client.ListInstancesAsync(
queryFilter,
CancellationToken.None);
foreach (DurableOrchestrationStatus instance in result.DurableOrchestrationState)
{
log.LogInformation(JsonConvert.SerializeObject(instance));
}
}
注
前の C# コードでは、 IDurableOrchestrationClientを含むインプロセス モデルが使用されています。これは、Durable Functions 拡張機能の新しいバージョンでは古いものとしてマークされています。 新しい .NET プロジェクトの場合は、を使用することを検討してください。 詳細については、 Durable Functions のバージョンに関する記事を 参照してください。
const df = require("durable-functions");
module.exports = async function(context, req) {
const client = df.getClient(context);
const runtimeStatus = [
df.OrchestrationRuntimeStatus.Completed,
df.OrchestrationRuntimeStatus.Running,
];
const instances = await client.getStatusBy(
new Date(2021, 3, 10, 10, 1, 0),
new Date(2021, 3, 10, 10, 23, 59),
runtimeStatus
);
instances.forEach((instance) => {
context.log(JSON.stringify(instance));
});
};
function.json 構成については、「 インスタンスの開始」 を参照してください。
import logging
from datetime import datetime
import json
import azure.functions as func
import azure.durable_functions as df
from azure.durable_functions.models.OrchestrationRuntimeStatus import OrchestrationRuntimeStatus
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
runtime_status = [OrchestrationRuntimeStatus.Completed, OrchestrationRuntimeStatus.Running]
instances = await client.get_status_by(
datetime(2021, 3, 10, 10, 1, 0),
datetime(2021, 3, 10, 10, 23, 59),
runtime_status
)
for instance in instances:
logging.log(json.dumps(instance))
@FunctionName("GetRunningInstances")
public String getRunningInstances(
@HttpTrigger(name = "req", methods = {HttpMethod.GET}) HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
OrchestrationStatusQuery filter = new OrchestrationStatusQuery()
.setRuntimeStatusList(List.of(OrchestrationRuntimeStatus.PENDING, OrchestrationRuntimeStatus.RUNNING))
.setCreatedTimeFrom(Instant.now().minus(Duration.ofDays(7)))
.setCreatedTimeTo(Instant.now().minus(Duration.ofDays(1)));
OrchestrationStatusQueryResult result = client.queryInstances(filter);
return "Found " + result.getOrchestrationState().size() + " orchestrations.";
}
using Microsoft.DurableTask.Client;
// Get running or pending instances created in the last 7 days
var query = new OrchestrationQuery
{
Statuses = new[] { OrchestrationRuntimeStatus.Running, OrchestrationRuntimeStatus.Pending },
CreatedFrom = DateTime.UtcNow.AddDays(-7),
CreatedTo = DateTime.UtcNow.AddDays(-1),
PageSize = 100
};
AsyncPageable<OrchestrationMetadata> instances = client.GetAllInstancesAsync(query);
await foreach (OrchestrationMetadata instance in instances)
{
Console.WriteLine($"{instance.InstanceId}: {instance.RuntimeStatus}");
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from datetime import datetime, timedelta
# Get running or pending instances created in the last 7 days
instances = client.list_orchestrations(
created_time_from=datetime.utcnow() - timedelta(days=7),
created_time_to=datetime.utcnow() - timedelta(days=1),
runtime_status=['RUNNING', 'PENDING']
)
for instance in instances:
print(f"{instance.instance_id}: {instance.runtime_status}")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationStatusQuery;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
// Get running or pending instances created in the last 7 days
OrchestrationStatusQuery filter = new OrchestrationStatusQuery()
.setRuntimeStatusList(List.of(OrchestrationRuntimeStatus.PENDING, OrchestrationRuntimeStatus.RUNNING))
.setCreatedTimeFrom(Instant.now().minus(Duration.ofDays(7)))
.setCreatedTimeTo(Instant.now().minus(Duration.ofDays(1)));
OrchestrationStatusQueryResult result = client.queryInstances(filter);
for (OrchestrationMetadata instance : result.getOrchestrationState()) {
System.out.println(instance.getInstanceId() + ": " + instance.getRuntimeStatus());
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
import { OrchestrationStatus } from "@microsoft/durabletask-js";
const client = createAzureManagedClient(connectionString);
// Get running or pending instances created in the last 7 days
const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
const instances = client.getAllInstances({
statuses: [OrchestrationStatus.RUNNING, OrchestrationStatus.PENDING],
createdFrom: sevenDaysAgo,
createdTo: oneDayAgo,
});
for await (const instance of instances) {
console.log(`${instance.instanceId}: ${instance.runtimeStatus}`);
}
オーケストレーション インスタンスを終了する
実行に時間がかかりすぎるオーケストレーション インスタンスがある場合、または何らかの理由で完了する前に停止する必要がある場合は、終了できます。
terminate API の 2 つのパラメーターは 、インスタンス ID と 理由 文字列であり、ログとインスタンスの状態に書き込まれます。
[FunctionName("TerminateInstance")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("terminate-queue")] string instanceId)
{
string reason = "Found a bug";
return client.TerminateAsync(instanceId, reason);
}
注
前の C# コードでは、 IDurableOrchestrationClientを含むインプロセス モデルが使用されています。これは、Durable Functions 拡張機能の新しいバージョンでは古いものとしてマークされています。 新しい .NET プロジェクトの場合は、を使用することを検討してください。 詳細については、 Durable Functions のバージョンに関する記事を 参照してください。
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
const reason = "Found a bug";
return client.terminate(instanceId, reason);
};
function.json 構成については、「 インスタンスの開始」 を参照してください。
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
reason = "Found a bug"
return await client.terminate(instance_id, reason)
param($Request, $TriggerMetadata)
# Get instance id from body
$InstanceId = $Request.Body.InstanceId
$Reason = 'Found a bug'
Stop-DurableOrchestration -InstanceId $InstanceId -Reason $Reason
@FunctionName("TerminateInstance")
public void terminateInstance(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}) HttpRequestMessage<String> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceID = req.getBody();
String reason = "Found a bug";
durableContext.getClient().terminate(instanceID, reason);
}
using Microsoft.DurableTask.Client;
string reason = "Found a bug";
await client.TerminateInstanceAsync(instanceId, reason);
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
reason = "Found a bug"
client.terminate_orchestration(instance_id, reason=reason)
import com.microsoft.durabletask.DurableTaskClient;
String reason = "Found a bug";
client.terminate(instanceId, reason);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
const reason = "Found a bug";
await client.terminateOrchestration(instanceId, reason);
終了したインスタンスは、最終的に Terminated 状態に遷移します。 ただし、この移行はすぐには行われません。 代わりに、終了操作は、そのインスタンスの他の操作と共にタスク ハブにキューに入れられます。
インスタンス クエリ API を使用して、終了したインスタンスが実際にTerminated状態に達したタイミングを把握できます。
注
インスタンスの終了は現在伝達されません。 アクティビティ関数とサブオーケストレーションは、呼び出したオーケストレーション インスタンスを終了するかどうかに関係なく、完了まで実行されます。
オーケストレーション インスタンスの中断と再開
オーケストレーションを中断すると、実行中のオーケストレーションを停止することができます。 オーケストレーションの終了とは異なり、中断されたオーケストレーターを後で再開することができます。
suspend API の 2 つのパラメーターは、インスタンス ID と理由文字列であり、ログとインスタンスの状態に書き込まれます。
[FunctionName("SuspendResumeInstance")]
public static async Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("suspend-resume-queue")] string instanceId)
{
// To suspend an orchestration
string suspendReason = "Need to pause workflow";
await client.SuspendAsync(instanceId, suspendReason);
// To resume an orchestration
string resumeReason = "Continue workflow";
await client.ResumeAsync(instanceId, resumeReason);
}
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
// To suspend an orchestration
const suspendReason = "Need to pause workflow";
await client.suspend(instanceId, suspendReason);
// To resume an orchestration
const resumeReason = "Continue workflow";
await client.resume(instanceId, resumeReason);
};
import azure.functions as func
import azure.durable_functions as df
from datetime import timedelta
async def main(req: func.HttpRequest, starter: str, instance_id: str):
client = df.DurableOrchestrationClient(starter)
# To suspend an orchestration
suspend_reason = "Need to pause workflow"
await client.suspend(instance_id, suspend_reason)
# To resume an orchestration
resume_reason = "Continue workflow"
await client.resume(instance_id, resume_reason)
param($Request, $TriggerMetadata)
$InstanceId = $Request.Body.InstanceId
# To suspend an orchestration
$SuspendReason = 'Need to pause workflow'
Suspend-DurableOrchestration -InstanceId $InstanceId -Reason $SuspendReason
# To resume an orchestration
$ResumeReason = 'Continue workflow'
Resume-DurableOrchestration -InstanceId $InstanceId -Reason $ResumeReason
@FunctionName("SuspendResumeInstance")
public void suspendResumeInstance(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}) HttpRequestMessage<String> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceID = req.getBody();
DurableTaskClient client = durableContext.getClient();
// To suspend an orchestration
String suspendReason = "Need to pause workflow";
client.suspendInstance(instanceID, suspendReason);
// To resume an orchestration
String resumeReason = "Continue workflow";
client.resumeInstance(instanceID, resumeReason);
}
using Microsoft.DurableTask.Client;
// To suspend an orchestration
string suspendReason = "Need to pause workflow";
await client.SuspendInstanceAsync(instanceId, suspendReason);
// To resume an orchestration
string resumeReason = "Continue workflow";
await client.ResumeInstanceAsync(instanceId, resumeReason);
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# To suspend an orchestration
suspend_reason = "Need to pause workflow"
client.suspend_orchestration(instance_id, reason=suspend_reason)
# To resume an orchestration
resume_reason = "Continue workflow"
client.resume_orchestration(instance_id, reason=resume_reason)
import com.microsoft.durabletask.DurableTaskClient;
// To suspend an orchestration
String suspendReason = "Need to pause workflow";
client.suspendInstance(instanceId, suspendReason);
// To resume an orchestration
String resumeReason = "Continue workflow";
client.resumeInstance(instanceId, resumeReason);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// To suspend an orchestration
await client.suspendOrchestration(instanceId);
// To resume an orchestration
await client.resumeOrchestration(instanceId);
中断されたインスタンスは、最終的に Suspended 状態に遷移します。 ただし、この移行はすぐには行われません。 代わりに、中断操作は、そのインスタンスの他の操作と共にタスク ハブにキューに入れられます。 インスタンス クエリ API を使用して、実行中のインスタンスが実際に Suspended 状態に達したことを確認します。
中断されたオーケストレーターが再開されると、その状態が Runningに戻ります。
インスタンスにイベントを送信する
一部のシナリオでは、オーケストレーター関数が外部イベントを待ち、受信する必要があります。 このアプローチが役に立つ例としては、 監視 と 人間の相互作用 のシナリオがあります。
一部のシナリオでは、オーケストレーションが外部イベントを待機してリッスンする必要があります。 このアプローチが役に立つ例としては、 監視 と 人間の相互作用 のシナリオがあります。
オーケストレーション クライアントの raise イベント API を使用して、実行中のインスタンスに イベント 通知を送信できます。 オーケストレーションは、外部イベントの待機オーケストレーターAPIを使用して、これらのイベントを待ち受けて応答できます。
raise イベントのパラメーターは次のとおりです。
-
インスタンス ID: インスタンスの一意の ID。
-
イベント名: 送信するイベントの名前。
-
イベント データ: インスタンスに送信する JSON シリアル化可能なペイロード。
[FunctionName("RaiseEvent")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("event-queue")] string instanceId)
{
int[] eventData = new int[] { 1, 2, 3 };
return client.RaiseEventAsync(instanceId, "MyEvent", eventData);
}
注
前の C# コードでは、 IDurableOrchestrationClientを含むインプロセス モデルが使用されています。これは、Durable Functions 拡張機能の新しいバージョンでは古いものとしてマークされています。 新しい .NET プロジェクトの場合は、を使用することを検討してください。 詳細については、 Durable Functions のバージョンに関する記事を 参照してください。
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
const eventData = [ 1, 2, 3 ];
return client.raiseEvent(instanceId, "MyEvent", eventData);
};
function.json 構成については、「 インスタンスの開始」 を参照してください。
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
event_data = [1, 2 ,3]
return await client.raise_event(instance_id, 'MyEvent', event_data)
param($Request, $TriggerMetadata)
# Get instance id from body
$InstanceId = $Request.Body.InstanceId
$EventName = 'MyEvent'
$EventData = @(1,2,3)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName $EventName -EventData $EventData
@FunctionName("RaiseEvent")
public void raiseEvent(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}) HttpRequestMessage<String> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
String instanceID = req.getBody();
String eventName = "MyEvent";
int[] eventData = { 1, 2, 3 };
durableContext.getClient().raiseEvent(instanceID, eventName, eventData);
}
using Microsoft.DurableTask.Client;
int[] eventData = new int[] { 1, 2, 3 };
await client.RaiseEventAsync(instanceId, "MyEvent", eventData);
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
event_data = [1, 2, 3]
client.raise_orchestration_event(instance_id, "MyEvent", event_data)
import com.microsoft.durabletask.DurableTaskClient;
int[] eventData = { 1, 2, 3 };
client.raiseEvent(instanceId, "MyEvent", eventData);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
const eventData = [1, 2, 3];
await client.raiseOrchestrationEvent(instanceId, "MyEvent", eventData);
注
指定したインスタンス ID を持つオーケストレーション インスタンスがない場合、イベント メッセージは破棄されます。 インスタンスが存在するが、まだイベントを待機していない場合、イベントは受信して処理する準備ができるまでインスタンス状態に格納されます。
オーケストレーションの完了を待つ
実行時間の長いオーケストレーションでは、オーケストレーションの結果を待機して取得することが必要な場合があります。 このような場合は、オーケストレーションのタイムアウト期間を定義することも役立ちます。 タイムアウトを超えた場合は、結果ではなくオーケストレーションの状態が返されます。
オーケストレーション インスタンスから実際の出力を同期的に取得するには、 "完了の待機または状態の確認応答の作成" API を使用します。 既定では、このメソッドのタイムアウトは 10 秒で、ポーリング間隔は 1 秒です。
この API の使用方法を示す HTTP トリガー関数の例を次に示します。
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.
using System;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
namespace VSSample
{
public static class HttpSyncStart
{
private const string Timeout = "timeout";
private const string RetryInterval = "retryInterval";
[FunctionName("HttpSyncStart")]
public static async Task<HttpResponseMessage> Run(
[HttpTrigger(AuthorizationLevel.Function, methods: "post", Route = "orchestrators/{functionName}/wait")]
HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
string functionName,
ILogger log)
{
// Function input comes from the request content.
object eventData = await req.Content.ReadAsAsync<object>();
string instanceId = await starter.StartNewAsync(functionName, eventData);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
TimeSpan timeout = GetTimeSpan(req, Timeout) ?? TimeSpan.FromSeconds(30);
TimeSpan retryInterval = GetTimeSpan(req, RetryInterval) ?? TimeSpan.FromSeconds(1);
return await starter.WaitForCompletionOrCreateCheckStatusResponseAsync(
req,
instanceId,
timeout,
retryInterval);
}
private static TimeSpan? GetTimeSpan(HttpRequestMessage request, string queryParameterName)
{
string queryParameterStringValue = request.RequestUri.ParseQueryString()[queryParameterName];
if (string.IsNullOrEmpty(queryParameterStringValue))
{
return null;
}
return TimeSpan.FromSeconds(double.Parse(queryParameterStringValue));
}
}
}
const df = require("durable-functions");
const timeout = "timeout";
const retryInterval = "retryInterval";
module.exports = async function (context, req) {
const client = df.getClient(context);
const instanceId = await client.startNew(req.params.functionName, undefined, req.body);
context.log(`Started orchestration with ID = '${instanceId}'.`);
const timeoutInMilliseconds = getTimeInSeconds(req, timeout) || 30000;
const retryIntervalInMilliseconds = getTimeInSeconds(req, retryInterval) || 1000;
const response = client.waitForCompletionOrCreateCheckStatusResponse(
context.bindingData.req,
instanceId,
timeoutInMilliseconds,
retryIntervalInMilliseconds
);
return response;
};
function getTimeInSeconds(req, queryParameterName) {
const queryValue = req.query[queryParameterName];
return queryValue
? queryValue * 1000 // expected to be in seconds
: undefined;
}
function.json 構成については、「 インスタンスの開始」 を参照してください。
import logging
import azure.functions as func
import azure.durable_functions as df
timeout = "timeout"
retry_interval = "retryInterval"
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new(req.route_params['functionName'], None, req.get_body())
logging.log(f"Started orchestration with ID = '${instance_id}'.")
timeout_in_milliseconds = get_time_in_seconds(req, timeout)
timeout_in_milliseconds = timeout_in_milliseconds if timeout_in_milliseconds != None else 30000
retry_interval_in_milliseconds = get_time_in_seconds(req, retry_interval)
retry_interval_in_milliseconds = retry_interval_in_milliseconds if retry_interval_in_milliseconds != None else 1000
return await client.wait_for_completion_or_create_check_status_response(
req,
instance_id,
timeout_in_milliseconds,
retry_interval_in_milliseconds
)
def get_time_in_seconds(req: func.HttpRequest, query_parameter_name: str):
query_value = req.params.get(query_parameter_name)
return query_value if query_value != None else 1000
注
現在、PowerShell には、このシナリオ用の組み込みコマンドはありません。
Java現在、このシナリオには 1 つのメソッドはありませんが、追加のコード行をいくつか使用して実装できます。
@FunctionName("HttpStartAndWait")
public HttpResponseMessage httpStartAndWait(
@HttpTrigger(name = "req", route = "orchestrators/{functionName}/wait", methods = {HttpMethod.POST}) HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
@BindingName("functionName") String functionName,
final ExecutionContext context) {
DurableTaskClient client = durableContext.getClient();
String instanceId = client.scheduleNewOrchestrationInstance(functionName);
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
try {
String timeoutString = req.getQueryParameters().get("timeout");
Integer timeoutInSeconds = Integer.parseInt(timeoutString);
OrchestrationMetadata orchestration = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(timeoutInSeconds),
true /* getInputsAndOutputs */);
return req.createResponseBuilder(HttpStatus.OK)
.body(orchestration.getSerializedOutput())
.header("Content-Type", "application/json")
.build();
} catch (TimeoutException timeoutEx) {
// timeout expired - return a 202 response
return durableContext.createCheckStatusResponse(req, instanceId);
}
}
Durable Task SDK には、オーケストレーションが同期的に完了するまで待機するメソッドが用意されています。
using Microsoft.DurableTask.Client;
// Wait for orchestration to complete with a timeout
OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(
instanceId,
timeout: TimeSpan.FromSeconds(30),
getInputsAndOutputs: true);
if (metadata.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
{
Console.WriteLine($"Output: {metadata.SerializedOutput}");
}
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Wait for orchestration to complete with a timeout
state = client.wait_for_orchestration_completion(
instance_id,
timeout=30,
fetch_payloads=True)
if state.runtime_status == 'COMPLETED':
print(f"Output: {state.serialized_output}")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.OrchestrationMetadata;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
// Wait for orchestration to complete with a timeout
try {
OrchestrationMetadata metadata = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true /* getInputsAndOutputs */);
System.out.println("Output: " + metadata.getSerializedOutput());
} catch (TimeoutException e) {
System.out.println("Orchestration did not complete within timeout");
}
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
import { OrchestrationStatus } from "@microsoft/durabletask-js";
const client = createAzureManagedClient(connectionString);
// Wait for orchestration to complete with a timeout
const state = await client.waitForOrchestrationCompletion(instanceId, true, 30);
if (state?.runtimeStatus === OrchestrationStatus.COMPLETED) {
console.log(`Output: ${state.serializedOutput}`);
}
次の行で関数を呼び出します。 タイムアウトには 2 秒、再試行間隔には 0.5 秒を使用します。
curl -X POST "http://localhost:7071/orchestrators/E1_HelloSequence/wait?timeout=2&retryInterval=0.5"
注
上記の cURL コマンドは、プロジェクトに E1_HelloSequence という名前のオーケストレーター関数があることを前提としています。 HTTP トリガー関数の記述方法により、プロジェクト内のオーケストレーター関数の名前に置き換えることができます。
オーケストレーション インスタンスから応答を取得するために必要な時間に応じて、次の 2 つのケースが存在します。
- オーケストレーション インスタンスは、定義されたタイムアウト (この場合は 2 秒) 以内に終了し、応答は実際のオーケストレーション インスタンスの出力であり、同期的に配信されます。
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Thu, 14 Dec 2021 06:14:29 GMT
Transfer-Encoding: chunked
[
"Hello Tokyo!",
"Hello Seattle!",
"Hello London!"
]
HTTP/1.1 202 Accepted
Content-Type: application/json; charset=utf-8
Date: Thu, 14 Dec 2021 06:13:51 GMT
Location: http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177?taskHub={taskHub}&connection={connection}&code={systemKey}
Retry-After: 10
Transfer-Encoding: chunked
{
"id": "d3b72dddefce4e758d92f4d411567177",
"sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177/raiseEvent/{eventName}?taskHub={taskHub}&connection={connection}&code={systemKey}",
"statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177?taskHub={taskHub}&connection={connection}&code={systemKey}",
"terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177/terminate?reason={text}&taskHub={taskHub}&connection={connection}&code={systemKey}",
"suspendPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177/suspend?reason={text}&taskHub={taskHub}&connection={connection}&code={systemKey}",
"resumePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/d3b72dddefce4e758d92f4d411567177/resume?reason={text}&taskHub={taskHub}&connection={connection}&code={systemKey}"
}
注
Webhook URL の形式は、実行するAzure Functions ホストのバージョンによって異なる場合があります。 上記の例は、Azure Functions 3.0 ホスト用です。
オーケストレーション インスタンスの HTTP 管理 Webhook URL を取得する
オーケストレーションに対するイベントを監視または発生させるには、外部システムを使用します。 外部システムは、HTTP API URL 検出で説明されている既定の応答の一部である webhook URL を介してDurable Functionsと通信します。 Webhook URL には、 オーケストレーション クライアント バインドを使用してプログラムでアクセスすることもできます。 具体的には、 HTTP 管理ペイロードの作成 API は、これらの Webhook URL を含むシリアル化可能なオブジェクトを取得します。
HTTP 管理ペイロードの作成 API には、次の 1 つのパラメーターがあります。
-
インスタンス ID: インスタンスの一意の ID。
メソッドは、次の文字列プロパティを持つオブジェクトを返します。
-
Id: オーケストレーションのインスタンス ID (
InstanceId 入力と同じである必要があります)。
-
StatusQueryGetUri: オーケストレーション インスタンスの状態 URL。
-
SendEventPostUri: オーケストレーション インスタンスの "イベントの発生" URL。
-
TerminatePostUri: オーケストレーション インスタンスの "terminate" URL。
-
PurgeHistoryDeleteUri: オーケストレーション インスタンスの "消去履歴" URL。
-
SuspendPostUri: オーケストレーション インスタンスの "一時停止" URL。
-
ResumePostUri: オーケストレーション インスタンスの "resume" URL。
関数は、次の例に示すように、これらのオブジェクトのインスタンスを外部システムに送信して、対応するオーケストレーションのイベントを監視または発生させます。
[FunctionName("SendInstanceInfo")]
public static void SendInstanceInfo(
[ActivityTrigger] IDurableActivityContext ctx,
[DurableClient] IDurableOrchestrationClient client,
[CosmosDB(
databaseName: "MonitorDB",
containerName: "HttpManagementPayloads",
Connection = "CosmosDBConnectionSetting")]out dynamic document)
{
HttpManagementPayload payload = client.CreateHttpManagementPayload(ctx.InstanceId);
// send the payload to Azure Cosmos DB
document = new { Payload = payload, id = ctx.InstanceId };
}
注
前の C# コードでは、 IDurableOrchestrationClient と IDurableActivityContextを含むインプロセス モデルが使用されています。これは、Durable Functions 拡張機能の新しいバージョンでは古いものとしてマークされています。 新しい .NET プロジェクトの場合は、を使用することを検討してください。 詳細については、 Durable Functions のバージョンに関する記事を 参照してください。
const df = require("durable-functions");
modules.exports = async function(context, ctx) {
const client = df.getClient(context);
const payload = client.createHttpManagementPayload(ctx.instanceId);
// send the payload to Azure Cosmos DB
context.bindings.document = JSON.stringify({
id: ctx.instanceId,
payload,
});
};
function.json 構成については、「 インスタンスの開始」 を参照してください。
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.cosmosdb.cdb.Document:
client = df.DurableOrchestrationClient(starter)
payload = client.create_check_status_response(req, instance_id).get_body().decode()
return func.cosmosdb.CosmosDBConverter.encode({
id: instance_id,
payload: payload
})
using namespace System.Net
param($Request, $TriggerMetadata)
$InstanceId = $Request.Body.InstanceId
$Response = New-DurableOrchestrationCheckStatusResponse -Request $Request -InstanceId $InstanceId
Push-OutputBinding -Name Response -Value $Response
注
Javaは現在、この機能をサポートしていません。
オーケストレーション インスタンスの巻き戻し
予期しない理由でオーケストレーションエラーが発生した場合は、その目的のために構築された API を使用して、インスタンスを以前は正常な状態に 巻き戻 します。
注
この API は、適切なエラー処理と再試行ポリシーに代わるものではありません。 代わりに、予期しない理由でオーケストレーション インスタンスが失敗した場合にのみ使用することを目的としています。
Failed以外の状態のオーケストレーション (Running、Pending、Terminated、Completedなど) は"巻き戻し" できません。 エラー処理と再試行ポリシーの詳細については、 エラー処理 に関する記事を参照してください。
オーケストレーションを RewindAsync 状態に戻すには、rewindの (.NET) または (JavaScript) メソッドを使用します。 このメソッドは、オーケストレーションエラーの原因となったアクティビティまたはサブ割り当て実行エラーも再実行します。
たとえば、一連の 人間による承認を含むワークフローがあるとします。 承認が必要であることを他のユーザーに通知し、リアルタイムの応答を待機する一連のアクティビティ関数があるとします。 すべての承認アクティビティが応答またはタイムアウトを受け取った後、無効なデータベース connection stringなど、アプリケーションの構成ミスが原因で別のアクティビティが失敗したとします。 その結果、ワークフローの深部でオーケストレーションエラーが発生します。
RewindAsync (.NET) または rewind (JavaScript) API を使用すると、アプリケーション管理者は構成エラーを修正し、失敗したオーケストレーションをエラーの直前の状態に戻すことができます。 人間とのやり取りの手順を再承認する必要はありません。これでオーケストレーションを正常に完了できます。
注
巻き戻し機能では、永続的タイマーを使用するオーケストレーション インスタンスの巻き戻しはサポートされていません。
[FunctionName("RewindInstance")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("rewind-queue")] string instanceId)
{
string reason = "Orchestrator failed and needs to be revived.";
return client.RewindAsync(instanceId, reason);
}
注
前の C# コードでは、 IDurableOrchestrationClientを含むインプロセス モデルが使用されています。これは、Durable Functions 拡張機能の新しいバージョンでは古いものとしてマークされています。 新しい .NET プロジェクトの場合は、を使用することを検討してください。 詳細については、 Durable Functions のバージョンに関する記事を 参照してください。
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
const reason = "Orchestrator failed and needs to be revived.";
return client.rewind(instanceId, reason);
};
function.json 構成については、「 インスタンスの開始」 を参照してください。
注
Pythonは現在、この機能をサポートしていません。
注
現在、PowerShell ではこの機能はサポートされていません。
注
Javaは現在、この機能をサポートしていません。
using Microsoft.DurableTask.Client;
string reason = "Orchestrator failed and needs to be revived.";
await client.RewindInstanceAsync(instanceId, reason);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
const reason = "Orchestrator failed and needs to be revived.";
await client.rewindInstance(instanceId, reason);
このサンプルは、.NETと JavaScript についてのみ示されています。
このサンプルは、.NETと JavaScript についてのみ示されています。
このサンプルは、.NETと JavaScript についてのみ示されています。
オーケストレーション インスタンスを再起動する
オーケストレーションを再起動すると、以前に実行されたインスタンスの履歴を使用して新しいインスタンスが作成されます。 この機能は、同じ入力とインスタンス ID パターンでオーケストレーションを再実行し、元のパターンに基づいて新しい実行を作成する場合に便利です。
[FunctionName("RestartInstance")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("restart-queue")] string instanceId)
{
return client.RestartAsync(instanceId, restartWithNewInstanceId: true);
}
注
前の C# コードでは、 IDurableOrchestrationClientを含むインプロセス モデルが使用されています。これは、Durable Functions 拡張機能の新しいバージョンでは古いものとしてマークされています。 新しい .NET プロジェクトの場合は、を使用することを検討してください。 詳細については、 Durable Functions のバージョンに関する記事を 参照してください。
注
現在、JavaScript ではこの機能はサポートされていません。
注
Pythonは現在、この機能をサポートしていません。
注
現在、PowerShell ではこの機能はサポートされていません。
注
この機能は現在、Javaではサポートされていません。
using Microsoft.DurableTask.Client;
// Restart an orchestration with a new instance ID
string newInstanceId = await client.RestartInstanceAsync(instanceId, restartWithNewInstanceId: true);
Console.WriteLine($"Restarted as new instance: {newInstanceId}");
// Restart an orchestration keeping the same instance ID
await client.RestartInstanceAsync(instanceId, restartWithNewInstanceId: false);
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Restart an orchestration with a new instance ID
const newInstanceId = await client.restartOrchestration(instanceId, true);
console.log(`Restarted as new instance: ${newInstanceId}`);
// Restart an orchestration keeping the same instance ID
await client.restartOrchestration(instanceId, false);
このサンプルは、.NETと JavaScript についてのみ示されています。
このサンプルは、.NETと JavaScript についてのみ示されています。
このサンプルは、.NETと JavaScript についてのみ示されています。
オーケストレーション インスタンスの履歴を消去する
オーケストレーションに関連付けられているすべてのデータを削除するには、インスタンス履歴を消去します。 たとえば、完了したインスタンスに関連付けられているストレージ リソースを削除します。
オーケストレーション クライアントによって定義された消去インスタンス API を使用します。
次の例は、1 つのオーケストレーション インスタンスを消去する方法を示しています。
[FunctionName("PurgeInstanceHistory")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[QueueTrigger("purge-queue")] string instanceId)
{
return client.PurgeInstanceHistoryAsync(instanceId);
}
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
return client.purgeInstanceHistory(instanceId);
};
function.json 構成については、「 インスタンスの開始」 を参照してください。
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
return await client.purge_instance_history(instance_id)
@FunctionName("PurgeInstance")
public HttpResponseMessage purgeInstance(
@HttpTrigger(name = "req", methods = {HttpMethod.POST}, route = "purge/{instanceID}") HttpRequestMessage<?> req,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
@BindingName("instanceID") String instanceID) {
PurgeResult result = durableContext.getClient().purgeInstance(instanceID);
if (result.getDeletedInstanceCount() == 0) {
return req.createResponseBuilder(HttpStatus.NOT_FOUND)
.body("No completed instance with ID '" + instanceID + "' was found!")
.build();
} else {
return req.createResponseBuilder(HttpStatus.OK)
.body("Successfully purged data for " + instanceID)
.build();
}
}
using Microsoft.DurableTask.Client;
// Purge a single orchestration instance
PurgeResult result = await client.PurgeInstanceAsync(instanceId);
Console.WriteLine($"Purged {result.PurgedInstanceCount} instance(s).");
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
# Purge a single orchestration instance
result = client.purge_orchestration(instance_id)
print(f"Purged {result.deleted_instance_count} instance(s).")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.PurgeResult;
// Purge a single orchestration instance
PurgeResult result = client.purgeInstance(instanceId);
System.out.println("Purged " + result.getDeletedInstanceCount() + " instance(s).");
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
const client = createAzureManagedClient(connectionString);
// Purge a single orchestration instance
const result = await client.purgeOrchestration(instanceId);
console.log(`Purged ${result?.deletedInstanceCount ?? 0} instance(s).`);
次の例は、指定した時間間隔後に完了したすべてのオーケストレーション インスタンスの履歴を消去するタイマーによってトリガーされる関数を示しています。 この場合、30 日以上前に完了したすべてのインスタンスのデータが削除されます。 この関数例は、1 日に 1 回、午後 12 時 (UTC) に実行するようにスケジュールされています。
[FunctionName("PurgeInstanceHistory")]
public static Task Run(
[DurableClient] IDurableOrchestrationClient client,
[TimerTrigger("0 0 12 * * *")] TimerInfo myTimer)
{
return client.PurgeInstanceHistoryAsync(
DateTime.MinValue,
DateTime.UtcNow.AddDays(-30),
new List<OrchestrationStatus>
{
OrchestrationStatus.Completed
});
}
注
前の C# コードでは、 IDurableOrchestrationClientを含むインプロセス モデルが使用されています。これは、Durable Functions 拡張機能の新しいバージョンでは古いものとしてマークされています。 新しい .NET プロジェクトの場合は、を使用することを検討してください。 詳細については、 Durable Functions のバージョンに関する記事を 参照してください。
purgeInstanceHistoryBy メソッドを使用すると、複数のインスタンスのインスタンス履歴を条件付きで消去できます。
function.json
{
"bindings": [
{
"schedule": "0 0 12 * * *",
"name": "myTimer",
"type": "timerTrigger",
"direction": "in"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
],
"disabled": false
}
注
この例では、Durable Functions バージョン 2.x を対象とします。 バージョン 1.x では、orchestrationClientの代わりに durableClient を使用します。
index.js
const df = require("durable-functions");
module.exports = async function (context, myTimer) {
const client = df.getClient(context);
const createdTimeFrom = new Date(0);
const createdTimeTo = new Date().setDate(today.getDate() - 30);
const runtimeStatuses = [ df.OrchestrationRuntimeStatus.Completed ];
return client.purgeInstanceHistoryBy(createdTimeFrom, createdTimeTo, runtimeStatuses);
};
import azure.functions as func
import azure.durable_functions as df
from azure.durable_functions.models.DurableOrchestrationStatus import OrchestrationRuntimeStatus
from datetime import datetime, timedelta
async def main(req: func.HttpRequest, starter: str, instance_id: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
created_time_from = datetime.min
created_time_to = datetime.today() + timedelta(days = -30)
runtime_statuses = [OrchestrationRuntimeStatus.Completed]
return await client.purge_instance_history_by(created_time_from, created_time_to, runtime_statuses)
@FunctionName("PurgeInstances")
public void purgeInstances(
@TimerTrigger(name = "purgeTimer", schedule = "0 0 12 * * *") String timerInfo,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
ExecutionContext context) throws TimeoutException {
PurgeInstanceCriteria criteria = new PurgeInstanceCriteria()
.setCreatedTimeFrom(Instant.now().minus(Duration.ofDays(60)))
.setCreatedTimeTo(Instant.now().minus(Duration.ofDays(30)))
.setRuntimeStatusList(List.of(OrchestrationRuntimeStatus.COMPLETED));
PurgeResult result = durableContext.getClient().purgeInstances(criteria);
context.getLogger().info(String.format("Purged %d instance(s)", result.getDeletedInstanceCount()));
}
using Microsoft.DurableTask.Client;
// Purge completed instances older than 30 days
var filter = new PurgeInstancesFilter(
CreatedFrom: DateTime.MinValue,
CreatedTo: DateTime.UtcNow.AddDays(-30),
Statuses: new[] { OrchestrationRuntimeStatus.Completed });
PurgeResult result = await client.PurgeAllInstancesAsync(filter);
Console.WriteLine($"Purged {result.PurgedInstanceCount} instance(s).");
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from datetime import datetime, timedelta
# Purge completed instances older than 30 days
result = client.purge_orchestrations(
created_time_from=datetime.min,
created_time_to=datetime.utcnow() - timedelta(days=30),
runtime_status=['COMPLETED']
)
print(f"Purged {result.deleted_instance_count} instance(s).")
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.PurgeInstanceCriteria;
import com.microsoft.durabletask.PurgeResult;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
// Purge completed instances older than 30 days
PurgeInstanceCriteria criteria = new PurgeInstanceCriteria()
.setCreatedTimeTo(Instant.now().minus(Duration.ofDays(30)))
.setRuntimeStatusList(List.of(OrchestrationRuntimeStatus.COMPLETED));
PurgeResult result = client.purgeInstances(criteria);
System.out.println("Purged " + result.getDeletedInstanceCount() + " instance(s).");
import { createAzureManagedClient } from "@microsoft/durabletask-js-azuremanaged";
import { OrchestrationStatus, PurgeInstanceCriteria } from "@microsoft/durabletask-js";
const client = createAzureManagedClient(connectionString);
// Purge completed instances older than 30 days
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
const criteria = new PurgeInstanceCriteria();
criteria.setCreatedTimeTo(thirtyDaysAgo);
criteria.setRuntimeStatusList([OrchestrationStatus.COMPLETED]);
const result = await client.purgeOrchestration(criteria);
console.log(`Purged ${result?.deletedInstanceCount ?? 0} instance(s).`);
注
消去履歴操作を成功させるには、ターゲット インスタンスのランタイム状態が [完了]、[ 終了]、または [失敗] である必要があります。
次のステップ