Atualizar ou mesclar registros em Banco de Dados SQL do Azure usando Azure Functions

Atualmente, Azure Stream Analytics (ASA) dá suporte apenas à inserção (acréscimo) de linhas em saídas SQL (SQL do Azure Databases e Azure Synapse Analytics). Este artigo discute soluções alternativas para habilitar UPDATE, UPSERT ou MERGE em bancos de dados SQL usando Azure Functions como camada intermediária.

As opções alternativas para Azure Functions são apresentadas no final.

Requisito

Você pode gravar dados em uma tabela usando um dos seguintes modos:

Modo Instrução T-SQL equivalente Requisitos
Acrescentar INSERT Nenhum
Substitua MERGE (UPSERT) Chave exclusiva
Acumular MERGE (UPSERT) utilizando o operador de atribuição composta (, +=...) Chave exclusiva e acumulador

Para ilustrar as diferenças, considere o que acontece ao ingerir os dois registros a seguir:

Hora_de_Chegada ID do Dispositivo Measure_Value
10:00 Um 1
10:05 Um 20

No modo de acréscimo , você insere dois registros. A instrução T-SQL equivalente é:

INSERT INTO [target] VALUES (...);

Resultando em:

Tempo_Modificado Device_Id Measure_Value
10:00 Um 1
10:05 Um 20

No modo de substituição , você obtém apenas o último valor por chave. Aqui você usa Device_Id como a chave. A instrução T-SQL equivalente é:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Resultando em:

Tempo_Modificado Device_Key Measure_Value
10:05 Um 20

Por fim, no modo de acumulação , você soma Value com um operador de atribuição composta (+=). Aqui também você usa Device_Id como a chave:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Resultando em:

Tempo_Modificado Device_Key Measure_Value
10:05 Um 21

Para considerações sobre desempenho, os adaptadores de saída de banco de dados ASA SQL atualmente dão suporte apenas ao modo acrescentar nativamente. Esses adaptadores usam inserção em massa para maximizar a taxa de transferência e limitar a pressão de retorno.

Este artigo mostra como usar Azure Functions para implementar modos Substituir e Acumular para o ASA. Quando você usa uma função como uma camada intermediária, o desempenho potencial de gravação não afeta o trabalho de streaming. Nesse sentido, usar Azure Functions funciona melhor com o SQL do Azure. Alterar de instruções em massa para instruções linha por linha no SQL do Synapse pode causar maiores problemas de desempenho.

Saída das Azure Functions

Neste trabalho, você substitui a saída do SQL ASA pela saída ASA Azure Functions. A função implementa os recursos UPDATE, UPSERT ou MERGE.

Atualmente, você pode acessar um Banco de Dados SQL em uma função usando duas opções. A primeira opção é a vinculação de saída SQL do Azure. Atualmente, ele está limitado a C#e oferece apenas o modo de substituição. A segunda opção é compor uma consulta SQL para enviar por meio do driver SQL apropriado (Microsoft. Data.SqlClient para .NET).

Ambos os exemplos a seguir pressupõem o esquema de tabela a seguir. A opção de associação requer que uma chave primária seja definida na tabela de destino. Isso não é necessário, mas é recomendado, ao usar um driver de SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Para usar uma função como saída do ASA, a função deve atender às seguintes expectativas:

  • Azure Stream Analytics espera o status HTTP 200 do aplicativo Functions para lotes que ele processa com êxito.
  • Quando o Azure Stream Analytics recebe uma exceção 413 ("HTTP Request Entity Too Large") de uma Azure Function, ele reduz o tamanho dos lotes que envia para a Azure Function.
  • Durante a conexão de teste, o Stream Analytics envia uma solicitação POST com um lote vazio para Azure Functions e espera que o status HTTP 20x volte a validar o teste.

Opção 1: Atualizar por chave com a Vinculação SQL do Azure Functions

Essa opção usa a vinculação de saída SQL do Azure Function. Essa extensão pode substituir um objeto em uma tabela sem que você precise escrever uma instrução SQL. Neste momento, ela não dá suporte a operadores de atribuição composta (acumulações).

Este exemplo foi criado em:

Para entender melhor a abordagem de associação, siga this tutorial.

Primeiro, crie um aplicativo de função padrão HttpTrigger seguindo este tutorial. Use as seguintes informações:

  • Linguagem: C#
  • Runtime: .NET 6 (em função/runtime v4)
  • Modelo: HTTP trigger

Instale a extensão de associação executando o seguinte comando em um terminal localizado na pasta do projeto:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Adicione o item SqlConnectionString na seção Values do local.settings.json, preenchendo a cadeia de conexão do servidor de destino:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Substitua toda a função (arquivo. cs no projeto) pelo trecho de código a seguir. Atualize o namespace, o nome da classe e o nome da função com seu próprio:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Atualize o nome da tabela de destino na seção associação:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Atualize a classe Device e seção de mapeamento para corresponder ao seu próprio esquema:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

Agora você pode testar a fiação entre a função local e o banco de dados através da depuração (F5 no Visual Studio Code). O banco de dados SQL precisa estar acessível de seu computador. Você pode usar o SSMS para verificar a conectividade. Em seguida, envie solicitações POST para o endpoint local. Uma solicitação com um corpo vazio deve retornar HTTP 204. Uma solicitação com um payload real deve ser persistida na tabela de destino (no modo substituir/atualizar). Aqui está uma amostra de payload correspondente ao esquema usado neste exemplo:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

A função agora pode ser publicada no Azure. Definir uma configuração de aplicativo para SqlConnectionString. O firewall do servidor SQL do Azure deve permitir a entrada dos serviços do Azure para que a funcionalidade ao vivo possa acessá-lo.

Em seguida, você pode definir a função como uma saída no trabalho ASA e usá-la para substituir registros em vez de inseri-los.

Opção 2: mesclar com atribuição composta (acumular) por meio de uma consulta SQL personalizada

Observação

Após reiniciar e recuperar, o ASA poderá reenviar eventos de saída que já emitiu. Esse comportamento pode fazer com que a lógica de acúmulo falhe (duplicando valores individuais). Para evitar esse problema, gere os mesmos dados em uma tabela usando a saída do SQL ASA nativa. Você pode usar essa tabela de controle para detectar problemas e ressincronizar o acúmulo quando necessário.

Essa opção usa Microsoft.Data.SqlClient. Essa biblioteca permite enviar consultas SQL para um Banco de Dados SQL.

Este exemplo foi construído sobre:

Primeiro, crie um aplicativo de função padrão HttpTrigger seguindo este tutorial. As seguintes informações são usadas:

  • Idioma: C#
  • Runtime: .NET 6 (sob a função/tempo de execução v4)
  • Modelo: HTTP trigger

Instale a biblioteca SqlClient executando o seguinte comando em um terminal localizado na pasta do projeto:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Adicione o item SqlConnectionString na seção Values do local.settings.json, preenchendo a cadeia de conexão do servidor de destino:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Substitua toda a função (arquivo. cs no projeto) pelo trecho de código a seguir. Atualize o namespace, o nome da classe e o nome da função por conta própria:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Atualize a seção de criação de comando sqltext para corresponder ao seu próprio esquema (observe como o acúmulo é obtido por meio do operador += na atualização):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

Agora você pode testar a conexão entre a função local e o banco de dados realizando debug (F5 no VS Code). O banco de dados SQL precisa estar acessível de seu computador. Você pode usar o SSMS para verificar a conectividade. Em seguida, envie solicitações POST para o endpoint local. Uma solicitação com um corpo vazio deve retornar HTTP 204. Uma solicitação com um payload real deve ser persistida na tabela de destino (no modo de acumulação/fusão). Aqui está uma amostra de payload correspondente ao esquema usado neste exemplo:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

A função agora pode ser publicada no Azure. Uma configuração de aplicativo deve ser definida para SqlConnectionString. O firewall do servidor do SQL do Azure deve permitir que os serviços do Azure entrem na função ao vivo para alcançá-lo.

A função pode ser definida como saída na tarefa ASA e usada para substituir registros ao invés de inseri-los.

Alternativas

Fora de Azure Functions, vários métodos podem alcançar o resultado esperado. Esta seção descreve alguns desses métodos.

Pós-processamento no Banco de Dados SQL de destino

Uma tarefa em segundo plano funcionará depois que os dados forem inseridos no banco de dados por meio das saídas do SQL ASA padrão.

Para SQL do Azure, use INSTEAD OFDML triggers para interceptar os comandos INSERT que o ASA emite.

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Para o SQL do Synapse, o ASA pode inserir em uma tabela intermediária. Uma tarefa recorrente pode então transformar os dados conforme necessário em uma tabela intermediária. Por fim, os dados são movidos para a tabela de produção.

Pré-processamento no Azure Cosmos DB

O Azure Cosmos DB dá suporte ao UPSERT nativamente. Aqui, somente acrescentar ou substituir é possível. Você deve gerenciar acúmulos do lado do cliente no Azure Cosmos DB.

Se os requisitos corresponderem, você poderá substituir o banco de dados SQL de destino por uma instância de Azure Cosmos DB. Essa alteração requer uma alteração importante na arquitetura geral da solução.

Para o SQL do Synapse, você pode usar Azure Cosmos DB como uma camada intermediária por meio de Azure Link do Synapse para Azure Cosmos DB. Use o Azure Link do Synapse para criar um armazenamento analítico. Em seguida, você pode consultar esse armazenamento de dados diretamente no SQL do Synapse.

Comparação das alternativas

Cada abordagem oferece diferentes propostas de valor e funcionalidades:

Tipo Opção Modos Banco de Dados SQL do Azure Azure Synapse Analytics
Pós-processamento
Gatilhos Substituir, Acumular + N/A, os gatilhos não estão disponíveis no SQL do Synapse
Staging Substituir, Acumular + +
Pré-processamento
Azure Functions Substituir, Acumular + – (desempenho de linha por linha)
Substituição do Azure Cosmos DB Substitua N/D N/D
Link do Azure Synapse para Azure Cosmos DB Substitua N/D +

Obtenha suporte

Para obter mais assistência, experimente a página de perguntas do Microsoft Q&A para o Azure Stream Analytics.

Próximas etapas