-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Durable backend for Distributed Data collections #2490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
898db53
dedicated serializers for ddata
Horusiath 37bbf4a
minor bug fixes
Horusiath 73e2865
minor bug fixed & richer collections API
Horusiath 9023f54
init ddata durable envelopes
Horusiath 71e2754
replicator support for durable ddata
Horusiath 2705678
durable LMDB storage for ddata
Horusiath 74f2ad4
DurableDataSpec
Horusiath 96fcc50
DurablePrunningSpec
Horusiath 285e210
fixed basic tests for ddata multinode
Horusiath 77a645d
ToString for messages
Horusiath 18d529d
opened ReplicatorSpec
Horusiath 6c65859
repaired a bunch of bugs in order to get ReplicatorSpec passing
Horusiath 3f4bab6
fixed in numeric aggregator reads
Horusiath 60ba8b5
moved LMDB dependency to separate project
Horusiath a2c7141
remaining bug: ReplicatorSpec D12
Horusiath e46b00d
removed replicator messages out of the Replicator class
Horusiath 40e89eb
DSL comments
Horusiath f449f9f
found and fixed bug in akka JVM
Horusiath 545c067
general cleanup of ddata MNTK specs
Horusiath e5afe76
removed unnecessary try-catch
Horusiath 56667a0
more logs for tests
Horusiath 74a312f
muted spec
Horusiath 09f4544
fixed LMDB durable store
Horusiath File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
80 changes: 80 additions & 0 deletions
80
src/contrib/cluster/Akka.DistributedData.LightningDB/Akka.DistributedData.LightningDB.csproj
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
<?xml version="1.0" encoding="utf-8"?> | ||
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> | ||
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> | ||
<PropertyGroup> | ||
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> | ||
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> | ||
<ProjectGuid>{26103500-0DFE-498D-8FCF-EFE66BB4FD69}</ProjectGuid> | ||
<OutputType>Library</OutputType> | ||
<AppDesignerFolder>Properties</AppDesignerFolder> | ||
<RootNamespace>Akka.DistributedData.LightningDB</RootNamespace> | ||
<AssemblyName>Akka.DistributedData.LightningDB</AssemblyName> | ||
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion> | ||
<FileAlignment>512</FileAlignment> | ||
</PropertyGroup> | ||
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> | ||
<DebugSymbols>true</DebugSymbols> | ||
<DebugType>full</DebugType> | ||
<Optimize>false</Optimize> | ||
<OutputPath>bin\Debug\</OutputPath> | ||
<DefineConstants>DEBUG;TRACE</DefineConstants> | ||
<ErrorReport>prompt</ErrorReport> | ||
<WarningLevel>4</WarningLevel> | ||
</PropertyGroup> | ||
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> | ||
<DebugType>pdbonly</DebugType> | ||
<Optimize>true</Optimize> | ||
<OutputPath>bin\Release\</OutputPath> | ||
<DefineConstants>TRACE</DefineConstants> | ||
<ErrorReport>prompt</ErrorReport> | ||
<WarningLevel>4</WarningLevel> | ||
</PropertyGroup> | ||
<ItemGroup> | ||
<Reference Include="LightningDB, Version=0.9.7.0, Culture=neutral, processorArchitecture=MSIL"> | ||
<HintPath>..\..\..\packages\LightningDB.0.9.7\lib\net45\LightningDB.dll</HintPath> | ||
<Private>True</Private> | ||
</Reference> | ||
<Reference Include="System" /> | ||
<Reference Include="System.Collections.Immutable, Version=1.2.1.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL"> | ||
<HintPath>..\..\..\packages\System.Collections.Immutable.1.3.1\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> | ||
<Private>True</Private> | ||
</Reference> | ||
<Reference Include="System.Core" /> | ||
<Reference Include="System.Xml.Linq" /> | ||
<Reference Include="System.Data.DataSetExtensions" /> | ||
<Reference Include="Microsoft.CSharp" /> | ||
<Reference Include="System.Data" /> | ||
<Reference Include="System.Net.Http" /> | ||
<Reference Include="System.Xml" /> | ||
</ItemGroup> | ||
<ItemGroup> | ||
<Compile Include="LmdbDurableStore.cs" /> | ||
<Compile Include="Properties\AssemblyInfo.cs" /> | ||
</ItemGroup> | ||
<ItemGroup> | ||
<ProjectReference Include="..\..\..\core\Akka.Cluster\Akka.Cluster.csproj"> | ||
<Project>{6ab00f61-269a-4501-b06a-026707f000a7}</Project> | ||
<Name>Akka.Cluster</Name> | ||
</ProjectReference> | ||
<ProjectReference Include="..\..\..\core\Akka\Akka.csproj"> | ||
<Project>{5deddf90-37f0-48d3-a0b0-a5cbd8a7e377}</Project> | ||
<Name>Akka</Name> | ||
</ProjectReference> | ||
<ProjectReference Include="..\Akka.DistributedData\Akka.DistributedData.csproj"> | ||
<Project>{59cffc88-8a73-445d-b191-281e40be9421}</Project> | ||
<Name>Akka.DistributedData</Name> | ||
</ProjectReference> | ||
</ItemGroup> | ||
<ItemGroup> | ||
<None Include="packages.config" /> | ||
<EmbeddedResource Include="reference.conf" /> | ||
</ItemGroup> | ||
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> | ||
<!-- To modify your build process, add your task inside one of the targets below and uncomment it. | ||
Other similar extension points exist, see Microsoft.Common.targets. | ||
<Target Name="BeforeBuild"> | ||
</Target> | ||
<Target Name="AfterBuild"> | ||
</Target> | ||
--> | ||
</Project> |
229 changes: 229 additions & 0 deletions
229
src/contrib/cluster/Akka.DistributedData.LightningDB/LmdbDurableStore.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
#region copyright | ||
// ----------------------------------------------------------------------- | ||
// <copyright file="LmdbDurableStore.cs" company="Akka.NET project"> | ||
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2017 Akka.NET project <https://github.com/akkadotnet> | ||
// </copyright> | ||
// ----------------------------------------------------------------------- | ||
#endregion | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Collections.Immutable; | ||
using System.IO; | ||
using System.Text; | ||
using Akka.Actor; | ||
using Akka.Configuration; | ||
using Akka.DistributedData.Durable; | ||
using Akka.Event; | ||
using Akka.Serialization; | ||
using LightningDB; | ||
|
||
namespace Akka.DistributedData.LightningDB | ||
{ | ||
/// <summary> | ||
/// An actor implementing the durable store for the Distributed Data <see cref="Replicator"/> | ||
/// has to implement the protocol with the messages defined here. | ||
/// | ||
/// At startup the <see cref="Replicator"/> creates the durable store actor and sends the | ||
/// <see cref="LoadAll"/> message to it. It must then reply with 0 or more <see cref="LoadData"/> messages | ||
/// followed by one <see cref="LoadAllCompleted"/> message to the <see cref="IActorContext.Sender"/> (the <see cref="Replicator"/>). | ||
/// | ||
/// If the <see cref="LoadAll"/> fails it can throw <see cref="LoadFailedException"/> and the <see cref="Replicator"/> supervisor | ||
/// will stop itself and the durable store. | ||
/// | ||
/// When the <see cref="Replicator"/> needs to store a value it sends a <see cref="Store"/> message | ||
/// to the durable store actor, which must then reply with the <see cref="StoreReply.SuccessMessage"/> or | ||
/// <see cref="StoreReply.FailureMessage"/> to the <see cref="StoreReply.ReplyTo"/>. | ||
/// </summary> | ||
public sealed class LmdbDurableStore : ReceiveActor | ||
{ | ||
public const string DatabaseName = "ddata"; | ||
private sealed class WriteBehind | ||
{ | ||
public static readonly WriteBehind Instance = new WriteBehind(); | ||
private WriteBehind() { } | ||
} | ||
|
||
public static Actor.Props Props(Config config) => Actor.Props.Create(() => new LmdbDurableStore(config)); | ||
|
||
private readonly TimeSpan _writeBehindInterval; | ||
private readonly Dictionary<string, DurableDataEnvelope> _pending = new Dictionary<string, DurableDataEnvelope>(); | ||
private readonly LightningEnvironment _environment; | ||
private readonly ILoggingAdapter _log; | ||
private readonly Serializer _serializer; | ||
|
||
public LmdbDurableStore(Config config) | ||
{ | ||
config = config.GetConfig("lmdb"); | ||
if (config == null) throw new ArgumentException("Coudln't find config for LMDB durable store. Default path: `akka.cluster.distributed-data.durable.lmdb`"); | ||
|
||
_log = Context.GetLogger(); | ||
|
||
_writeBehindInterval = config.GetString("write-behind-interval") == "off" | ||
? TimeSpan.Zero : config.GetTimeSpan("write-behind-interval"); | ||
|
||
var mapSize = config.GetByteSize("map-size"); | ||
var dirPath = config.GetString("dir"); | ||
if (dirPath.EndsWith("ddata")) | ||
{ | ||
dirPath = $"path-{Context.System.Name}-{Self.Path.Parent.Name}-{Cluster.Cluster.Get(Context.System).SelfAddress.Port}"; | ||
} | ||
|
||
if (!Directory.Exists(dirPath)) | ||
{ | ||
Directory.CreateDirectory(dirPath); | ||
} | ||
|
||
_environment = new LightningEnvironment(dirPath, new EnvironmentConfiguration | ||
{ | ||
MapSize = mapSize ?? (100 * 1024 * 1024), | ||
MaxDatabases = 1 | ||
}); | ||
_environment.Open(EnvironmentOpenFlags.NoLock); | ||
|
||
using (var tx = _environment.BeginTransaction()) | ||
using (var db = tx.OpenDatabase(configuration: new DatabaseConfiguration { Flags = DatabaseOpenFlags.Create })) | ||
{ | ||
// just create | ||
} | ||
|
||
_serializer = Context.System.Serialization.FindSerializerForType(typeof(DurableDataEnvelope)); | ||
|
||
Init(); | ||
} | ||
|
||
protected override void PostStop() | ||
{ | ||
base.PostStop(); | ||
DoWriteBehind(); | ||
//_db.Dispose(); | ||
_environment.Dispose(); | ||
} | ||
|
||
protected override void PostRestart(Exception reason) | ||
{ | ||
base.PostRestart(reason); | ||
// Load is only done on first start, not on restart | ||
Become(Active); | ||
} | ||
|
||
private void Active() | ||
{ | ||
Receive<Store>(store => | ||
{ | ||
var reply = store.Reply; | ||
|
||
try | ||
{ | ||
if (_writeBehindInterval == TimeSpan.Zero) | ||
{ | ||
using (var tx = _environment.BeginTransaction()) | ||
using (var db = tx.OpenDatabase()) | ||
{ | ||
DbPut(tx, db, store.Key, store.Data); | ||
tx.Commit(); | ||
} | ||
} | ||
else | ||
{ | ||
if (_pending.Count > 0) | ||
Context.System.Scheduler.ScheduleTellOnce(_writeBehindInterval, Self, WriteBehind.Instance, ActorRefs.NoSender); | ||
_pending[store.Key] = store.Data; | ||
} | ||
|
||
reply?.ReplyTo.Tell(reply.SuccessMessage); | ||
} | ||
catch (Exception cause) | ||
{ | ||
_log.Error(cause, "Failed to store [{0}]", store.Key); | ||
reply?.ReplyTo.Tell(reply.FailureMessage); | ||
} | ||
}); | ||
Receive<WriteBehind>(_ => DoWriteBehind()); | ||
} | ||
|
||
private void Init() | ||
{ | ||
Receive<LoadAll>(loadAll => | ||
{ | ||
var t0 = System.DateTime.UtcNow; | ||
using (var tx = _environment.BeginTransaction(TransactionBeginFlags.ReadOnly)) | ||
using (var db = tx.OpenDatabase()) | ||
using (var cursor = tx.CreateCursor(db)) | ||
{ | ||
try | ||
{ | ||
var n = 0; | ||
var builder = ImmutableDictionary<string, DurableDataEnvelope>.Empty.ToBuilder(); | ||
foreach (var entry in cursor) | ||
{ | ||
n++; | ||
var key = Encoding.UTF8.GetString(entry.Key); | ||
var envelope = (DurableDataEnvelope)_serializer.FromBinary(entry.Value, typeof(DurableDataEnvelope)); | ||
builder.Add(new KeyValuePair<string, DurableDataEnvelope>(key, envelope)); | ||
} | ||
|
||
if (builder.Count > 0) | ||
{ | ||
var loadData = new LoadData(builder.ToImmutable()); | ||
Sender.Tell(loadData); | ||
} | ||
|
||
Sender.Tell(LoadAllCompleted.Instance); | ||
|
||
if (_log.IsDebugEnabled) | ||
_log.Debug("Load all of [{0}] entries took [{1}]", n, DateTime.UtcNow - t0); | ||
|
||
Become(Active); | ||
} | ||
catch (Exception e) | ||
{ | ||
throw new LoadFailedException("failed to load durable distributed-data", e); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
private void DbPut(LightningTransaction tx, LightningDatabase db, string key, DurableDataEnvelope data) | ||
{ | ||
var byteKey = Encoding.UTF8.GetBytes(key); | ||
var byteValue = _serializer.ToBinary(data); | ||
tx.Put(db, byteKey, byteValue); | ||
} | ||
|
||
private void DoWriteBehind() | ||
{ | ||
if (_pending.Count > 0) | ||
{ | ||
var t0 = DateTime.UtcNow; | ||
using (var tx = _environment.BeginTransaction()) | ||
using (var db = tx.OpenDatabase()) | ||
{ | ||
try | ||
{ | ||
foreach (var entry in _pending) | ||
{ | ||
DbPut(tx, db, entry.Key, entry.Value); | ||
} | ||
tx.Commit(); | ||
|
||
if (_log.IsDebugEnabled) | ||
{ | ||
_log.Debug("store and commit of [{0}] entries took {1} ms", _pending.Count, (DateTime.UtcNow - t0).TotalMilliseconds); | ||
} | ||
} | ||
catch (Exception cause) | ||
{ | ||
_log.Error(cause, "failed to store [{0}]", string.Join(", ", _pending.Keys)); | ||
tx.Abort(); | ||
} | ||
finally | ||
{ | ||
_pending.Clear(); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JVM version by default uses LMDB backend for storage. I think, it's better to move that dependency to a separate package however.