mirror of https://github.com/apache/activemq.git
minor refactorings to make it easier to derive from
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384755 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c5f96e1db5
commit
df26287181
|
@ -29,7 +29,6 @@ import java.io.ByteArrayOutputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.DatagramChannel;
|
import java.nio.channels.DatagramChannel;
|
||||||
|
@ -50,7 +49,7 @@ public class CommandChannel implements Service {
|
||||||
private int datagramSize = 4 * 1024;
|
private int datagramSize = 4 * 1024;
|
||||||
private DatagramReplayStrategy replayStrategy;
|
private DatagramReplayStrategy replayStrategy;
|
||||||
private SocketAddress targetAddress;
|
private SocketAddress targetAddress;
|
||||||
private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller();
|
private DatagramHeaderMarshaller headerMarshaller;
|
||||||
private final boolean checkSequenceNumbers;
|
private final boolean checkSequenceNumbers;
|
||||||
|
|
||||||
// reading
|
// reading
|
||||||
|
@ -67,7 +66,7 @@ public class CommandChannel implements Service {
|
||||||
private DatagramHeader header = new DatagramHeader();
|
private DatagramHeader header = new DatagramHeader();
|
||||||
|
|
||||||
public CommandChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
|
public CommandChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
|
||||||
DatagramReplayStrategy replayStrategy, SocketAddress targetAddress, boolean checkSequenceNumbers) {
|
DatagramReplayStrategy replayStrategy, SocketAddress targetAddress, boolean checkSequenceNumbers, DatagramHeaderMarshaller headerMarshaller) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
this.wireFormat = wireFormat;
|
this.wireFormat = wireFormat;
|
||||||
|
@ -76,6 +75,7 @@ public class CommandChannel implements Service {
|
||||||
this.replayStrategy = replayStrategy;
|
this.replayStrategy = replayStrategy;
|
||||||
this.targetAddress = targetAddress;
|
this.targetAddress = targetAddress;
|
||||||
this.checkSequenceNumbers = checkSequenceNumbers;
|
this.checkSequenceNumbers = checkSequenceNumbers;
|
||||||
|
this.headerMarshaller = headerMarshaller;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
|
|
|
@ -25,8 +25,12 @@ import java.nio.ByteBuffer;
|
||||||
*/
|
*/
|
||||||
public class DatagramHeaderMarshaller {
|
public class DatagramHeaderMarshaller {
|
||||||
|
|
||||||
|
public DatagramHeader createDatagramHeader() {
|
||||||
|
return new DatagramHeader();
|
||||||
|
}
|
||||||
|
|
||||||
public DatagramHeader readHeader(ByteBuffer readBuffer) {
|
public DatagramHeader readHeader(ByteBuffer readBuffer) {
|
||||||
DatagramHeader answer = new DatagramHeader();
|
DatagramHeader answer = createDatagramHeader();
|
||||||
answer.setCounter(readBuffer.getLong());
|
answer.setCounter(readBuffer.getLong());
|
||||||
answer.setDataSize(readBuffer.getInt());
|
answer.setDataSize(readBuffer.getInt());
|
||||||
byte flags = readBuffer.get();
|
byte flags = readBuffer.get();
|
||||||
|
@ -46,5 +50,4 @@ public class DatagramHeaderMarshaller {
|
||||||
public int getHeaderSize(DatagramHeader header) {
|
public int getHeaderSize(DatagramHeader header) {
|
||||||
return 8 + 4 + 1;
|
return 8 + 4 + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,7 +82,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
||||||
public UdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException {
|
public UdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException {
|
||||||
this(wireFormat);
|
this(wireFormat);
|
||||||
this.targetAddress = socketAddress;
|
this.targetAddress = socketAddress;
|
||||||
this.description = "UdpServerConnection@";
|
this.description = getProtocolName() + "ServerConnection@";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -92,7 +92,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
||||||
this(wireFormat);
|
this(wireFormat);
|
||||||
this.port = port;
|
this.port = port;
|
||||||
this.targetAddress = null;
|
this.targetAddress = null;
|
||||||
this.description = "UdpServer@";
|
this.description = getProtocolName() + "Server@";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -113,7 +113,6 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
||||||
commandChannel.write(command, address);
|
commandChannel.write(command, address);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void receivedHeader(DatagramHeader header) {
|
public void receivedHeader(DatagramHeader header) {
|
||||||
wireFormatHeader = header;
|
wireFormatHeader = header;
|
||||||
}
|
}
|
||||||
|
@ -126,7 +125,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
||||||
return description + port;
|
return description + port;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
return "udp://" + targetAddress + "@" + port;
|
return getProtocolUriScheme() + targetAddress + "@" + port;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,6 +168,22 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We have received the WireFormatInfo from the server on the actual channel
|
||||||
|
* we should use for all future communication with the server, so lets set
|
||||||
|
* the target to be the actual channel that the server has chosen for us to
|
||||||
|
* talk on.
|
||||||
|
*/
|
||||||
|
public void useLastInboundDatagramAsNewTarget() {
|
||||||
|
if (originalTargetAddress == null) {
|
||||||
|
originalTargetAddress = targetAddress;
|
||||||
|
}
|
||||||
|
SocketAddress lastAddress = commandChannel.getLastReadDatagramAddress();
|
||||||
|
if (lastAddress != null) {
|
||||||
|
targetAddress = lastAddress;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Properties
|
// Properties
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
public boolean isTrace() {
|
public boolean isTrace() {
|
||||||
|
@ -255,7 +270,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
||||||
public OpenWireFormat getWireFormat() {
|
public OpenWireFormat getWireFormat() {
|
||||||
return wireFormat;
|
return wireFormat;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isCheckSequenceNumbers() {
|
public boolean isCheckSequenceNumbers() {
|
||||||
return checkSequenceNumbers;
|
return checkSequenceNumbers;
|
||||||
}
|
}
|
||||||
|
@ -313,7 +328,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
||||||
if (bufferPool == null) {
|
if (bufferPool == null) {
|
||||||
bufferPool = new DefaultBufferPool();
|
bufferPool = new DefaultBufferPool();
|
||||||
}
|
}
|
||||||
commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress, isCheckSequenceNumbers());
|
commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress, isCheckSequenceNumbers(), createDatagramHeaderMarshaller());
|
||||||
commandChannel.start();
|
commandChannel.start();
|
||||||
|
|
||||||
// lets pass the header & address into the channel so it avoids a
|
// lets pass the header & address into the channel so it avoids a
|
||||||
|
@ -331,21 +346,15 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() {
|
||||||
* We have received the WireFormatInfo from the server on the actual channel
|
return new DatagramHeaderMarshaller();
|
||||||
* we should use for all future communication with the server, so lets set
|
|
||||||
* the target to be the actual channel that the server has chosen for us to
|
|
||||||
* talk on.
|
|
||||||
*/
|
|
||||||
public void useLastInboundDatagramAsNewTarget() {
|
|
||||||
if (originalTargetAddress == null) {
|
|
||||||
originalTargetAddress = targetAddress;
|
|
||||||
}
|
|
||||||
SocketAddress lastAddress = commandChannel.getLastReadDatagramAddress();
|
|
||||||
if (lastAddress != null) {
|
|
||||||
targetAddress = lastAddress;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected String getProtocolName() {
|
||||||
|
return "Udp";
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getProtocolUriScheme() {
|
||||||
|
return "udp://";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue