Skip to content

Commit 184e7c7

Browse files
Horusiathmarcpiechura
authored andcommitted
StreamRefs (#3321)
* initial commit for StreamRefs * more work over Stream-Ref serializer * fixes in serializer and config * fixes in stream-refs and tests * StreamRefs docs * added defaults for StreamRefsSettings * StreamRefs approvals API * fixed missing impl + added animation gifs to docs * applied docs, fixed namespaces * fixed stream ref serializer namespaces
1 parent f1de07d commit 184e7c7

File tree

20 files changed

+3364
-29
lines changed

20 files changed

+3364
-29
lines changed

build.fsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,8 @@ Target "Protobuf" <| fun _ ->
444444
("DistributedPubSubMessages.proto", "/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Serialization/Proto/");
445445
("ClusterShardingMessages.proto", "/src/contrib/cluster/Akka.Cluster.Sharding/Serialization/Proto/");
446446
("TestConductorProtocol.proto", "/src/core/Akka.Remote.TestKit/Proto/");
447-
("Persistence.proto", "/src/core/Akka.Persistence/Serialization/Proto/") ]
447+
("Persistence.proto", "/src/core/Akka.Persistence/Serialization/Proto/");
448+
("StreamRefMessages.proto", "/src/core/Akka.Streams/Serialization/Proto/")]
448449

449450
printfn "Using proto.exe: %s" protocPath
450451

docs/articles/streams/streamrefs.md

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
---
2+
uid: stream-ref
3+
title: StreamRefs - Reactive Streams over the network
4+
---
5+
6+
> **Warning**
7+
This module is currently marked as may change in the sense of being the subject of active research. This means that API or semantics can change without warning or deprecation period and it is not recommended to use this module in production just yet—you have been warned.
8+
9+
Stream references, or “stream refs” for short, allow running Akka Streams across multiple nodes within an Akka Remote boundaries.
10+
11+
Unlike heavier “streaming data processing” frameworks, Akka Streams are not “deployed” nor automatically distributed. Akka stream refs are, as the name implies, references to existing parts of a stream, and can be used to create a distributed processing framework or introduce such capabilities in specific parts of your application.
12+
13+
Stream refs are trivial to make use of in existing clustered Akka applications, and require no additional configuration or setup. They automatically maintain flow-control / back-pressure over the network, and employ Akka’s failure detection mechanisms to fail-fast (“let it crash!”) in the case of failures of remote nodes. They can be seen as an implementation of the Work Pulling Pattern, which one would otherwise implement manually.
14+
15+
> **Note**
16+
A useful way to think about stream refs is: “like an `IActorRef`, but for Akka Streams’s `Source` and `Sink`”.
17+
Stream refs refer to an already existing, possibly remote, `Sink` or `Source`. This is not to be mistaken with deploying streams remotely, which this feature is not intended for.
18+
19+
## Stream References
20+
21+
The prime use case for stream refs is to replace raw actor or HTTP messaging between systems where a long running stream of data is expected between two entities. Often times, they can be used to effectively achieve point to point streaming without the need of setting up additional message brokers or similar secondary clusters.
22+
23+
Stream refs are well suited for any system in which you need to send messages between nodes and need to do so in a flow-controlled fashion. Typical examples include sending work requests to worker nodes, as fast as possible, but not faster than the worker node can process them, or sending data elements which the downstream may be slow at processing. It is recommended to mix and introduce stream refs in Actor messaging based systems, where the actor messaging is used to orchestrate and prepare such message flows, and later the stream refs are used to do the flow-controlled message transfer.
24+
25+
Stream refs are not persistent, however it is simple to build a recover-able stream by introducing such protocol on the actor messaging layer. Stream refs are absolutely expected to be sent over Akka remoting to other nodes within a cluster, and as such, complement and do not compete with plain Actor messaging. Actors would usually be used to establish the stream, by means of some initial message saying “I want to offer you many log elements (the stream ref)”, or alternatively in the opposite way “If you need to send me much data, here is the stream ref you can use to do so”.
26+
27+
Since the two sides (“local” and “remote”) of each reference may be confusing to simply refer to as “remote” and “local” – since either side can be seen as “local” or “remote” depending how we look at it – we propose to use the terminology “origin” and “target”, which is defined by where the stream ref was created. For SourceRefs, the “origin” is the side which has the data that it is going to stream out. For SinkRefs the “origin” side is the actor system that is ready to receive the data and has allocated the ref. Those two may be seen as duals of each other, however to explain patterns about sharing references, we found this wording to be rather useful.
28+
29+
### Source Refs - offering streaming data to a remote system
30+
31+
A `SourceRef` can be offered to a remote actor system in order for it to consume some source of data that we have prepared locally.
32+
33+
In order to share a `Source` with a remote endpoint you need to materialize it by running it into the `StreamRefs.SourceRef`. That sink materializes the `ISourceRef<T>` that you can then send to other nodes. Please note that it materializes into a Task so you will have to use the continuation (either `PipeTo` or async/await pattern).
34+
35+
[!code-csharp[StreamRefsDocTests.cs](../../examples/DocsExamples/Streams/StreamRefsDocTests.cs?name=data-source-actor)]
36+
37+
The origin actor which creates and owns the `Source` could also perform some validation or additional setup when preparing the source. Once it has handed out the `ISourceRef<T>` the remote side can run it like this:
38+
39+
[!code-csharp[StreamRefsDocTests.cs](../../examples/DocsExamples/Streams/StreamRefsDocTests.cs?name=source-ref-materialization)]
40+
41+
The process of preparing and running a `ISourceRef<T>` powered distributed stream is shown by the animation below:
42+
43+
![source ref](/images/source-ref-animation.gif)
44+
45+
> **Warning**
46+
A `ISourceRef<T>` is by design “single-shot”. i.e. it may only be materialized once. This is in order to not complicate the mental model what materializing such value would mean.
47+
While stream refs are designed to be single shot, you may use them to mimic multicast scenarios, simply by starting a `Broadcast` stage once, and attaching multiple new streams to it, for each emitting a new stream ref. This way each output of the broadcast is by itself an unique single-shot reference, however they can all be powered using a single `Source` – located before the `Broadcast` stage.
48+
49+
### Sink Refs - offering to receive streaming data from a remote system
50+
51+
They can be used to offer the other side the capability to send to the origin side data in a streaming, flow-controlled fashion. The origin here allocates a Sink, which could be as simple as a `Sink.ForEach` or as advanced as a complex sink which streams the incoming data into various other systems (e.g. any of the Alpakka provided Sinks).
52+
53+
> **Note**
54+
To form a good mental model of `SinkRef`s, you can think of them as being similar to “passive mode” in FTP.
55+
56+
[!code-csharp[StreamRefsDocTests.cs](../../examples/DocsExamples/Streams/StreamRefsDocTests.cs?name=data-sink-actor)]
57+
58+
Using the offered `ISinkRef<>` to send data to the origin of the `Sink` is also simple, as we can treat the `ISinkRef<>` just as any other sink and directly runWith or run with it.
59+
60+
[!code-csharp[StreamRefsDocTests.cs](../../examples/DocsExamples/Streams/StreamRefsDocTests.cs?name=sink-ref-materialization)]
61+
62+
The process of preparing and running a `ISinkRef<>` powered distributed stream is shown by the animation below:
63+
64+
![sink ref](/images/sink-ref-animation.gif)
65+
66+
> **Warning**
67+
A `ISinkRef<>` is *by design* “single-shot”. i.e. it may only be materialized once. This is in order to not complicate the mental model what materializing such value would mean. If you have an use case for building a fan-in operation accepting writes from multiple remote nodes, you can build your `Sink` and prepend it with a `Merge` stage, each time materializing a new `ISinkRef<>` targeting that `Merge`. This has the added benefit of giving you full control how to merge these streams (i.e. by using “merge preferred” or any other variation of the fan-in stages).
68+
69+
## Configuration
70+
71+
### Stream reference subscription timeouts
72+
73+
All stream references have a subscription timeout, which is intended to prevent resource leaks in situations in which a remote node would requests the allocation of many streams yet never actually run them. In order to prevent this, each stream reference has a default timeout (of 30 seconds), after which the origin will abort the stream offer if the target has not materialized the stream ref in time. After the timeout has triggered, materialization of the target side will fail pointing out that the origin is missing.
74+
75+
Since these timeouts are often very different based on the kind of stream offered, and there can be many different kinds of them in the same application, it is possible to not only configure this setting globally (`akka.stream.materializer.stream-ref.subscription-timeout`), but also via attributes:
76+
77+
```csharp
78+
Source.Repeat("hello")
79+
.RunWith(StreamRefs.SourceRef<string>()
80+
.AddAttributes(StreamRefAttributes
81+
.SubscriptionTimeout(TimeSpan.FromSeconds(5)))
82+
, materializer);
83+
84+
StreamRefs.SinkRef<string>()
85+
.AddAttributes(StreamRefAttributes
86+
.SubscriptionTimeout(TimeSpan.FromSeconds(5)))
87+
.RunWith(Sink.Ignore<string>(), materializer);
88+
```
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
4+
using System.Threading.Tasks;
5+
using Akka;
6+
using Akka.Streams;
7+
using Akka.Streams.Dsl;
8+
using Akka.TestKit.Xunit2;
9+
using Xunit;
10+
using Xunit.Abstractions;
11+
using Akka.Actor;
12+
using Akka.IO;
13+
using Akka.Util;
14+
using System.Linq;
15+
16+
namespace DocsExamples.Streams
17+
{
18+
public class StreamRefsDocTests : TestKit
19+
{
20+
#region data-source-actor
21+
public sealed class RequestLogs
22+
{
23+
public int StreamId { get; }
24+
25+
public RequestLogs(int streamId)
26+
{
27+
StreamId = streamId;
28+
}
29+
}
30+
31+
public sealed class LogsOffer
32+
{
33+
public int StreamId { get; }
34+
public ISourceRef<string> SourceRef { get; }
35+
36+
public LogsOffer(int streamId, ISourceRef<string> sourceRef)
37+
{
38+
StreamId = streamId;
39+
SourceRef = sourceRef;
40+
}
41+
}
42+
43+
public class DataSource : ReceiveActor
44+
{
45+
public DataSource()
46+
{
47+
Receive<RequestLogs>(request =>
48+
{
49+
// create a source
50+
StreamLogs(request.StreamId)
51+
// materialize it using stream refs
52+
.RunWith(StreamRefs.SourceRef<string>(), Context.System.Materializer())
53+
// and send to sender
54+
.PipeTo(Sender, success: sourceRef => new LogsOffer(request.StreamId, sourceRef));
55+
});
56+
}
57+
58+
private Source<string, NotUsed> StreamLogs(int streamId) =>
59+
Source.From(Enumerable.Range(1, 100)).Select(i => i.ToString());
60+
}
61+
#endregion
62+
63+
#region data-sink-actor
64+
public sealed class PrepareUpload
65+
{
66+
public string Id { get; }
67+
public PrepareUpload(string id)
68+
{
69+
Id = id;
70+
}
71+
}
72+
73+
public sealed class MeasurementsSinkReady
74+
{
75+
public string Id { get; }
76+
public ISinkRef<string> SinkRef { get; }
77+
public MeasurementsSinkReady(string id, ISinkRef<string> sinkRef)
78+
{
79+
Id = id;
80+
SinkRef = sinkRef;
81+
}
82+
}
83+
84+
class DataReceiver : ReceiveActor
85+
{
86+
public DataReceiver()
87+
{
88+
Receive<PrepareUpload>(prepare =>
89+
{
90+
// obtain a source you want to offer
91+
var sink = LogsSinksFor(prepare.Id);
92+
93+
// materialize sink ref (remote is source data for us)
94+
StreamRefs.SinkRef<string>()
95+
.To(sink)
96+
.Run(Context.System.Materializer())
97+
.PipeTo(Sender, success: sinkRef => new MeasurementsSinkReady(prepare.Id, sinkRef));
98+
});
99+
}
100+
101+
private Sink<string, Task> LogsSinksFor(string id) =>
102+
Sink.ForEach<string>(Console.WriteLine);
103+
}
104+
#endregion
105+
106+
private ActorMaterializer Materializer { get; }
107+
108+
public StreamRefsDocTests(ITestOutputHelper output)
109+
: base("", output)
110+
{
111+
Materializer = Sys.Materializer();
112+
}
113+
114+
[Fact]
115+
public async Task SourceRef_must_propagate_source_from_another_system()
116+
{
117+
#region source-ref-materialization
118+
var sourceActor = Sys.ActorOf(Props.Create<DataSource>(), "dataSource");
119+
120+
var offer = await sourceActor.Ask<LogsOffer>(new RequestLogs(1337));
121+
await offer.SourceRef.Source.RunForeach(Console.WriteLine, Materializer);
122+
#endregion
123+
}
124+
125+
[Fact]
126+
public async Task SinkRef_must_receive_messages_from_another_system()
127+
{
128+
#region sink-ref-materialization
129+
var receiver = Sys.ActorOf(Props.Create<DataReceiver>(), "receiver");
130+
131+
var ready = await receiver.Ask<MeasurementsSinkReady>(new PrepareUpload("id"), timeout: TimeSpan.FromSeconds(30));
132+
133+
// stream local metrics to Sink's origin:
134+
Source.From(Enumerable.Range(1, 100))
135+
.Select(i => i.ToString())
136+
.RunWith(ready.SinkRef.Sink, Materializer);
137+
#endregion
138+
}
139+
}
140+
}

docs/images/sink-ref-animation.gif

94.6 KB
Loading

docs/images/source-ref-animation.gif

94.6 KB
Loading

src/Akka.sln

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Protobuf", "Protobuf", "{98
5757
protobuf\SystemMessageFormats.proto = protobuf\SystemMessageFormats.proto
5858
protobuf\TestConductorProtocol.proto = protobuf\TestConductorProtocol.proto
5959
protobuf\WireFormats.proto = protobuf\WireFormats.proto
60+
protobuf\StreamRefMessages.proto = protobuf\StreamRefMessages.proto
6061
EndProjectSection
6162
EndProject
6263
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Build", "Build", "{10C5B1E8-15B5-4EB3-81AE-1FC054FE1305}"

0 commit comments

Comments
 (0)