a little bit of spring cleaning to remove some of the cruft

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384837 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-10 16:12:47 +00:00
parent 555e7e296f
commit f51ac13f85
4 changed files with 20 additions and 59 deletions

View File

@ -16,15 +16,12 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import org.activeio.ByteSequence;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint; import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.LastPartialCommand; import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand; import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.TransportListener;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -57,7 +54,6 @@ public class CommandChannel implements Service {
// reading // reading
private Object readLock = new Object(); private Object readLock = new Object();
private ByteBuffer readBuffer; private ByteBuffer readBuffer;
private SocketAddress lastReadDatagramAddress;
// writing // writing
private Object writeLock = new Object(); private Object writeLock = new Object();
@ -80,10 +76,6 @@ public class CommandChannel implements Service {
} }
public void start() throws Exception { public void start() throws Exception {
// wireFormat.setPrefixPacketSize(false);
wireFormat.setCacheEnabled(false);
wireFormat.setTightEncodingEnabled(true);
bufferPool.setDefaultSize(datagramSize); bufferPool.setDefaultSize(datagramSize);
bufferPool.start(); bufferPool.start();
readBuffer = bufferPool.borrowBuffer(); readBuffer = bufferPool.borrowBuffer();
@ -96,22 +88,19 @@ public class CommandChannel implements Service {
public Command read() throws IOException { public Command read() throws IOException {
Command answer = null; Command answer = null;
lastReadDatagramAddress = null;
synchronized (readLock) { synchronized (readLock) {
readBuffer.clear(); readBuffer.clear();
lastReadDatagramAddress = channel.receive(readBuffer); SocketAddress address = channel.receive(readBuffer);
readBuffer.flip(); readBuffer.flip();
Endpoint from = headerMarshaller.createEndpoint(readBuffer, lastReadDatagramAddress); Endpoint from = headerMarshaller.createEndpoint(readBuffer, address);
int remaining = readBuffer.remaining(); int remaining = readBuffer.remaining();
byte[] data = new byte[remaining]; byte[] data = new byte[remaining];
readBuffer.get(data); readBuffer.get(data);
// TODO could use a DataInput implementation that talks direct to // TODO could use a DataInput implementation that talks direct to
// the // the ByteBuffer to avoid object allocation and unnecessary buffering?
// ByteBuffer
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data)); DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
answer = (Command) wireFormat.unmarshal(dataIn); answer = (Command) wireFormat.unmarshal(dataIn);
answer.setFrom(from); answer.setFrom(from);
@ -124,15 +113,6 @@ public class CommandChannel implements Service {
return answer; return answer;
} }
/**
* Called if a packet is received on a different channel from a remote
* client
*
* @throws IOException
*/
public void setWireFormatInfoEndpoint(DatagramEndpoint endpoint) throws IOException {
}
public void write(Command command) throws IOException { public void write(Command command) throws IOException {
write(command, targetAddress); write(command, targetAddress);
} }
@ -236,11 +216,6 @@ public class CommandChannel implements Service {
this.headerMarshaller = headerMarshaller; this.headerMarshaller = headerMarshaller;
} }
public SocketAddress getLastReadDatagramAddress() {
synchronized (readLock) {
return lastReadDatagramAddress;
}
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------

View File

@ -18,11 +18,12 @@ package org.apache.activemq.transport.udp;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportThreadSupport; import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.transport.replay.ReplayStrategy;
import org.apache.activemq.transport.replay.ExceptionIfDroppedReplayStrategy; import org.apache.activemq.transport.replay.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.replay.ReplayStrategy;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -60,7 +61,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
private int port; private int port;
private int minmumWireFormatVersion; private int minmumWireFormatVersion;
private String description = null; private String description = null;
private DatagramEndpoint wireFormatHeader;
protected UdpTransport(OpenWireFormat wireFormat) throws IOException { protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
@ -106,10 +106,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
commandChannel.write(command, address); commandChannel.write(command, address);
} }
public void receivedHeader(DatagramEndpoint endpoint) {
wireFormatHeader = endpoint;
}
/** /**
* @return pretty print of 'this' * @return pretty print of 'this'
*/ */
@ -132,10 +128,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
Command command = commandChannel.read(); Command command = commandChannel.read();
doConsume(command); doConsume(command);
} }
/*
* catch (SocketTimeoutException e) { } catch
* (InterruptedIOException e) { }
*/
catch (AsynchronousCloseException e) { catch (AsynchronousCloseException e) {
try { try {
stop(); stop();
@ -168,13 +160,16 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
* the target to be the actual channel that the server has chosen for us to * the target to be the actual channel that the server has chosen for us to
* talk on. * talk on.
*/ */
public void useLastInboundDatagramAsNewTarget() { public void setTargetEndpoint(Endpoint newTarget) {
if (originalTargetAddress == null) { if (newTarget instanceof DatagramEndpoint) {
originalTargetAddress = targetAddress; DatagramEndpoint endpoint = (DatagramEndpoint) newTarget;
} SocketAddress address = endpoint.getAddress();
SocketAddress lastAddress = commandChannel.getLastReadDatagramAddress(); if (address != null) {
if (lastAddress != null) { if (originalTargetAddress == null) {
targetAddress = lastAddress; originalTargetAddress = targetAddress;
}
targetAddress = address;
}
} }
} }
@ -318,12 +313,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller()); commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller());
commandChannel.start(); commandChannel.start();
// lets pass the header & address into the channel so it avoids a
// re-request
if (wireFormatHeader != null) {
commandChannel.setWireFormatInfoEndpoint(wireFormatHeader);
}
super.doStart(); super.doStart();
} }

View File

@ -120,9 +120,10 @@ public class UdpTransportFactory extends TransportFactory {
protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) { protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) {
transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) { transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) {
protected void onWireFormatNegotiated(WireFormatInfo info) { protected void onWireFormatNegotiated(WireFormatInfo info) {
// lets switch to the targetAddress that the last packet was // lets switch to the target endpoint
// received as so that all future requests go to the newly created UDP channel // based on the last packet that was received
udpTransport.useLastInboundDatagramAsNewTarget(); // so that all future requests go to the newly created UDP channel
udpTransport.setTargetEndpoint(info.getFrom());
} }
}; };
return transport; return transport;

View File

@ -18,7 +18,6 @@ package org.apache.activemq.transport.udp;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.CommandJoiner; import org.apache.activemq.transport.CommandJoiner;
@ -145,9 +144,6 @@ public class UdpTransportServer extends TransportServerSupport {
final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy(); final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
final UdpTransport transport = new UdpTransport(connectionWireFormat, address); final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
// TODO - is this still required?
transport.receivedHeader(endpoint);
Transport configuredTransport = new CommandJoiner(transport, connectionWireFormat); Transport configuredTransport = new CommandJoiner(transport, connectionWireFormat);
return new WireFormatNegotiator(configuredTransport, transport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) { return new WireFormatNegotiator(configuredTransport, transport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) {