refactor of the UDP transport so that it can work with multicast using a DatagramSocket/MulticastSocket directly in addition to using a DatagramChannel (which only seems to work with UDP)

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@385577 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-13 15:52:01 +00:00
parent 0def1d4d40
commit 5739c6cdca
11 changed files with 798 additions and 251 deletions

View File

@ -363,6 +363,7 @@
<!-- TODO FIX ME --> <!-- TODO FIX ME -->
<exclude>**/UdpSendReceiveWithTwoConnectionsTest.*</exclude> <exclude>**/UdpSendReceiveWithTwoConnectionsTest.*</exclude>
<exclude>**/MulticastTransportTest.*</exclude>
</excludes> </excludes>
</unitTest> </unitTest>
<resources> <resources>

View File

@ -16,12 +16,39 @@
*/ */
package org.apache.activemq.transport.multicast; package org.apache.activemq.transport.multicast;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.transport.udp.DatagramEndpoint;
import org.apache.activemq.transport.udp.DatagramHeaderMarshaller; import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
/** /**
* *
* @version $Revision$ * @version $Revision$
*/ */
public class MulticastDatagramHeaderMarshaller extends DatagramHeaderMarshaller { public class MulticastDatagramHeaderMarshaller extends DatagramHeaderMarshaller {
private final String localUri;
private final byte[] localUriAsBytes;
public MulticastDatagramHeaderMarshaller(String localUri) {
this.localUri = localUri;
this.localUriAsBytes = localUri.getBytes();
}
public Endpoint createEndpoint(ByteBuffer readBuffer, SocketAddress address) {
int size = readBuffer.getInt();
byte[] data = new byte[size];
readBuffer.get(data);
return new DatagramEndpoint(new String(data), address);
}
public void writeHeader(Command command, ByteBuffer writeBuffer) {
writeBuffer.putInt(localUriAsBytes.length);
writeBuffer.put(localUriAsBytes);
super.writeHeader(command, writeBuffer);
}
} }

View File

@ -17,12 +17,26 @@
package org.apache.activemq.transport.multicast; package org.apache.activemq.transport.multicast;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.udp.CommandChannel;
import org.apache.activemq.transport.udp.CommandDatagramChannel;
import org.apache.activemq.transport.udp.CommandDatagramSocket;
import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
import org.apache.activemq.transport.udp.DefaultBufferPool;
import org.apache.activemq.transport.udp.UdpTransport; import org.apache.activemq.transport.udp.UdpTransport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException; import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URI; import java.net.URI;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.channels.DatagramChannel;
/** /**
* A multicast based transport. * A multicast based transport.
@ -31,22 +45,21 @@ import java.net.UnknownHostException;
*/ */
public class MulticastTransport extends UdpTransport { public class MulticastTransport extends UdpTransport {
public MulticastTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException, IOException { private static final Log log = LogFactory.getLog(MulticastTransport.class);
super(wireFormat, port);
}
public MulticastTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException { private static final int DEFAULT_IDLE_TIME = 5000;
super(wireFormat, socketAddress);
} private MulticastSocket socket;
private InetAddress mcastAddress;
private int mcastPort;
private int timeToLive = 1;
private boolean loopBackMode = false;
private long keepAliveInterval = DEFAULT_IDLE_TIME;
public MulticastTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException { public MulticastTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException {
super(wireFormat, remoteLocation); super(wireFormat, remoteLocation);
} }
public MulticastTransport(OpenWireFormat wireFormat) throws IOException {
super(wireFormat);
}
protected String getProtocolName() { protected String getProtocolName() {
return "Multicast"; return "Multicast";
} }
@ -54,4 +67,43 @@ public class MulticastTransport extends UdpTransport {
protected String getProtocolUriScheme() { protected String getProtocolUriScheme() {
return "multicast://"; return "multicast://";
} }
protected void bind(DatagramSocket socket, SocketAddress localAddress) throws SocketException {
}
protected void doStop(ServiceStopper stopper) throws Exception {
super.doStop(stopper);
if (socket != null) {
try {
socket.leaveGroup(mcastAddress);
}
catch (IOException e) {
stopper.onException(this, e);
}
socket.close();
}
}
protected CommandChannel createCommandChannel() throws IOException {
socket = new MulticastSocket(mcastPort);
socket.setLoopbackMode(loopBackMode);
socket.setTimeToLive(timeToLive);
log.debug("Joining multicast address: " + mcastAddress);
socket.joinGroup(mcastAddress);
socket.setSoTimeout((int) keepAliveInterval);
return new CommandDatagramSocket(toString(), socket, getWireFormat(), getDatagramSize(), mcastAddress, mcastPort, createDatagramHeaderMarshaller());
}
protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {
this.mcastAddress = InetAddress.getByName(remoteLocation.getHost());
this.mcastPort = remoteLocation.getPort();
return new InetSocketAddress(mcastAddress, mcastPort);
}
protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() {
return new MulticastDatagramHeaderMarshaller("udp://dummyHostName:" + getPort());
}
} }

View File

@ -16,244 +16,33 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import org.activeio.ByteArrayInputStream;
import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
/** /**
* A strategy for reading datagrams and de-fragmenting them together. *
*
* @version $Revision$ * @version $Revision$
*/ */
public class CommandChannel implements Service { public interface CommandChannel extends Service {
private static final Log log = LogFactory.getLog(CommandChannel.class); public abstract Command read() throws IOException;
private final String name; public abstract void write(Command command) throws IOException;
private DatagramChannel channel;
private OpenWireFormat wireFormat;
private ByteBufferPool bufferPool;
private int datagramSize = 4 * 1024;
private SocketAddress targetAddress;
private DatagramHeaderMarshaller headerMarshaller;
// reading public abstract void write(Command command, SocketAddress address) throws IOException;
private Object readLock = new Object();
private ByteBuffer readBuffer;
// writing public abstract int getDatagramSize();
private Object writeLock = new Object();
private ByteBuffer writeBuffer;
private int defaultMarshalBufferSize = 64 * 1024;
public CommandChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) {
this.name = name;
this.channel = channel;
this.wireFormat = wireFormat;
this.bufferPool = bufferPool;
this.datagramSize = datagramSize;
this.targetAddress = targetAddress;
this.headerMarshaller = headerMarshaller;
}
public String toString() {
return "CommandChannel#" + name;
}
public void start() throws Exception {
bufferPool.setDefaultSize(datagramSize);
bufferPool.start();
readBuffer = bufferPool.borrowBuffer();
writeBuffer = bufferPool.borrowBuffer();
}
public void stop() throws Exception {
bufferPool.stop();
}
public Command read() throws IOException {
Command answer = null;
synchronized (readLock) {
readBuffer.clear();
SocketAddress address = channel.receive(readBuffer);
readBuffer.flip();
Endpoint from = headerMarshaller.createEndpoint(readBuffer, address);
int remaining = readBuffer.remaining();
byte[] data = new byte[remaining];
readBuffer.get(data);
// TODO could use a DataInput implementation that talks direct to
// the ByteBuffer to avoid object allocation and unnecessary
// buffering?
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
answer = (Command) wireFormat.unmarshal(dataIn);
answer.setFrom(from);
}
if (answer != null) {
if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " about to process: " + answer);
}
}
return answer;
}
public void write(Command command) throws IOException {
write(command, targetAddress);
}
public void write(Command command, SocketAddress address) throws IOException {
synchronized (writeLock) {
ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
byte[] data = largeBuffer.toByteArray();
int size = data.length;
if (size >= datagramSize) {
// lets split the command up into chunks
int offset = 0;
boolean lastFragment = false;
for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
// write the header
writeBuffer.clear();
headerMarshaller.writeHeader(command, writeBuffer);
int chunkSize = writeBuffer.remaining();
// we need to remove the amount of overhead to write the
// partial command
// lets write the flags in there
BooleanStream bs = null;
if (wireFormat.isTightEncodingEnabled()) {
bs = new BooleanStream();
bs.writeBoolean(true); // the partial data byte[] is
// never null
}
// lets remove the header of the partial command
// which is the byte for the type and an int for the size of
// the byte[]
chunkSize -= 1 // the data type
+ 4 // the command ID
+ 4; // the size of the partial data
// the boolean flags
if (bs != null) {
chunkSize -= bs.marshalledSize();
}
else {
chunkSize -= 1;
}
if (!wireFormat.isSizePrefixDisabled()) {
// lets write the size of the command buffer
writeBuffer.putInt(chunkSize);
chunkSize -= 4;
}
lastFragment = offset + chunkSize >= length;
if (chunkSize + offset > length) {
chunkSize = length - offset;
}
writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
if (bs != null) {
bs.marshal(writeBuffer);
}
writeBuffer.putInt(command.getCommandId());
if (bs == null) {
writeBuffer.put((byte) 1);
}
// size of byte array
writeBuffer.putInt(chunkSize);
// now the data
writeBuffer.put(data, offset, chunkSize);
offset += chunkSize;
sendWriteBuffer(address);
}
// now lets write the last partial command
command = new LastPartialCommand(command);
largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
data = largeBuffer.toByteArray();
}
writeBuffer.clear();
headerMarshaller.writeHeader(command, writeBuffer);
writeBuffer.put(data);
sendWriteBuffer(address);
}
}
// Properties
// -------------------------------------------------------------------------
public int getDatagramSize() {
return datagramSize;
}
/** /**
* Sets the default size of a datagram on the network. * Sets the default size of a datagram on the network.
*/ */
public void setDatagramSize(int datagramSize) { public abstract void setDatagramSize(int datagramSize);
this.datagramSize = datagramSize;
}
public ByteBufferPool getBufferPool() { public abstract DatagramHeaderMarshaller getHeaderMarshaller();
return bufferPool;
}
/** public abstract void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller);
* Sets the implementation of the byte buffer pool to use
*/
public void setBufferPool(ByteBufferPool bufferPool) {
this.bufferPool = bufferPool;
}
public DatagramHeaderMarshaller getHeaderMarshaller() { }
return headerMarshaller;
}
public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
this.headerMarshaller = headerMarshaller;
}
// Implementation methods
// -------------------------------------------------------------------------
protected void sendWriteBuffer(SocketAddress address) throws IOException {
writeBuffer.flip();
if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " sending datagram to: " + address);
}
channel.send(writeBuffer, address);
}
}

View File

@ -0,0 +1,303 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.udp;
import org.activeio.ByteArrayInputStream;
import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
/**
* A strategy for reading datagrams and de-fragmenting them together.
*
* @version $Revision$
*/
public class CommandDatagramChannel implements CommandChannel {
private static final Log log = LogFactory.getLog(CommandDatagramChannel.class);
private final String name;
private DatagramChannel channel;
private OpenWireFormat wireFormat;
private ByteBufferPool bufferPool;
private int datagramSize = 4 * 1024;
private SocketAddress targetAddress;
private DatagramHeaderMarshaller headerMarshaller;
// reading
private Object readLock = new Object();
private ByteBuffer readBuffer;
// writing
private Object writeLock = new Object();
private ByteBuffer writeBuffer;
private int defaultMarshalBufferSize = 64 * 1024;
public CommandDatagramChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) {
this.name = name;
this.channel = channel;
this.wireFormat = wireFormat;
this.bufferPool = bufferPool;
this.datagramSize = datagramSize;
this.targetAddress = targetAddress;
this.headerMarshaller = headerMarshaller;
}
public String toString() {
return "CommandChannel#" + name;
}
public void start() throws Exception {
bufferPool.setDefaultSize(datagramSize);
bufferPool.start();
readBuffer = bufferPool.borrowBuffer();
writeBuffer = bufferPool.borrowBuffer();
}
public void stop() throws Exception {
bufferPool.stop();
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.udp.CommandChannel#read()
*/
public Command read() throws IOException {
Command answer = null;
synchronized (readLock) {
while (true) {
readBuffer.clear();
SocketAddress address = channel.receive(readBuffer);
/*
if (address == null) {
System.out.println("No address on packet: " + readBuffer);
// continue;
}
*/
readBuffer.flip();
if (readBuffer.limit() == 0) {
//System.out.println("Empty packet!");
continue;
}
//log.debug("buffer: " + readBuffer + " has remaining: " + readBuffer.remaining());
Endpoint from = headerMarshaller.createEndpoint(readBuffer, address);
int remaining = readBuffer.remaining();
byte[] data = new byte[remaining];
readBuffer.get(data);
// TODO could use a DataInput implementation that talks direct
// to
// the ByteBuffer to avoid object allocation and unnecessary
// buffering?
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
answer = (Command) wireFormat.unmarshal(dataIn);
if (answer != null) {
answer.setFrom(from);
}
break;
}
}
if (answer != null) {
if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " about to process: " + answer);
}
}
return answer;
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.udp.CommandChannel#write(org.apache.activemq.command.Command)
*/
public void write(Command command) throws IOException {
write(command, targetAddress);
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.udp.CommandChannel#write(org.apache.activemq.command.Command, java.net.SocketAddress)
*/
public void write(Command command, SocketAddress address) throws IOException {
synchronized (writeLock) {
ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
byte[] data = largeBuffer.toByteArray();
int size = data.length;
writeBuffer.clear();
headerMarshaller.writeHeader(command, writeBuffer);
if (size >= writeBuffer.remaining()) {
// lets split the command up into chunks
int offset = 0;
boolean lastFragment = false;
for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
// write the header
if (fragment > 0) {
writeBuffer.clear();
headerMarshaller.writeHeader(command, writeBuffer);
}
int chunkSize = writeBuffer.remaining();
// we need to remove the amount of overhead to write the
// partial command
// lets write the flags in there
BooleanStream bs = null;
if (wireFormat.isTightEncodingEnabled()) {
bs = new BooleanStream();
bs.writeBoolean(true); // the partial data byte[] is
// never null
}
// lets remove the header of the partial command
// which is the byte for the type and an int for the size of
// the byte[]
chunkSize -= 1 // the data type
+ 4 // the command ID
+ 4; // the size of the partial data
// the boolean flags
if (bs != null) {
chunkSize -= bs.marshalledSize();
}
else {
chunkSize -= 1;
}
if (!wireFormat.isSizePrefixDisabled()) {
// lets write the size of the command buffer
writeBuffer.putInt(chunkSize);
chunkSize -= 4;
}
lastFragment = offset + chunkSize >= length;
if (chunkSize + offset > length) {
chunkSize = length - offset;
}
writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
if (bs != null) {
bs.marshal(writeBuffer);
}
writeBuffer.putInt(command.getCommandId());
if (bs == null) {
writeBuffer.put((byte) 1);
}
// size of byte array
writeBuffer.putInt(chunkSize);
// now the data
writeBuffer.put(data, offset, chunkSize);
offset += chunkSize;
sendWriteBuffer(address);
}
// now lets write the last partial command
command = new LastPartialCommand(command);
largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
data = largeBuffer.toByteArray();
writeBuffer.clear();
headerMarshaller.writeHeader(command, writeBuffer);
}
writeBuffer.put(data);
sendWriteBuffer(address);
}
}
// Properties
// -------------------------------------------------------------------------
/* (non-Javadoc)
* @see org.apache.activemq.transport.udp.CommandChannel#getDatagramSize()
*/
public int getDatagramSize() {
return datagramSize;
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.udp.CommandChannel#setDatagramSize(int)
*/
public void setDatagramSize(int datagramSize) {
this.datagramSize = datagramSize;
}
public ByteBufferPool getBufferPool() {
return bufferPool;
}
/**
* Sets the implementation of the byte buffer pool to use
*/
public void setBufferPool(ByteBufferPool bufferPool) {
this.bufferPool = bufferPool;
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.udp.CommandChannel#getHeaderMarshaller()
*/
public DatagramHeaderMarshaller getHeaderMarshaller() {
return headerMarshaller;
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.udp.CommandChannel#setHeaderMarshaller(org.apache.activemq.transport.udp.DatagramHeaderMarshaller)
*/
public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
this.headerMarshaller = headerMarshaller;
}
// Implementation methods
// -------------------------------------------------------------------------
protected void sendWriteBuffer(SocketAddress address) throws IOException {
writeBuffer.flip();
if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " sending datagram to: " + address);
}
channel.send(writeBuffer, address);
}
}

View File

@ -0,0 +1,263 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.udp;
import org.activeio.ByteArrayInputStream;
import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
/**
* A strategy for reading datagrams and de-fragmenting them together.
*
* @version $Revision$
*/
public class CommandDatagramSocket implements CommandChannel {
private static final Log log = LogFactory.getLog(CommandDatagramSocket.class);
private final String name;
private DatagramSocket channel;
private InetAddress targetAddress;
private int targetPort;
private OpenWireFormat wireFormat;
private int datagramSize = 4 * 1024;
private DatagramHeaderMarshaller headerMarshaller;
// reading
private Object readLock = new Object();
// writing
private Object writeLock = new Object();
public CommandDatagramSocket(String name, DatagramSocket channel, OpenWireFormat wireFormat, int datagramSize, InetAddress targetAddress, int targetPort,
DatagramHeaderMarshaller headerMarshaller) {
this.name = name;
this.channel = channel;
this.wireFormat = wireFormat;
this.datagramSize = datagramSize;
this.targetAddress = targetAddress;
this.targetPort = targetPort;
this.headerMarshaller = headerMarshaller;
}
public String toString() {
return "CommandChannel#" + name;
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
public Command read() throws IOException {
Command answer = null;
Endpoint from = null;
synchronized (readLock) {
while (true) {
DatagramPacket datagram = createDatagramPacket();
channel.receive(datagram);
// TODO could use a DataInput implementation that talks direct
// to the byte[] to avoid object allocation
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData()));
from = headerMarshaller.createEndpoint(datagram, dataIn);
answer = (Command) wireFormat.unmarshal(dataIn);
break;
}
}
if (answer != null) {
answer.setFrom(from);
if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " about to process: " + answer);
}
}
return answer;
}
public void write(Command command) throws IOException {
write(command, targetAddress, targetPort);
}
public void write(Command command, SocketAddress address) throws IOException {
if (address instanceof InetSocketAddress) {
InetSocketAddress ia = (InetSocketAddress) address;
write(command, ia.getAddress(), ia.getPort());
}
else {
write(command);
}
}
public void write(Command command, InetAddress address, int port) throws IOException {
synchronized (writeLock) {
ByteArrayOutputStream writeBuffer = createByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(writeBuffer);
headerMarshaller.writeHeader(command, dataOut);
int offset = writeBuffer.size();
wireFormat.marshal(command, dataOut);
if (remaining(writeBuffer) >= 0) {
sendWriteBuffer(address, port, writeBuffer);
}
else {
// lets split the command up into chunks
byte[] data = writeBuffer.toByteArray();
boolean lastFragment = false;
for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
writeBuffer.reset();
headerMarshaller.writeHeader(command, dataOut);
int chunkSize = remaining(writeBuffer);
// we need to remove the amount of overhead to write the
// partial command
// lets write the flags in there
BooleanStream bs = null;
if (wireFormat.isTightEncodingEnabled()) {
bs = new BooleanStream();
bs.writeBoolean(true); // the partial data byte[] is
// never null
}
// lets remove the header of the partial command
// which is the byte for the type and an int for the size of
// the byte[]
chunkSize -= 1 // the data type
+ 4 // the command ID
+ 4; // the size of the partial data
// the boolean flags
if (bs != null) {
chunkSize -= bs.marshalledSize();
}
else {
chunkSize -= 1;
}
if (!wireFormat.isSizePrefixDisabled()) {
// lets write the size of the command buffer
dataOut.writeInt(chunkSize);
chunkSize -= 4;
}
lastFragment = offset + chunkSize >= length;
if (chunkSize + offset > length) {
chunkSize = length - offset;
}
dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE);
if (bs != null) {
bs.marshal(dataOut);
}
dataOut.writeInt(command.getCommandId());
if (bs == null) {
dataOut.write((byte) 1);
}
// size of byte array
dataOut.writeInt(chunkSize);
// now the data
dataOut.write(data, offset, chunkSize);
offset += chunkSize;
sendWriteBuffer(address, port, writeBuffer);
}
// now lets write the last partial command
command = new LastPartialCommand(command);
writeBuffer.reset();
headerMarshaller.writeHeader(command, dataOut);
wireFormat.marshal(command, dataOut);
sendWriteBuffer(address, port, writeBuffer);
}
}
}
// Properties
// -------------------------------------------------------------------------
public int getDatagramSize() {
return datagramSize;
}
/**
* Sets the default size of a datagram on the network.
*/
public void setDatagramSize(int datagramSize) {
this.datagramSize = datagramSize;
}
public DatagramHeaderMarshaller getHeaderMarshaller() {
return headerMarshaller;
}
public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
this.headerMarshaller = headerMarshaller;
}
// Implementation methods
// -------------------------------------------------------------------------
protected void sendWriteBuffer(InetAddress address, int port, ByteArrayOutputStream writeBuffer) throws IOException {
if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " sending datagram to: " + address);
}
byte[] data = writeBuffer.toByteArray();
DatagramPacket packet = new DatagramPacket(data, 0, data.length, address, port);
channel.send(packet);
}
protected DatagramPacket createDatagramPacket() {
return new DatagramPacket(new byte[datagramSize], datagramSize);
}
protected int remaining(ByteArrayOutputStream buffer) {
return datagramSize - buffer.size();
}
protected ByteArrayOutputStream createByteArrayOutputStream() {
return new ByteArrayOutputStream(datagramSize);
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.udp;
import org.apache.activemq.command.BaseEndpoint; import org.apache.activemq.command.BaseEndpoint;
import java.net.InetAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
/** /**
@ -36,5 +37,5 @@ public class DatagramEndpoint extends BaseEndpoint {
public SocketAddress getAddress() { public SocketAddress getAddress() {
return address; return address;
} }
} }

View File

@ -17,9 +17,14 @@
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import org.activeio.ByteArrayOutputStream;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint; import org.apache.activemq.command.Endpoint;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -36,6 +41,11 @@ public class DatagramHeaderMarshaller {
return new DatagramEndpoint(address.toString(), address); return new DatagramEndpoint(address.toString(), address);
} }
public Endpoint createEndpoint(DatagramPacket datagram, DataInputStream dataIn) {
SocketAddress address = datagram.getSocketAddress();
return new DatagramEndpoint(address.toString(), address);
}
public void writeHeader(Command command, ByteBuffer writeBuffer) { public void writeHeader(Command command, ByteBuffer writeBuffer) {
/* /*
writeBuffer.putLong(command.getCounter()); writeBuffer.putLong(command.getCounter());
@ -45,4 +55,8 @@ public class DatagramHeaderMarshaller {
writeBuffer.put(flags); writeBuffer.put(flags);
*/ */
} }
public void writeHeader(Command command, DataOutputStream dataOut) {
}
} }

View File

@ -33,6 +33,7 @@ import java.net.DatagramSocket;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URI; import java.net.URI;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousCloseException;
@ -129,6 +130,17 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
doConsume(command); doConsume(command);
} }
catch (AsynchronousCloseException e) { catch (AsynchronousCloseException e) {
// DatagramChannel closed
try {
stop();
}
catch (Exception e2) {
log.warn("Caught while closing: " + e2 + ". Now Closed", e2);
}
}
catch (SocketException e) {
// DatagramSocket closed
log.debug("Socket closed: " + e, e);
try { try {
stop(); stop();
} }
@ -137,6 +149,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
} }
} }
catch (Exception e) { catch (Exception e) {
System.out.println("Caught exception of type: " + e.getClass());
e.printStackTrace(); e.printStackTrace();
try { try {
stop(); stop();
@ -187,12 +200,12 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
return maxInactivityDuration; return maxInactivityDuration;
} }
public DatagramChannel getChannel() { public int getDatagramSize() {
return channel; return datagramSize;
} }
public void setChannel(DatagramChannel channel) { public void setDatagramSize(int datagramSize) {
this.channel = channel; this.datagramSize = datagramSize;
} }
/** /**
@ -222,7 +235,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
/** /**
* Sets the implementation of the command channel to use. * Sets the implementation of the command channel to use.
*/ */
public void setCommandChannel(CommandChannel commandChannel) { public void setCommandChannel(CommandDatagramChannel commandChannel) {
this.commandChannel = commandChannel; this.commandChannel = commandChannel;
} }
@ -290,19 +303,20 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
} }
protected void doStart() throws Exception { protected void doStart() throws Exception {
SocketAddress localAddress = new InetSocketAddress(port); commandChannel = createCommandChannel();
channel = DatagramChannel.open(); commandChannel.start();
channel.configureBlocking(true);
// TODO super.doStart();
// connect to default target address to avoid security checks each time }
// channel = channel.connect(targetAddress);
protected CommandChannel createCommandChannel() throws IOException {
SocketAddress localAddress = createLocalAddress();
channel = DatagramChannel.open();
channel = connect(channel, targetAddress);
DatagramSocket socket = channel.socket(); DatagramSocket socket = channel.socket();
if (log.isDebugEnabled()) { bind(socket, localAddress);
log.debug("Binding to address: " + localAddress);
}
socket.bind(localAddress);
if (port == 0) { if (port == 0) {
port = socket.getLocalPort(); port = socket.getLocalPort();
} }
@ -310,10 +324,28 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
if (bufferPool == null) { if (bufferPool == null) {
bufferPool = new DefaultBufferPool(); bufferPool = new DefaultBufferPool();
} }
commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller()); return new CommandDatagramChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller());
commandChannel.start(); }
super.doStart(); protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException {
channel.configureBlocking(true);
if (log.isDebugEnabled()) {
log.debug("Binding to address: " + localAddress);
}
socket.bind(localAddress);
}
protected DatagramChannel connect(DatagramChannel channel, SocketAddress targetAddress2) throws IOException {
// TODO
// connect to default target address to avoid security checks each time
// channel = channel.connect(targetAddress);
return channel;
}
protected SocketAddress createLocalAddress() {
return new InetSocketAddress(port);
} }
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
@ -333,4 +365,12 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
protected String getProtocolUriScheme() { protected String getProtocolUriScheme() {
return "udp://"; return "udp://";
} }
protected SocketAddress getTargetAddress() {
return targetAddress;
}
public void setCommandChannel(CommandChannel commandChannel) {
this.commandChannel = commandChannel;
}
} }

View File

@ -0,0 +1,55 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.multicast;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.udp.UdpTransport;
import org.apache.activemq.transport.udp.UdpTransportTest;
import java.net.URI;
/**
*
* @version $Revision$
*/
public class MulticastTransportTest extends UdpTransportTest {
private String multicastURI = "multicast://224.1.2.3:6255";
protected Transport createProducer() throws Exception {
System.out.println("Producer using URI: " + multicastURI);
// we are not using the TransportFactory as this assumes that
// transports talk to a server using a WireFormat Negotiation step
// rather than talking directly to each other
OpenWireFormat wireFormat = createWireFormat();
MulticastTransport transport = new MulticastTransport(wireFormat, new URI(multicastURI));
return new CommandJoiner(transport, wireFormat);
}
protected Transport createConsumer() throws Exception {
OpenWireFormat wireFormat = createWireFormat();
MulticastTransport transport = new MulticastTransport(wireFormat, new URI(multicastURI));
return new CommandJoiner(transport, wireFormat);
}
}

View File

@ -155,9 +155,11 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
producer = createProducer(); producer = createProducer();
producer.setTransportListener(new TransportListener() { producer.setTransportListener(new TransportListener() {
public void onCommand(Command command) { public void onCommand(Command command) {
System.out.println("Producer received: " + command);
} }
public void onException(IOException error) { public void onException(IOException error) {
System.out.println("Producer exception: " + error);
} }
public void transportInterupted() { public void transportInterupted() {