mirror of https://github.com/apache/activemq.git
added a default implementation of BufferPool and a default replay strategy
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383918 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fcd06b2077
commit
1c5ecb36d3
|
@ -20,6 +20,7 @@ import org.apache.activemq.Service;
|
|||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.openwire.BooleanStream;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.activemq.transport.udp.replay.DatagramReplayStrategy;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
@ -39,35 +40,36 @@ import java.nio.channels.Channels;
|
|||
public class CommandChannel implements Service {
|
||||
|
||||
private static final Log log = LogFactory.getLog(CommandChannel.class);
|
||||
|
||||
|
||||
private ByteChannel channel;
|
||||
private OpenWireFormat wireFormat;
|
||||
private ByteBufferPool bufferPool;
|
||||
private int datagramSize = 4 * 1024;
|
||||
private DatagramReplayStrategy replayStrategy;
|
||||
private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller();
|
||||
|
||||
|
||||
// reading
|
||||
private ByteBuffer readBuffer;
|
||||
private DataInputStream dataIn;
|
||||
private CommandReadBuffer readStack;
|
||||
|
||||
|
||||
// writing
|
||||
private ByteBuffer writeBuffer;
|
||||
private BooleanStream bs = new BooleanStream();
|
||||
private BooleanStream bs = new BooleanStream();
|
||||
private DataOutputStream dataOut;
|
||||
private int largeMessageBufferSize = 128 * 1024;
|
||||
private DatagramHeader header = new DatagramHeader();
|
||||
|
||||
|
||||
public CommandChannel(ByteChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize) {
|
||||
public CommandChannel(ByteChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, DatagramReplayStrategy replayStrategy) {
|
||||
this.channel = channel;
|
||||
this.wireFormat = wireFormat;
|
||||
this.bufferPool = bufferPool;
|
||||
this.datagramSize = datagramSize;
|
||||
this.replayStrategy = replayStrategy;
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
readStack = new CommandReadBuffer(wireFormat);
|
||||
readStack = new CommandReadBuffer(wireFormat, replayStrategy);
|
||||
bufferPool.setDefaultSize(datagramSize);
|
||||
bufferPool.start();
|
||||
readBuffer = bufferPool.borrowBuffer();
|
||||
|
@ -79,7 +81,7 @@ public class CommandChannel implements Service {
|
|||
public void stop() throws Exception {
|
||||
bufferPool.stop();
|
||||
}
|
||||
|
||||
|
||||
public synchronized Command read() throws IOException {
|
||||
readBuffer.clear();
|
||||
int read = channel.read(readBuffer);
|
||||
|
@ -109,10 +111,11 @@ public class CommandChannel implements Service {
|
|||
public synchronized void write(Command command) throws IOException {
|
||||
header.incrementCounter();
|
||||
int size = wireFormat.tightMarshalNestedObject1(command, bs);
|
||||
if (size < datagramSize ) {
|
||||
if (size < datagramSize) {
|
||||
header.setPartial(false);
|
||||
header.setDataSize(size);
|
||||
writeBuffer.rewind();
|
||||
headerMarshaller.writeHeader(header, writeBuffer);
|
||||
wireFormat.marshal(command, dataOut);
|
||||
dataOut.flush();
|
||||
channel.write(writeBuffer);
|
||||
|
@ -120,15 +123,15 @@ public class CommandChannel implements Service {
|
|||
else {
|
||||
header.setPartial(true);
|
||||
header.setComplete(false);
|
||||
|
||||
|
||||
// lets split the command up into chunks
|
||||
ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
|
||||
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
|
||||
|
||||
|
||||
byte[] data = largeBuffer.toByteArray();
|
||||
int offset = 0;
|
||||
boolean lastFragment = false;
|
||||
for (int fragment = 0, length = data.length; !lastFragment; fragment++ ) {
|
||||
for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
|
||||
// write the header
|
||||
writeBuffer.rewind();
|
||||
int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
|
||||
|
@ -144,7 +147,7 @@ public class CommandChannel implements Service {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Properties
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
|
@ -153,7 +156,7 @@ public class CommandChannel implements Service {
|
|||
}
|
||||
|
||||
/**
|
||||
* Sets the default size of a datagram on the network.
|
||||
* Sets the default size of a datagram on the network.
|
||||
*/
|
||||
public void setDatagramSize(int datagramSize) {
|
||||
this.datagramSize = datagramSize;
|
||||
|
@ -170,4 +173,12 @@ public class CommandChannel implements Service {
|
|||
this.bufferPool = bufferPool;
|
||||
}
|
||||
|
||||
public DatagramHeaderMarshaller getHeaderMarshaller() {
|
||||
return headerMarshaller;
|
||||
}
|
||||
|
||||
public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
|
||||
this.headerMarshaller = headerMarshaller;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,6 +18,9 @@ package org.apache.activemq.transport.udp;
|
|||
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.activemq.transport.udp.replay.DatagramReplayStrategy;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
@ -33,20 +36,32 @@ import java.util.TreeSet;
|
|||
* @version $Revision$
|
||||
*/
|
||||
public class CommandReadBuffer {
|
||||
private static final Log log = LogFactory.getLog(CommandReadBuffer.class);
|
||||
|
||||
private OpenWireFormat wireFormat;
|
||||
private DatagramReplayStrategy replayStrategy;
|
||||
private SortedSet headers = new TreeSet();
|
||||
private int expectedCounter;
|
||||
private long expectedCounter;
|
||||
private ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
|
||||
public CommandReadBuffer(OpenWireFormat wireFormat) {
|
||||
public CommandReadBuffer(OpenWireFormat wireFormat, DatagramReplayStrategy replayStrategy) {
|
||||
this.wireFormat = wireFormat;
|
||||
this.replayStrategy = replayStrategy;
|
||||
}
|
||||
|
||||
|
||||
public Command read(DatagramHeader header) throws IOException {
|
||||
if (expectedCounter != header.getCounter()) {
|
||||
// lets add it to the list for later on
|
||||
headers.add(header);
|
||||
long actualCounter = header.getCounter();
|
||||
if (expectedCounter != actualCounter) {
|
||||
if (actualCounter < expectedCounter) {
|
||||
log.warn("Ignoring out of step packet: " + header);
|
||||
}
|
||||
else {
|
||||
replayStrategy.onDroppedPackets(expectedCounter, actualCounter);
|
||||
|
||||
// lets add it to the list for later on
|
||||
headers.add(header);
|
||||
}
|
||||
|
||||
// lets see if the first item in the set is the next header
|
||||
header = (DatagramHeader) headers.first();
|
||||
|
@ -56,6 +71,7 @@ public class CommandReadBuffer {
|
|||
}
|
||||
|
||||
// we've got a valid header so increment counter
|
||||
replayStrategy.onReceivedPacket(expectedCounter);
|
||||
expectedCounter++;
|
||||
|
||||
Command answer = null;
|
||||
|
@ -75,7 +91,6 @@ public class CommandReadBuffer {
|
|||
}
|
||||
}
|
||||
return answer;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.udp;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A default implementation of {@link BufferPool} which keeps a pool of direct
|
||||
* byte buffers.
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class DefaultBufferPool implements ByteBufferPool {
|
||||
|
||||
private int defaultSize;
|
||||
private List buffers = new ArrayList();
|
||||
private Object lock = new Object();
|
||||
|
||||
public synchronized ByteBuffer borrowBuffer() {
|
||||
synchronized (lock) {
|
||||
int size = buffers.size();
|
||||
if (size > 0) {
|
||||
return (ByteBuffer) buffers.remove(size - 1);
|
||||
}
|
||||
}
|
||||
return ByteBuffer.allocateDirect(defaultSize);
|
||||
}
|
||||
|
||||
public synchronized void returnBuffer(ByteBuffer buffer) {
|
||||
synchronized (lock) {
|
||||
buffers.add(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
public void setDefaultSize(int defaultSize) {
|
||||
this.defaultSize = defaultSize;
|
||||
}
|
||||
|
||||
public synchronized void start() throws Exception {
|
||||
}
|
||||
|
||||
public synchronized void stop() throws Exception {
|
||||
synchronized (lock) {
|
||||
/*
|
||||
for (Iterator iter = buffers.iterator(); iter.hasNext();) {
|
||||
ByteBuffer buffer = (ByteBuffer) iter.next();
|
||||
}
|
||||
*/
|
||||
buffers.clear();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -21,6 +21,8 @@ import org.apache.activemq.command.Command;
|
|||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportThreadSupport;
|
||||
import org.apache.activemq.transport.udp.replay.DatagramReplayStrategy;
|
||||
import org.apache.activemq.transport.udp.replay.ExceptionIfDroppedPacketStrategy;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -45,6 +47,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
|||
private CommandChannel commandChannel;
|
||||
private OpenWireFormat wireFormat;
|
||||
private ByteBufferPool bufferPool;
|
||||
private DatagramReplayStrategy replayStrategy = new ExceptionIfDroppedPacketStrategy();
|
||||
private int datagramSize = 4 * 1024;
|
||||
private long maxInactivityDuration = 0; //30000;
|
||||
private InetSocketAddress socketAddress;
|
||||
|
@ -85,7 +88,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
|||
* reads packets from a Socket
|
||||
*/
|
||||
public void run() {
|
||||
log.trace("TCP consumer thread starting");
|
||||
log.trace("Consumer thread starting for: " + toString());
|
||||
while (!isClosed()) {
|
||||
try {
|
||||
Command command = commandChannel.read();
|
||||
|
@ -152,10 +155,21 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
|||
this.commandChannel = commandChannel;
|
||||
}
|
||||
|
||||
public DatagramReplayStrategy getReplayStrategy() {
|
||||
return replayStrategy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the strategy used to replay missed datagrams
|
||||
*/
|
||||
public void setReplayStrategy(DatagramReplayStrategy replayStrategy) {
|
||||
this.replayStrategy = replayStrategy;
|
||||
}
|
||||
|
||||
|
||||
// Implementation methods
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
|
||||
/**
|
||||
* Creates an address from the given URI
|
||||
*/
|
||||
|
@ -182,7 +196,10 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
|||
else if (channel == null) {
|
||||
throw new IllegalArgumentException("No channel configured");
|
||||
}
|
||||
commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize);
|
||||
if (bufferPool == null) {
|
||||
bufferPool = new DefaultBufferPool();
|
||||
}
|
||||
commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy);
|
||||
commandChannel.start();
|
||||
super.doStart();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.udp.replay;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A pluggable strategy for how to deal with dropped packets.
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public interface DatagramReplayStrategy {
|
||||
|
||||
void onDroppedPackets(long expectedCounter, long actualCounter) throws IOException;
|
||||
|
||||
void onReceivedPacket(long expectedCounter);
|
||||
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.udp.replay;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Throws an exception if packets are dropped causing the transport to be closed.
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class ExceptionIfDroppedPacketStrategy implements DatagramReplayStrategy {
|
||||
|
||||
public void onDroppedPackets(long expectedCounter, long actualCounter) throws IOException {
|
||||
long count = actualCounter - expectedCounter;
|
||||
throw new IOException("" + count + " packet(s) dropped. Expected: " + expectedCounter + " but was: " + actualCounter);
|
||||
}
|
||||
|
||||
public void onReceivedPacket(long expectedCounter) {
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue