16
16
17
17
package org .springframework .integration .rsocket ;
18
18
19
- import java .nio .charset .StandardCharsets ;
20
- import java .util .Arrays ;
21
- import java .util .List ;
19
+ import java .util .Map ;
22
20
import java .util .concurrent .atomic .AtomicBoolean ;
23
21
import java .util .function .Function ;
24
22
28
26
import org .springframework .core .io .buffer .DataBufferFactory ;
29
27
import org .springframework .core .io .buffer .DataBufferUtils ;
30
28
import org .springframework .core .io .buffer .NettyDataBuffer ;
31
- import org .springframework .core .io .buffer .NettyDataBufferFactory ;
32
29
import org .springframework .lang .Nullable ;
33
30
import org .springframework .messaging .Message ;
34
31
import org .springframework .messaging .MessageHeaders ;
35
32
import org .springframework .messaging .ReactiveMessageHandler ;
36
33
import org .springframework .messaging .handler .DestinationPatternsMessageCondition ;
37
34
import org .springframework .messaging .handler .invocation .reactive .HandlerMethodReturnValueHandler ;
35
+ import org .springframework .messaging .rsocket .PayloadUtils ;
38
36
import org .springframework .messaging .rsocket .RSocketRequester ;
37
+ import org .springframework .messaging .rsocket .annotation .support .MetadataExtractor ;
39
38
import org .springframework .messaging .rsocket .annotation .support .RSocketPayloadReturnValueHandler ;
40
39
import org .springframework .messaging .rsocket .annotation .support .RSocketRequesterMethodArgumentResolver ;
41
40
import org .springframework .messaging .support .MessageBuilder ;
44
43
import org .springframework .util .MimeType ;
45
44
import org .springframework .util .RouteMatcher ;
46
45
47
- import io .netty .buffer .ByteBuf ;
48
46
import io .rsocket .AbstractRSocket ;
49
47
import io .rsocket .ConnectionSetupPayload ;
50
48
import io .rsocket .Payload ;
51
- import io .rsocket .metadata .CompositeMetadata ;
52
49
import reactor .core .publisher .Flux ;
53
50
import reactor .core .publisher .Mono ;
54
51
import reactor .core .publisher .MonoProcessor ;
@@ -72,11 +69,6 @@ class IntegrationRSocket extends AbstractRSocket {
72
69
73
70
static final MimeType COMPOSITE_METADATA = new MimeType ("message" , "x.rsocket.composite-metadata.v0" );
74
71
75
- static final MimeType ROUTING = new MimeType ("message" , "x.rsocket.routing.v0" );
76
-
77
- static final List <MimeType > METADATA_MIME_TYPES = Arrays .asList (COMPOSITE_METADATA , ROUTING );
78
-
79
-
80
72
private final ReactiveMessageHandler handler ;
81
73
82
74
private final RouteMatcher routeMatcher ;
@@ -89,24 +81,24 @@ class IntegrationRSocket extends AbstractRSocket {
89
81
90
82
private final MimeType metadataMimeType ;
91
83
84
+ private final MetadataExtractor metadataExtractor ;
85
+
92
86
IntegrationRSocket (ReactiveMessageHandler handler , RouteMatcher routeMatcher ,
93
87
RSocketRequester requester , MimeType dataMimeType , MimeType metadataMimeType ,
94
- DataBufferFactory bufferFactory ) {
88
+ MetadataExtractor metadataExtractor , DataBufferFactory bufferFactory ) {
95
89
96
90
Assert .notNull (handler , "'handler' is required" );
97
91
Assert .notNull (routeMatcher , "'routeMatcher' is required" );
98
92
Assert .notNull (requester , "'requester' is required" );
99
93
Assert .notNull (dataMimeType , "'dataMimeType' is required" );
100
94
Assert .notNull (metadataMimeType , "'metadataMimeType' is required" );
101
95
102
- Assert .isTrue (METADATA_MIME_TYPES .contains (metadataMimeType ),
103
- () -> "Unexpected metadatata mime type: '" + metadataMimeType + "'" );
104
-
105
96
this .handler = handler ;
106
97
this .routeMatcher = routeMatcher ;
107
98
this .requester = requester ;
108
99
this .dataMimeType = dataMimeType ;
109
100
this .metadataMimeType = metadataMimeType ;
101
+ this .metadataExtractor = metadataExtractor ;
110
102
this .bufferFactory = bufferFactory ;
111
103
}
112
104
@@ -163,8 +155,7 @@ public Mono<Void> metadataPush(Payload payload) {
163
155
164
156
165
157
private Mono <Void > handle (Payload payload ) {
166
- String destination = getDestination (payload );
167
- MessageHeaders headers = createHeaders (destination , null );
158
+ MessageHeaders headers = createHeaders (payload , null );
168
159
DataBuffer dataBuffer = retainDataAndReleasePayload (payload );
169
160
int refCount = refCount (dataBuffer );
170
161
Message <?> message = MessageBuilder .createMessage (dataBuffer , headers );
@@ -176,15 +167,9 @@ private Mono<Void> handle(Payload payload) {
176
167
});
177
168
}
178
169
179
- static int refCount (DataBuffer dataBuffer ) {
180
- return dataBuffer instanceof NettyDataBuffer ?
181
- ((NettyDataBuffer ) dataBuffer ).getNativeBuffer ().refCnt () : 1 ;
182
- }
183
-
184
170
private Flux <Payload > handleAndReply (Payload firstPayload , Flux <Payload > payloads ) {
185
171
MonoProcessor <Flux <Payload >> replyMono = MonoProcessor .create ();
186
- String destination = getDestination (firstPayload );
187
- MessageHeaders headers = createHeaders (destination , replyMono );
172
+ MessageHeaders headers = createHeaders (firstPayload , replyMono );
188
173
189
174
AtomicBoolean read = new AtomicBoolean ();
190
175
Flux <DataBuffer > buffers =
@@ -206,57 +191,48 @@ private Flux<Payload> handleAndReply(Payload firstPayload, Flux<Payload> payload
206
191
}
207
192
208
193
String getDestination (Payload payload ) {
209
- if (this .metadataMimeType .equals (COMPOSITE_METADATA )) {
210
- CompositeMetadata metadata = new CompositeMetadata (payload .metadata (), false );
211
- for (CompositeMetadata .Entry entry : metadata ) {
212
- String mimeType = entry .getMimeType ();
213
- if (ROUTING .toString ().equals (mimeType )) {
214
- return entry .getContent ().toString (StandardCharsets .UTF_8 );
215
- }
216
- }
217
- return "" ;
194
+ Map <String , Object > metadataValues = this .metadataExtractor .extract (payload , this .metadataMimeType );
195
+ Object routingKey = metadataValues .get (MetadataExtractor .ROUTE_KEY );
196
+ if (routingKey != null ) {
197
+ RouteMatcher .Route route = this .routeMatcher .parseRoute (routingKey .toString ());
198
+ return route .value ();
218
199
}
219
- else if ( this . metadataMimeType . equals ( ROUTING )) {
220
- return payload . getMetadataUtf8 () ;
200
+ else {
201
+ return "" ;
221
202
}
222
- // Should not happen (given constructor assertions)
223
- throw new IllegalArgumentException ("Unexpected metadata MimeType" );
224
203
}
225
204
226
205
private DataBuffer retainDataAndReleasePayload (Payload payload ) {
227
- return payloadToDataBuffer (payload , this .bufferFactory );
206
+ payload .retain ();
207
+ return PayloadUtils .retainDataAndReleasePayload (payload , this .bufferFactory );
228
208
}
229
209
230
- private MessageHeaders createHeaders (String destination , @ Nullable MonoProcessor <?> replyMono ) {
210
+ private MessageHeaders createHeaders (Payload payload , @ Nullable MonoProcessor <?> replyMono ) {
231
211
MessageHeaderAccessor headers = new MessageHeaderAccessor ();
232
212
headers .setLeaveMutable (true );
233
- RouteMatcher .Route route = this .routeMatcher .parseRoute (destination );
234
- headers .setHeader (DestinationPatternsMessageCondition .LOOKUP_DESTINATION_HEADER , route );
213
+
214
+ Map <String , Object > metadataValues = this .metadataExtractor .extract (payload , this .metadataMimeType );
215
+ metadataValues .putIfAbsent (MetadataExtractor .ROUTE_KEY , "" );
216
+ for (Map .Entry <String , Object > entry : metadataValues .entrySet ()) {
217
+ if (entry .getKey ().equals (MetadataExtractor .ROUTE_KEY )) {
218
+ RouteMatcher .Route route = this .routeMatcher .parseRoute ((String ) entry .getValue ());
219
+ headers .setHeader (DestinationPatternsMessageCondition .LOOKUP_DESTINATION_HEADER , route );
220
+ }
221
+ else {
222
+ headers .setHeader (entry .getKey (), entry .getValue ());
223
+ }
224
+ }
225
+
235
226
headers .setContentType (this .dataMimeType );
236
227
headers .setHeader (RSocketRequesterMethodArgumentResolver .RSOCKET_REQUESTER_HEADER , this .requester );
237
- if (replyMono != null ) {
238
- headers .setHeader (RSocketPayloadReturnValueHandler .RESPONSE_HEADER , replyMono );
239
- }
240
228
headers .setHeader (HandlerMethodReturnValueHandler .DATA_BUFFER_FACTORY_HEADER , this .bufferFactory );
229
+ headers .setHeader (RSocketPayloadReturnValueHandler .RESPONSE_HEADER , replyMono );
230
+
241
231
return headers .getMessageHeaders ();
242
232
}
243
233
244
- static DataBuffer payloadToDataBuffer (Payload payload , DataBufferFactory bufferFactory ) {
245
- payload .retain ();
246
- try {
247
- if (bufferFactory instanceof NettyDataBufferFactory ) {
248
- ByteBuf byteBuf = payload .sliceData ().retain ();
249
- return ((NettyDataBufferFactory ) bufferFactory ).wrap (byteBuf );
250
- }
251
- else {
252
- return bufferFactory .wrap (payload .getData ());
253
- }
254
- }
255
- finally {
256
- if (payload .refCnt () > 0 ) {
257
- payload .release ();
258
- }
259
- }
234
+ private static int refCount (DataBuffer dataBuffer ) {
235
+ return dataBuffer instanceof NettyDataBuffer ? ((NettyDataBuffer ) dataBuffer ).getNativeBuffer ().refCnt () : 1 ;
260
236
}
261
237
262
238
}
0 commit comments