From 3fd17832f00f201cdf0b4fcaeef836747adcebac Mon Sep 17 00:00:00 2001 From: yaelgoede <44118611+yaelgoede@users.noreply.github.com> Date: Mon, 26 Feb 2024 21:10:13 +0100 Subject: [PATCH] added kafka trigger --- src/app.ts | 5 +++ src/trigger.ts | 9 +++++ types/app.d.ts | 8 ++++ types/index.d.ts | 1 + types/kafka.d.ts | 92 ++++++++++++++++++++++++++++++++++++++++++++++ types/trigger.d.ts | 6 +++ 6 files changed, 121 insertions(+) create mode 100644 types/kafka.d.ts diff --git a/src/app.ts b/src/app.ts index b9d974a..0335700 100644 --- a/src/app.ts +++ b/src/app.ts @@ -12,6 +12,7 @@ import { HttpHandler, HttpMethod, HttpMethodFunctionOptions, + KafkaFunctionOptions, ServiceBusQueueFunctionOptions, ServiceBusTopicFunctionOptions, StorageBlobFunctionOptions, @@ -124,6 +125,10 @@ export function cosmosDB(name: string, options: CosmosDBFunctionOptions): void { generic(name, convertToGenericOptions(options, trigger.cosmosDB)); } +export function kafka(name: string, options: KafkaFunctionOptions): void { + generic(name, convertToGenericOptions(options, trigger.kafka)); +} + export function warmup(name: string, options: WarmupFunctionOptions): void { generic(name, convertToGenericOptions(options, trigger.warmup)); } diff --git a/src/trigger.ts b/src/trigger.ts index d7749fd..8afd3b2 100644 --- a/src/trigger.ts +++ b/src/trigger.ts @@ -12,6 +12,8 @@ import { GenericTriggerOptions, HttpTrigger, HttpTriggerOptions, + KafkaTrigger, + KafkaTriggerOptions, ServiceBusQueueTrigger, ServiceBusQueueTriggerOptions, ServiceBusTopicTrigger, @@ -92,6 +94,13 @@ export function cosmosDB(options: CosmosDBTriggerOptions): CosmosDBTrigger { }); } +export function kafka(options: KafkaTriggerOptions): KafkaTrigger { + return addTriggerBindingName({ + ...options, + type: 'kafkaTrigger', + }); +} + export function warmup(options: WarmupTriggerOptions): WarmupTrigger { return addTriggerBindingName({ ...options, diff --git a/types/app.d.ts b/types/app.d.ts index 8a6a75c..e8da2ae 100644 --- a/types/app.d.ts +++ b/types/app.d.ts @@ -6,6 +6,7 @@ import { EventGridFunctionOptions } from './eventGrid'; import { EventHubFunctionOptions } from './eventHub'; import { GenericFunctionOptions } from './generic'; import { HttpFunctionOptions, HttpHandler, HttpMethodFunctionOptions } from './http'; +import { KafkaFunctionOptions } from './kafka'; import { ServiceBusQueueFunctionOptions, ServiceBusTopicFunctionOptions } from './serviceBus'; import { SetupOptions } from './setup'; import { StorageBlobFunctionOptions, StorageQueueFunctionOptions } from './storage'; @@ -151,6 +152,13 @@ export function eventGrid(name: string, options: EventGridFunctionOptions): void */ export function cosmosDB(name: string, options: CosmosDBFunctionOptions): void; +/** + * Registers a kafka function in your app that will be triggered whenever a message is added to a kafka topic + * @param name The name of the function. The name must be unique within your app and will mostly be used for your own tracking purposes + * @param options Configuration options describing the inputs, outputs, and handler for this function + */ +export function kafka(name: string, options: KafkaFunctionOptions): void; + /** * Registers a function in your app that will be triggered when an instance is added to scale a running function app. * The warmup trigger is only called during scale-out operations, not during restarts or other non-scale startups. diff --git a/types/index.d.ts b/types/index.d.ts index 4a3d990..ccb147b 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -16,6 +16,7 @@ export * from './hooks/invocationHooks'; export * from './http'; export * as input from './input'; export * from './InvocationContext'; +export * from './kafka'; export * as output from './output'; export * from './serviceBus'; export * from './setup'; diff --git a/types/kafka.d.ts b/types/kafka.d.ts new file mode 100644 index 0000000..2e1d548 --- /dev/null +++ b/types/kafka.d.ts @@ -0,0 +1,92 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import { FunctionOptions, FunctionResult, FunctionTrigger, RetryOptions } from './index'; +import { InvocationContext } from './InvocationContext'; + +export type KafkaHandler = (messages: unknown, context: InvocationContext) => FunctionResult; + +export interface KafkaFunctionOptions extends KafkaTriggerOptions, Partial { + handler: KafkaHandler; + + trigger?: KafkaTrigger; + + /** + * An optional retry policy to rerun a failed execution until either successful completion occurs or the maximum number of retries is reached. + * Learn more [here](https://learn.microsoft.com/azure/azure-functions/functions-bindings-error-pages) + */ + retry?: RetryOptions; +} + +export interface KafkaTriggerOptions { + /** + * The list of Kafka brokers monitored by the trigger. + */ + brokerList: []; + + /** + * The topic monitored by the trigger. + */ + topic: string; + + /** + * Indicates the cardinality of the trigger input. The supported values are ONE (default) and MANY. Use ONE when the input is a single message and MANY when the input is an array of messages. When you use MANY, you must also set a dataType. + */ + cardinality?: 'ONE' | 'MANY'; + + /** + * Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When string, the input is treated as just a string. When binary, the message is received as binary data, and Functions tries to deserialize it to an actual parameter type byte[]. + */ + dataType?: 'string' | 'binary'; + + /** + * Kafka consumer group used by the trigger. + */ + consumerGroup?: string; + + /** + * Schema of a generic record when using the Avro protocol. + */ + avroSchema?: string; + + /** + * The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are Gssapi, Plain (default), ScramSha256, ScramSha512. + */ + authenticationMode?: 'Plain' | 'Gssapi' | 'ScramSha256' | 'ScramSha512'; + + /** + * The username for SASL authentication. Not supported when AuthenticationMode is Gssapi. + */ + username?: string; + + /** + * The password for SASL authentication. Not supported when AuthenticationMode is Gssapi. + */ + password?: string; + + /** + * The security protocol used when communicating with brokers. The supported values are plaintext (default), ssl, sasl_plaintext, sasl_ssl. + */ + protocol?: 'plaintext' | 'ssl' | 'sasl_plaintext' | 'sasl_ssl'; + + /** + * Path to CA certificate file for verifying the broker's certificate. + */ + sslCaLocation?: string; + + /** + * Path to client's certificate file. + */ + sslCertificateLocation?: string; + + /** + * Path to client's private key (PEM) used for authentication. + */ + sslKeyLocation?: string; + + /** + * Password for client's certificate. + */ + sslKeyPassword?: string; +} +export type KafkaTrigger = FunctionTrigger & KafkaTriggerOptions; diff --git a/types/trigger.d.ts b/types/trigger.d.ts index bfae17c..c23b6fc 100644 --- a/types/trigger.d.ts +++ b/types/trigger.d.ts @@ -7,6 +7,7 @@ import { EventHubTrigger, EventHubTriggerOptions } from './eventHub'; import { GenericTriggerOptions } from './generic'; import { HttpTrigger, HttpTriggerOptions } from './http'; import { FunctionTrigger } from './index'; +import { KafkaTrigger, KafkaTriggerOptions } from './kafka'; import { ServiceBusQueueTrigger, ServiceBusQueueTriggerOptions, @@ -67,6 +68,11 @@ export function eventGrid(options: EventGridTriggerOptions): EventGridTrigger; */ export function cosmosDB(options: CosmosDBTriggerOptions): CosmosDBTrigger; +/** + * [Link to docs and examples](https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-kafka-trigger&pivots=programming-language-javascript) + */ +export function kafka(options: KafkaTriggerOptions): KafkaTrigger; + /** * [Link to docs and examples](https://learn.microsoft.com/azure/azure-functions/functions-bindings-warmup?tabs=isolated-process&pivots=programming-language-javascript) */