Skip to content

Time-series insertion filters that allow ignoring close-by samples #303

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 5, 2024
96 changes: 96 additions & 0 deletions src/NRedisStack/TimeSeries/DataTypes/TSParameters.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
using NRedisStack.DataTypes;
using NRedisStack.Literals.Enums;

namespace NRedisStack
{
public class TsBaseParams
{
protected IList<object> parameters;

internal TsBaseParams()
{
this.parameters = new List<object>();
}

internal TsBaseParams(IList<object> parameters)
{
this.parameters = parameters;
}

internal object[] ToArray(string key)
{
parameters.Insert(0, key);
return parameters.ToArray();
}
}

public class TsCreateParams : TsBaseParams
{
internal TsCreateParams(IList<object> parameters) : base(parameters) { }

internal TsCreateParams(long? retentionTime, IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed,
long? chunkSizeBytes, TsDuplicatePolicy? policy)
{
parameters.AddRetentionTime(retentionTime);
parameters.AddChunkSize(chunkSizeBytes);
parameters.AddLabels(labels);
parameters.AddUncompressed(uncompressed);
parameters.AddDuplicatePolicy(policy);
}
}

public class TsAlterParams : TsBaseParams
{
internal TsAlterParams(IList<object> parameters) : base(parameters) { }

internal TsAlterParams(long? retentionTime, long? chunkSizeBytes, TsDuplicatePolicy? policy, IReadOnlyCollection<TimeSeriesLabel>? labels)
{
parameters.AddRetentionTime(retentionTime);
parameters.AddChunkSize(chunkSizeBytes);
parameters.AddDuplicatePolicy(policy);
parameters.AddLabels(labels);
}
}

public class TsAddParams : TsBaseParams
{
internal TsAddParams(IList<object> parameters) : base(parameters) { }

internal TsAddParams(TimeStamp timestamp, double value, long? retentionTime, IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed, long? chunkSizeBytes, TsDuplicatePolicy? policy)
{
parameters.Add(timestamp.Value);
parameters.Add(value);
parameters.AddRetentionTime(retentionTime);
parameters.AddChunkSize(chunkSizeBytes);
parameters.AddLabels(labels);
parameters.AddUncompressed(uncompressed);
parameters.AddOnDuplicate(policy);
}
}

public class TsIncrByParams : TsBaseParams
{
internal TsIncrByParams(IList<object> parameters) : base(parameters) { }

internal TsIncrByParams(double value, TimeStamp? timestampMaybe, long? retentionTime,
IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed, long? chunkSizeBytes)
{
parameters.Add(value);
if (timestampMaybe is { } timestamp) parameters.AddTimeStamp(timestamp);
parameters.AddRetentionTime(retentionTime);
parameters.AddChunkSize(chunkSizeBytes);
if (labels != null) parameters.AddLabels(labels);
parameters.AddUncompressed(uncompressed);
}
}

public class TsDecrByParams : TsIncrByParams
{
internal TsDecrByParams(IList<object> parameters) : base(parameters) { }

internal TsDecrByParams(double value, TimeStamp? timestampMaybe, long? retentionTime, IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed, long? chunkSizeBytes)
: base(value, timestampMaybe, retentionTime, labels, uncompressed, chunkSizeBytes)
{ }
}

}
1 change: 1 addition & 0 deletions src/NRedisStack/TimeSeries/Literals/CommandArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ internal class TimeSeriesArgs
public const string DEBUG = "DEBUG";
public const string BUCKETTIMESTAMP = "BUCKETTIMESTAMP";
public const string EMPTY = "EMPTY";
public const String IGNORE = "IGNORE";
}
}
105 changes: 0 additions & 105 deletions src/NRedisStack/TimeSeries/TimeSeriesAux.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,6 @@ public static void AddLatest(this IList<object> args, bool latest)
if (latest) args.Add(TimeSeriesArgs.LATEST);
}

public static void AddRetentionTime(this IList<object> args, long? retentionTime)
{
if (retentionTime.HasValue)
{
args.Add(TimeSeriesArgs.RETENTION);
args.Add(retentionTime);
}
}

public static void AddChunkSize(this IList<object> args, long? chunkSize)
{
if (chunkSize.HasValue)
{
args.Add(TimeSeriesArgs.CHUNK_SIZE);
args.Add(chunkSize);
}
}

public static void AddLabels(this IList<object> args, IReadOnlyCollection<TimeSeriesLabel>? labels)
{
if (labels != null)
{
args.Add(TimeSeriesArgs.LABELS);
foreach (var label in labels)
{
args.Add(label.Key);
args.Add(label.Value);
}
}
}

public static void AddUncompressed(this IList<object> args, bool? uncompressed)
{
if (uncompressed.HasValue)
{
args.Add(TimeSeriesArgs.UNCOMPRESSED);
}
}

public static void AddCount(this IList<object> args, long? count)
{
if (count.HasValue)
Expand All @@ -60,25 +21,6 @@ public static void AddCount(this IList<object> args, long? count)
}
}

public static void AddDuplicatePolicy(this IList<object> args, TsDuplicatePolicy? policy)
{
if (policy.HasValue)
{
args.Add(TimeSeriesArgs.DUPLICATE_POLICY);
args.Add(policy.Value.AsArg());
}
}


public static void AddOnDuplicate(this IList<object> args, TsDuplicatePolicy? policy)
{
if (policy.HasValue)
{
args.Add(TimeSeriesArgs.ON_DUPLICATE);
args.Add(policy.Value.AsArg());
}
}

public static void AddAlign(this IList<object> args, TimeStamp? alignMaybe)
{
if (alignMaybe is { } align)
Expand Down Expand Up @@ -212,53 +154,6 @@ public static void AddRule(this IList<object> args, TimeSeriesRule rule)
args.Add(rule.TimeBucket);
}

public static List<object> BuildTsCreateArgs(string key, long? retentionTime, IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed,
long? chunkSizeBytes, TsDuplicatePolicy? policy)
{
var args = new List<object> { key };
args.AddRetentionTime(retentionTime);
args.AddChunkSize(chunkSizeBytes);
args.AddLabels(labels);
args.AddUncompressed(uncompressed);
args.AddDuplicatePolicy(policy);
return args;
}

public static List<object> BuildTsAlterArgs(string key, long? retentionTime, long? chunkSizeBytes,
TsDuplicatePolicy? policy, IReadOnlyCollection<TimeSeriesLabel>? labels)
{
var args = new List<object> { key };
args.AddRetentionTime(retentionTime);
args.AddChunkSize(chunkSizeBytes);
args.AddDuplicatePolicy(policy);
args.AddLabels(labels);
return args;
}

public static List<object> BuildTsAddArgs(string key, TimeStamp timestamp, double value, long? retentionTime,
IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed, long? chunkSizeBytes, TsDuplicatePolicy? policy)
{
var args = new List<object> { key, timestamp.Value, value };
args.AddRetentionTime(retentionTime);
args.AddChunkSize(chunkSizeBytes);
args.AddLabels(labels);
args.AddUncompressed(uncompressed);
args.AddOnDuplicate(policy);
return args;
}

public static List<object> BuildTsIncrDecrByArgs(string key, double value, TimeStamp? timestampMaybe, long? retentionTime,
IReadOnlyCollection<TimeSeriesLabel>? labels, bool? uncompressed, long? chunkSizeBytes)
{
var args = new List<object> { key, value };
if (timestampMaybe is { } timestamp) args.AddTimeStamp(timestamp);
args.AddRetentionTime(retentionTime);
args.AddChunkSize(chunkSizeBytes);
if (labels != null) args.AddLabels(labels);
args.AddUncompressed(uncompressed);
return args;
}

public static List<object> BuildTsDelArgs(string key, TimeStamp fromTimeStamp, TimeStamp toTimeStamp)
{
var args = new List<object>
Expand Down
19 changes: 19 additions & 0 deletions src/NRedisStack/TimeSeries/TimeSeriesCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,33 @@ public TimeSeriesCommands(IDatabase db) : base(db)
#region Create

/// <inheritdoc/>
[Obsolete("Please use the other method with TsCreateParams and check related builder TsCreateParamsBuilder to build parameters.")]
public bool Create(string key, long? retentionTime = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null, long? chunkSizeBytes = null, TsDuplicatePolicy? duplicatePolicy = null)
{
return _db.Execute(TimeSeriesCommandsBuilder.Create(key, retentionTime, labels,
uncompressed, chunkSizeBytes,
duplicatePolicy)).OKtoBoolean();
}

/// <inheritdoc/>
public bool Create(string key, TsCreateParams parameters) => _db.Execute(TimeSeriesCommandsBuilder.Create(key, parameters)).OKtoBoolean();

#endregion

#region Update

/// <inheritdoc/>
[Obsolete("Please use the other method with TsAlterParams and check related builder TsAlterParamsBuilder to build parameters.")]
public bool Alter(string key, long? retentionTime = null, long? chunkSizeBytes = null, TsDuplicatePolicy? duplicatePolicy = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null)
{
return _db.Execute(TimeSeriesCommandsBuilder.Alter(key, retentionTime, chunkSizeBytes, duplicatePolicy, labels)).OKtoBoolean();
}

/// <inheritdoc/>
public bool Alter(string key, TsAlterParams parameters) => _db.Execute(TimeSeriesCommandsBuilder.Alter(key, parameters)).OKtoBoolean();

/// <inheritdoc/>
[Obsolete("Please use the other method with TsAddParams and check related builder TsAddParamsBuilder to build parameters.")]
public TimeStamp Add(string key, TimeStamp timestamp, double value, long? retentionTime = null,
IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null,
long? chunkSizeBytes = null, TsDuplicatePolicy? duplicatePolicy = null)
Expand All @@ -41,26 +49,37 @@ public TimeStamp Add(string key, TimeStamp timestamp, double value, long? retent
uncompressed, chunkSizeBytes, duplicatePolicy)).ToTimeStamp();
}

/// <inheritdoc/>
public TimeStamp Add(string key, TsAddParams parameters) => _db.Execute(TimeSeriesCommandsBuilder.Add(key, parameters)).ToTimeStamp();

/// <inheritdoc/>
public IReadOnlyList<TimeStamp> MAdd(IReadOnlyCollection<(string key, TimeStamp timestamp, double value)> sequence)
{
return _db.Execute(TimeSeriesCommandsBuilder.MAdd(sequence)).ToTimeStampArray()!;
}

/// <inheritdoc/>
[Obsolete("Please use the other method with TsIncrByParams and check related builder TsIncryByParamsBuilder to build parameters.")]
public TimeStamp IncrBy(string key, double value, TimeStamp? timestamp = null, long? retentionTime = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null, long? chunkSizeBytes = null)
{
return _db.Execute(TimeSeriesCommandsBuilder.IncrBy(key, value, timestamp, retentionTime,
labels, uncompressed, chunkSizeBytes)).ToTimeStamp();
}

/// <inheritdoc/>
public TimeStamp IncrBy(string key, TsIncrByParams parameters) => _db.Execute(TimeSeriesCommandsBuilder.IncrBy(key, parameters)).ToTimeStamp();

/// <inheritdoc/>
[Obsolete("Please use the other method with TsDecrByParams and check related builder TsDecryByParamsBuilder to build parameters.")]
public TimeStamp DecrBy(string key, double value, TimeStamp? timestamp = null, long? retentionTime = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null, long? chunkSizeBytes = null)
{
return _db.Execute(TimeSeriesCommandsBuilder.DecrBy(key, value, timestamp, retentionTime,
labels, uncompressed, chunkSizeBytes)).ToTimeStamp();
}

/// <inheritdoc/>
public TimeStamp DecrBy(string key, TsDecrByParams parameters) => _db.Execute(TimeSeriesCommandsBuilder.DecrBy(key, parameters)).ToTimeStamp();

/// <inheritdoc/>
public long Del(string key, TimeStamp fromTimeStamp, TimeStamp toTimeStamp)
{
Expand Down
22 changes: 22 additions & 0 deletions src/NRedisStack/TimeSeries/TimeSeriesCommandsAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,42 @@ public TimeSeriesCommandsAsync(IDatabaseAsync db)
#region Create

/// <inheritdoc/>
[Obsolete("Please use the other method with TsCreateParams and check related builder TsCreateParamsBuilder to build parameters.")]
public async Task<bool> CreateAsync(string key, long? retentionTime = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null, long? chunkSizeBytes = null, TsDuplicatePolicy? duplicatePolicy = null)
{
return (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.Create(key, retentionTime, labels,
uncompressed, chunkSizeBytes,
duplicatePolicy))).OKtoBoolean();
}

/// <inheritdoc/>
public async Task<bool> CreateAsync(string key, TsCreateParams parameters) => (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.Create(key, parameters))).OKtoBoolean();

#endregion

#region Update

/// <inheritdoc/>
[Obsolete("Please use the other method with TsAlterParams and check related builder TsAlterParamsBuilder to build parameters.")]
public async Task<bool> AlterAsync(string key, long? retentionTime = null, long? chunkSizeBytes = null, TsDuplicatePolicy? duplicatePolicy = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null)
{
return (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.Alter(key, retentionTime, chunkSizeBytes, duplicatePolicy, labels))).OKtoBoolean();
}

/// <inheritdoc/>
public async Task<bool> AlterAsync(string key, TsAlterParams parameters) => (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.Alter(key, parameters))).OKtoBoolean();

/// <inheritdoc/>
[Obsolete("Please use the other method with TsAddParams and check related builder TsAddParamsBuilder to build parameters.")]
public async Task<TimeStamp> AddAsync(string key, TimeStamp timestamp, double value, long? retentionTime = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null, long? chunkSizeBytes = null, TsDuplicatePolicy? duplicatePolicy = null)
{
return (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.Add(key, timestamp, value, retentionTime, labels,
uncompressed, chunkSizeBytes, duplicatePolicy))).ToTimeStamp();
}

/// <inheritdoc/>
public async Task<TimeStamp> AddAsync(string key, TsAddParams parameters) => (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.Add(key, parameters))).ToTimeStamp();

/// <inheritdoc/>
public async Task<IReadOnlyList<TimeStamp>> MAddAsync(IReadOnlyCollection<(string key, TimeStamp timestamp, double value)> sequence)
{
Expand All @@ -51,13 +63,23 @@ public async Task<TimeStamp> IncrByAsync(string key, double value, TimeStamp? ti
labels, uncompressed, chunkSizeBytes))).ToTimeStamp();
}


/// <inheritdoc/>
[Obsolete("Please use the other method with TsIncrByParams and check related builder TsIncryByParamsBuilder to build parameters.")]
public async Task<TimeStamp> IncrByAsync(string key, TsIncrByParams parameters) => (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.IncrBy(key, parameters))).ToTimeStamp();


/// <inheritdoc/>
public async Task<TimeStamp> DecrByAsync(string key, double value, TimeStamp? timestamp = null, long? retentionTime = null, IReadOnlyCollection<TimeSeriesLabel>? labels = null, bool? uncompressed = null, long? chunkSizeBytes = null)
{
return (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.DecrBy(key, value, timestamp, retentionTime,
labels, uncompressed, chunkSizeBytes))).ToTimeStamp();
}

/// <inheritdoc/>
[Obsolete("Please use the other method with TsDecrByParams and check related builder TsDecryByParamsBuilder to build parameters.")]
public async Task<TimeStamp> DecrByAsync(string key, TsDecrByParams parameters) => (await _db.ExecuteAsync(TimeSeriesCommandsBuilder.DecrBy(key, parameters))).ToTimeStamp();

/// <inheritdoc/>
public async Task<long> DelAsync(string key, TimeStamp fromTimeStamp, TimeStamp toTimeStamp)
{
Expand Down
Loading
Loading