From 5739c6cdcaa450880ce02733f8e3c53ab2affb42 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Mon, 13 Mar 2006 15:52:01 +0000 Subject: [PATCH] refactor of the UDP transport so that it can work with multicast using a DatagramSocket/MulticastSocket directly in addition to using a DatagramChannel (which only seems to work with UDP) git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@385577 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-core/project.xml | 1 + .../MulticastDatagramHeaderMarshaller.java | 29 +- .../multicast/MulticastTransport.java | 72 ++++- .../transport/udp/CommandChannel.java | 231 +------------ .../transport/udp/CommandDatagramChannel.java | 303 ++++++++++++++++++ .../transport/udp/CommandDatagramSocket.java | 263 +++++++++++++++ .../transport/udp/DatagramEndpoint.java | 3 +- .../udp/DatagramHeaderMarshaller.java | 14 + .../activemq/transport/udp/UdpTransport.java | 76 +++-- .../multicast/MulticastTransportTest.java | 55 ++++ .../transport/udp/UdpTestSupport.java | 2 + 11 files changed, 798 insertions(+), 251 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java diff --git a/activemq-core/project.xml b/activemq-core/project.xml index 1b172291f1..cc78ebacba 100755 --- a/activemq-core/project.xml +++ b/activemq-core/project.xml @@ -363,6 +363,7 @@ **/UdpSendReceiveWithTwoConnectionsTest.* + **/MulticastTransportTest.* diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java index 166c46c3b2..98e3bf8803 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastDatagramHeaderMarshaller.java @@ -16,12 +16,39 @@ */ package org.apache.activemq.transport.multicast; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.Endpoint; +import org.apache.activemq.transport.udp.DatagramEndpoint; import org.apache.activemq.transport.udp.DatagramHeaderMarshaller; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + /** - * + * * @version $Revision$ */ public class MulticastDatagramHeaderMarshaller extends DatagramHeaderMarshaller { + private final String localUri; + private final byte[] localUriAsBytes; + + public MulticastDatagramHeaderMarshaller(String localUri) { + this.localUri = localUri; + this.localUriAsBytes = localUri.getBytes(); + } + + public Endpoint createEndpoint(ByteBuffer readBuffer, SocketAddress address) { + int size = readBuffer.getInt(); + byte[] data = new byte[size]; + readBuffer.get(data); + return new DatagramEndpoint(new String(data), address); + } + + public void writeHeader(Command command, ByteBuffer writeBuffer) { + writeBuffer.putInt(localUriAsBytes.length); + writeBuffer.put(localUriAsBytes); + super.writeHeader(command, writeBuffer); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java index bab5d89246..e88b8ecf94 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java @@ -17,12 +17,26 @@ package org.apache.activemq.transport.multicast; import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.transport.udp.CommandChannel; +import org.apache.activemq.transport.udp.CommandDatagramChannel; +import org.apache.activemq.transport.udp.CommandDatagramSocket; +import org.apache.activemq.transport.udp.DatagramHeaderMarshaller; +import org.apache.activemq.transport.udp.DefaultBufferPool; import org.apache.activemq.transport.udp.UdpTransport; +import org.apache.activemq.util.ServiceStopper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import java.io.IOException; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; import java.net.SocketAddress; +import java.net.SocketException; import java.net.URI; import java.net.UnknownHostException; +import java.nio.channels.DatagramChannel; /** * A multicast based transport. @@ -31,22 +45,21 @@ import java.net.UnknownHostException; */ public class MulticastTransport extends UdpTransport { - public MulticastTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException { - super(wireFormat, port); - } + private static final Log log = LogFactory.getLog(MulticastTransport.class); - public MulticastTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException { - super(wireFormat, socketAddress); - } + private static final int DEFAULT_IDLE_TIME = 5000; + + private MulticastSocket socket; + private InetAddress mcastAddress; + private int mcastPort; + private int timeToLive = 1; + private boolean loopBackMode = false; + private long keepAliveInterval = DEFAULT_IDLE_TIME; public MulticastTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException { super(wireFormat, remoteLocation); } - public MulticastTransport(OpenWireFormat wireFormat) throws IOException { - super(wireFormat); - } - protected String getProtocolName() { return "Multicast"; } @@ -54,4 +67,43 @@ public class MulticastTransport extends UdpTransport { protected String getProtocolUriScheme() { return "multicast://"; } + + protected void bind(DatagramSocket socket, SocketAddress localAddress) throws SocketException { + } + + protected void doStop(ServiceStopper stopper) throws Exception { + super.doStop(stopper); + if (socket != null) { + try { + socket.leaveGroup(mcastAddress); + } + catch (IOException e) { + stopper.onException(this, e); + } + socket.close(); + } + } + + protected CommandChannel createCommandChannel() throws IOException { + socket = new MulticastSocket(mcastPort); + socket.setLoopbackMode(loopBackMode); + socket.setTimeToLive(timeToLive); + + log.debug("Joining multicast address: " + mcastAddress); + socket.joinGroup(mcastAddress); + socket.setSoTimeout((int) keepAliveInterval); + + return new CommandDatagramSocket(toString(), socket, getWireFormat(), getDatagramSize(), mcastAddress, mcastPort, createDatagramHeaderMarshaller()); + } + + protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException { + this.mcastAddress = InetAddress.getByName(remoteLocation.getHost()); + this.mcastPort = remoteLocation.getPort(); + return new InetSocketAddress(mcastAddress, mcastPort); + } + + protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() { + return new MulticastDatagramHeaderMarshaller("udp://dummyHostName:" + getPort()); + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java index 19feef2ea1..fd45d6891f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java @@ -16,244 +16,33 @@ */ package org.apache.activemq.transport.udp; -import org.activeio.ByteArrayInputStream; -import org.activeio.ByteArrayOutputStream; import org.apache.activemq.Service; import org.apache.activemq.command.Command; -import org.apache.activemq.command.Endpoint; -import org.apache.activemq.command.LastPartialCommand; -import org.apache.activemq.command.PartialCommand; -import org.apache.activemq.openwire.BooleanStream; -import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; /** - * A strategy for reading datagrams and de-fragmenting them together. - * + * * @version $Revision$ */ -public class CommandChannel implements Service { +public interface CommandChannel extends Service { - private static final Log log = LogFactory.getLog(CommandChannel.class); + public abstract Command read() throws IOException; - private final String name; - private DatagramChannel channel; - private OpenWireFormat wireFormat; - private ByteBufferPool bufferPool; - private int datagramSize = 4 * 1024; - private SocketAddress targetAddress; - private DatagramHeaderMarshaller headerMarshaller; + public abstract void write(Command command) throws IOException; - // reading - private Object readLock = new Object(); - private ByteBuffer readBuffer; + public abstract void write(Command command, SocketAddress address) throws IOException; - // writing - private Object writeLock = new Object(); - private ByteBuffer writeBuffer; - private int defaultMarshalBufferSize = 64 * 1024; - - public CommandChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, - SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) { - this.name = name; - this.channel = channel; - this.wireFormat = wireFormat; - this.bufferPool = bufferPool; - this.datagramSize = datagramSize; - this.targetAddress = targetAddress; - this.headerMarshaller = headerMarshaller; - } - - public String toString() { - return "CommandChannel#" + name; - } - - public void start() throws Exception { - bufferPool.setDefaultSize(datagramSize); - bufferPool.start(); - readBuffer = bufferPool.borrowBuffer(); - writeBuffer = bufferPool.borrowBuffer(); - } - - public void stop() throws Exception { - bufferPool.stop(); - } - - public Command read() throws IOException { - Command answer = null; - synchronized (readLock) { - readBuffer.clear(); - SocketAddress address = channel.receive(readBuffer); - readBuffer.flip(); - - Endpoint from = headerMarshaller.createEndpoint(readBuffer, address); - - int remaining = readBuffer.remaining(); - byte[] data = new byte[remaining]; - readBuffer.get(data); - - // TODO could use a DataInput implementation that talks direct to - // the ByteBuffer to avoid object allocation and unnecessary - // buffering? - DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data)); - answer = (Command) wireFormat.unmarshal(dataIn); - answer.setFrom(from); - } - if (answer != null) { - if (log.isDebugEnabled()) { - log.debug("Channel: " + name + " about to process: " + answer); - } - } - return answer; - } - - public void write(Command command) throws IOException { - write(command, targetAddress); - } - - public void write(Command command, SocketAddress address) throws IOException { - synchronized (writeLock) { - - ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize); - wireFormat.marshal(command, new DataOutputStream(largeBuffer)); - byte[] data = largeBuffer.toByteArray(); - int size = data.length; - - if (size >= datagramSize) { - // lets split the command up into chunks - int offset = 0; - boolean lastFragment = false; - for (int fragment = 0, length = data.length; !lastFragment; fragment++) { - // write the header - writeBuffer.clear(); - headerMarshaller.writeHeader(command, writeBuffer); - - int chunkSize = writeBuffer.remaining(); - - // we need to remove the amount of overhead to write the - // partial command - - // lets write the flags in there - BooleanStream bs = null; - if (wireFormat.isTightEncodingEnabled()) { - bs = new BooleanStream(); - bs.writeBoolean(true); // the partial data byte[] is - // never null - } - - // lets remove the header of the partial command - // which is the byte for the type and an int for the size of - // the byte[] - chunkSize -= 1 // the data type - + 4 // the command ID - + 4; // the size of the partial data - - // the boolean flags - if (bs != null) { - chunkSize -= bs.marshalledSize(); - } - else { - chunkSize -= 1; - } - - if (!wireFormat.isSizePrefixDisabled()) { - // lets write the size of the command buffer - writeBuffer.putInt(chunkSize); - chunkSize -= 4; - } - - lastFragment = offset + chunkSize >= length; - if (chunkSize + offset > length) { - chunkSize = length - offset; - } - - writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE); - - if (bs != null) { - bs.marshal(writeBuffer); - } - - writeBuffer.putInt(command.getCommandId()); - if (bs == null) { - writeBuffer.put((byte) 1); - } - - // size of byte array - writeBuffer.putInt(chunkSize); - - // now the data - writeBuffer.put(data, offset, chunkSize); - - offset += chunkSize; - sendWriteBuffer(address); - } - - // now lets write the last partial command - command = new LastPartialCommand(command); - largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize); - wireFormat.marshal(command, new DataOutputStream(largeBuffer)); - data = largeBuffer.toByteArray(); - } - - writeBuffer.clear(); - headerMarshaller.writeHeader(command, writeBuffer); - - writeBuffer.put(data); - - sendWriteBuffer(address); - } - } - - // Properties - // ------------------------------------------------------------------------- - - public int getDatagramSize() { - return datagramSize; - } + public abstract int getDatagramSize(); /** * Sets the default size of a datagram on the network. */ - public void setDatagramSize(int datagramSize) { - this.datagramSize = datagramSize; - } + public abstract void setDatagramSize(int datagramSize); - public ByteBufferPool getBufferPool() { - return bufferPool; - } + public abstract DatagramHeaderMarshaller getHeaderMarshaller(); - /** - * Sets the implementation of the byte buffer pool to use - */ - public void setBufferPool(ByteBufferPool bufferPool) { - this.bufferPool = bufferPool; - } + public abstract void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller); - public DatagramHeaderMarshaller getHeaderMarshaller() { - return headerMarshaller; - } - - public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) { - this.headerMarshaller = headerMarshaller; - } - - // Implementation methods - // ------------------------------------------------------------------------- - protected void sendWriteBuffer(SocketAddress address) throws IOException { - writeBuffer.flip(); - - if (log.isDebugEnabled()) { - log.debug("Channel: " + name + " sending datagram to: " + address); - } - channel.send(writeBuffer, address); - } - -} +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java new file mode 100644 index 0000000000..4dc02e11ca --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java @@ -0,0 +1,303 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.udp; + +import org.activeio.ByteArrayInputStream; +import org.activeio.ByteArrayOutputStream; +import org.apache.activemq.Service; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.Endpoint; +import org.apache.activemq.command.LastPartialCommand; +import org.apache.activemq.command.PartialCommand; +import org.apache.activemq.openwire.BooleanStream; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; + +/** + * A strategy for reading datagrams and de-fragmenting them together. + * + * @version $Revision$ + */ +public class CommandDatagramChannel implements CommandChannel { + + private static final Log log = LogFactory.getLog(CommandDatagramChannel.class); + + private final String name; + private DatagramChannel channel; + private OpenWireFormat wireFormat; + private ByteBufferPool bufferPool; + private int datagramSize = 4 * 1024; + private SocketAddress targetAddress; + private DatagramHeaderMarshaller headerMarshaller; + + // reading + private Object readLock = new Object(); + private ByteBuffer readBuffer; + + // writing + private Object writeLock = new Object(); + private ByteBuffer writeBuffer; + private int defaultMarshalBufferSize = 64 * 1024; + + public CommandDatagramChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, + SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) { + this.name = name; + this.channel = channel; + this.wireFormat = wireFormat; + this.bufferPool = bufferPool; + this.datagramSize = datagramSize; + this.targetAddress = targetAddress; + this.headerMarshaller = headerMarshaller; + } + + public String toString() { + return "CommandChannel#" + name; + } + + public void start() throws Exception { + bufferPool.setDefaultSize(datagramSize); + bufferPool.start(); + readBuffer = bufferPool.borrowBuffer(); + writeBuffer = bufferPool.borrowBuffer(); + } + + public void stop() throws Exception { + bufferPool.stop(); + } + + /* (non-Javadoc) + * @see org.apache.activemq.transport.udp.CommandChannel#read() + */ + public Command read() throws IOException { + Command answer = null; + synchronized (readLock) { + while (true) { + readBuffer.clear(); + SocketAddress address = channel.receive(readBuffer); + + /* + if (address == null) { + System.out.println("No address on packet: " + readBuffer); + // continue; + } + */ + + readBuffer.flip(); + + if (readBuffer.limit() == 0) { + //System.out.println("Empty packet!"); + continue; + } + + //log.debug("buffer: " + readBuffer + " has remaining: " + readBuffer.remaining()); + + Endpoint from = headerMarshaller.createEndpoint(readBuffer, address); + + int remaining = readBuffer.remaining(); + byte[] data = new byte[remaining]; + readBuffer.get(data); + + // TODO could use a DataInput implementation that talks direct + // to + // the ByteBuffer to avoid object allocation and unnecessary + // buffering? + DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data)); + answer = (Command) wireFormat.unmarshal(dataIn); + if (answer != null) { + answer.setFrom(from); + } + break; + } + } + if (answer != null) { + if (log.isDebugEnabled()) { + log.debug("Channel: " + name + " about to process: " + answer); + } + } + return answer; + } + + /* (non-Javadoc) + * @see org.apache.activemq.transport.udp.CommandChannel#write(org.apache.activemq.command.Command) + */ + public void write(Command command) throws IOException { + write(command, targetAddress); + } + + /* (non-Javadoc) + * @see org.apache.activemq.transport.udp.CommandChannel#write(org.apache.activemq.command.Command, java.net.SocketAddress) + */ + public void write(Command command, SocketAddress address) throws IOException { + synchronized (writeLock) { + + ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize); + wireFormat.marshal(command, new DataOutputStream(largeBuffer)); + byte[] data = largeBuffer.toByteArray(); + int size = data.length; + + writeBuffer.clear(); + headerMarshaller.writeHeader(command, writeBuffer); + + if (size >= writeBuffer.remaining()) { + // lets split the command up into chunks + int offset = 0; + boolean lastFragment = false; + for (int fragment = 0, length = data.length; !lastFragment; fragment++) { + // write the header + if (fragment > 0) { + writeBuffer.clear(); + headerMarshaller.writeHeader(command, writeBuffer); + } + + int chunkSize = writeBuffer.remaining(); + + // we need to remove the amount of overhead to write the + // partial command + + // lets write the flags in there + BooleanStream bs = null; + if (wireFormat.isTightEncodingEnabled()) { + bs = new BooleanStream(); + bs.writeBoolean(true); // the partial data byte[] is + // never null + } + + // lets remove the header of the partial command + // which is the byte for the type and an int for the size of + // the byte[] + chunkSize -= 1 // the data type + + 4 // the command ID + + 4; // the size of the partial data + + // the boolean flags + if (bs != null) { + chunkSize -= bs.marshalledSize(); + } + else { + chunkSize -= 1; + } + + if (!wireFormat.isSizePrefixDisabled()) { + // lets write the size of the command buffer + writeBuffer.putInt(chunkSize); + chunkSize -= 4; + } + + lastFragment = offset + chunkSize >= length; + if (chunkSize + offset > length) { + chunkSize = length - offset; + } + + writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE); + + if (bs != null) { + bs.marshal(writeBuffer); + } + + writeBuffer.putInt(command.getCommandId()); + if (bs == null) { + writeBuffer.put((byte) 1); + } + + // size of byte array + writeBuffer.putInt(chunkSize); + + // now the data + writeBuffer.put(data, offset, chunkSize); + + offset += chunkSize; + sendWriteBuffer(address); + } + + // now lets write the last partial command + command = new LastPartialCommand(command); + largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize); + wireFormat.marshal(command, new DataOutputStream(largeBuffer)); + data = largeBuffer.toByteArray(); + + writeBuffer.clear(); + headerMarshaller.writeHeader(command, writeBuffer); + } + + writeBuffer.put(data); + + sendWriteBuffer(address); + } + } + + // Properties + // ------------------------------------------------------------------------- + + /* (non-Javadoc) + * @see org.apache.activemq.transport.udp.CommandChannel#getDatagramSize() + */ + public int getDatagramSize() { + return datagramSize; + } + + /* (non-Javadoc) + * @see org.apache.activemq.transport.udp.CommandChannel#setDatagramSize(int) + */ + public void setDatagramSize(int datagramSize) { + this.datagramSize = datagramSize; + } + + public ByteBufferPool getBufferPool() { + return bufferPool; + } + + /** + * Sets the implementation of the byte buffer pool to use + */ + public void setBufferPool(ByteBufferPool bufferPool) { + this.bufferPool = bufferPool; + } + + /* (non-Javadoc) + * @see org.apache.activemq.transport.udp.CommandChannel#getHeaderMarshaller() + */ + public DatagramHeaderMarshaller getHeaderMarshaller() { + return headerMarshaller; + } + + /* (non-Javadoc) + * @see org.apache.activemq.transport.udp.CommandChannel#setHeaderMarshaller(org.apache.activemq.transport.udp.DatagramHeaderMarshaller) + */ + public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) { + this.headerMarshaller = headerMarshaller; + } + + // Implementation methods + // ------------------------------------------------------------------------- + protected void sendWriteBuffer(SocketAddress address) throws IOException { + writeBuffer.flip(); + + if (log.isDebugEnabled()) { + log.debug("Channel: " + name + " sending datagram to: " + address); + } + channel.send(writeBuffer, address); + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java new file mode 100644 index 0000000000..3735a637c6 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java @@ -0,0 +1,263 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.udp; + +import org.activeio.ByteArrayInputStream; +import org.activeio.ByteArrayOutputStream; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.Endpoint; +import org.apache.activemq.command.LastPartialCommand; +import org.apache.activemq.command.PartialCommand; +import org.apache.activemq.openwire.BooleanStream; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +/** + * A strategy for reading datagrams and de-fragmenting them together. + * + * @version $Revision$ + */ +public class CommandDatagramSocket implements CommandChannel { + + private static final Log log = LogFactory.getLog(CommandDatagramSocket.class); + + private final String name; + private DatagramSocket channel; + private InetAddress targetAddress; + private int targetPort; + private OpenWireFormat wireFormat; + private int datagramSize = 4 * 1024; + private DatagramHeaderMarshaller headerMarshaller; + + // reading + private Object readLock = new Object(); + + // writing + private Object writeLock = new Object(); + + + public CommandDatagramSocket(String name, DatagramSocket channel, OpenWireFormat wireFormat, int datagramSize, InetAddress targetAddress, int targetPort, + DatagramHeaderMarshaller headerMarshaller) { + this.name = name; + this.channel = channel; + this.wireFormat = wireFormat; + this.datagramSize = datagramSize; + this.targetAddress = targetAddress; + this.targetPort = targetPort; + this.headerMarshaller = headerMarshaller; + } + + public String toString() { + return "CommandChannel#" + name; + } + + public void start() throws Exception { + } + + public void stop() throws Exception { + } + + public Command read() throws IOException { + Command answer = null; + Endpoint from = null; + synchronized (readLock) { + while (true) { + DatagramPacket datagram = createDatagramPacket(); + channel.receive(datagram); + + // TODO could use a DataInput implementation that talks direct + // to the byte[] to avoid object allocation + DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData())); + + from = headerMarshaller.createEndpoint(datagram, dataIn); + answer = (Command) wireFormat.unmarshal(dataIn); + break; + } + } + if (answer != null) { + answer.setFrom(from); + + if (log.isDebugEnabled()) { + log.debug("Channel: " + name + " about to process: " + answer); + } + } + return answer; + } + + public void write(Command command) throws IOException { + write(command, targetAddress, targetPort); + } + + public void write(Command command, SocketAddress address) throws IOException { + if (address instanceof InetSocketAddress) { + InetSocketAddress ia = (InetSocketAddress) address; + write(command, ia.getAddress(), ia.getPort()); + } + else { + write(command); + } + } + + public void write(Command command, InetAddress address, int port) throws IOException { + synchronized (writeLock) { + + ByteArrayOutputStream writeBuffer = createByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(writeBuffer); + headerMarshaller.writeHeader(command, dataOut); + + int offset = writeBuffer.size(); + + wireFormat.marshal(command, dataOut); + + if (remaining(writeBuffer) >= 0) { + sendWriteBuffer(address, port, writeBuffer); + } + else { + // lets split the command up into chunks + byte[] data = writeBuffer.toByteArray(); + boolean lastFragment = false; + for (int fragment = 0, length = data.length; !lastFragment; fragment++) { + writeBuffer.reset(); + headerMarshaller.writeHeader(command, dataOut); + + int chunkSize = remaining(writeBuffer); + + // we need to remove the amount of overhead to write the + // partial command + + // lets write the flags in there + BooleanStream bs = null; + if (wireFormat.isTightEncodingEnabled()) { + bs = new BooleanStream(); + bs.writeBoolean(true); // the partial data byte[] is + // never null + } + + // lets remove the header of the partial command + // which is the byte for the type and an int for the size of + // the byte[] + chunkSize -= 1 // the data type + + 4 // the command ID + + 4; // the size of the partial data + + // the boolean flags + if (bs != null) { + chunkSize -= bs.marshalledSize(); + } + else { + chunkSize -= 1; + } + + if (!wireFormat.isSizePrefixDisabled()) { + // lets write the size of the command buffer + dataOut.writeInt(chunkSize); + chunkSize -= 4; + } + + lastFragment = offset + chunkSize >= length; + if (chunkSize + offset > length) { + chunkSize = length - offset; + } + + dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE); + + if (bs != null) { + bs.marshal(dataOut); + } + + dataOut.writeInt(command.getCommandId()); + if (bs == null) { + dataOut.write((byte) 1); + } + + // size of byte array + dataOut.writeInt(chunkSize); + + // now the data + dataOut.write(data, offset, chunkSize); + + offset += chunkSize; + sendWriteBuffer(address, port, writeBuffer); + } + + // now lets write the last partial command + command = new LastPartialCommand(command); + + writeBuffer.reset(); + headerMarshaller.writeHeader(command, dataOut); + wireFormat.marshal(command, dataOut); + + sendWriteBuffer(address, port, writeBuffer); + } + } + } + + // Properties + // ------------------------------------------------------------------------- + + public int getDatagramSize() { + return datagramSize; + } + + /** + * Sets the default size of a datagram on the network. + */ + public void setDatagramSize(int datagramSize) { + this.datagramSize = datagramSize; + } + + public DatagramHeaderMarshaller getHeaderMarshaller() { + return headerMarshaller; + } + + public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) { + this.headerMarshaller = headerMarshaller; + } + + // Implementation methods + // ------------------------------------------------------------------------- + protected void sendWriteBuffer(InetAddress address, int port, ByteArrayOutputStream writeBuffer) throws IOException { + if (log.isDebugEnabled()) { + log.debug("Channel: " + name + " sending datagram to: " + address); + } + byte[] data = writeBuffer.toByteArray(); + DatagramPacket packet = new DatagramPacket(data, 0, data.length, address, port); + channel.send(packet); + } + + protected DatagramPacket createDatagramPacket() { + return new DatagramPacket(new byte[datagramSize], datagramSize); + } + + protected int remaining(ByteArrayOutputStream buffer) { + return datagramSize - buffer.size(); + } + + protected ByteArrayOutputStream createByteArrayOutputStream() { + return new ByteArrayOutputStream(datagramSize); + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java index 7c9674ed9b..0f9db77ea2 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramEndpoint.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.udp; import org.apache.activemq.command.BaseEndpoint; +import java.net.InetAddress; import java.net.SocketAddress; /** @@ -36,5 +37,5 @@ public class DatagramEndpoint extends BaseEndpoint { public SocketAddress getAddress() { return address; } - + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java index f3344f1af1..505f0f0d2b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeaderMarshaller.java @@ -17,9 +17,14 @@ package org.apache.activemq.transport.udp; +import org.activeio.ByteArrayOutputStream; import org.apache.activemq.command.Command; import org.apache.activemq.command.Endpoint; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.net.DatagramPacket; +import java.net.InetAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -36,6 +41,11 @@ public class DatagramHeaderMarshaller { return new DatagramEndpoint(address.toString(), address); } + public Endpoint createEndpoint(DatagramPacket datagram, DataInputStream dataIn) { + SocketAddress address = datagram.getSocketAddress(); + return new DatagramEndpoint(address.toString(), address); + } + public void writeHeader(Command command, ByteBuffer writeBuffer) { /* writeBuffer.putLong(command.getCounter()); @@ -45,4 +55,8 @@ public class DatagramHeaderMarshaller { writeBuffer.put(flags); */ } + + public void writeHeader(Command command, DataOutputStream dataOut) { + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java index 82fbc64c2c..d19b5e2a98 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java @@ -33,6 +33,7 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.SocketException; import java.net.URI; import java.net.UnknownHostException; import java.nio.channels.AsynchronousCloseException; @@ -129,6 +130,17 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S doConsume(command); } catch (AsynchronousCloseException e) { + // DatagramChannel closed + try { + stop(); + } + catch (Exception e2) { + log.warn("Caught while closing: " + e2 + ". Now Closed", e2); + } + } + catch (SocketException e) { + // DatagramSocket closed + log.debug("Socket closed: " + e, e); try { stop(); } @@ -137,6 +149,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S } } catch (Exception e) { + System.out.println("Caught exception of type: " + e.getClass()); e.printStackTrace(); try { stop(); @@ -187,12 +200,12 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S return maxInactivityDuration; } - public DatagramChannel getChannel() { - return channel; + public int getDatagramSize() { + return datagramSize; } - public void setChannel(DatagramChannel channel) { - this.channel = channel; + public void setDatagramSize(int datagramSize) { + this.datagramSize = datagramSize; } /** @@ -222,7 +235,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S /** * Sets the implementation of the command channel to use. */ - public void setCommandChannel(CommandChannel commandChannel) { + public void setCommandChannel(CommandDatagramChannel commandChannel) { this.commandChannel = commandChannel; } @@ -290,19 +303,20 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S } protected void doStart() throws Exception { - SocketAddress localAddress = new InetSocketAddress(port); - channel = DatagramChannel.open(); - channel.configureBlocking(true); + commandChannel = createCommandChannel(); + commandChannel.start(); - // TODO - // connect to default target address to avoid security checks each time - // channel = channel.connect(targetAddress); + super.doStart(); + } + + protected CommandChannel createCommandChannel() throws IOException { + SocketAddress localAddress = createLocalAddress(); + channel = DatagramChannel.open(); + + channel = connect(channel, targetAddress); DatagramSocket socket = channel.socket(); - if (log.isDebugEnabled()) { - log.debug("Binding to address: " + localAddress); - } - socket.bind(localAddress); + bind(socket, localAddress); if (port == 0) { port = socket.getLocalPort(); } @@ -310,10 +324,28 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S if (bufferPool == null) { bufferPool = new DefaultBufferPool(); } - commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller()); - commandChannel.start(); + return new CommandDatagramChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller()); + } - super.doStart(); + protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException { + channel.configureBlocking(true); + + if (log.isDebugEnabled()) { + log.debug("Binding to address: " + localAddress); + } + socket.bind(localAddress); + } + + protected DatagramChannel connect(DatagramChannel channel, SocketAddress targetAddress2) throws IOException { + // TODO + // connect to default target address to avoid security checks each time + // channel = channel.connect(targetAddress); + + return channel; + } + + protected SocketAddress createLocalAddress() { + return new InetSocketAddress(port); } protected void doStop(ServiceStopper stopper) throws Exception { @@ -333,4 +365,12 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S protected String getProtocolUriScheme() { return "udp://"; } + + protected SocketAddress getTargetAddress() { + return targetAddress; + } + + public void setCommandChannel(CommandChannel commandChannel) { + this.commandChannel = commandChannel; + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java new file mode 100644 index 0000000000..a9b8595ced --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/multicast/MulticastTransportTest.java @@ -0,0 +1,55 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.multicast; + +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.transport.CommandJoiner; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.udp.UdpTransport; +import org.apache.activemq.transport.udp.UdpTransportTest; + +import java.net.URI; + +/** + * + * @version $Revision$ + */ +public class MulticastTransportTest extends UdpTransportTest { + + private String multicastURI = "multicast://224.1.2.3:6255"; + + + protected Transport createProducer() throws Exception { + System.out.println("Producer using URI: " + multicastURI); + + // we are not using the TransportFactory as this assumes that + // transports talk to a server using a WireFormat Negotiation step + // rather than talking directly to each other + + OpenWireFormat wireFormat = createWireFormat(); + MulticastTransport transport = new MulticastTransport(wireFormat, new URI(multicastURI)); + return new CommandJoiner(transport, wireFormat); + } + + protected Transport createConsumer() throws Exception { + OpenWireFormat wireFormat = createWireFormat(); + MulticastTransport transport = new MulticastTransport(wireFormat, new URI(multicastURI)); + return new CommandJoiner(transport, wireFormat); + } + + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java index eb4b54b51a..1248392dcc 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java @@ -155,9 +155,11 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen producer = createProducer(); producer.setTransportListener(new TransportListener() { public void onCommand(Command command) { + System.out.println("Producer received: " + command); } public void onException(IOException error) { + System.out.println("Producer exception: " + error); } public void transportInterupted() {