Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/main/java/com/redislabs/redistimeseries/Keyword.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public enum Keyword implements ProtocolCommand {
UNCOMPRESSED,
CHUNK_SIZE,
DUPLICATE_POLICY,
ON_DUPLICATE;
ON_DUPLICATE,
ALIGN;

private final byte[] raw;

Expand Down
24 changes: 24 additions & 0 deletions src/main/java/com/redislabs/redistimeseries/RangeParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public class RangeParams {

private Integer count;

private byte[] align;

private Aggregation aggregationType;
private long timeBucket;

Expand All @@ -21,6 +23,23 @@ public RangeParams count(int count) {
return this;
}

private RangeParams align(byte[] raw) {
this.align = raw;
return this;
}

public RangeParams align(long timestamp) {
return align(Protocol.toByteArray(timestamp));
}

public RangeParams alignStart() {
return align("start".getBytes());
}

public RangeParams alignEnd() {
return align("end".getBytes());
}

public RangeParams aggregation(Aggregation aggregation, long timeBucket) {
this.aggregationType = aggregation;
this.timeBucket = timeBucket;
Expand All @@ -38,6 +57,11 @@ public byte[][] getByteParams(String key, long from, long to) {
params.add(Protocol.toByteArray(count));
}

if (align != null) {
params.add(Keyword.ALIGN.getRaw());
params.add(align);
}

if (aggregationType != null) {
params.add(Keyword.AGGREGATION.getRaw());
params.add(aggregationType.getRaw());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.redislabs.redistimeseries;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -409,6 +410,39 @@ public void testIncrByDecrBy() throws InterruptedException {
}
}

@Test
public void align() {
client.add("align", 1, 10d);
client.add("align", 3, 5d);
client.add("align", 11, 10d);
client.add("align", 25, 11d);

Value[] values =
client.range("align", 1, 30, RangeParams.rangeParams().aggregation(Aggregation.COUNT, 10));
assertArrayEquals(new Value[] {new Value(1, 2), new Value(11, 1), new Value(21, 1)}, values);

values =
client.range(
"align",
1,
30,
RangeParams.rangeParams().alignStart().aggregation(Aggregation.COUNT, 10));
assertArrayEquals(new Value[] {new Value(1, 2), new Value(11, 1), new Value(21, 1)}, values);

values =
client.range(
"align",
1,
30,
RangeParams.rangeParams().alignEnd().aggregation(Aggregation.COUNT, 10));
assertArrayEquals(new Value[] {new Value(1, 2), new Value(11, 1), new Value(21, 1)}, values);

values =
client.range(
"align", 1, 30, RangeParams.rangeParams().align(5).aggregation(Aggregation.COUNT, 10));
assertArrayEquals(new Value[] {new Value(1, 2), new Value(11, 1), new Value(21, 1)}, values);
}

@Test
public void testGet() {

Expand Down