Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions NRedisTimeSeries.Example/MRangeExample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,27 @@ public static void MRangeWithLabelsExample()
}
redis.Close();
}

/// <summary>
/// Example for basic usage of RedisTimeSeries RANGE command with "-" and "+" as range boundreis, a filter and a Groupby concept.
/// NRedisTimeSeris MRange is expecting two TimeStamps objects as the range boundries.
/// In this case, the strings are implicitly casted into TimeStamp objects.
/// The TimeSeriesMRange command returns an IReadOnlyList<(string key, IReadOnlyList<TimeSeriesLabel> labels, IReadOnlyList<TimeSeriesTuple> values)>collection.
/// </summary>
public static void MRangeWithGroupbyExample()
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("localhost");
IDatabase db = redis.GetDatabase();
var filter = new List<string> { "MRANGEkey=MRANGEvalue" };
var results = db.TimeSeriesMRange("-", "+", filter, withLabels: true, groupbyTuple: ("labelName", TsReduce.Max));
// Values extraction example.
foreach (var result in results)
{
string group = result.key;
IReadOnlyList<TimeSeriesLabel> labels = result.labels;
IReadOnlyList<TimeSeriesTuple> values = result.values;
}
redis.Close();
}
}
}
22 changes: 22 additions & 0 deletions NRedisTimeSeries.Example/MRangeExampleAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,27 @@ public static async Task MRangeWithLabelsAsyncExample()
}
redis.Close();
}

/// <summary>
/// Example for basic usage of RedisTimeSeries RANGE command with "-" and "+" as range boundreis, a filter and a Groupby concept.
/// NRedisTimeSeris MRange is expecting two TimeStamps objects as the range boundries.
/// In this case, the strings are implicitly casted into TimeStamp objects.
/// The TimeSeriesMRange command returns an IReadOnlyList<(string key, IReadOnlyList<TimeSeriesLabel> labels, IReadOnlyList<TimeSeriesTuple> values)>collection.
/// </summary>
public static async Task MRangeWithGroupbyAsyncExample()
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("localhost");
IDatabase db = redis.GetDatabase();
var filter = new List<string> { "MRANGEkey=MRANGEvalue" };
var results = await db.TimeSeriesMRangeAsync("-", "+", filter, withLabels: true, groupbyTuple: ("labelName", TsReduce.Max));
// Values extraction example.
foreach (var result in results)
{
string group = result.key;
IReadOnlyList<TimeSeriesLabel> labels = result.labels;
IReadOnlyList<TimeSeriesTuple> values = result.values;
}
redis.Close();
}
}
}
46 changes: 46 additions & 0 deletions NRedisTimeSeries.Test/TestAPI/TestMRange.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,53 @@ public void TestMissingTimeBucket()
var tuples = CreateData(db, 50);
var ex = Assert.Throws<ArgumentException>(() => db.TimeSeriesMRange("-", "+", new List<string> { "key=MissingTimeBucket" }, aggregation: TsAggregation.Avg));
Assert.Equal("RANGE Aggregation should have timeBucket value", ex.Message);
}

[Fact]
public void TestMRangeGroupby()
{
IDatabase db = redisFixture.Redis.GetDatabase();
for(int i = 0; i < keys.Length; i++)
{
var label1 = new TimeSeriesLabel("key", "MRangeGroupby");
var label2 = new TimeSeriesLabel("group", i.ToString());
db.TimeSeriesCreate(keys[i], labels: new List<TimeSeriesLabel> { label1, label2 });
}

var tuples = CreateData(db, 50);
var results = db.TimeSeriesMRange("-", "+", new List<string> { "key=MRangeGroupby" }, withLabels: true, groupbyTuple: ("group", TsReduce.Min));
Assert.Equal(keys.Length, results.Count);
for (int i = 0; i < results.Count; i++)
{
Assert.Equal("group=" + i, results[i].key);
Assert.Equal(new TimeSeriesLabel("group", i.ToString()), results[i].labels[0]);
Assert.Equal(new TimeSeriesLabel("__reducer__", "min"), results[i].labels[1]);
Assert.Equal(new TimeSeriesLabel("__source__", keys[i]), results[i].labels[2]);
Assert.Equal(tuples, results[i].values);
}
}

[Fact]
public void TestMRangeReduce()
{
IDatabase db = redisFixture.Redis.GetDatabase();
foreach(var key in keys)
{
var label = new TimeSeriesLabel("key", "MRangeReduce");
db.TimeSeriesCreate(key, labels: new List<TimeSeriesLabel> { label });
}

var tuples = CreateData(db, 50);
var results = db.TimeSeriesMRange("-", "+", new List<string> { "key=MRangeReduce" }, withLabels: true, groupbyTuple: ("key", TsReduce.Sum));
Assert.Equal(1, results.Count);
Assert.Equal("key=MRangeReduce", results[0].key);
Assert.Equal(new TimeSeriesLabel("key", "MRangeReduce"), results[0].labels[0]);
Assert.Equal(new TimeSeriesLabel("__reducer__", "sum"), results[0].labels[1]);
Assert.Equal(new TimeSeriesLabel("__source__", string.Join(",", keys)), results[0].labels[2]);
for(int i = 0; i < results[0].values.Count; i++)
{
Assert.Equal(tuples[i].Val * 2, results[0].values[i].Val);
}
}
}
}
49 changes: 49 additions & 0 deletions NRedisTimeSeries.Test/TestAPI/TestMRangeAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,5 +177,54 @@ await db.TimeSeriesMRangeAsync("-", "+",
});
Assert.Equal("RANGE Aggregation should have timeBucket value", ex.Message);
}

[Fact]
public async Task TestMRangeGroupby()
{
var keys = CreateKeyNames(2);
var db = redisFixture.Redis.GetDatabase();
for(int i = 0; i < keys.Length; i++)
{
var label1 = new TimeSeriesLabel(keys[0], "value");
var label2 = new TimeSeriesLabel("group", i.ToString());
await db.TimeSeriesCreateAsync(keys[i], labels: new List<TimeSeriesLabel> { label1, label2 });
}

var tuples = await CreateData(db, keys, 50);
var results = await db.TimeSeriesMRangeAsync("-", "+", new List<string> { $"{keys[0]}=value" }, withLabels: true, groupbyTuple: ("group", TsReduce.Min));
Assert.Equal(keys.Length, results.Count);
for (int i = 0; i < results.Count; i++)
{
Assert.Equal("group=" + i, results[i].key);
Assert.Equal(new TimeSeriesLabel("group", i.ToString()), results[i].labels[0]);
Assert.Equal(new TimeSeriesLabel("__reducer__", "min"), results[i].labels[1]);
Assert.Equal(new TimeSeriesLabel("__source__", keys[i]), results[i].labels[2]);
Assert.Equal(tuples, results[i].values);
}
}

[Fact]
public async Task TestMRangeReduce()
{
var keys = CreateKeyNames(2);
var db = redisFixture.Redis.GetDatabase();
foreach(var key in keys)
{
var label = new TimeSeriesLabel(keys[0], "value");
await db.TimeSeriesCreateAsync(key, labels: new List<TimeSeriesLabel> { label });
}

var tuples = await CreateData(db, keys, 50);
var results = await db.TimeSeriesMRangeAsync("-", "+", new List<string> { $"{keys[0]}=value" }, withLabels: true, groupbyTuple: (keys[0], TsReduce.Sum));
Assert.Equal(1, results.Count);
Assert.Equal($"{keys[0]}=value", results[0].key);
Assert.Equal(new TimeSeriesLabel(keys[0], "value"), results[0].labels[0]);
Assert.Equal(new TimeSeriesLabel("__reducer__", "sum"), results[0].labels[1]);
Assert.Equal(new TimeSeriesLabel("__source__", string.Join(",", keys)), results[0].labels[2]);
for(int i = 0; i < results[0].values.Count; i++)
{
Assert.Equal(tuples[i].Val * 2, results[0].values[i].Val);
}
}
}
}
49 changes: 49 additions & 0 deletions NRedisTimeSeries.Test/TestAPI/TestMRevRange.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,56 @@ public void TestMissingTimeBucket()
var tuples = CreateData(db, keys, 50);
var ex = Assert.Throws<ArgumentException>(() => db.TimeSeriesMRevRange("-", "+", new List<string> { "key=MissingTimeBucket" }, aggregation: TsAggregation.Avg));
Assert.Equal("RANGE Aggregation should have timeBucket value", ex.Message);
}

[Fact]
public void TestMRevRangeGroupby()
{
var keys = CreateKeyNames(2);
var db = redisFixture.Redis.GetDatabase();
for(int i = 0; i < keys.Length; i++)
{
var label1 = new TimeSeriesLabel(keys[0], "value");
var label2 = new TimeSeriesLabel("group", i.ToString());
db.TimeSeriesCreate(keys[i], labels: new List<TimeSeriesLabel> { label1, label2 });
}

var tuples = CreateData(db, keys, 50);
var results = db.TimeSeriesMRevRange("-", "+", new List<string> { $"{keys[0]}=value" }, withLabels: true, groupbyTuple: ("group", TsReduce.Min));
Assert.Equal(keys.Length, results.Count);
for (var i = 0; i < results.Count; i++)
{
Assert.Equal("group=" + i, results[i].key);
Assert.Equal(new TimeSeriesLabel("group", i.ToString()), results[i].labels[0]);
Assert.Equal(new TimeSeriesLabel("__reducer__", "min"), results[i].labels[1]);
Assert.Equal(new TimeSeriesLabel("__source__", keys[i]), results[i].labels[2]);
Assert.Equal(ReverseData(tuples), results[i].values);
}
}

[Fact]
public void TestMRevRangeReduce()
{
var keys = CreateKeyNames(2);
var db = redisFixture.Redis.GetDatabase();
foreach(var key in keys)
{
var label = new TimeSeriesLabel(keys[0], "value");
db.TimeSeriesCreate(key, labels: new List<TimeSeriesLabel> { label });
}

var tuples = CreateData(db, keys, 50);
var results = db.TimeSeriesMRevRange("-", "+", new List<string> { $"{keys[0]}=value" }, withLabels: true, groupbyTuple: (keys[0], TsReduce.Sum));
Assert.Equal(1, results.Count);
Assert.Equal($"{keys[0]}=value", results[0].key);
Assert.Equal(new TimeSeriesLabel(keys[0], "value"), results[0].labels[0]);
Assert.Equal(new TimeSeriesLabel("__reducer__", "sum"), results[0].labels[1]);
Assert.Equal(new TimeSeriesLabel("__source__", string.Join(",", keys)), results[0].labels[2]);
tuples = ReverseData(tuples);
for(int i = 0; i < results[0].values.Count; i++)
{
Assert.Equal(tuples[i].Val * 2, results[0].values[i].Val);
}
}
}
}
50 changes: 50 additions & 0 deletions NRedisTimeSeries.Test/TestAPI/TestMRevRangeAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,5 +177,55 @@ await db.TimeSeriesMRevRangeAsync("-", "+",
});
Assert.Equal("RANGE Aggregation should have timeBucket value", ex.Message);
}

[Fact]
public async Task TestMRevRangeGroupby()
{
var keys = CreateKeyNames(2);
var db = redisFixture.Redis.GetDatabase();
for(int i = 0; i < keys.Length; i++)
{
var label1 = new TimeSeriesLabel(keys[0], "value");
var label2 = new TimeSeriesLabel("group", i.ToString());
await db.TimeSeriesCreateAsync(keys[i], labels: new List<TimeSeriesLabel> { label1, label2 });
}

var tuples = await CreateData(db, keys, 50);
var results = await db.TimeSeriesMRevRangeAsync("-", "+", new List<string> { $"{keys[0]}=value" }, withLabels: true, groupbyTuple: ("group", TsReduce.Min));
Assert.Equal(keys.Length, results.Count);
for (var i = 0; i < results.Count; i++)
{
Assert.Equal("group=" + i, results[i].key);
Assert.Equal(new TimeSeriesLabel("group", i.ToString()), results[i].labels[0]);
Assert.Equal(new TimeSeriesLabel("__reducer__", "min"), results[i].labels[1]);
Assert.Equal(new TimeSeriesLabel("__source__", keys[i]), results[i].labels[2]);
Assert.Equal(ReverseData(tuples), results[i].values);
}
}

[Fact]
public async Task TestMRevRangeReduce()
{
var keys = CreateKeyNames(2);
var db = redisFixture.Redis.GetDatabase();
foreach(var key in keys)
{
var label = new TimeSeriesLabel(keys[0], "value");
await db.TimeSeriesCreateAsync(key, labels: new List<TimeSeriesLabel> { label });
}

var tuples = await CreateData(db, keys, 50);
var results = await db.TimeSeriesMRevRangeAsync("-", "+", new List<string> { $"{keys[0]}=value" }, withLabels: true, groupbyTuple: (keys[0], TsReduce.Sum));
Assert.Equal(1, results.Count);
Assert.Equal($"{keys[0]}=value", results[0].key);
Assert.Equal(new TimeSeriesLabel(keys[0], "value"), results[0].labels[0]);
Assert.Equal(new TimeSeriesLabel("__reducer__", "sum"), results[0].labels[1]);
Assert.Equal(new TimeSeriesLabel("__source__", string.Join(",", keys)), results[0].labels[2]);
tuples = ReverseData(tuples);
for(int i = 0; i < results[0].values.Count; i++)
{
Assert.Equal(tuples[i].Val * 2, results[0].values[i].Val);
}
}
}
}
2 changes: 2 additions & 0 deletions NRedisTimeSeries/Commands/CommandArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@ internal class CommandArgs
public static string CHUNK_SIZE => "CHUNK_SIZE";
public static string DUPLICATE_POLICY => "DUPLICATE_POLICY";
public static string ON_DUPLICATE => "ON_DUPLICATE";
public static string GROPUBY => "GROUPBY";
public static string REDUCE => "REDUCE";
}
}
23 changes: 23 additions & 0 deletions NRedisTimeSeries/Commands/Enums/Reduce.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace NRedisTimeSeries.Commands.Enums
{
/// <summary>
/// TODO: Add description
/// </summary>
public enum TsReduce
{
/// <summary>
/// A sum of all samples in the group
/// </summary>
Sum,

/// <summary>
/// A minimum sample of all samples in the group
/// </summary>
Min,

/// <summary>
/// A maximum sample of all samples in the group
/// </summary>
Max,
}
}
24 changes: 24 additions & 0 deletions NRedisTimeSeries/Extensions/ReduceExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using NRedisTimeSeries.Commands.Enums;

namespace NRedisTimeSeries.Extensions
{
internal static class ReduceExtensions
{
public static string AsArg(this TsReduce reduce) => reduce switch
{
TsReduce.Sum => "SUM",
TsReduce.Min => "MIN",
TsReduce.Max => "MAX",
_ => throw new ArgumentOutOfRangeException(nameof(reduce), "Invalid Reduce type"),
};

public static TsReduce AsReduce(string reduce) => reduce switch
{
"SUM" => TsReduce.Sum,
"MIN" => TsReduce.Min,
"MAX" => TsReduce.Max,
_ => throw new ArgumentOutOfRangeException(nameof(reduce), $"Invalid Reduce type '{reduce}'"),
};
}
}
33 changes: 26 additions & 7 deletions NRedisTimeSeries/TimeSeriesClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using NRedisTimeSeries.Commands;
using System;
using NRedisTimeSeries.Commands;
using NRedisTimeSeries.Commands.Enums;
using NRedisTimeSeries.DataTypes;
using StackExchange.Redis;
Expand Down Expand Up @@ -226,10 +227,19 @@ public static IReadOnlyList<TimeSeriesTuple> TimeSeriesRevRange(this IDatabase d
/// <param name="aggregation">Optional: Aggregation type</param>
/// <param name="timeBucket">Optional: Time bucket for aggregation in milliseconds</param>
/// <param name="withLabels">Optional: Include in the reply the label-value pairs that represent metadata labels of the time-series</param>
/// <returns>A list of <(key, labels, values)> tuples. Each tuple contains the key name, its labels and the values which satisfies the given range and filters.</returns>
public static IReadOnlyList<(string key, IReadOnlyList<TimeSeriesLabel> labels, IReadOnlyList<TimeSeriesTuple> values)> TimeSeriesMRange(this IDatabase db, TimeStamp fromTimeStamp, TimeStamp toTimeStamp, IReadOnlyCollection<string> filter, long? count = null, TsAggregation? aggregation = null, long? timeBucket = null, bool? withLabels = null)
/// <param name="groupbyTuple">Optional: Grouping by fields the results, and applying reducer functions on each group.</param>
/// <returns>A list of (key, labels, values) tuples. Each tuple contains the key name, its labels and the values which satisfies the given range and filters.</returns>
public static IReadOnlyList<(string key, IReadOnlyList<TimeSeriesLabel> labels, IReadOnlyList<TimeSeriesTuple> values)> TimeSeriesMRange(this IDatabase db,
TimeStamp fromTimeStamp,
TimeStamp toTimeStamp,
IReadOnlyCollection<string> filter,
long? count = null,
TsAggregation? aggregation = null,
long? timeBucket = null,
bool? withLabels = null,
(string, TsReduce)? groupbyTuple = null)
{
var args = BuildMultiRangeArgs(fromTimeStamp, toTimeStamp, filter, count, aggregation, timeBucket, withLabels);
var args = BuildMultiRangeArgs(fromTimeStamp, toTimeStamp, filter, count, aggregation, timeBucket, withLabels, groupbyTuple);
return ParseMRangeResponse(db.Execute(TS.MRANGE, args));
}

Expand All @@ -244,10 +254,19 @@ public static IReadOnlyList<TimeSeriesTuple> TimeSeriesRevRange(this IDatabase d
/// <param name="aggregation">Optional: Aggregation type</param>
/// <param name="timeBucket">Optional: Time bucket for aggregation in milliseconds</param>
/// <param name="withLabels">Optional: Include in the reply the label-value pairs that represent metadata labels of the time-series</param>
/// <returns>A list of <(key, labels, values)> tuples. Each tuple contains the key name, its labels and the values which satisfies the given range and filters.</returns>
public static IReadOnlyList<(string key, IReadOnlyList<TimeSeriesLabel> labels, IReadOnlyList<TimeSeriesTuple> values)> TimeSeriesMRevRange(this IDatabase db, TimeStamp fromTimeStamp, TimeStamp toTimeStamp, IReadOnlyCollection<string> filter, long? count = null, TsAggregation? aggregation = null, long? timeBucket = null, bool? withLabels = null)
/// <param name="groupbyTuple">Optional: Grouping by fields the results, and applying reducer functions on each group.</param>
/// <returns>A list of (key, labels, values) tuples. Each tuple contains the key name, its labels and the values which satisfies the given range and filters.</returns>
public static IReadOnlyList<(string key, IReadOnlyList<TimeSeriesLabel> labels, IReadOnlyList<TimeSeriesTuple> values)> TimeSeriesMRevRange(this IDatabase db,
TimeStamp fromTimeStamp,
TimeStamp toTimeStamp,
IReadOnlyCollection<string> filter,
long? count = null,
TsAggregation? aggregation = null,
long? timeBucket = null,
bool? withLabels = null,
(string, TsReduce)? groupbyTuple = null)
{
var args = BuildMultiRangeArgs(fromTimeStamp, toTimeStamp, filter, count, aggregation, timeBucket, withLabels);
var args = BuildMultiRangeArgs(fromTimeStamp, toTimeStamp, filter, count, aggregation, timeBucket, withLabels, groupbyTuple);
return ParseMRangeResponse(db.Execute(TS.MREVRANGE, args));
}

Expand Down
Loading