Skip to content

Commit 9a2b75b

Browse files
garyrussellartembilan
authored andcommitted
Add simple Avro transformers
Unsophisticated Avro transformers for `SpecificRecord` implementations. * * Fix DSL Factory - transformer was changed to `<? extends GenericContainer>`; change the DSL to match * * Restore test log4j * * Revert to supporting only SpecificRecord * * Fix assert * * Add multi-type deserialization with fluent API * Polishing for PR comments * * Remove type mappings; Exxpression now returns the type (or class name) * Cache types created from class names * Add type header in "toAvro" transformer * * Remove the type cache (already handled by the class loader) * * Don't convert the class to String in the "toAvro" transformer
1 parent 9115644 commit 9a2b75b

File tree

12 files changed

+1097
-2
lines changed

12 files changed

+1097
-2
lines changed

build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ subprojects { subproject ->
101101
ext {
102102
activeMqVersion = '5.15.9'
103103
apacheSshdVersion = '2.2.0'
104+
avroVersion = '1.8.2'
104105
aspectjVersion = '1.9.4'
105106
assertjVersion = '3.12.2'
106107
assertkVersion = '0.17'
@@ -397,6 +398,7 @@ project('spring-integration-core') {
397398
compile("com.esotericsoftware:kryo-shaded:$kryoShadedVersion", optional)
398399
compile("io.micrometer:micrometer-core:$micrometerVersion", optional)
399400
compile("io.github.resilience4j:resilience4j-ratelimiter:$resilience4jVersion", optional)
401+
compile("org.apache.avro:avro:$avroVersion", optional)
400402

401403
testCompile ("org.aspectj:aspectjweaver:$aspectjVersion")
402404
testCompile "io.projectreactor:reactor-test:$reactorVersion"

spring-integration-core/src/main/java/org/springframework/integration/transformer/AbstractTransformer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public final Message<?> transform(Message<?> message) {
3535
return null;
3636
}
3737
return (result instanceof Message) ? (Message<?>) result
38-
: this.getMessageBuilderFactory().withPayload(result).copyHeaders(message.getHeaders()).build();
38+
: getMessageBuilderFactory().withPayload(result).copyHeaders(message.getHeaders()).build();
3939
}
4040
catch (MessageTransformationException e) { // NOSONAR - catch and throw
4141
throw e;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.transformer;
18+
19+
import java.io.IOException;
20+
import java.io.UncheckedIOException;
21+
22+
import org.apache.avro.io.DatumReader;
23+
import org.apache.avro.io.DecoderFactory;
24+
import org.apache.avro.specific.SpecificDatumReader;
25+
import org.apache.avro.specific.SpecificRecord;
26+
27+
import org.springframework.beans.factory.BeanClassLoaderAware;
28+
import org.springframework.expression.EvaluationContext;
29+
import org.springframework.expression.Expression;
30+
import org.springframework.integration.context.IntegrationContextUtils;
31+
import org.springframework.integration.expression.FunctionExpression;
32+
import org.springframework.integration.transformer.support.AvroHeaders;
33+
import org.springframework.messaging.Message;
34+
import org.springframework.util.Assert;
35+
import org.springframework.util.ClassUtils;
36+
37+
/**
38+
* An Apache Avro transformer to create generated {@link SpecificRecord} objects
39+
* from {@code byte[]}.
40+
*
41+
* @author Gary Russell
42+
* @since 5.2
43+
*
44+
*/
45+
public class SimpleFromAvroTransformer extends AbstractTransformer implements BeanClassLoaderAware {
46+
47+
private final Class<? extends SpecificRecord> defaultType;
48+
49+
private final DecoderFactory decoderFactory = new DecoderFactory();
50+
51+
private Expression typeIdExpression = new FunctionExpression<Message<?>>(
52+
msg -> msg.getHeaders().get(AvroHeaders.TYPE));
53+
54+
private EvaluationContext evaluationContext;
55+
56+
private ClassLoader beanClassLoader;
57+
58+
/**
59+
* Construct an instance with the supplied default type to create.
60+
* @param defaultType the type.
61+
*/
62+
public SimpleFromAvroTransformer(Class<? extends SpecificRecord> defaultType) {
63+
Assert.notNull(defaultType, "'defaultType' must not be null");
64+
this.defaultType = defaultType;
65+
}
66+
67+
@Override
68+
public void setBeanClassLoader(ClassLoader classLoader) {
69+
this.beanClassLoader = classLoader;
70+
}
71+
72+
/**
73+
* Set the expression to evaluate against the message to determine the type.
74+
* Default {@code headers['avro_type']}.
75+
* @param expression the expression.
76+
* @return the transformer
77+
*/
78+
public SimpleFromAvroTransformer typeExpression(Expression expression) {
79+
Assert.notNull(expression, "'expression' must not be null");
80+
this.typeIdExpression = expression;
81+
return this;
82+
}
83+
84+
/**
85+
* Set the expression to evaluate against the message to determine the type id.
86+
* Default {@code headers['avro_type']}.
87+
* @param expression the expression.
88+
* @return the transformer
89+
*/
90+
public SimpleFromAvroTransformer typeExpression(String expression) {
91+
Assert.notNull(expression, "'expression' must not be null");
92+
this.typeIdExpression = EXPRESSION_PARSER.parseExpression(expression);
93+
return this;
94+
}
95+
96+
/**
97+
* Set the expression to evaluate against the message to determine the type.
98+
* Default {@code headers['avro_type']}.
99+
* @param expression the expression.
100+
*/
101+
public void setTypeExpression(Expression expression) {
102+
Assert.notNull(expression, "'expression' must not be null");
103+
this.typeIdExpression = expression;
104+
}
105+
106+
/**
107+
* Set the expression to evaluate against the message to determine the type id.
108+
* Default {@code headers['avro_type']}.
109+
* @param expression the expression.
110+
*/
111+
public void setTypeExpression(String expression) {
112+
Assert.notNull(expression, "'expression' must not be null");
113+
this.typeIdExpression = EXPRESSION_PARSER.parseExpression(expression);
114+
}
115+
116+
@Override
117+
protected void onInit() {
118+
this.evaluationContext = IntegrationContextUtils.getEvaluationContext(getBeanFactory());
119+
}
120+
121+
@SuppressWarnings("unchecked")
122+
@Override
123+
protected Object doTransform(Message<?> message) {
124+
Assert.state(message.getPayload() instanceof byte[], "Payload must be a byte[]");
125+
Class<? extends SpecificRecord> type = null;
126+
Object value = this.typeIdExpression.getValue(this.evaluationContext, message);
127+
if (value instanceof Class) {
128+
type = (Class<? extends SpecificRecord>) value;
129+
}
130+
else if (value instanceof String) {
131+
try {
132+
type = (Class<? extends SpecificRecord>) ClassUtils.forName((String) value, this.beanClassLoader);
133+
}
134+
catch (ClassNotFoundException | LinkageError e) {
135+
throw new IllegalStateException(e);
136+
}
137+
}
138+
if (type == null) {
139+
type = this.defaultType;
140+
}
141+
DatumReader<?> reader = new SpecificDatumReader<>(type);
142+
try {
143+
return reader.read(null, this.decoderFactory.binaryDecoder((byte[]) message.getPayload(), null));
144+
}
145+
catch (IOException e) {
146+
throw new UncheckedIOException(e);
147+
}
148+
}
149+
150+
151+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.transformer;
18+
19+
import java.io.ByteArrayOutputStream;
20+
import java.io.IOException;
21+
import java.io.UncheckedIOException;
22+
23+
import org.apache.avro.io.BinaryEncoder;
24+
import org.apache.avro.io.DatumWriter;
25+
import org.apache.avro.io.EncoderFactory;
26+
import org.apache.avro.specific.SpecificDatumWriter;
27+
import org.apache.avro.specific.SpecificRecord;
28+
29+
import org.springframework.integration.transformer.support.AvroHeaders;
30+
import org.springframework.messaging.Message;
31+
import org.springframework.util.Assert;
32+
33+
/**
34+
* An Apache Avro transformer for generated {@link SpecificRecord} objects.
35+
*
36+
* @author Gary Russell
37+
* @since 5.2
38+
*
39+
*/
40+
public class SimpleToAvroTransformer extends AbstractTransformer {
41+
42+
private final EncoderFactory encoderFactory = new EncoderFactory();
43+
44+
@Override
45+
protected Object doTransform(Message<?> message) {
46+
Assert.state(message.getPayload() instanceof SpecificRecord,
47+
"Payload must be an implementation of 'SpecificRecord'");
48+
SpecificRecord specific = (SpecificRecord) message.getPayload();
49+
ByteArrayOutputStream out = new ByteArrayOutputStream();
50+
BinaryEncoder encoder = this.encoderFactory.directBinaryEncoder(out, null);
51+
DatumWriter<Object> writer = new SpecificDatumWriter<>(specific.getSchema());
52+
try {
53+
writer.write(specific, encoder);
54+
encoder.flush();
55+
}
56+
catch (IOException e) {
57+
throw new UncheckedIOException(e);
58+
}
59+
return getMessageBuilderFactory().withPayload(out.toByteArray())
60+
.copyHeaders(message.getHeaders())
61+
.setHeader(AvroHeaders.TYPE, specific.getClass())
62+
.build();
63+
}
64+
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.transformer.support;
18+
19+
/**
20+
* Pre-defined names and prefixes for Apache Avro related headers.
21+
*
22+
* @author Gary Russell
23+
* @since 5.2
24+
*/
25+
public final class AvroHeaders {
26+
27+
private AvroHeaders() {
28+
super();
29+
}
30+
31+
public static final String PREFIX = "avro";
32+
33+
/**
34+
* The {@code SpecificRecord} type.
35+
*/
36+
public static final String TYPE = PREFIX + "_type";
37+
38+
}

0 commit comments

Comments
 (0)