diff --git a/activemq-core/project.xml b/activemq-core/project.xml
index 1b172291f1..cc78ebacba 100755
--- a/activemq-core/project.xml
+++ b/activemq-core/project.xml
@@ -363,6 +363,7 @@
**/UdpSendReceiveWithTwoConnectionsTest.*
+ **/MulticastTransportTest.*
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java
index 166c46c3b2..98e3bf8803 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java
@@ -16,12 +16,39 @@
*/
package org.apache.activemq.transport.multicast;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
+import org.apache.activemq.transport.udp.DatagramEndpoint;
import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
/**
- *
+ *
* @version $Revision$
*/
public class MulticastDatagramHeaderMarshaller extends DatagramHeaderMarshaller {
+ private final String localUri;
+ private final byte[] localUriAsBytes;
+
+ public MulticastDatagramHeaderMarshaller(String localUri) {
+ this.localUri = localUri;
+ this.localUriAsBytes = localUri.getBytes();
+ }
+
+ public Endpoint createEndpoint(ByteBuffer readBuffer, SocketAddress address) {
+ int size = readBuffer.getInt();
+ byte[] data = new byte[size];
+ readBuffer.get(data);
+ return new DatagramEndpoint(new String(data), address);
+ }
+
+ public void writeHeader(Command command, ByteBuffer writeBuffer) {
+ writeBuffer.putInt(localUriAsBytes.length);
+ writeBuffer.put(localUriAsBytes);
+ super.writeHeader(command, writeBuffer);
+ }
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
index bab5d89246..e88b8ecf94 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
@@ -17,12 +17,26 @@
package org.apache.activemq.transport.multicast;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.udp.CommandChannel;
+import org.apache.activemq.transport.udp.CommandDatagramChannel;
+import org.apache.activemq.transport.udp.CommandDatagramSocket;
+import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
+import org.apache.activemq.transport.udp.DefaultBufferPool;
import org.apache.activemq.transport.udp.UdpTransport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
import java.net.SocketAddress;
+import java.net.SocketException;
import java.net.URI;
import java.net.UnknownHostException;
+import java.nio.channels.DatagramChannel;
/**
* A multicast based transport.
@@ -31,22 +45,21 @@ import java.net.UnknownHostException;
*/
public class MulticastTransport extends UdpTransport {
- public MulticastTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException {
- super(wireFormat, port);
- }
+ private static final Log log = LogFactory.getLog(MulticastTransport.class);
- public MulticastTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException {
- super(wireFormat, socketAddress);
- }
+ private static final int DEFAULT_IDLE_TIME = 5000;
+
+ private MulticastSocket socket;
+ private InetAddress mcastAddress;
+ private int mcastPort;
+ private int timeToLive = 1;
+ private boolean loopBackMode = false;
+ private long keepAliveInterval = DEFAULT_IDLE_TIME;
public MulticastTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
super(wireFormat, remoteLocation);
}
- public MulticastTransport(OpenWireFormat wireFormat) throws IOException {
- super(wireFormat);
- }
-
protected String getProtocolName() {
return "Multicast";
}
@@ -54,4 +67,43 @@ public class MulticastTransport extends UdpTransport {
protected String getProtocolUriScheme() {
return "multicast://";
}
+
+ protected void bind(DatagramSocket socket, SocketAddress localAddress) throws SocketException {
+ }
+
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ super.doStop(stopper);
+ if (socket != null) {
+ try {
+ socket.leaveGroup(mcastAddress);
+ }
+ catch (IOException e) {
+ stopper.onException(this, e);
+ }
+ socket.close();
+ }
+ }
+
+ protected CommandChannel createCommandChannel() throws IOException {
+ socket = new MulticastSocket(mcastPort);
+ socket.setLoopbackMode(loopBackMode);
+ socket.setTimeToLive(timeToLive);
+
+ log.debug("Joining multicast address: " + mcastAddress);
+ socket.joinGroup(mcastAddress);
+ socket.setSoTimeout((int) keepAliveInterval);
+
+ return new CommandDatagramSocket(toString(), socket, getWireFormat(), getDatagramSize(), mcastAddress, mcastPort, createDatagramHeaderMarshaller());
+ }
+
+ protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {
+ this.mcastAddress = InetAddress.getByName(remoteLocation.getHost());
+ this.mcastPort = remoteLocation.getPort();
+ return new InetSocketAddress(mcastAddress, mcastPort);
+ }
+
+ protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() {
+ return new MulticastDatagramHeaderMarshaller("udp://dummyHostName:" + getPort());
+ }
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
index 19feef2ea1..fd45d6891f 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
@@ -16,244 +16,33 @@
*/
package org.apache.activemq.transport.udp;
-import org.activeio.ByteArrayInputStream;
-import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
-import org.apache.activemq.command.Endpoint;
-import org.apache.activemq.command.LastPartialCommand;
-import org.apache.activemq.command.PartialCommand;
-import org.apache.activemq.openwire.BooleanStream;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
/**
- * A strategy for reading datagrams and de-fragmenting them together.
- *
+ *
* @version $Revision$
*/
-public class CommandChannel implements Service {
+public interface CommandChannel extends Service {
- private static final Log log = LogFactory.getLog(CommandChannel.class);
+ public abstract Command read() throws IOException;
- private final String name;
- private DatagramChannel channel;
- private OpenWireFormat wireFormat;
- private ByteBufferPool bufferPool;
- private int datagramSize = 4 * 1024;
- private SocketAddress targetAddress;
- private DatagramHeaderMarshaller headerMarshaller;
+ public abstract void write(Command command) throws IOException;
- // reading
- private Object readLock = new Object();
- private ByteBuffer readBuffer;
+ public abstract void write(Command command, SocketAddress address) throws IOException;
- // writing
- private Object writeLock = new Object();
- private ByteBuffer writeBuffer;
- private int defaultMarshalBufferSize = 64 * 1024;
-
- public CommandChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
- SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) {
- this.name = name;
- this.channel = channel;
- this.wireFormat = wireFormat;
- this.bufferPool = bufferPool;
- this.datagramSize = datagramSize;
- this.targetAddress = targetAddress;
- this.headerMarshaller = headerMarshaller;
- }
-
- public String toString() {
- return "CommandChannel#" + name;
- }
-
- public void start() throws Exception {
- bufferPool.setDefaultSize(datagramSize);
- bufferPool.start();
- readBuffer = bufferPool.borrowBuffer();
- writeBuffer = bufferPool.borrowBuffer();
- }
-
- public void stop() throws Exception {
- bufferPool.stop();
- }
-
- public Command read() throws IOException {
- Command answer = null;
- synchronized (readLock) {
- readBuffer.clear();
- SocketAddress address = channel.receive(readBuffer);
- readBuffer.flip();
-
- Endpoint from = headerMarshaller.createEndpoint(readBuffer, address);
-
- int remaining = readBuffer.remaining();
- byte[] data = new byte[remaining];
- readBuffer.get(data);
-
- // TODO could use a DataInput implementation that talks direct to
- // the ByteBuffer to avoid object allocation and unnecessary
- // buffering?
- DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
- answer = (Command) wireFormat.unmarshal(dataIn);
- answer.setFrom(from);
- }
- if (answer != null) {
- if (log.isDebugEnabled()) {
- log.debug("Channel: " + name + " about to process: " + answer);
- }
- }
- return answer;
- }
-
- public void write(Command command) throws IOException {
- write(command, targetAddress);
- }
-
- public void write(Command command, SocketAddress address) throws IOException {
- synchronized (writeLock) {
-
- ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
- wireFormat.marshal(command, new DataOutputStream(largeBuffer));
- byte[] data = largeBuffer.toByteArray();
- int size = data.length;
-
- if (size >= datagramSize) {
- // lets split the command up into chunks
- int offset = 0;
- boolean lastFragment = false;
- for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
- // write the header
- writeBuffer.clear();
- headerMarshaller.writeHeader(command, writeBuffer);
-
- int chunkSize = writeBuffer.remaining();
-
- // we need to remove the amount of overhead to write the
- // partial command
-
- // lets write the flags in there
- BooleanStream bs = null;
- if (wireFormat.isTightEncodingEnabled()) {
- bs = new BooleanStream();
- bs.writeBoolean(true); // the partial data byte[] is
- // never null
- }
-
- // lets remove the header of the partial command
- // which is the byte for the type and an int for the size of
- // the byte[]
- chunkSize -= 1 // the data type
- + 4 // the command ID
- + 4; // the size of the partial data
-
- // the boolean flags
- if (bs != null) {
- chunkSize -= bs.marshalledSize();
- }
- else {
- chunkSize -= 1;
- }
-
- if (!wireFormat.isSizePrefixDisabled()) {
- // lets write the size of the command buffer
- writeBuffer.putInt(chunkSize);
- chunkSize -= 4;
- }
-
- lastFragment = offset + chunkSize >= length;
- if (chunkSize + offset > length) {
- chunkSize = length - offset;
- }
-
- writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
-
- if (bs != null) {
- bs.marshal(writeBuffer);
- }
-
- writeBuffer.putInt(command.getCommandId());
- if (bs == null) {
- writeBuffer.put((byte) 1);
- }
-
- // size of byte array
- writeBuffer.putInt(chunkSize);
-
- // now the data
- writeBuffer.put(data, offset, chunkSize);
-
- offset += chunkSize;
- sendWriteBuffer(address);
- }
-
- // now lets write the last partial command
- command = new LastPartialCommand(command);
- largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
- wireFormat.marshal(command, new DataOutputStream(largeBuffer));
- data = largeBuffer.toByteArray();
- }
-
- writeBuffer.clear();
- headerMarshaller.writeHeader(command, writeBuffer);
-
- writeBuffer.put(data);
-
- sendWriteBuffer(address);
- }
- }
-
- // Properties
- // -------------------------------------------------------------------------
-
- public int getDatagramSize() {
- return datagramSize;
- }
+ public abstract int getDatagramSize();
/**
* Sets the default size of a datagram on the network.
*/
- public void setDatagramSize(int datagramSize) {
- this.datagramSize = datagramSize;
- }
+ public abstract void setDatagramSize(int datagramSize);
- public ByteBufferPool getBufferPool() {
- return bufferPool;
- }
+ public abstract DatagramHeaderMarshaller getHeaderMarshaller();
- /**
- * Sets the implementation of the byte buffer pool to use
- */
- public void setBufferPool(ByteBufferPool bufferPool) {
- this.bufferPool = bufferPool;
- }
+ public abstract void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller);
- public DatagramHeaderMarshaller getHeaderMarshaller() {
- return headerMarshaller;
- }
-
- public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
- this.headerMarshaller = headerMarshaller;
- }
-
- // Implementation methods
- // -------------------------------------------------------------------------
- protected void sendWriteBuffer(SocketAddress address) throws IOException {
- writeBuffer.flip();
-
- if (log.isDebugEnabled()) {
- log.debug("Channel: " + name + " sending datagram to: " + address);
- }
- channel.send(writeBuffer, address);
- }
-
-}
+}
\ No newline at end of file
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
new file mode 100644
index 0000000000..4dc02e11ca
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
@@ -0,0 +1,303 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.activeio.ByteArrayInputStream;
+import org.activeio.ByteArrayOutputStream;
+import org.apache.activemq.Service;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
+import org.apache.activemq.command.LastPartialCommand;
+import org.apache.activemq.command.PartialCommand;
+import org.apache.activemq.openwire.BooleanStream;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+/**
+ * A strategy for reading datagrams and de-fragmenting them together.
+ *
+ * @version $Revision$
+ */
+public class CommandDatagramChannel implements CommandChannel {
+
+ private static final Log log = LogFactory.getLog(CommandDatagramChannel.class);
+
+ private final String name;
+ private DatagramChannel channel;
+ private OpenWireFormat wireFormat;
+ private ByteBufferPool bufferPool;
+ private int datagramSize = 4 * 1024;
+ private SocketAddress targetAddress;
+ private DatagramHeaderMarshaller headerMarshaller;
+
+ // reading
+ private Object readLock = new Object();
+ private ByteBuffer readBuffer;
+
+ // writing
+ private Object writeLock = new Object();
+ private ByteBuffer writeBuffer;
+ private int defaultMarshalBufferSize = 64 * 1024;
+
+ public CommandDatagramChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
+ SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) {
+ this.name = name;
+ this.channel = channel;
+ this.wireFormat = wireFormat;
+ this.bufferPool = bufferPool;
+ this.datagramSize = datagramSize;
+ this.targetAddress = targetAddress;
+ this.headerMarshaller = headerMarshaller;
+ }
+
+ public String toString() {
+ return "CommandChannel#" + name;
+ }
+
+ public void start() throws Exception {
+ bufferPool.setDefaultSize(datagramSize);
+ bufferPool.start();
+ readBuffer = bufferPool.borrowBuffer();
+ writeBuffer = bufferPool.borrowBuffer();
+ }
+
+ public void stop() throws Exception {
+ bufferPool.stop();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.transport.udp.CommandChannel#read()
+ */
+ public Command read() throws IOException {
+ Command answer = null;
+ synchronized (readLock) {
+ while (true) {
+ readBuffer.clear();
+ SocketAddress address = channel.receive(readBuffer);
+
+ /*
+ if (address == null) {
+ System.out.println("No address on packet: " + readBuffer);
+ // continue;
+ }
+ */
+
+ readBuffer.flip();
+
+ if (readBuffer.limit() == 0) {
+ //System.out.println("Empty packet!");
+ continue;
+ }
+
+ //log.debug("buffer: " + readBuffer + " has remaining: " + readBuffer.remaining());
+
+ Endpoint from = headerMarshaller.createEndpoint(readBuffer, address);
+
+ int remaining = readBuffer.remaining();
+ byte[] data = new byte[remaining];
+ readBuffer.get(data);
+
+ // TODO could use a DataInput implementation that talks direct
+ // to
+ // the ByteBuffer to avoid object allocation and unnecessary
+ // buffering?
+ DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
+ answer = (Command) wireFormat.unmarshal(dataIn);
+ if (answer != null) {
+ answer.setFrom(from);
+ }
+ break;
+ }
+ }
+ if (answer != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Channel: " + name + " about to process: " + answer);
+ }
+ }
+ return answer;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.transport.udp.CommandChannel#write(org.apache.activemq.command.Command)
+ */
+ public void write(Command command) throws IOException {
+ write(command, targetAddress);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.transport.udp.CommandChannel#write(org.apache.activemq.command.Command, java.net.SocketAddress)
+ */
+ public void write(Command command, SocketAddress address) throws IOException {
+ synchronized (writeLock) {
+
+ ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
+ wireFormat.marshal(command, new DataOutputStream(largeBuffer));
+ byte[] data = largeBuffer.toByteArray();
+ int size = data.length;
+
+ writeBuffer.clear();
+ headerMarshaller.writeHeader(command, writeBuffer);
+
+ if (size >= writeBuffer.remaining()) {
+ // lets split the command up into chunks
+ int offset = 0;
+ boolean lastFragment = false;
+ for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
+ // write the header
+ if (fragment > 0) {
+ writeBuffer.clear();
+ headerMarshaller.writeHeader(command, writeBuffer);
+ }
+
+ int chunkSize = writeBuffer.remaining();
+
+ // we need to remove the amount of overhead to write the
+ // partial command
+
+ // lets write the flags in there
+ BooleanStream bs = null;
+ if (wireFormat.isTightEncodingEnabled()) {
+ bs = new BooleanStream();
+ bs.writeBoolean(true); // the partial data byte[] is
+ // never null
+ }
+
+ // lets remove the header of the partial command
+ // which is the byte for the type and an int for the size of
+ // the byte[]
+ chunkSize -= 1 // the data type
+ + 4 // the command ID
+ + 4; // the size of the partial data
+
+ // the boolean flags
+ if (bs != null) {
+ chunkSize -= bs.marshalledSize();
+ }
+ else {
+ chunkSize -= 1;
+ }
+
+ if (!wireFormat.isSizePrefixDisabled()) {
+ // lets write the size of the command buffer
+ writeBuffer.putInt(chunkSize);
+ chunkSize -= 4;
+ }
+
+ lastFragment = offset + chunkSize >= length;
+ if (chunkSize + offset > length) {
+ chunkSize = length - offset;
+ }
+
+ writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
+
+ if (bs != null) {
+ bs.marshal(writeBuffer);
+ }
+
+ writeBuffer.putInt(command.getCommandId());
+ if (bs == null) {
+ writeBuffer.put((byte) 1);
+ }
+
+ // size of byte array
+ writeBuffer.putInt(chunkSize);
+
+ // now the data
+ writeBuffer.put(data, offset, chunkSize);
+
+ offset += chunkSize;
+ sendWriteBuffer(address);
+ }
+
+ // now lets write the last partial command
+ command = new LastPartialCommand(command);
+ largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
+ wireFormat.marshal(command, new DataOutputStream(largeBuffer));
+ data = largeBuffer.toByteArray();
+
+ writeBuffer.clear();
+ headerMarshaller.writeHeader(command, writeBuffer);
+ }
+
+ writeBuffer.put(data);
+
+ sendWriteBuffer(address);
+ }
+ }
+
+ // Properties
+ // -------------------------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.transport.udp.CommandChannel#getDatagramSize()
+ */
+ public int getDatagramSize() {
+ return datagramSize;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.transport.udp.CommandChannel#setDatagramSize(int)
+ */
+ public void setDatagramSize(int datagramSize) {
+ this.datagramSize = datagramSize;
+ }
+
+ public ByteBufferPool getBufferPool() {
+ return bufferPool;
+ }
+
+ /**
+ * Sets the implementation of the byte buffer pool to use
+ */
+ public void setBufferPool(ByteBufferPool bufferPool) {
+ this.bufferPool = bufferPool;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.transport.udp.CommandChannel#getHeaderMarshaller()
+ */
+ public DatagramHeaderMarshaller getHeaderMarshaller() {
+ return headerMarshaller;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.transport.udp.CommandChannel#setHeaderMarshaller(org.apache.activemq.transport.udp.DatagramHeaderMarshaller)
+ */
+ public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
+ this.headerMarshaller = headerMarshaller;
+ }
+
+ // Implementation methods
+ // -------------------------------------------------------------------------
+ protected void sendWriteBuffer(SocketAddress address) throws IOException {
+ writeBuffer.flip();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Channel: " + name + " sending datagram to: " + address);
+ }
+ channel.send(writeBuffer, address);
+ }
+
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
new file mode 100644
index 0000000000..3735a637c6
--- /dev/null
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
@@ -0,0 +1,263 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.activeio.ByteArrayInputStream;
+import org.activeio.ByteArrayOutputStream;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
+import org.apache.activemq.command.LastPartialCommand;
+import org.apache.activemq.command.PartialCommand;
+import org.apache.activemq.openwire.BooleanStream;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+/**
+ * A strategy for reading datagrams and de-fragmenting them together.
+ *
+ * @version $Revision$
+ */
+public class CommandDatagramSocket implements CommandChannel {
+
+ private static final Log log = LogFactory.getLog(CommandDatagramSocket.class);
+
+ private final String name;
+ private DatagramSocket channel;
+ private InetAddress targetAddress;
+ private int targetPort;
+ private OpenWireFormat wireFormat;
+ private int datagramSize = 4 * 1024;
+ private DatagramHeaderMarshaller headerMarshaller;
+
+ // reading
+ private Object readLock = new Object();
+
+ // writing
+ private Object writeLock = new Object();
+
+
+ public CommandDatagramSocket(String name, DatagramSocket channel, OpenWireFormat wireFormat, int datagramSize, InetAddress targetAddress, int targetPort,
+ DatagramHeaderMarshaller headerMarshaller) {
+ this.name = name;
+ this.channel = channel;
+ this.wireFormat = wireFormat;
+ this.datagramSize = datagramSize;
+ this.targetAddress = targetAddress;
+ this.targetPort = targetPort;
+ this.headerMarshaller = headerMarshaller;
+ }
+
+ public String toString() {
+ return "CommandChannel#" + name;
+ }
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ }
+
+ public Command read() throws IOException {
+ Command answer = null;
+ Endpoint from = null;
+ synchronized (readLock) {
+ while (true) {
+ DatagramPacket datagram = createDatagramPacket();
+ channel.receive(datagram);
+
+ // TODO could use a DataInput implementation that talks direct
+ // to the byte[] to avoid object allocation
+ DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData()));
+
+ from = headerMarshaller.createEndpoint(datagram, dataIn);
+ answer = (Command) wireFormat.unmarshal(dataIn);
+ break;
+ }
+ }
+ if (answer != null) {
+ answer.setFrom(from);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Channel: " + name + " about to process: " + answer);
+ }
+ }
+ return answer;
+ }
+
+ public void write(Command command) throws IOException {
+ write(command, targetAddress, targetPort);
+ }
+
+ public void write(Command command, SocketAddress address) throws IOException {
+ if (address instanceof InetSocketAddress) {
+ InetSocketAddress ia = (InetSocketAddress) address;
+ write(command, ia.getAddress(), ia.getPort());
+ }
+ else {
+ write(command);
+ }
+ }
+
+ public void write(Command command, InetAddress address, int port) throws IOException {
+ synchronized (writeLock) {
+
+ ByteArrayOutputStream writeBuffer = createByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(writeBuffer);
+ headerMarshaller.writeHeader(command, dataOut);
+
+ int offset = writeBuffer.size();
+
+ wireFormat.marshal(command, dataOut);
+
+ if (remaining(writeBuffer) >= 0) {
+ sendWriteBuffer(address, port, writeBuffer);
+ }
+ else {
+ // lets split the command up into chunks
+ byte[] data = writeBuffer.toByteArray();
+ boolean lastFragment = false;
+ for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
+ writeBuffer.reset();
+ headerMarshaller.writeHeader(command, dataOut);
+
+ int chunkSize = remaining(writeBuffer);
+
+ // we need to remove the amount of overhead to write the
+ // partial command
+
+ // lets write the flags in there
+ BooleanStream bs = null;
+ if (wireFormat.isTightEncodingEnabled()) {
+ bs = new BooleanStream();
+ bs.writeBoolean(true); // the partial data byte[] is
+ // never null
+ }
+
+ // lets remove the header of the partial command
+ // which is the byte for the type and an int for the size of
+ // the byte[]
+ chunkSize -= 1 // the data type
+ + 4 // the command ID
+ + 4; // the size of the partial data
+
+ // the boolean flags
+ if (bs != null) {
+ chunkSize -= bs.marshalledSize();
+ }
+ else {
+ chunkSize -= 1;
+ }
+
+ if (!wireFormat.isSizePrefixDisabled()) {
+ // lets write the size of the command buffer
+ dataOut.writeInt(chunkSize);
+ chunkSize -= 4;
+ }
+
+ lastFragment = offset + chunkSize >= length;
+ if (chunkSize + offset > length) {
+ chunkSize = length - offset;
+ }
+
+ dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE);
+
+ if (bs != null) {
+ bs.marshal(dataOut);
+ }
+
+ dataOut.writeInt(command.getCommandId());
+ if (bs == null) {
+ dataOut.write((byte) 1);
+ }
+
+ // size of byte array
+ dataOut.writeInt(chunkSize);
+
+ // now the data
+ dataOut.write(data, offset, chunkSize);
+
+ offset += chunkSize;
+ sendWriteBuffer(address, port, writeBuffer);
+ }
+
+ // now lets write the last partial command
+ command = new LastPartialCommand(command);
+
+ writeBuffer.reset();
+ headerMarshaller.writeHeader(command, dataOut);
+ wireFormat.marshal(command, dataOut);
+
+ sendWriteBuffer(address, port, writeBuffer);
+ }
+ }
+ }
+
+ // Properties
+ // -------------------------------------------------------------------------
+
+ public int getDatagramSize() {
+ return datagramSize;
+ }
+
+ /**
+ * Sets the default size of a datagram on the network.
+ */
+ public void setDatagramSize(int datagramSize) {
+ this.datagramSize = datagramSize;
+ }
+
+ public DatagramHeaderMarshaller getHeaderMarshaller() {
+ return headerMarshaller;
+ }
+
+ public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
+ this.headerMarshaller = headerMarshaller;
+ }
+
+ // Implementation methods
+ // -------------------------------------------------------------------------
+ protected void sendWriteBuffer(InetAddress address, int port, ByteArrayOutputStream writeBuffer) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("Channel: " + name + " sending datagram to: " + address);
+ }
+ byte[] data = writeBuffer.toByteArray();
+ DatagramPacket packet = new DatagramPacket(data, 0, data.length, address, port);
+ channel.send(packet);
+ }
+
+ protected DatagramPacket createDatagramPacket() {
+ return new DatagramPacket(new byte[datagramSize], datagramSize);
+ }
+
+ protected int remaining(ByteArrayOutputStream buffer) {
+ return datagramSize - buffer.size();
+ }
+
+ protected ByteArrayOutputStream createByteArrayOutputStream() {
+ return new ByteArrayOutputStream(datagramSize);
+ }
+}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java
index 7c9674ed9b..0f9db77ea2 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.udp;
import org.apache.activemq.command.BaseEndpoint;
+import java.net.InetAddress;
import java.net.SocketAddress;
/**
@@ -36,5 +37,5 @@ public class DatagramEndpoint extends BaseEndpoint {
public SocketAddress getAddress() {
return address;
}
-
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
index f3344f1af1..505f0f0d2b 100644
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java
@@ -17,9 +17,14 @@
package org.apache.activemq.transport.udp;
+import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -36,6 +41,11 @@ public class DatagramHeaderMarshaller {
return new DatagramEndpoint(address.toString(), address);
}
+ public Endpoint createEndpoint(DatagramPacket datagram, DataInputStream dataIn) {
+ SocketAddress address = datagram.getSocketAddress();
+ return new DatagramEndpoint(address.toString(), address);
+ }
+
public void writeHeader(Command command, ByteBuffer writeBuffer) {
/*
writeBuffer.putLong(command.getCounter());
@@ -45,4 +55,8 @@ public class DatagramHeaderMarshaller {
writeBuffer.put(flags);
*/
}
+
+ public void writeHeader(Command command, DataOutputStream dataOut) {
+ }
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
index 82fbc64c2c..d19b5e2a98 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
@@ -33,6 +33,7 @@ import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.net.SocketException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.AsynchronousCloseException;
@@ -129,6 +130,17 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
doConsume(command);
}
catch (AsynchronousCloseException e) {
+ // DatagramChannel closed
+ try {
+ stop();
+ }
+ catch (Exception e2) {
+ log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
+ }
+ }
+ catch (SocketException e) {
+ // DatagramSocket closed
+ log.debug("Socket closed: " + e, e);
try {
stop();
}
@@ -137,6 +149,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
}
}
catch (Exception e) {
+ System.out.println("Caught exception of type: " + e.getClass());
e.printStackTrace();
try {
stop();
@@ -187,12 +200,12 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
return maxInactivityDuration;
}
- public DatagramChannel getChannel() {
- return channel;
+ public int getDatagramSize() {
+ return datagramSize;
}
- public void setChannel(DatagramChannel channel) {
- this.channel = channel;
+ public void setDatagramSize(int datagramSize) {
+ this.datagramSize = datagramSize;
}
/**
@@ -222,7 +235,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
/**
* Sets the implementation of the command channel to use.
*/
- public void setCommandChannel(CommandChannel commandChannel) {
+ public void setCommandChannel(CommandDatagramChannel commandChannel) {
this.commandChannel = commandChannel;
}
@@ -290,19 +303,20 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
}
protected void doStart() throws Exception {
- SocketAddress localAddress = new InetSocketAddress(port);
- channel = DatagramChannel.open();
- channel.configureBlocking(true);
+ commandChannel = createCommandChannel();
+ commandChannel.start();
- // TODO
- // connect to default target address to avoid security checks each time
- // channel = channel.connect(targetAddress);
+ super.doStart();
+ }
+
+ protected CommandChannel createCommandChannel() throws IOException {
+ SocketAddress localAddress = createLocalAddress();
+ channel = DatagramChannel.open();
+
+ channel = connect(channel, targetAddress);
DatagramSocket socket = channel.socket();
- if (log.isDebugEnabled()) {
- log.debug("Binding to address: " + localAddress);
- }
- socket.bind(localAddress);
+ bind(socket, localAddress);
if (port == 0) {
port = socket.getLocalPort();
}
@@ -310,10 +324,28 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
if (bufferPool == null) {
bufferPool = new DefaultBufferPool();
}
- commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller());
- commandChannel.start();
+ return new CommandDatagramChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller());
+ }
- super.doStart();
+ protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException {
+ channel.configureBlocking(true);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Binding to address: " + localAddress);
+ }
+ socket.bind(localAddress);
+ }
+
+ protected DatagramChannel connect(DatagramChannel channel, SocketAddress targetAddress2) throws IOException {
+ // TODO
+ // connect to default target address to avoid security checks each time
+ // channel = channel.connect(targetAddress);
+
+ return channel;
+ }
+
+ protected SocketAddress createLocalAddress() {
+ return new InetSocketAddress(port);
}
protected void doStop(ServiceStopper stopper) throws Exception {
@@ -333,4 +365,12 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
protected String getProtocolUriScheme() {
return "udp://";
}
+
+ protected SocketAddress getTargetAddress() {
+ return targetAddress;
+ }
+
+ public void setCommandChannel(CommandChannel commandChannel) {
+ this.commandChannel = commandChannel;
+ }
}
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java
new file mode 100644
index 0000000000..a9b8595ced
--- /dev/null
+++ b/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java
@@ -0,0 +1,55 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.multicast;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.CommandJoiner;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.udp.UdpTransport;
+import org.apache.activemq.transport.udp.UdpTransportTest;
+
+import java.net.URI;
+
+/**
+ *
+ * @version $Revision$
+ */
+public class MulticastTransportTest extends UdpTransportTest {
+
+ private String multicastURI = "multicast://224.1.2.3:6255";
+
+
+ protected Transport createProducer() throws Exception {
+ System.out.println("Producer using URI: " + multicastURI);
+
+ // we are not using the TransportFactory as this assumes that
+ // transports talk to a server using a WireFormat Negotiation step
+ // rather than talking directly to each other
+
+ OpenWireFormat wireFormat = createWireFormat();
+ MulticastTransport transport = new MulticastTransport(wireFormat, new URI(multicastURI));
+ return new CommandJoiner(transport, wireFormat);
+ }
+
+ protected Transport createConsumer() throws Exception {
+ OpenWireFormat wireFormat = createWireFormat();
+ MulticastTransport transport = new MulticastTransport(wireFormat, new URI(multicastURI));
+ return new CommandJoiner(transport, wireFormat);
+ }
+
+
+}
diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
index eb4b54b51a..1248392dcc 100644
--- a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
+++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
@@ -155,9 +155,11 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
producer = createProducer();
producer.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
+ System.out.println("Producer received: " + command);
}
public void onException(IOException error) {
+ System.out.println("Producer exception: " + error);
}
public void transportInterupted() {