Skip to content

Commit cf3d274

Browse files
committed
feat: add support for Hazelcast ReplicatedMap
1 parent 1567b39 commit cf3d274

19 files changed

+926
-35
lines changed

src/generated/java/io/neonbee/config/NeonBeeConfigConverter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NeonBee
102102
obj.setTrackingDataHandlingStrategy((String) member.getValue());
103103
}
104104
break;
105+
case "useReplicatedMaps":
106+
if (member.getValue() instanceof Boolean) {
107+
obj.setUseReplicatedMaps((Boolean) member.getValue());
108+
}
109+
break;
105110
case "verticleDeploymentTimeout":
106111
if (member.getValue() instanceof Number) {
107112
obj.setVerticleDeploymentTimeout(((Number) member.getValue()).intValue());
@@ -155,6 +160,7 @@ static void toJson(NeonBeeConfig obj, java.util.Map<String, Object> json) {
155160
if (obj.getTrackingDataHandlingStrategy() != null) {
156161
json.put("trackingDataHandlingStrategy", obj.getTrackingDataHandlingStrategy());
157162
}
163+
json.put("useReplicatedMaps", obj.isUseReplicatedMaps());
158164
if (obj.getVerticleDeploymentTimeout() != null) {
159165
json.put("verticleDeploymentTimeout", obj.getVerticleDeploymentTimeout());
160166
}

src/main/java/io/neonbee/NeonBee.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import io.neonbee.internal.Registry;
6464
import io.neonbee.internal.ReplyInboundInterceptor;
6565
import io.neonbee.internal.SharedDataAccessor;
66+
import io.neonbee.internal.SharedDataAccessorFactory;
6667
import io.neonbee.internal.WriteSafeRegistry;
6768
import io.neonbee.internal.buffer.ImmutableBuffer;
6869
import io.neonbee.internal.cluster.ClusterHelper;
@@ -75,6 +76,7 @@
7576
import io.neonbee.internal.codec.ImmutableJsonObjectMessageCodec;
7677
import io.neonbee.internal.deploy.Deployable;
7778
import io.neonbee.internal.deploy.Deployables;
79+
import io.neonbee.internal.hazelcast.ReplicatedClusterEntityRegistry;
7880
import io.neonbee.internal.helper.ConfigHelper;
7981
import io.neonbee.internal.helper.FileSystemHelper;
8082
import io.neonbee.internal.job.RedeployEntitiesJob;
@@ -449,7 +451,8 @@ private Future<Void> registerHooks() {
449451
*/
450452
@VisibleForTesting
451453
Future<Void> initializeSharedMaps() {
452-
SharedDataAccessor sharedData = new SharedDataAccessor(vertx, NeonBee.class);
454+
SharedDataAccessor sharedData = new SharedDataAccessorFactory(this)
455+
.getSharedDataAccessor(NeonBee.class);
453456
sharedLocalMap = sharedData.getLocalMap(SHARED_MAP_NAME);
454457
return sharedData.<String, Object>getAsyncMap(SHARED_MAP_NAME).onSuccess(asyncMap -> sharedAsyncMap = asyncMap)
455458
.mapEmpty();
@@ -677,7 +680,12 @@ private Future<Void> deployModules() {
677680
this.healthRegistry = new HealthCheckRegistry(vertx);
678681
this.modelManager = new EntityModelManager(this);
679682
if (vertx.isClustered()) {
680-
this.entityRegistry = new ClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME);
683+
if (config.isUseReplicatedMaps() && ClusterHelper.getHazelcastClusterManager(vertx).isPresent()) {
684+
this.entityRegistry =
685+
new ReplicatedClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME);
686+
} else {
687+
this.entityRegistry = new ClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME);
688+
}
681689
} else {
682690
this.entityRegistry = new WriteSafeRegistry<>(vertx, EntityVerticle.REGISTRY_NAME);
683691
}

src/main/java/io/neonbee/cache/CachingDataVerticle.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.neonbee.data.DataRequest;
2929
import io.neonbee.data.DataVerticle;
3030
import io.neonbee.internal.SharedDataAccessor;
31+
import io.neonbee.internal.SharedDataAccessorFactory;
3132
import io.vertx.core.Context;
3233
import io.vertx.core.Future;
3334
import io.vertx.core.Vertx;
@@ -147,7 +148,8 @@ public void init(Vertx vertx, Context context) {
147148

148149
// we will only need to retrieve locks if we coalesce requests
149150
if (coalescingTimeout > 0) {
150-
sharedDataAccessor = new SharedDataAccessor(vertx, getClass());
151+
sharedDataAccessor = new SharedDataAccessorFactory(vertx)
152+
.getSharedDataAccessor(getClass());
151153
}
152154
}
153155

src/main/java/io/neonbee/config/NeonBeeConfig.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636
/**
3737
* In contrast to the {@link NeonBeeOptions} the {@link NeonBeeConfig} is persistent configuration in a file.
38-
*
38+
* <p>
3939
* Whilst the {@link NeonBeeOptions} contain information which is to specify when NeonBee starts, such as the port of
4040
* the server to start on and the cluster to connect to, which potentially could be different across cluster nodes, the
4141
* {@link NeonBeeConfig} contains information which is mostly shared across different cluster nodes or you would like to
@@ -100,6 +100,8 @@ public class NeonBeeConfig {
100100

101101
private int jsonMaxStringSize;
102102

103+
private boolean useReplicatedMaps;
104+
103105
/**
104106
* Are the metrics enabled?
105107
*
@@ -535,4 +537,38 @@ public NeonBeeConfig setJsonMaxStringSize(int jsonMaxStringSize) {
535537
public int getJsonMaxStringSize() {
536538
return jsonMaxStringSize;
537539
}
540+
541+
/**
542+
* Set the value to enable, disable replicated maps.
543+
* <p>
544+
* <b>Currently this feature is only supported for a Hazelcast-Cluster</b>
545+
* <p>
546+
* Replicated maps are a distributed data structure that provides a way to replicate data across multiple nodes in a
547+
* cluster. Replicated maps are useful when you need to replicate data across multiple nodes in a cluster, and you
548+
* don't need to partition the data.
549+
* <p>
550+
*
551+
* @return true if the replicated maps should be used, false otherwise
552+
*/
553+
public boolean isUseReplicatedMaps() {
554+
return useReplicatedMaps;
555+
}
556+
557+
/**
558+
* Set the value to enable, disable replicated maps.
559+
* <p>
560+
* <b>Currently this feature is only supported for a Hazelcast-Cluster</b>
561+
* <p>
562+
* Replicated maps are a distributed data structure that provides a way to replicate data across multiple nodes in a
563+
* cluster. Replicated maps are useful when you need to replicate data across multiple nodes in a cluster, and you
564+
* don't need to partition the data.
565+
* <p>
566+
*
567+
* @param useReplicatedMaps true if the replicated maps should be enabled, false otherwise
568+
* @return the {@linkplain NeonBeeConfig} for fluent use
569+
*/
570+
public NeonBeeConfig setUseReplicatedMaps(boolean useReplicatedMaps) {
571+
this.useReplicatedMaps = useReplicatedMaps;
572+
return this;
573+
}
538574
}

src/main/java/io/neonbee/endpoint/odatav4/ODataV4Endpoint.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import io.neonbee.endpoint.odatav4.internal.olingo.OlingoEndpointHandler;
3030
import io.neonbee.entity.EntityModel;
3131
import io.neonbee.internal.RegexBlockList;
32-
import io.neonbee.internal.SharedDataAccessor;
32+
import io.neonbee.internal.SharedDataAccessorFactory;
3333
import io.neonbee.logging.LoggingFacade;
3434
import io.vertx.core.Future;
3535
import io.vertx.core.Vertx;
@@ -201,27 +201,35 @@ public Future<Router> createEndpointRouter(Vertx vertx, String basePath, JsonObj
201201
// when NeonBee is started and / or in case the endpoint is not used.
202202
Route initialRoute = router.route();
203203
initialRoute.handler(
204-
routingContext -> new SharedDataAccessor(vertx, ODataV4Endpoint.class).getLocalLock(asyncLock ->
205-
// immediately initialize the router, this will also "arm" the event bus listener
206-
(!initialized.getAndSet(true)
207-
? refreshRouter(vertx, router, basePath, uriConversion, exposedEntities, models)
208-
: succeededFuture()).onComplete(handler -> {
209-
// wait for the refresh to finish (the result doesn't matter), remove the initial route, as
210-
// this will redirect all requests to the registered service endpoint handlers (if non have
211-
// been registered, e.g. due to a failure in model loading, it'll result in an 404). Could
212-
// have been removed already by refreshRouter, we don't care!
213-
initialRoute.remove();
214-
if (asyncLock.succeeded()) {
215-
// releasing the lock will cause other requests unblock and not call the initial route
216-
asyncLock.result().release();
217-
}
218-
219-
// let the router again handle the context again, now with either all service endpoints
220-
// registered, or none in case there have been a failure while loading the models.
221-
// NOTE: Re-route is the only elegant way I found to restart the current router to take
222-
// the new routes! Might consider checking again with the Vert.x 4.0 release.
223-
routingContext.reroute(routingContext.request().uri());
224-
})));
204+
routingContext -> new SharedDataAccessorFactory(vertx)
205+
.getSharedDataAccessor(ODataV4Endpoint.class)
206+
.getLocalLock(asyncLock ->
207+
// immediately initialize the router, this will also "arm" the event bus listener
208+
(!initialized.getAndSet(true)
209+
? refreshRouter(vertx, router, basePath, uriConversion, exposedEntities, models)
210+
: succeededFuture()).onComplete(handler -> {
211+
// wait for the refresh to finish (the result doesn't matter), remove the initial
212+
// route, as
213+
// this will redirect all requests to the registered service endpoint handlers (if
214+
// non have
215+
// been registered, e.g. due to a failure in model loading, it'll result in an 404).
216+
// Could
217+
// have been removed already by refreshRouter, we don't care!
218+
initialRoute.remove();
219+
if (asyncLock.succeeded()) {
220+
// releasing the lock will cause other requests unblock and not call the initial
221+
// route
222+
asyncLock.result().release();
223+
}
224+
225+
// let the router again handle the context again, now with either all service
226+
// endpoints
227+
// registered, or none in case there have been a failure while loading the models.
228+
// NOTE: Re-route is the only elegant way I found to restart the current router to
229+
// take
230+
// the new routes! Might consider checking again with the Vert.x 4.0 release.
231+
routingContext.reroute(routingContext.request().uri());
232+
})));
225233

226234
return succeededFuture(router);
227235
}

src/main/java/io/neonbee/entity/EntityModelManager.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import com.google.common.base.Functions;
1717

1818
import io.neonbee.NeonBee;
19-
import io.neonbee.internal.SharedDataAccessor;
19+
import io.neonbee.internal.SharedDataAccessorFactory;
2020
import io.neonbee.logging.LoggingFacade;
2121
import io.vertx.core.Future;
2222
import io.vertx.core.eventbus.DeliveryOptions;
@@ -147,7 +147,10 @@ public Future<Map<String, EntityModel>> getSharedModels() {
147147
}
148148

149149
// if not try to reload the models and return the loaded data model
150-
return new SharedDataAccessor(neonBee.getVertx(), EntityModelManager.class).getLocalLock()
150+
151+
return new SharedDataAccessorFactory(neonBee)
152+
.getSharedDataAccessor(EntityModelManager.class)
153+
.getLocalLock()
151154
.transform(asyncLocalLock -> {
152155
Map<String, EntityModel> retryModels = getBufferedModels();
153156
if (retryModels != null) {
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package io.neonbee.internal;
2+
3+
import io.neonbee.NeonBee;
4+
import io.neonbee.config.NeonBeeConfig;
5+
import io.neonbee.internal.cluster.ClusterHelper;
6+
import io.neonbee.internal.hazelcast.ReplicatedDataAccessor;
7+
import io.vertx.core.Vertx;
8+
9+
/**
10+
* Factory to create a {@link SharedDataAccessor} based on the configuration.
11+
*/
12+
public class SharedDataAccessorFactory {
13+
private final NeonBee neonBee;
14+
15+
/**
16+
* Create a new instance of {@link SharedDataAccessorFactory}.
17+
*/
18+
public SharedDataAccessorFactory() {
19+
this.neonBee = NeonBee.get();
20+
}
21+
22+
/**
23+
* Create a new instance of {@link SharedDataAccessorFactory}.
24+
*
25+
* @param vertx the Vert.x instance
26+
*/
27+
public SharedDataAccessorFactory(Vertx vertx) {
28+
this.neonBee = NeonBee.get(vertx);
29+
}
30+
31+
/**
32+
* Create a new instance of {@link SharedDataAccessorFactory}.
33+
*
34+
* @param neonBee the NeonBee instance
35+
*/
36+
public SharedDataAccessorFactory(NeonBee neonBee) {
37+
this.neonBee = neonBee;
38+
}
39+
40+
private boolean useHazelcastReplicatedMaps() {
41+
NeonBeeConfig config = neonBee.getConfig();
42+
return config.isUseReplicatedMaps() && ClusterHelper.getHazelcastClusterManager(neonBee.getVertx()).isPresent();
43+
}
44+
45+
/**
46+
* Get a {@link SharedDataAccessor} based on the configuration.
47+
*
48+
* @param accessClass the class to access the shared data
49+
* @return the shared data accessor
50+
*/
51+
public SharedDataAccessor getSharedDataAccessor(Class<?> accessClass) {
52+
if (useHazelcastReplicatedMaps()) {
53+
return new ReplicatedDataAccessor(neonBee.getVertx(), accessClass);
54+
} else {
55+
return new SharedDataAccessor(neonBee.getVertx(), accessClass);
56+
}
57+
}
58+
}

src/main/java/io/neonbee/internal/WriteSafeRegistry.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,25 @@ public class WriteSafeRegistry<T> implements Registry<T> {
2626

2727
private final String registryName;
2828

29+
/**
30+
* Create a new {@link WriteSafeRegistry}.
31+
*
32+
* @param registryName the name of the map registry
33+
* @param sharedDataAccessor the shared data accessor
34+
*/
35+
protected WriteSafeRegistry(String registryName, SharedDataAccessor sharedDataAccessor) {
36+
this.registryName = registryName;
37+
this.sharedDataAccessor = sharedDataAccessor;
38+
}
39+
2940
/**
3041
* Create a new {@link WriteSafeRegistry}.
3142
*
3243
* @param vertx the {@link Vertx} instance
3344
* @param registryName the name of the map registry
3445
*/
3546
public WriteSafeRegistry(Vertx vertx, String registryName) {
36-
this.registryName = registryName;
37-
this.sharedDataAccessor = new SharedDataAccessor(vertx, this.getClass());
47+
this(registryName, new SharedDataAccessor(vertx, WriteSafeRegistry.class));
3848
}
3949

4050
/**

src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,32 @@ public class ClusterEntityRegistry implements Registry<String> {
4848

4949
private final WriteSafeRegistry<Object> entityRegistry;
5050

51+
/**
52+
* Create a new instance of {@link ClusterEntityRegistry}.
53+
*
54+
* @param vertx the {@link Vertx} instance
55+
* @param entityRegistry the entity registry
56+
* @param clusteringInformation the clustering information registry
57+
*/
58+
protected ClusterEntityRegistry(
59+
Vertx vertx,
60+
WriteSafeRegistry<Object> entityRegistry,
61+
WriteSafeRegistry<JsonObject> clusteringInformation) {
62+
this.entityRegistry = entityRegistry;
63+
this.clusteringInformation = clusteringInformation;
64+
this.vertx = vertx;
65+
}
66+
5167
/**
5268
* Create a new instance of {@link ClusterEntityRegistry}.
5369
*
5470
* @param vertx the {@link Vertx} instance
5571
* @param registryName the name of the map registry
5672
*/
5773
public ClusterEntityRegistry(Vertx vertx, String registryName) {
58-
this.entityRegistry = new WriteSafeRegistry<>(vertx, registryName);
59-
this.clusteringInformation = new WriteSafeRegistry<>(vertx, registryName + "#ClusteringInformation");
60-
this.vertx = vertx;
74+
this(vertx,
75+
new WriteSafeRegistry<>(vertx, registryName),
76+
new WriteSafeRegistry<>(vertx, registryName + "#ClusteringInformation"));
6177
}
6278

6379
@VisibleForTesting

0 commit comments

Comments
 (0)