Skip to content

Commit 2d337d9

Browse files
committed
Add WriteRequest abstraction
Allows to properly handle the first send header request. Fixes #11
1 parent 0386b4e commit 2d337d9

File tree

8 files changed

+126
-36
lines changed

8 files changed

+126
-36
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client.impl.nio;
17+
18+
import com.rabbitmq.client.impl.Frame;
19+
20+
import java.io.IOException;
21+
import java.nio.ByteBuffer;
22+
import java.nio.channels.WritableByteChannel;
23+
24+
/**
25+
*
26+
*/
27+
public class FrameWriteRequest implements WriteRequest {
28+
29+
private final Frame frame;
30+
31+
public FrameWriteRequest(Frame frame) {
32+
this.frame = frame;
33+
}
34+
35+
@Override
36+
public void handle(WritableByteChannel writableChannel, ByteBuffer buffer) throws IOException {
37+
frame.writeTo(writableChannel, buffer);
38+
}
39+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client.impl.nio;
17+
18+
import com.rabbitmq.client.AMQP;
19+
20+
import java.io.IOException;
21+
import java.nio.ByteBuffer;
22+
import java.nio.channels.WritableByteChannel;
23+
24+
/**
25+
*
26+
*/
27+
public class HeaderWriteRequest implements WriteRequest {
28+
29+
@Override
30+
public void handle(WritableByteChannel writableChannel, ByteBuffer buffer) throws IOException {
31+
buffer.put("AMQP".getBytes("US-ASCII"));
32+
buffer.put((byte) 0);
33+
buffer.put((byte) AMQP.PROTOCOL.MAJOR);
34+
buffer.put((byte) AMQP.PROTOCOL.MINOR);
35+
buffer.put((byte) AMQP.PROTOCOL.REVISION);
36+
buffer.flip();
37+
while(buffer.hasRemaining() && writableChannel.write(buffer) != -1);
38+
buffer.clear();
39+
}
40+
}

src/main/java/com/rabbitmq/client/impl/nio/ReadLoop.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public ReadLoop(NioParams nioParams, NioLoopsState readSelectorState) {
4444
public void run() {
4545
final SelectorHolder selectorState = state.readSelectorState;
4646
final Selector selector = selectorState.selector;
47-
Set<SocketChannelRegistration> registrations = selectorState.statesToBeRegistered;
47+
Set<SocketChannelRegistration> registrations = selectorState.registrations;
4848
// FIXME find a better default?
4949
ByteBuffer buffer = ByteBuffer.allocate(nioParams.getReadByteBufferSize());
5050
try {

src/main/java/com/rabbitmq/client/impl/nio/SelectorHolder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@ public class SelectorHolder {
2727

2828
final Selector selector;
2929

30-
final Set<SocketChannelRegistration> statesToBeRegistered = Collections
30+
final Set<SocketChannelRegistration> registrations = Collections
3131
.newSetFromMap(new ConcurrentHashMap<SocketChannelRegistration, Boolean>());
3232

3333
SelectorHolder(Selector selector) {
3434
this.selector = selector;
3535
}
3636

3737
public void registerFrameHandlerState(SocketChannelFrameHandlerState state, int operations) {
38-
statesToBeRegistered.add(new SocketChannelRegistration(state, operations));
38+
registrations.add(new SocketChannelRegistration(state, operations));
3939
selector.wakeup();
4040
}
4141
}

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public int getTimeout() throws SocketException {
7070

7171
@Override
7272
public void sendHeader() throws IOException {
73-
state.setSendHeader(true);
73+
state.sendHeader();
7474
}
7575

7676
@Override

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,10 @@ public class SocketChannelFrameHandlerState {
3737

3838
private final SocketChannel channel;
3939

40-
private final BlockingQueue<Frame> writeQueue;
40+
private final BlockingQueue<WriteRequest> writeQueue;
4141

4242
private volatile AMQConnection connection;
4343

44-
private volatile boolean sendHeader = false;
45-
4644
/** should be used only in the NIO read thread */
4745
private long lastActivity;
4846

@@ -57,32 +55,29 @@ public SocketChannelFrameHandlerState(SocketChannel channel, SelectorHolder read
5755
this.channel = channel;
5856
this.readSelectorState = readSelectorState;
5957
this.writeSelectorState = writeSelectorState;
60-
this.writeQueue = new ArrayBlockingQueue<Frame>(nioParams.getWriteQueueCapacity(), true);
58+
this.writeQueue = new ArrayBlockingQueue<WriteRequest>(nioParams.getWriteQueueCapacity(), true);
6159
this.writeEnqueuingTimeoutInMs = nioParams.getWriteEnqueuingTimeoutInMs();
6260
}
6361

6462
public SocketChannel getChannel() {
6563
return channel;
6664
}
6765

68-
public Queue<Frame> getWriteQueue() {
66+
public Queue<WriteRequest> getWriteQueue() {
6967
return writeQueue;
7068
}
7169

72-
public boolean isSendHeader() {
73-
return sendHeader;
70+
public void sendHeader() throws IOException {
71+
sendWriteRequest(new HeaderWriteRequest());
7472
}
7573

76-
public void setSendHeader(boolean sendHeader) {
77-
this.sendHeader = sendHeader;
78-
if(sendHeader) {
79-
this.writeSelectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE);
80-
}
74+
public void write(Frame frame) throws IOException {
75+
sendWriteRequest(new FrameWriteRequest(frame));
8176
}
8277

83-
public void write(Frame frame) throws IOException {
78+
private void sendWriteRequest(WriteRequest writeRequest) throws IOException {
8479
try {
85-
boolean offered = this.writeQueue.offer(frame, writeEnqueuingTimeoutInMs, TimeUnit.MILLISECONDS);
80+
boolean offered = this.writeQueue.offer(writeRequest, writeEnqueuingTimeoutInMs, TimeUnit.MILLISECONDS);
8681
if(offered) {
8782
this.writeSelectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE);
8883
} else {

src/main/java/com/rabbitmq/client/impl/nio/WriteLoop.java

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
package com.rabbitmq.client.impl.nio;
1717

18-
import com.rabbitmq.client.AMQP;
1918
import com.rabbitmq.client.impl.Frame;
2019
import org.slf4j.Logger;
2120
import org.slf4j.LoggerFactory;
@@ -50,7 +49,7 @@ public void run() {
5049
try {
5150
while(true && !Thread.currentThread().isInterrupted()) {
5251
int select;
53-
if(state.statesToBeRegistered.isEmpty()) {
52+
if(state.registrations.isEmpty()) {
5453
// we can block, registration will call Selector.wakeup()
5554
select = selector.select();
5655
} else {
@@ -61,7 +60,7 @@ public void run() {
6160
// registrations should be done after select,
6261
// once the cancelled keys have been actually removed
6362
SocketChannelRegistration registration;
64-
Iterator<SocketChannelRegistration> registrationIterator = state.statesToBeRegistered.iterator();
63+
Iterator<SocketChannelRegistration> registrationIterator = state.registrations.iterator();
6564
while(registrationIterator.hasNext()) {
6665
registration = registrationIterator.next();
6766
registrationIterator.remove();
@@ -86,23 +85,11 @@ public void run() {
8685
boolean cancelKey = true;
8786
try {
8887
int toBeWritten = state.getWriteQueue().size();
89-
// FIXME property handle header sending request
90-
if(state.isSendHeader()) {
91-
buffer.put("AMQP".getBytes("US-ASCII"));
92-
buffer.put((byte) 0);
93-
buffer.put((byte) AMQP.PROTOCOL.MAJOR);
94-
buffer.put((byte) AMQP.PROTOCOL.MINOR);
95-
buffer.put((byte) AMQP.PROTOCOL.REVISION);
96-
buffer.flip();
97-
while(buffer.hasRemaining() && channel.write(buffer) != -1);
98-
buffer.clear();
99-
state.setSendHeader(false);
100-
}
10188

10289
int written = 0;
103-
Frame frame;
104-
while(written <= toBeWritten && (frame = state.getWriteQueue().poll()) != null) {
105-
frame.writeTo(channel, buffer);
90+
WriteRequest request;
91+
while(written <= toBeWritten && (request = state.getWriteQueue().poll()) != null) {
92+
request.handle(channel, buffer);
10693
written++;
10794
}
10895
Frame.drain(channel, buffer);
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client.impl.nio;
17+
18+
import java.io.IOException;
19+
import java.nio.ByteBuffer;
20+
import java.nio.channels.WritableByteChannel;
21+
22+
/**
23+
*
24+
*/
25+
public interface WriteRequest {
26+
27+
void handle(WritableByteChannel writableChannel, ByteBuffer buffer) throws IOException;
28+
29+
}

0 commit comments

Comments
 (0)