Skip to content

Migrating data to newUserFunctionId tables as part of Consumption plan support #1071

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 29, 2024
2 changes: 2 additions & 0 deletions src/SqlBindingConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ internal static class SqlBindingConstants
{
public const string ISO_8061_DATETIME_FORMAT = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'.'fffZ";

public const string WebsiteName = "WEBSITE_SITE_NAME";

/// <summary>
/// Sql Server Edition of the target server, list consolidated from
/// https://learn.microsoft.com/en-us/sql/t-sql/functions/serverproperty-transact-sql?view=sql-server-ver16
Expand Down
9 changes: 9 additions & 0 deletions src/SqlBindingUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ public static string GetConnectionString(string connectionStringSetting, IConfig
return connectionString;
}

public static string GetWebSiteName(IConfiguration configuration)
{
if (configuration == null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens for local development? Is website site name set?

Copy link
Contributor Author

@MaddyDev MaddyDev May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WEBSITE_SITE_NAME value is null for local development, it creates the hex value for userfunctionId only using the functionName for local development

{
throw new ArgumentNullException(nameof(configuration));
}
return configuration.GetConnectionStringOrSetting(SqlBindingConstants.WebsiteName);
}

/// <summary>
/// Parses the parameter string into a list of parameters, where each parameter is separated by "," and has the form
/// "@param1=param2". "@param1" is the parameter name to be used in the query or stored procedure, and param1 is the
Expand Down
1 change: 1 addition & 0 deletions src/Telemetry/Telemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ public enum TelemetryErrorName
UpsertRollback,
GetServerTelemetryProperties,
GetLeaseLockedRowCount,
OldUserFunctionIdRecordsExist,
}

internal class ServerProperties
Expand Down
31 changes: 28 additions & 3 deletions src/TriggerBinding/SqlTriggerBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ public async Task<IListener> CreateListenerAsync(ListenerFactoryContext context)
{
_ = context ?? throw new ArgumentNullException(nameof(context), "Missing listener context");

string userFunctionId = await this.GetUserFunctionIdAsync();
return new SqlTriggerListener<T>(this._connectionString, this._tableName, this._leasesTableName, userFunctionId, context.Executor, this._sqlOptions, this._logger, this._configuration);
string userFunctionId = this.GetUserFunctionIdAsync();
string oldUserFunctionId = await this.GetOldUserFunctionIdAsync();
return new SqlTriggerListener<T>(this._connectionString, this._tableName, this._leasesTableName, userFunctionId, oldUserFunctionId, context.Executor, this._sqlOptions, this._logger, this._configuration);
}

public ParameterDescriptor ToParameterDescriptor()
Expand All @@ -95,14 +96,37 @@ public ParameterDescriptor ToParameterDescriptor()
/// <summary>
/// Returns an ID that uniquely identifies the user function.
///
/// We call the WEBSITE_SITE_NAME from the configuration and use that to create the hash of the
/// user function id. Appending another hash of class+method in here ensures that if there
/// are multiple user functions within the same process and tracking the same SQL table, then each one of them
/// gets a separate view of the table changes.
/// </summary>
private string GetUserFunctionIdAsync()
{
// Using read-only App name for the hash https://learn.microsoft.com/en-us/azure/app-service/reference-app-settings?tabs=kudu%2Cdotnet#app-environment
string websiteName = SqlBindingUtilities.GetWebSiteName(this._configuration);

var methodInfo = (MethodInfo)this._parameter.Member;
string functionName = $"{methodInfo.DeclaringType.FullName}.{methodInfo.Name}";

using (var sha256 = SHA256.Create())
{
byte[] hash = sha256.ComputeHash(Encoding.UTF8.GetBytes(websiteName + functionName));
return new Guid(hash.Take(16).ToArray()).ToString("N").Substring(0, 16);
}
}

/// <summary>
/// Returns the deprecated ID that was used to identify the user function.
///
/// We call the WebJobs SDK library method to generate the host ID. The host ID is essentially a hash of the
/// assembly name containing the user function(s). This ensures that if the user ever updates their application,
/// unless the assembly name is modified, the new application version will be able to resume from the point
/// where the previous version had left. Appending another hash of class+method in here ensures that if there
/// are multiple user functions within the same process and tracking the same SQL table, then each one of them
/// gets a separate view of the table changes.
/// </summary>
private async Task<string> GetUserFunctionIdAsync()
private async Task<string> GetOldUserFunctionIdAsync()
{
string hostId = await this._hostIdProvider.GetHostIdAsync(CancellationToken.None);

Expand All @@ -115,5 +139,6 @@ private async Task<string> GetUserFunctionIdAsync()
return new Guid(hash.Take(16).ToArray()).ToString("N").Substring(0, 16);
}
}

}
}
95 changes: 89 additions & 6 deletions src/TriggerBinding/SqlTriggerListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ internal sealed class SqlTriggerListener<T> : IListener, IScaleMonitorProvider,
private readonly string _connectionString;
private readonly string _userDefinedLeasesTableName;
private readonly string _userFunctionId;
private readonly string _oldUserFunctionId;
private readonly ITriggeredFunctionExecutor _executor;
private readonly SqlOptions _sqlOptions;
private readonly ILogger _logger;
Expand All @@ -59,16 +60,18 @@ internal sealed class SqlTriggerListener<T> : IListener, IScaleMonitorProvider,
/// <param name="tableName">Name of the user table</param>
/// <param name="userDefinedLeasesTableName">Optional - Name of the leases table</param>
/// <param name="userFunctionId">Unique identifier for the user function</param>
/// <param name="oldUserFunctionId">deprecated user function id value created using hostId for the user function</param>
/// <param name="executor">Defines contract for triggering user function</param>
/// <param name="sqlOptions"></param>
/// <param name="logger">Facilitates logging of messages</param>
/// <param name="configuration">Provides configuration values</param>
public SqlTriggerListener(string connectionString, string tableName, string userDefinedLeasesTableName, string userFunctionId, ITriggeredFunctionExecutor executor, SqlOptions sqlOptions, ILogger logger, IConfiguration configuration)
public SqlTriggerListener(string connectionString, string tableName, string userDefinedLeasesTableName, string userFunctionId, string oldUserFunctionId, ITriggeredFunctionExecutor executor, SqlOptions sqlOptions, ILogger logger, IConfiguration configuration)
{
this._connectionString = !string.IsNullOrEmpty(connectionString) ? connectionString : throw new ArgumentNullException(nameof(connectionString));
this._userTable = !string.IsNullOrEmpty(tableName) ? new SqlObject(tableName) : throw new ArgumentNullException(nameof(tableName));
this._userDefinedLeasesTableName = userDefinedLeasesTableName;
this._userFunctionId = !string.IsNullOrEmpty(userFunctionId) ? userFunctionId : throw new ArgumentNullException(nameof(userFunctionId));
this._oldUserFunctionId = oldUserFunctionId;
this._executor = executor ?? throw new ArgumentNullException(nameof(executor));
this._sqlOptions = sqlOptions ?? throw new ArgumentNullException(nameof(sqlOptions));
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
Expand Down Expand Up @@ -389,16 +392,30 @@ private async Task<long> InsertGlobalStateTableRowAsync(SqlConnection connection
}
}

string insertRowGlobalStateTableQuery = $@"
bool migrateOldUserFunctionIdData = await this.OldUserFunctionIdRecordExists(connection, transaction, userTableId, cancellationToken);

string insertRowGlobalStateTableQuery = migrateOldUserFunctionIdData ? $@"
{AppLockStatements}

-- Migrate LastSyncVersion from oldUserFunctionId if it exists and delete the record
Declare @lastSyncVersion bigint;
Select @lastSyncVersion = LastSyncVersion from az_func.GlobalState where UserFunctionID = '{this._oldUserFunctionId}' AND UserTableID = {userTableId}
IF @lastSyncVersion is NULL
SET @lastSyncVersion = {(long)minValidVersion};
ELSE
Delete from az_func.GlobalState where UserFunctionID = '{this._oldUserFunctionId}' AND UserTableID = {userTableId}

INSERT INTO {GlobalStateTableName}
VALUES ('{this._userFunctionId}', {userTableId}, @lastSyncVersion, GETUTCDATE());
" :
$@"{AppLockStatements}

IF NOT EXISTS (
SELECT * FROM {GlobalStateTableName}
WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {userTableId}
)
INSERT INTO {GlobalStateTableName}
VALUES ('{this._userFunctionId}', {userTableId}, {(long)minValidVersion}, GETUTCDATE());
";
VALUES ('{this._userFunctionId}', {userTableId}, {(long)minValidVersion}, GETUTCDATE());";

using (var insertRowGlobalStateTableCommand = new SqlCommand(insertRowGlobalStateTableQuery, connection, transaction))
{
Expand Down Expand Up @@ -426,18 +443,41 @@ private async Task<long> CreateLeasesTableAsync(
{
string primaryKeysWithTypes = string.Join(", ", primaryKeyColumns.Select(col => $"{col.name.AsBracketQuotedString()} {col.type}"));
string primaryKeys = string.Join(", ", primaryKeyColumns.Select(col => col.name.AsBracketQuotedString()));
string OldLeasesTableName = leasesTableName.Contains(this._userFunctionId) ? leasesTableName.Replace(this._userFunctionId, this._oldUserFunctionId) : string.Empty;

string createLeasesTableQuery = $@"
string createLeasesTableQuery = string.IsNullOrEmpty(OldLeasesTableName) ? $@"
{AppLockStatements}

IF OBJECT_ID(N'{leasesTableName}', 'U') IS NULL
CREATE TABLE {leasesTableName} (
{primaryKeysWithTypes},
{LeasesTableChangeVersionColumnName} bigint NOT NULL,
{LeasesTableAttemptCountColumnName} int NOT NULL,
{LeasesTableLeaseExpirationTimeColumnName} datetime2,
PRIMARY KEY ({primaryKeys})
);
" : $@"
{AppLockStatements}

IF OBJECT_ID(N'{leasesTableName}', 'U') IS NULL
Begin
CREATE TABLE {leasesTableName} (
{primaryKeysWithTypes},
{LeasesTableChangeVersionColumnName} bigint NOT NULL,
{LeasesTableAttemptCountColumnName} int NOT NULL,
{LeasesTableLeaseExpirationTimeColumnName} datetime2,
PRIMARY KEY ({primaryKeys})
);

-- Migrate all data from OldLeasesTable and delete it.
IF OBJECT_ID(N'{OldLeasesTableName}', 'U') IS NOT NULL
Begin
Insert into {leasesTableName}
Select * from {OldLeasesTableName};

Drop Table {OldLeasesTableName};
END
End
";

using (var createLeasesTableCommand = new SqlCommand(createLeasesTableQuery, connection, transaction))
Expand All @@ -456,7 +496,7 @@ PRIMARY KEY ({primaryKeys})
// This generally shouldn't happen since we check for its existence in the statement but occasionally
// a race condition can make it so that multiple instances will try and create the schema at once.
// In that case we can just ignore the error since all we care about is that the schema exists at all.
this._logger.LogWarning($"Failed to create global state table '{leasesTableName}'. Exception message: {ex.Message} This is informational only, function startup will continue as normal.");
this._logger.LogWarning($"Failed to create leases table '{leasesTableName}'. Exception message: {ex.Message} This is informational only, function startup will continue as normal.");
}
else
{
Expand All @@ -468,6 +508,49 @@ PRIMARY KEY ({primaryKeys})
}
}

/// <summary>
/// Returns if oldUserFunctionId records exist.
/// </summary>
/// <param name="connection">The already-opened connection to use for executing the command</param>
/// <param name="transaction">The transaction wrapping this command</param>
/// <param name="userTableId">The ID of the table being watched</param>
/// <param name="cancellationToken">Cancellation token to pass to the command</param>
/// <returns>The time taken in ms to execute the command</returns>
private async Task<bool> OldUserFunctionIdRecordExists(
SqlConnection connection,
SqlTransaction transaction,
int userTableId,
CancellationToken cancellationToken)
{
string getRecordsWithOldUserFunctionId = $@"
Select Count(*) from az_func.GlobalState where UserFunctionID = '{this._oldUserFunctionId}' AND UserTableID = {userTableId}
";
long rowsExist = 0;
using (var getRecordsWithOldUserFunctionIdCommand = new SqlCommand(getRecordsWithOldUserFunctionId, connection, transaction))
{
try
{
rowsExist = (long)await getRecordsWithOldUserFunctionIdCommand.ExecuteScalarAsyncWithLogging(this._logger, cancellationToken);
}
catch (Exception ex)
{
TelemetryInstance.TrackException(TelemetryErrorName.OldUserFunctionIdRecordsExist, ex, this._telemetryProps);
var sqlEx = ex as SqlException;
if (sqlEx?.Number == ObjectAlreadyExistsErrorNumber)
{
// This generally shouldn't happen since we check for its existence in the statement but occasionally
// a race condition can make it so that multiple instances will try and create the schema at once.
// In that case we can just ignore the error since all we care about is that the schema exists at all.
this._logger.LogWarning($"Failed to check global state table for the deprecated functionId '{this._oldUserFunctionId}'. Exception message: {ex.Message} This is informational only, function startup will continue as normal.");
}
else
{
throw;
}
}
return rowsExist > 0;
}
}
public IScaleMonitor GetMonitor()
{
return this._scaleMonitor;
Expand Down
4 changes: 2 additions & 2 deletions test/Integration/SqlTriggerBindingIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ public async void GetMetricsTest()
this.SetChangeTrackingForTable("Products");
string userFunctionId = "func-id";
IConfiguration configuration = new ConfigurationBuilder().Build();
var listener = new SqlTriggerListener<Product>(this.DbConnectionString, "dbo.Products", "", userFunctionId, Mock.Of<ITriggeredFunctionExecutor>(), Mock.Of<SqlOptions>(), Mock.Of<ILogger>(), configuration);
var listener = new SqlTriggerListener<Product>(this.DbConnectionString, "dbo.Products", "", userFunctionId, "", Mock.Of<ITriggeredFunctionExecutor>(), Mock.Of<SqlOptions>(), Mock.Of<ILogger>(), configuration);
await listener.StartAsync(CancellationToken.None);
// Cancel immediately so the listener doesn't start processing the changes
await listener.StopAsync(CancellationToken.None);
Expand Down Expand Up @@ -639,7 +639,7 @@ public async void LastAccessTimeColumn_Created_OnStartup()
this.SetChangeTrackingForTable("Products");
string userFunctionId = "func-id";
IConfiguration configuration = new ConfigurationBuilder().Build();
var listener = new SqlTriggerListener<Product>(this.DbConnectionString, "dbo.Products", "", userFunctionId, Mock.Of<ITriggeredFunctionExecutor>(), Mock.Of<SqlOptions>(), Mock.Of<ILogger>(), configuration);
var listener = new SqlTriggerListener<Product>(this.DbConnectionString, "dbo.Products", "", userFunctionId, "", Mock.Of<ITriggeredFunctionExecutor>(), Mock.Of<SqlOptions>(), Mock.Of<ILogger>(), configuration);
await listener.StartAsync(CancellationToken.None);
// Cancel immediately so the listener doesn't start processing the changes
await listener.StopAsync(CancellationToken.None);
Expand Down
2 changes: 1 addition & 1 deletion test/Unit/TriggerBinding/SqlTriggerScaleMonitorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void InvalidUserConfiguredMaxChangesPerWorker(string maxChangesPerWorker)
(Mock<ILogger> mockLogger, List<string> logMessages) = CreateMockLogger();
Mock<IConfiguration> mockConfiguration = CreateMockConfiguration(maxChangesPerWorker);

Assert.Throws<InvalidOperationException>(() => new SqlTriggerListener<object>("testConnectionString", "testTableName", "", "testUserFunctionId", Mock.Of<ITriggeredFunctionExecutor>(), Mock.Of<SqlOptions>(), mockLogger.Object, mockConfiguration.Object));
Assert.Throws<InvalidOperationException>(() => new SqlTriggerListener<object>("testConnectionString", "testTableName", "", "testUserFunctionId", "", Mock.Of<ITriggeredFunctionExecutor>(), Mock.Of<SqlOptions>(), mockLogger.Object, mockConfiguration.Object));
}

private static IScaleMonitor<SqlTriggerMetrics> GetScaleMonitor(string tableName, string userFunctionId)
Expand Down