Skip to content

Commit c796eda

Browse files
author
Udo Kohlmeyer
committed
DATAGEODE-236 - Adding RegionEntry event processing and more testing
1 parent d31e6f9 commit c796eda

File tree

6 files changed

+389
-364
lines changed

6 files changed

+389
-364
lines changed

src/main/java/org/springframework/data/gemfire/config/annotation/AsCacheListener.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,18 @@
3636
public @interface AsCacheListener {
3737

3838
/**
39-
* An array of {@link CacheListenerEventType} that control what events need to be observed
39+
* An array of {@link CacheListenerEventType} that control what region CRUD events need to be observed
40+
* {@link CacheListenerEventType} and {@link RegionCacheListenerEventType} cannot be set on the same method. As they
41+
* are mutually exclusive and require that the implementing method uses {@link org.apache.geode.cache.RegionEvent} or
42+
* {@link org.apache.geode.cache.EntryEvent}
4043
*/
4144
CacheListenerEventType[] eventTypes() default {};
4245

4346
/**
44-
* An array of {@link CacheListenerEventType} that control what events need to be observed
47+
* An array of {@link RegionCacheListenerEventType} that control what region events need to be observed
48+
* {@link CacheListenerEventType} and {@link RegionCacheListenerEventType} cannot be set on the same method. As they
49+
* are mutually exclusive and require that the implementing method uses {@link org.apache.geode.cache.RegionEvent} or
50+
* {@link org.apache.geode.cache.EntryEvent}
4551
*/
4652
RegionCacheListenerEventType[] regionEventTypes() default {};
4753

src/main/java/org/springframework/data/gemfire/config/support/CacheListenerPostProcessor.java

+60-45
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,23 @@
1414
* limitations under the License.
1515
*
1616
*/
17+
1718
package org.springframework.data.gemfire.config.support;
1819

20+
import static java.util.Arrays.*;
21+
import static org.springframework.data.gemfire.util.ArrayUtils.*;
22+
import static org.springframework.data.gemfire.util.RuntimeExceptionFactory.*;
23+
24+
import java.lang.annotation.Annotation;
25+
import java.lang.reflect.Method;
26+
import java.lang.reflect.Modifier;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.Optional;
30+
31+
import org.apache.geode.cache.EntryEvent;
1932
import org.apache.geode.cache.Region;
33+
import org.apache.geode.cache.RegionEvent;
2034
import org.springframework.beans.BeansException;
2135
import org.springframework.beans.factory.BeanFactory;
2236
import org.springframework.beans.factory.BeanFactoryAware;
@@ -34,17 +48,9 @@
3448
import org.springframework.util.ObjectUtils;
3549
import org.springframework.util.ReflectionUtils;
3650

37-
import java.lang.annotation.Annotation;
38-
import java.lang.reflect.Method;
39-
import java.lang.reflect.Modifier;
40-
import java.util.ArrayList;
41-
import java.util.List;
42-
import java.util.Optional;
43-
44-
import static java.util.Arrays.*;
45-
import static org.springframework.data.gemfire.util.ArrayUtils.*;
46-
import static org.springframework.data.gemfire.util.RuntimeExceptionFactory.*;
47-
51+
/**
52+
*
53+
*/
4854
@Configuration public class CacheListenerPostProcessor implements BeanPostProcessor, BeanFactoryAware {
4955

5056
private ConfigurableListableBeanFactory beanFactory;
@@ -65,44 +71,52 @@ private void registerAnyDeclaredCacheListenerAnnotatedMethods(Object bean) {
6571
if (cacheListenerAnnotation != null) {
6672

6773
Assert.isTrue(Modifier.isPublic(method.getModifiers()), String
68-
.format("The bean [%s] method [%s] annotated with [%s] must be public", bean.getClass().getName(),
69-
method.getName(), AsCacheListener.class.getName()));
74+
.format("The bean [%s] method [%s] annotated with [%s] must be public", bean.getClass().getName(),
75+
method.getName(), AsCacheListener.class.getName()));
7076

7177
AnnotationAttributes cacheListenerAttributes = resolveAnnotationAttributes(cacheListenerAnnotation);
7278

73-
CacheListenerEventType[] eventTypes = (CacheListenerEventType[]) cacheListenerAttributes.get("eventTypes");
79+
CacheListenerEventType[] eventTypes = (CacheListenerEventType[]) cacheListenerAttributes
80+
.get("eventTypes");
7481
RegionCacheListenerEventType[] regionEventTypes = (RegionCacheListenerEventType[]) cacheListenerAttributes
75-
.get("regionEventTypes");
82+
.get("regionEventTypes");
7683

7784
if (regionEventTypes.length > 0 && eventTypes.length > 0) {
7885
throw new IllegalArgumentException(String
79-
.format("Populating both event and regionEvent types is not allowed for bean [%s] method [%s]",
80-
bean.getClass().getName(), method.getName()));
86+
.format("Populating both event and regionEvent types is not allowed for bean [%s] method [%s]",
87+
bean.getClass().getName(), method.getName()));
8188
}
8289

83-
String[] regions = getRegionsForEventRegistration(cacheListenerAttributes.getStringArray("regions"),
84-
getBeanFactory());
85-
86-
if (eventTypes.length > 0) {
87-
registerCacheListenerEvents(bean, method, regions, eventTypes);
88-
}
89-
if (regionEventTypes.length > 0) {
90-
registerRegionCacheListenerEvents(bean, method, regions, regionEventTypes);
91-
}
90+
registerEventHandlerToRegion(bean, method, cacheListenerAttributes, eventTypes, regionEventTypes);
9291
}
9392
});
9493
}
9594

95+
private void registerEventHandlerToRegion(Object bean, Method method, AnnotationAttributes cacheListenerAttributes,
96+
CacheListenerEventType[] eventTypes, RegionCacheListenerEventType[] regionEventTypes) {
97+
String[] regions = getRegionsForEventRegistration(cacheListenerAttributes.getStringArray("regions"),
98+
getBeanFactory());
99+
100+
if (eventTypes.length > 0) {
101+
EventProcessorUtils.validateCacheListenerMethodParameters(method,EntryEvent.class);
102+
registerCacheListenerEvents(bean, method, regions, eventTypes);
103+
}
104+
if (regionEventTypes.length > 0) {
105+
EventProcessorUtils.validateCacheListenerMethodParameters(method, RegionEvent.class);
106+
registerRegionCacheListenerEvents(bean, method, regions, regionEventTypes);
107+
}
108+
}
109+
96110
private void registerCacheListenerEvents(Object bean, Method method, String[] regions,
97-
CacheListenerEventType[] eventTypes) {
111+
CacheListenerEventType[] eventTypes) {
98112

99113
EventProcessorUtils.registerCacheListenerOntoRegions(regions, method, bean, eventTypes, getBeanFactory());
100114
}
101115

102116
private void registerRegionCacheListenerEvents(Object bean, Method method, String[] regions,
103-
RegionCacheListenerEventType[] regionEventTypes) {
117+
RegionCacheListenerEventType[] regionEventTypes) {
104118
EventProcessorUtils
105-
.registerCacheListenerForRegionEventsOntoRegions(regions, method, bean, regionEventTypes, getBeanFactory());
119+
.registerCacheListenerForRegionEventsOntoRegions(regions, method, bean, regionEventTypes, getBeanFactory());
106120
}
107121

108122
private String[] getRegionsForEventRegistration(String[] regions, ConfigurableListableBeanFactory beanFactory) {
@@ -118,7 +132,8 @@ private String[] getRegionsForEventRegistration(String[] regions, ConfigurableLi
118132
String[] tempRegions = new String[regionNames.size()];
119133
regionNames.toArray(tempRegions);
120134
return tempRegions;
121-
} else {
135+
}
136+
else {
122137
return regions;
123138
}
124139
}
@@ -128,33 +143,33 @@ private AnnotationAttributes resolveAnnotationAttributes(Annotation annotation)
128143
return AnnotationAttributes.fromMap(AnnotationUtils.getAnnotationAttributes(annotation, false, true));
129144
}
130145

146+
/**
147+
* Returns a reference to the containing Spring {@link BeanFactory}.
148+
*
149+
* @return a reference to the containing Spring {@link BeanFactory}.
150+
* @throws IllegalStateException if the {@link BeanFactory} was not configured.
151+
* @see org.springframework.beans.factory.BeanFactory
152+
*/
153+
protected ConfigurableListableBeanFactory getBeanFactory() {
154+
return Optional.ofNullable(this.beanFactory)
155+
.orElseThrow(() -> newIllegalStateException("BeanFactory was not properly configured"));
156+
}
157+
131158
/**
132159
* Sets a reference to the configured Spring {@link BeanFactory}.
133160
*
134161
* @param beanFactory configured Spring {@link BeanFactory}.
135162
* @throws IllegalArgumentException if the given {@link BeanFactory} is not an instance of
136-
* {@link ConfigurableListableBeanFactory}.
163+
* {@link ConfigurableListableBeanFactory}.
137164
* @see org.springframework.beans.factory.BeanFactoryAware
138165
* @see org.springframework.beans.factory.BeanFactory
139166
*/
140167
@Override @SuppressWarnings("all") public final void setBeanFactory(BeanFactory beanFactory) throws BeansException {
141168

142169
Assert.isInstanceOf(ConfigurableListableBeanFactory.class, beanFactory, String
143-
.format("BeanFactory [%1$s] must be an instance of %2$s", ObjectUtils.nullSafeClassName(beanFactory),
144-
ConfigurableListableBeanFactory.class.getSimpleName()));
170+
.format("BeanFactory [%1$s] must be an instance of %2$s", ObjectUtils.nullSafeClassName(beanFactory),
171+
ConfigurableListableBeanFactory.class.getSimpleName()));
145172

146173
this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
147174
}
148-
149-
/**
150-
* Returns a reference to the containing Spring {@link BeanFactory}.
151-
*
152-
* @return a reference to the containing Spring {@link BeanFactory}.
153-
* @throws IllegalStateException if the {@link BeanFactory} was not configured.
154-
* @see org.springframework.beans.factory.BeanFactory
155-
*/
156-
protected ConfigurableListableBeanFactory getBeanFactory() {
157-
return Optional.ofNullable(this.beanFactory)
158-
.orElseThrow(() -> newIllegalStateException("BeanFactory was not properly configured"));
159-
}
160175
}

src/main/java/org/springframework/data/gemfire/eventing/EventProcessorUtils.java

+26-13
Original file line numberDiff line numberDiff line change
@@ -14,48 +14,61 @@
1414
* limitations under the License.
1515
*
1616
*/
17+
1718
package org.springframework.data.gemfire.eventing;
1819

20+
import static java.util.Arrays.*;
21+
22+
import java.lang.reflect.Method;
23+
import java.util.Optional;
24+
1925
import org.apache.geode.cache.CacheListener;
26+
import org.apache.geode.cache.EntryEvent;
2027
import org.apache.geode.cache.Region;
21-
import org.slf4j.Logger;
22-
import org.slf4j.LoggerFactory;
2328
import org.springframework.beans.factory.BeanFactory;
2429
import org.springframework.data.gemfire.eventing.config.CacheListenerEventType;
2530
import org.springframework.data.gemfire.eventing.config.PojoCacheListenerWrapper;
2631
import org.springframework.data.gemfire.eventing.config.PojoRegionEventCacheListenerWrapper;
2732
import org.springframework.data.gemfire.eventing.config.RegionCacheListenerEventType;
2833

29-
import java.lang.reflect.Method;
30-
import java.util.Optional;
31-
32-
import static java.util.Arrays.*;
33-
3434
public class EventProcessorUtils {
3535

36-
private static transient Logger logger = LoggerFactory.getLogger(EventProcessorUtils.class);
37-
3836
public static void registerCacheListenerOntoRegions(String[] regions, Method method, Object bean,
39-
CacheListenerEventType[] eventTypes, BeanFactory beanFactory) {
37+
CacheListenerEventType[] eventTypes, BeanFactory beanFactory) {
4038
PojoCacheListenerWrapper cacheListenerWrapper = new PojoCacheListenerWrapper(method, bean, eventTypes);
4139

4240
registerCacheListenerToRegions(regions, beanFactory, cacheListenerWrapper);
4341
}
4442

4543
public static void registerCacheListenerForRegionEventsOntoRegions(String[] regions, Method method, Object bean,
46-
RegionCacheListenerEventType[] regionEventTypes, BeanFactory beanFactory) {
44+
RegionCacheListenerEventType[] regionEventTypes, BeanFactory beanFactory) {
4745
PojoRegionEventCacheListenerWrapper regionEventCacheListenerWrapper = new PojoRegionEventCacheListenerWrapper(
48-
method, bean, regionEventTypes);
46+
method, bean, regionEventTypes);
4947

5048
registerCacheListenerToRegions(regions, beanFactory, regionEventCacheListenerWrapper);
5149
}
5250

5351
private static void registerCacheListenerToRegions(String[] regions, BeanFactory beanFactory,
54-
CacheListener cacheListener) {
52+
CacheListener cacheListener) {
5553
stream(regions).forEach(regionName -> {
5654
Optional<Region> regionBeanOptional = Optional.of(beanFactory.getBean(regionName, Region.class));
5755

5856
regionBeanOptional.ifPresent(region -> region.getAttributesMutator().addCacheListener(cacheListener));
5957
});
6058
}
59+
60+
public static void validateCacheListenerMethodParameters(Method method, Class requireParameterType) {
61+
Class<?>[] parameterTypes = method.getParameterTypes();
62+
if (parameterTypes.length != 1) {
63+
throw new IllegalArgumentException(String
64+
.format("CacheListener method: %s does not currently support more than one parameter",
65+
method.getName()));
66+
}
67+
if (!parameterTypes[0].isAssignableFrom(EntryEvent.class)) {
68+
throw new IllegalArgumentException(String
69+
.format("CacheListener method: %s requires an %s parameter type", method.getName(),
70+
requireParameterType.getName()));
71+
}
72+
}
73+
6174
}

0 commit comments

Comments
 (0)