mirror of https://github.com/apache/activemq.git
added test case demonstrating UDP transport working
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383981 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f8e1c0ebe5
commit
187f884b58
|
@ -24,13 +24,14 @@ import org.apache.activemq.transport.udp.replay.DatagramReplayStrategy;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ByteChannel;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
|
||||
/**
|
||||
* A strategy for reading datagrams and de-fragmenting them together.
|
||||
|
@ -41,113 +42,146 @@ public class CommandChannel implements Service {
|
|||
|
||||
private static final Log log = LogFactory.getLog(CommandChannel.class);
|
||||
|
||||
private ByteChannel channel;
|
||||
private DatagramChannel channel;
|
||||
private OpenWireFormat wireFormat;
|
||||
private ByteBufferPool bufferPool;
|
||||
private int datagramSize = 4 * 1024;
|
||||
private DatagramReplayStrategy replayStrategy;
|
||||
private SocketAddress targetAddress;
|
||||
private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller();
|
||||
|
||||
// reading
|
||||
private Object readLock = new Object();
|
||||
private ByteBuffer readBuffer;
|
||||
private DataInputStream dataIn;
|
||||
private CommandReadBuffer readStack;
|
||||
|
||||
// writing
|
||||
private Object writeLock = new Object();
|
||||
private ByteBuffer writeBuffer;
|
||||
private BooleanStream bs = new BooleanStream();
|
||||
private DataOutputStream dataOut;
|
||||
private int largeMessageBufferSize = 128 * 1024;
|
||||
private DatagramHeader header = new DatagramHeader();
|
||||
|
||||
public CommandChannel(ByteChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, DatagramReplayStrategy replayStrategy) {
|
||||
public CommandChannel(DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, DatagramReplayStrategy replayStrategy, SocketAddress targetAddress) {
|
||||
this.channel = channel;
|
||||
this.wireFormat = wireFormat;
|
||||
this.bufferPool = bufferPool;
|
||||
this.datagramSize = datagramSize;
|
||||
this.replayStrategy = replayStrategy;
|
||||
this.targetAddress = targetAddress;
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
//wireFormat.setPrefixPacketSize(false);
|
||||
wireFormat.setCacheEnabled(false);
|
||||
wireFormat.setTightEncodingEnabled(true);
|
||||
|
||||
readStack = new CommandReadBuffer(wireFormat, replayStrategy);
|
||||
bufferPool.setDefaultSize(datagramSize);
|
||||
bufferPool.start();
|
||||
readBuffer = bufferPool.borrowBuffer();
|
||||
writeBuffer = bufferPool.borrowBuffer();
|
||||
dataIn = new DataInputStream(Channels.newInputStream(channel));
|
||||
dataOut = new DataOutputStream(Channels.newOutputStream(channel));
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
bufferPool.stop();
|
||||
}
|
||||
|
||||
public synchronized Command read() throws IOException {
|
||||
readBuffer.clear();
|
||||
int read = channel.read(readBuffer);
|
||||
DatagramHeader header = headerMarshaller.readHeader(readBuffer);
|
||||
public Command read() throws IOException {
|
||||
synchronized (readLock) {
|
||||
readBuffer.clear();
|
||||
SocketAddress address = channel.receive(readBuffer);
|
||||
readBuffer.flip();
|
||||
|
||||
int remaining = readBuffer.remaining();
|
||||
int size = header.getDataSize();
|
||||
if (size > remaining) {
|
||||
throw new IOException("Invalid command size: " + size + " when there are only: " + remaining + " byte(s) remaining");
|
||||
}
|
||||
else if (size < remaining) {
|
||||
log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining);
|
||||
}
|
||||
if (header.isPartial()) {
|
||||
byte[] data = new byte[size];
|
||||
readBuffer.get(data);
|
||||
header.setPartialData(data);
|
||||
}
|
||||
else {
|
||||
Command command = (Command) wireFormat.unmarshal(dataIn);
|
||||
header.setCommand(command);
|
||||
}
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Read a datagram from: " + address);
|
||||
}
|
||||
DatagramHeader header = headerMarshaller.readHeader(readBuffer);
|
||||
|
||||
return readStack.read(header);
|
||||
int remaining = readBuffer.remaining();
|
||||
int size = header.getDataSize();
|
||||
if (size > remaining) {
|
||||
throw new IOException("Invalid command size: " + size + " when there are only: " + remaining + " byte(s) remaining");
|
||||
}
|
||||
else if (size < remaining) {
|
||||
log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining);
|
||||
}
|
||||
if (header.isPartial()) {
|
||||
byte[] data = new byte[size];
|
||||
readBuffer.get(data);
|
||||
header.setPartialData(data);
|
||||
}
|
||||
else {
|
||||
byte[] data = new byte[size];
|
||||
readBuffer.get(data);
|
||||
|
||||
// TODO use a DataInput implementation that talks direct to the
|
||||
// ByteBuffer
|
||||
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
|
||||
Command command = (Command) wireFormat.doUnmarshal(dataIn);
|
||||
header.setCommand(command);
|
||||
}
|
||||
|
||||
return readStack.read(header);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void write(Command command) throws IOException {
|
||||
header.incrementCounter();
|
||||
int size = wireFormat.tightMarshalNestedObject1(command, bs);
|
||||
if (size < datagramSize) {
|
||||
header.setPartial(false);
|
||||
header.setDataSize(size);
|
||||
writeBuffer.rewind();
|
||||
headerMarshaller.writeHeader(header, writeBuffer);
|
||||
wireFormat.marshal(command, dataOut);
|
||||
dataOut.flush();
|
||||
channel.write(writeBuffer);
|
||||
}
|
||||
else {
|
||||
header.setPartial(true);
|
||||
header.setComplete(false);
|
||||
|
||||
// lets split the command up into chunks
|
||||
ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
|
||||
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
|
||||
|
||||
byte[] data = largeBuffer.toByteArray();
|
||||
int offset = 0;
|
||||
boolean lastFragment = false;
|
||||
for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
|
||||
// write the header
|
||||
writeBuffer.rewind();
|
||||
int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
|
||||
lastFragment = offset + chunkSize >= length;
|
||||
header.setDataSize(chunkSize);
|
||||
header.setComplete(lastFragment);
|
||||
public void write(Command command) throws IOException {
|
||||
synchronized (writeLock) {
|
||||
header.incrementCounter();
|
||||
int size = wireFormat.tightMarshal1(command, bs);
|
||||
if (size < datagramSize) {
|
||||
header.setPartial(false);
|
||||
header.setComplete(true);
|
||||
header.setDataSize(size);
|
||||
writeBuffer.clear();
|
||||
headerMarshaller.writeHeader(header, writeBuffer);
|
||||
|
||||
// now the data
|
||||
writeBuffer.put(data, offset, chunkSize);
|
||||
offset += chunkSize;
|
||||
channel.write(writeBuffer);
|
||||
// TODO use a DataOutput implementation that talks direct to the
|
||||
// ByteBuffer
|
||||
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
|
||||
DataOutputStream dataOut = new DataOutputStream(buffer);
|
||||
wireFormat.tightMarshal2(command, dataOut, bs);
|
||||
dataOut.close();
|
||||
byte[] data = buffer.toByteArray();
|
||||
writeBuffer.put(data);
|
||||
|
||||
sendWriteBuffer();
|
||||
}
|
||||
else {
|
||||
header.setPartial(true);
|
||||
header.setComplete(false);
|
||||
|
||||
// lets split the command up into chunks
|
||||
ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
|
||||
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
|
||||
|
||||
byte[] data = largeBuffer.toByteArray();
|
||||
int offset = 0;
|
||||
boolean lastFragment = false;
|
||||
for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
|
||||
// write the header
|
||||
writeBuffer.rewind();
|
||||
int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
|
||||
lastFragment = offset + chunkSize >= length;
|
||||
header.setDataSize(chunkSize);
|
||||
header.setComplete(lastFragment);
|
||||
headerMarshaller.writeHeader(header, writeBuffer);
|
||||
|
||||
// now the data
|
||||
writeBuffer.put(data, offset, chunkSize);
|
||||
offset += chunkSize;
|
||||
sendWriteBuffer();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void sendWriteBuffer() throws IOException {
|
||||
writeBuffer.flip();
|
||||
channel.send(writeBuffer, targetAddress);
|
||||
}
|
||||
|
||||
// Properties
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ public class CommandReadBuffer {
|
|||
private OpenWireFormat wireFormat;
|
||||
private DatagramReplayStrategy replayStrategy;
|
||||
private SortedSet headers = new TreeSet();
|
||||
private long expectedCounter;
|
||||
private long expectedCounter = 1;
|
||||
private ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
|
||||
public CommandReadBuffer(OpenWireFormat wireFormat, DatagramReplayStrategy replayStrategy) {
|
||||
|
|
|
@ -122,8 +122,8 @@ public class DatagramHeader implements Comparable {
|
|||
}
|
||||
|
||||
public void setFlags(byte flags) {
|
||||
partial = (flags & 0x1) == 0;
|
||||
complete = (flags & 0x2) == 0;
|
||||
partial = (flags & 0x1) != 0;
|
||||
complete = (flags & 0x2) != 0;
|
||||
}
|
||||
|
||||
public Command getCommand() {
|
||||
|
|
|
@ -31,13 +31,16 @@ public class DatagramHeaderMarshaller {
|
|||
answer.setDataSize(readBuffer.getInt());
|
||||
byte flags = readBuffer.get();
|
||||
answer.setFlags(flags);
|
||||
//System.out.println("Read header with counter: " + answer.getCounter() + "size: " + answer.getDataSize() + " with flags: " + flags);
|
||||
return answer;
|
||||
}
|
||||
|
||||
public void writeHeader(DatagramHeader header, ByteBuffer writeBuffer) {
|
||||
writeBuffer.putLong(header.getCounter());
|
||||
writeBuffer.putInt(header.getDataSize());
|
||||
writeBuffer.put(header.getFlags());
|
||||
byte flags = header.getFlags();
|
||||
//System.out.println("Writing header with counter: " + header.getCounter() + " size: " + header.getDataSize() + " with flags: " + flags);
|
||||
writeBuffer.put(flags);
|
||||
}
|
||||
|
||||
public int getHeaderSize(DatagramHeader header) {
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.activemq.transport.udp;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -27,12 +26,19 @@ import java.util.List;
|
|||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class DefaultBufferPool implements ByteBufferPool {
|
||||
public class DefaultBufferPool extends SimpleBufferPool implements ByteBufferPool {
|
||||
|
||||
private int defaultSize;
|
||||
private List buffers = new ArrayList();
|
||||
private Object lock = new Object();
|
||||
|
||||
public DefaultBufferPool() {
|
||||
super(true);
|
||||
}
|
||||
|
||||
public DefaultBufferPool(boolean useDirect) {
|
||||
super(useDirect);
|
||||
}
|
||||
|
||||
public synchronized ByteBuffer borrowBuffer() {
|
||||
synchronized (lock) {
|
||||
int size = buffers.size();
|
||||
|
@ -40,29 +46,24 @@ public class DefaultBufferPool implements ByteBufferPool {
|
|||
return (ByteBuffer) buffers.remove(size - 1);
|
||||
}
|
||||
}
|
||||
return ByteBuffer.allocateDirect(defaultSize);
|
||||
return createBuffer();
|
||||
}
|
||||
|
||||
public synchronized void returnBuffer(ByteBuffer buffer) {
|
||||
public void returnBuffer(ByteBuffer buffer) {
|
||||
synchronized (lock) {
|
||||
buffers.add(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
public void setDefaultSize(int defaultSize) {
|
||||
this.defaultSize = defaultSize;
|
||||
public void start() throws Exception {
|
||||
}
|
||||
|
||||
public synchronized void start() throws Exception {
|
||||
}
|
||||
|
||||
public synchronized void stop() throws Exception {
|
||||
public void stop() throws Exception {
|
||||
synchronized (lock) {
|
||||
/*
|
||||
for (Iterator iter = buffers.iterator(); iter.hasNext();) {
|
||||
ByteBuffer buffer = (ByteBuffer) iter.next();
|
||||
}
|
||||
*/
|
||||
* for (Iterator iter = buffers.iterator(); iter.hasNext();) {
|
||||
* ByteBuffer buffer = (ByteBuffer) iter.next(); }
|
||||
*/
|
||||
buffers.clear();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.udp;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* A simple implementation of {@link BufferPool} which does no pooling and just
|
||||
* creates new buffers each time
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class SimpleBufferPool implements ByteBufferPool {
|
||||
|
||||
private int defaultSize;
|
||||
private boolean useDirect;
|
||||
|
||||
public SimpleBufferPool() {
|
||||
this(false);
|
||||
}
|
||||
|
||||
public SimpleBufferPool(boolean useDirect) {
|
||||
this.useDirect = useDirect;
|
||||
}
|
||||
|
||||
public synchronized ByteBuffer borrowBuffer() {
|
||||
return createBuffer();
|
||||
}
|
||||
|
||||
public void returnBuffer(ByteBuffer buffer) {
|
||||
}
|
||||
|
||||
public void setDefaultSize(int defaultSize) {
|
||||
this.defaultSize = defaultSize;
|
||||
}
|
||||
|
||||
public boolean isUseDirect() {
|
||||
return useDirect;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets whether direct buffers are used or not
|
||||
*/
|
||||
public void setUseDirect(boolean useDirect) {
|
||||
this.useDirect = useDirect;
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
}
|
||||
|
||||
protected ByteBuffer createBuffer() {
|
||||
if (useDirect) {
|
||||
return ByteBuffer.allocateDirect(defaultSize);
|
||||
}
|
||||
else {
|
||||
return ByteBuffer.allocate(defaultSize);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -29,11 +29,14 @@ import org.apache.commons.logging.LogFactory;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.channels.AsynchronousCloseException;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
|
||||
/**
|
||||
|
@ -49,30 +52,34 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
|||
private ByteBufferPool bufferPool;
|
||||
private DatagramReplayStrategy replayStrategy = new ExceptionIfDroppedPacketStrategy();
|
||||
private int datagramSize = 4 * 1024;
|
||||
private long maxInactivityDuration = 0; //30000;
|
||||
private InetSocketAddress socketAddress;
|
||||
private long maxInactivityDuration = 0; // 30000;
|
||||
private InetSocketAddress targetAddress;
|
||||
private DatagramChannel channel;
|
||||
private boolean trace = false;
|
||||
private boolean useLocalHost = true;
|
||||
private int port;
|
||||
|
||||
protected UdpTransport(OpenWireFormat wireFormat) {
|
||||
protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
|
||||
this.wireFormat = wireFormat;
|
||||
}
|
||||
|
||||
public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
|
||||
this(wireFormat);
|
||||
this.socketAddress = createAddress(remoteLocation);
|
||||
this.targetAddress = createAddress(remoteLocation);
|
||||
}
|
||||
|
||||
public UdpTransport(OpenWireFormat wireFormat, InetSocketAddress socketAddress) {
|
||||
public UdpTransport(OpenWireFormat wireFormat, InetSocketAddress socketAddress) throws IOException {
|
||||
this(wireFormat);
|
||||
this.socketAddress = socketAddress;
|
||||
this.targetAddress = socketAddress;
|
||||
}
|
||||
|
||||
/**
|
||||
* A one way asynchronous send
|
||||
*/
|
||||
public void oneway(Command command) throws IOException {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Sending oneway from port: " + port + " to target: " + targetAddress);
|
||||
}
|
||||
checkStarted(command);
|
||||
commandChannel.write(command);
|
||||
}
|
||||
|
@ -81,7 +88,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
|||
* @return pretty print of 'this'
|
||||
*/
|
||||
public String toString() {
|
||||
return "udp://" + socketAddress;
|
||||
return "udp://" + targetAddress + "?port=" + port;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -94,18 +101,32 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
|||
Command command = commandChannel.read();
|
||||
doConsume(command);
|
||||
}
|
||||
catch (SocketTimeoutException e) {
|
||||
}
|
||||
catch (InterruptedIOException e) {
|
||||
}
|
||||
catch (IOException e) {
|
||||
/*
|
||||
* catch (SocketTimeoutException e) { } catch
|
||||
* (InterruptedIOException e) { }
|
||||
*/
|
||||
catch (AsynchronousCloseException e) {
|
||||
try {
|
||||
stop();
|
||||
}
|
||||
catch (Exception e2) {
|
||||
log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
|
||||
}
|
||||
onException(e);
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
try {
|
||||
stop();
|
||||
}
|
||||
catch (Exception e2) {
|
||||
log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
|
||||
}
|
||||
if (e instanceof IOException) {
|
||||
onException((IOException) e);
|
||||
}
|
||||
else {
|
||||
log.error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -124,12 +145,21 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
|||
return maxInactivityDuration;
|
||||
}
|
||||
|
||||
public DatagramChannel getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
public void setChannel(DatagramChannel channel) {
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum inactivity duration
|
||||
*/
|
||||
public void setMaxInactivityDuration(long maxInactivityDuration) {
|
||||
this.maxInactivityDuration = maxInactivityDuration;
|
||||
}
|
||||
|
||||
public boolean isUseLocalHost() {
|
||||
return useLocalHost;
|
||||
}
|
||||
|
@ -143,7 +173,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
|||
this.useLocalHost = useLocalHost;
|
||||
}
|
||||
|
||||
|
||||
public CommandChannel getCommandChannel() {
|
||||
return commandChannel;
|
||||
}
|
||||
|
@ -154,7 +183,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
|||
public void setCommandChannel(CommandChannel commandChannel) {
|
||||
this.commandChannel = commandChannel;
|
||||
}
|
||||
|
||||
|
||||
public DatagramReplayStrategy getReplayStrategy() {
|
||||
return replayStrategy;
|
||||
}
|
||||
|
@ -166,6 +195,17 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
|||
this.replayStrategy = replayStrategy;
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the port to connect on
|
||||
*/
|
||||
public void setPort(int port) {
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
|
||||
// Implementation methods
|
||||
// -------------------------------------------------------------------------
|
||||
|
@ -189,18 +229,26 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
|||
}
|
||||
|
||||
protected void doStart() throws Exception {
|
||||
if (socketAddress != null) {
|
||||
channel = DatagramChannel.open();
|
||||
channel.connect(socketAddress);
|
||||
}
|
||||
else if (channel == null) {
|
||||
throw new IllegalArgumentException("No channel configured");
|
||||
SocketAddress localAddress = new InetSocketAddress(port);
|
||||
channel = DatagramChannel.open();
|
||||
channel.configureBlocking(true);
|
||||
|
||||
// TODO
|
||||
// connect to default target address to avoid security checks each time
|
||||
// channel = channel.connect(targetAddress);
|
||||
|
||||
DatagramSocket socket = channel.socket();
|
||||
socket.bind(localAddress);
|
||||
if (port == 0) {
|
||||
port = socket.getLocalPort();
|
||||
}
|
||||
|
||||
if (bufferPool == null) {
|
||||
bufferPool = new DefaultBufferPool();
|
||||
}
|
||||
commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy);
|
||||
commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress);
|
||||
commandChannel.start();
|
||||
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
|
|
|
@ -65,6 +65,8 @@ public class UdpTransportFactory extends TransportFactory {
|
|||
public Transport configure(Transport transport, WireFormat format, Map options) {
|
||||
IntrospectionSupport.setProperties(transport, options);
|
||||
UdpTransport tcpTransport = (UdpTransport) transport;
|
||||
|
||||
/*
|
||||
if (tcpTransport.isTrace()) {
|
||||
transport = new TransportLogger(transport);
|
||||
}
|
||||
|
@ -73,8 +75,8 @@ public class UdpTransportFactory extends TransportFactory {
|
|||
transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
|
||||
}
|
||||
|
||||
transport = new MutexTransport(transport);
|
||||
transport = new ResponseCorrelator(transport);
|
||||
*/
|
||||
return transport;
|
||||
}
|
||||
|
||||
|
@ -111,7 +113,9 @@ public class UdpTransportFactory extends TransportFactory {
|
|||
return new UdpTransport(wf, location, localLocation);
|
||||
}
|
||||
*/
|
||||
return new UdpTransport((OpenWireFormat) wf, location);
|
||||
OpenWireFormat wireFormat = (OpenWireFormat) wf;
|
||||
wireFormat.setPrefixPacketSize(false);
|
||||
return new UdpTransport(wireFormat, location);
|
||||
}
|
||||
|
||||
protected ServerSocketFactory createServerSocketFactory() {
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
class=org.apache.activemq.transport.udp.UdpTransportFactory
|
Loading…
Reference in New Issue