added a working UDP server with test cases

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384603 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-09 20:26:47 +00:00
parent 4f446eb025
commit 4446d55d30
13 changed files with 153 additions and 83 deletions

View File

@ -360,10 +360,6 @@
<exclude>**/MultipleTestsWithSpringFactoryBeanTest.*</exclude> <exclude>**/MultipleTestsWithSpringFactoryBeanTest.*</exclude>
<exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude> <exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude>
<exclude>**/MultipleTestsWithSpringXBeanFactoryBeanTest.*</exclude> <exclude>**/MultipleTestsWithSpringXBeanFactoryBeanTest.*</exclude>
<!-- TODO FIXME -->
<exclude>**/UdpTransportUsingServerTest.*</exclude>
<exclude>**/UdpSendReceiveWithTwoConnectionsTest.*</exclude>
</excludes> </excludes>
</unitTest> </unitTest>
<resources> <resources>

View File

@ -81,11 +81,13 @@ final public class OpenWireFormat implements WireFormat {
public OpenWireFormat copy() { public OpenWireFormat copy() {
OpenWireFormat answer = new OpenWireFormat(); OpenWireFormat answer = new OpenWireFormat();
answer.version = version;
answer.stackTraceEnabled = stackTraceEnabled; answer.stackTraceEnabled = stackTraceEnabled;
answer.tcpNoDelayEnabled = tcpNoDelayEnabled; answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
answer.cacheEnabled = cacheEnabled; answer.cacheEnabled = cacheEnabled;
answer.tightEncodingEnabled = tightEncodingEnabled; answer.tightEncodingEnabled = tightEncodingEnabled;
answer.sizePrefixDisabled = sizePrefixDisabled; answer.sizePrefixDisabled = sizePrefixDisabled;
answer.preferedWireFormatInfo = preferedWireFormatInfo;
return answer; return answer;
} }
@ -104,8 +106,8 @@ final public class OpenWireFormat implements WireFormat {
static IdGenerator g = new IdGenerator(); static IdGenerator g = new IdGenerator();
String id = g.generateId(); String id = g.generateId();
public String toString() { public String toString() {
//return "OpenWireFormat{version="+version+", cacheEnabled="+cacheEnabled+", stackTraceEnabled="+stackTraceEnabled+", tightEncodingEnabled="+tightEncodingEnabled+", sizePrefixDisabled="+sizePrefixDisabled+"}"; return "OpenWireFormat{version="+version+", cacheEnabled="+cacheEnabled+", stackTraceEnabled="+stackTraceEnabled+", tightEncodingEnabled="+tightEncodingEnabled+", sizePrefixDisabled="+sizePrefixDisabled+"}";
return "OpenWireFormat{id="+id+", tightEncodingEnabled="+tightEncodingEnabled+"}"; //return "OpenWireFormat{id="+id+", tightEncodingEnabled="+tightEncodingEnabled+"}";
} }
public int getVersion() { public int getVersion() {

View File

@ -94,8 +94,15 @@ public class WireFormatNegotiator extends TransportFilter {
onException(new IOException("Remote wire format ("+info.getVersion()+") is lower the minimum version required ("+minimumVersion+")")); onException(new IOException("Remote wire format ("+info.getVersion()+") is lower the minimum version required ("+minimumVersion+")"));
} }
if (log.isDebugEnabled()) {
log.debug(this + " before negotiation: " + wireFormat);
}
wireFormat.renegociatWireFormat(info); wireFormat.renegociatWireFormat(info);
if (log.isDebugEnabled()) {
log.debug(this + " after negotiation: " + wireFormat);
}
} catch (IOException e) { } catch (IOException e) {
onException(e); onException(e);
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -43,6 +43,7 @@ public class CommandChannel implements Service {
private static final Log log = LogFactory.getLog(CommandChannel.class); private static final Log log = LogFactory.getLog(CommandChannel.class);
private final String name;
private DatagramChannel channel; private DatagramChannel channel;
private OpenWireFormat wireFormat; private OpenWireFormat wireFormat;
private ByteBufferPool bufferPool; private ByteBufferPool bufferPool;
@ -50,11 +51,12 @@ public class CommandChannel implements Service {
private DatagramReplayStrategy replayStrategy; private DatagramReplayStrategy replayStrategy;
private SocketAddress targetAddress; private SocketAddress targetAddress;
private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller(); private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller();
private final boolean checkSequenceNumbers;
// reading // reading
private Object readLock = new Object(); private Object readLock = new Object();
private ByteBuffer readBuffer; private ByteBuffer readBuffer;
private CommandReadBuffer readStack; private DatagramReadBuffer readStack;
private SocketAddress lastReadDatagramAddress; private SocketAddress lastReadDatagramAddress;
// writing // writing
@ -64,14 +66,20 @@ public class CommandChannel implements Service {
private int largeMessageBufferSize = 128 * 1024; private int largeMessageBufferSize = 128 * 1024;
private DatagramHeader header = new DatagramHeader(); private DatagramHeader header = new DatagramHeader();
public CommandChannel(DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, public CommandChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
DatagramReplayStrategy replayStrategy, SocketAddress targetAddress) { DatagramReplayStrategy replayStrategy, SocketAddress targetAddress, boolean checkSequenceNumbers) {
this.name = name;
this.channel = channel; this.channel = channel;
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.datagramSize = datagramSize; this.datagramSize = datagramSize;
this.replayStrategy = replayStrategy; this.replayStrategy = replayStrategy;
this.targetAddress = targetAddress; this.targetAddress = targetAddress;
this.checkSequenceNumbers = checkSequenceNumbers;
}
public String toString() {
return "CommandChannel#" + name;
} }
public void start() throws Exception { public void start() throws Exception {
@ -79,7 +87,9 @@ public class CommandChannel implements Service {
wireFormat.setCacheEnabled(false); wireFormat.setCacheEnabled(false);
wireFormat.setTightEncodingEnabled(true); wireFormat.setTightEncodingEnabled(true);
readStack = new CommandReadBuffer(wireFormat, replayStrategy); if (checkSequenceNumbers) {
readStack = new CommandReadBuffer(name, wireFormat, replayStrategy);
}
bufferPool.setDefaultSize(datagramSize); bufferPool.setDefaultSize(datagramSize);
bufferPool.start(); bufferPool.start();
readBuffer = bufferPool.borrowBuffer(); readBuffer = bufferPool.borrowBuffer();
@ -99,25 +109,20 @@ public class CommandChannel implements Service {
lastReadDatagramAddress = channel.receive(readBuffer); lastReadDatagramAddress = channel.receive(readBuffer);
readBuffer.flip(); readBuffer.flip();
if (log.isDebugEnabled()) {
log.debug("Read a datagram from: " + lastReadDatagramAddress);
}
header = headerMarshaller.readHeader(readBuffer); header = headerMarshaller.readHeader(readBuffer);
header.setFromAddress(lastReadDatagramAddress); header.setFromAddress(lastReadDatagramAddress);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Received datagram from: " + lastReadDatagramAddress + " header: " + header); log.debug("Received datagram on: " + name + " from: " + lastReadDatagramAddress + " header: " + header);
} }
int remaining = readBuffer.remaining(); int remaining = readBuffer.remaining();
int size = header.getDataSize(); int size = header.getDataSize();
/* /*
if (size > remaining) { * if (size > remaining) { throw new IOException("Invalid command
throw new IOException("Invalid command size: " + size + " when there are only: " + remaining + " byte(s) remaining"); * size: " + size + " when there are only: " + remaining + " byte(s)
} * remaining"); } else if (size < remaining) { log.warn("Extra bytes
else if (size < remaining) { * in buffer. Expecting: " + size + " but has: " + remaining); }
log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining); */
}
*/
if (size != remaining) { if (size != remaining) {
log.warn("Expecting: " + size + " but has: " + remaining); log.warn("Expecting: " + size + " but has: " + remaining);
} }
@ -133,23 +138,39 @@ public class CommandChannel implements Service {
// TODO use a DataInput implementation that talks direct to the // TODO use a DataInput implementation that talks direct to the
// ByteBuffer // ByteBuffer
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data)); DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
Command command = (Command) wireFormat.doUnmarshal(dataIn); Command command = (Command) wireFormat.unmarshal(dataIn);
// Command command = (Command) wireFormat.doUnmarshal(dataIn);
header.setCommand(command); header.setCommand(command);
} }
answer = readStack.read(header); if (readStack != null) {
answer = readStack.read(header);
}
else {
answer = header.getCommand();
}
} }
if (answer != null) { if (answer != null) {
if (log.isDebugEnabled()) {
log.debug("Channel: " + name + " about to process: " + answer);
}
processor.process(answer, header); processor.process(answer, header);
} }
} }
/** /**
* Called if a packet is received on a different channel from a remote client * Called if a packet is received on a different channel from a remote
* client
*
* @throws IOException * @throws IOException
*/ */
public Command onDatagramReceived(DatagramHeader header) throws IOException { public Command onDatagramReceived(DatagramHeader header) throws IOException {
return readStack.read(header); if (readStack != null) {
return readStack.read(header);
}
else {
return header.getCommand();
}
} }
public void write(Command command) throws IOException { public void write(Command command) throws IOException {
@ -159,10 +180,12 @@ public class CommandChannel implements Service {
public void write(Command command, SocketAddress address) throws IOException { public void write(Command command, SocketAddress address) throws IOException {
synchronized (writeLock) { synchronized (writeLock) {
header.incrementCounter(); header.incrementCounter();
bs = new BooleanStream();
// TODO ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
//bs.clear(); wireFormat.marshal(command, new DataOutputStream(largeBuffer));
int size = wireFormat.tightMarshal1(command, bs); byte[] data = largeBuffer.toByteArray();
int size = data.length;
if (size < datagramSize) { if (size < datagramSize) {
header.setPartial(false); header.setPartial(false);
header.setComplete(true); header.setComplete(true);
@ -170,13 +193,6 @@ public class CommandChannel implements Service {
writeBuffer.clear(); writeBuffer.clear();
headerMarshaller.writeHeader(header, writeBuffer); headerMarshaller.writeHeader(header, writeBuffer);
// TODO use a DataOutput implementation that talks direct to the
// ByteBuffer
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(buffer);
wireFormat.tightMarshal2(command, dataOut, bs);
dataOut.close();
byte[] data = buffer.toByteArray();
writeBuffer.put(data); writeBuffer.put(data);
sendWriteBuffer(address); sendWriteBuffer(address);
@ -186,10 +202,7 @@ public class CommandChannel implements Service {
header.setComplete(false); header.setComplete(false);
// lets split the command up into chunks // lets split the command up into chunks
ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
byte[] data = largeBuffer.toByteArray();
int offset = 0; int offset = 0;
boolean lastFragment = false; boolean lastFragment = false;
for (int fragment = 0, length = data.length; !lastFragment; fragment++) { for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
@ -249,14 +262,13 @@ public class CommandChannel implements Service {
} }
} }
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected void sendWriteBuffer(SocketAddress address) throws IOException { protected void sendWriteBuffer(SocketAddress address) throws IOException {
writeBuffer.flip(); writeBuffer.flip();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Sending datagram to: " + address + " header: " + header); log.debug("Channel: " + name + " sending datagram to: " + address + " header: " + header);
} }
channel.send(writeBuffer, address); channel.send(writeBuffer, address);
} }

View File

@ -35,7 +35,7 @@ import java.util.TreeSet;
* *
* @version $Revision$ * @version $Revision$
*/ */
public class CommandReadBuffer { public class CommandReadBuffer implements DatagramReadBuffer {
private static final Log log = LogFactory.getLog(CommandReadBuffer.class); private static final Log log = LogFactory.getLog(CommandReadBuffer.class);
private OpenWireFormat wireFormat; private OpenWireFormat wireFormat;
@ -43,8 +43,10 @@ public class CommandReadBuffer {
private SortedSet headers = new TreeSet(); private SortedSet headers = new TreeSet();
private long expectedCounter = 1; private long expectedCounter = 1;
private ByteArrayOutputStream out = new ByteArrayOutputStream(); private ByteArrayOutputStream out = new ByteArrayOutputStream();
private final String name;
public CommandReadBuffer(OpenWireFormat wireFormat, DatagramReplayStrategy replayStrategy) { public CommandReadBuffer(String name, OpenWireFormat wireFormat, DatagramReplayStrategy replayStrategy) {
this.name = name;
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
this.replayStrategy = replayStrategy; this.replayStrategy = replayStrategy;
} }
@ -57,13 +59,16 @@ public class CommandReadBuffer {
log.warn("Ignoring out of step packet: " + header); log.warn("Ignoring out of step packet: " + header);
} }
else { else {
replayStrategy.onDroppedPackets(expectedCounter, actualCounter); replayStrategy.onDroppedPackets(name, expectedCounter, actualCounter);
// lets add it to the list for later on // lets add it to the list for later on
headers.add(header); headers.add(header);
} }
// lets see if the first item in the set is the next header // lets see if the first item in the set is the next header
if (headers.isEmpty()) {
return null;
}
header = (DatagramHeader) headers.first(); header = (DatagramHeader) headers.first();
if (expectedCounter != header.getCounter()) { if (expectedCounter != header.getCounter()) {
return null; return null;
@ -71,7 +76,7 @@ public class CommandReadBuffer {
} }
// we've got a valid header so increment counter // we've got a valid header so increment counter
replayStrategy.onReceivedPacket(expectedCounter); replayStrategy.onReceivedPacket(name, expectedCounter);
expectedCounter++; expectedCounter++;
Command answer = null; Command answer = null;

View File

@ -0,0 +1,33 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.udp;
import org.apache.activemq.command.Command;
import java.io.IOException;
/**
* Represents an inbound buffer of datagrams for dealing with out of order
* or fragmented commands.
*
* @version $Revision$
*/
public interface DatagramReadBuffer {
Command read(DatagramHeader header) throws IOException;
}

View File

@ -56,6 +56,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
private DatagramChannel channel; private DatagramChannel channel;
private boolean trace = false; private boolean trace = false;
private boolean useLocalHost = true; private boolean useLocalHost = true;
private boolean checkSequenceNumbers = true;
private int port; private int port;
private int minmumWireFormatVersion; private int minmumWireFormatVersion;
private String description = null; private String description = null;
@ -112,7 +113,8 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
commandChannel.write(command, address); commandChannel.write(command, address);
} }
public void doConsume(Command command, DatagramHeader header) throws IOException {
public void receivedHeader(DatagramHeader header) {
wireFormatHeader = header; wireFormatHeader = header;
} }
@ -254,6 +256,14 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
return wireFormat; return wireFormat;
} }
public boolean isCheckSequenceNumbers() {
return checkSequenceNumbers;
}
public void setCheckSequenceNumbers(boolean checkSequenceNumbers) {
this.checkSequenceNumbers = checkSequenceNumbers;
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected CommandProcessor getCommandProcessor() { protected CommandProcessor getCommandProcessor() {
@ -303,7 +313,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
if (bufferPool == null) { if (bufferPool == null) {
bufferPool = new DefaultBufferPool(); bufferPool = new DefaultBufferPool();
} }
commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress); commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress, isCheckSequenceNumbers());
commandChannel.start(); commandChannel.start();
// lets pass the header & address into the channel so it avoids a // lets pass the header & address into the channel so it avoids a
@ -337,4 +347,5 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
} }
} }
} }

View File

@ -85,7 +85,6 @@ public class UdpTransportFactory extends TransportFactory {
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
OpenWireFormat wireFormat = asOpenWireFormat(wf); OpenWireFormat wireFormat = asOpenWireFormat(wf);
wireFormat.setSizePrefixDisabled(true);
return new UdpTransport(wireFormat, location); return new UdpTransport(wireFormat, location);
} }
@ -113,7 +112,7 @@ public class UdpTransportFactory extends TransportFactory {
transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) { transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) {
protected void onWireFormatNegotiated(WireFormatInfo info) { protected void onWireFormatNegotiated(WireFormatInfo info) {
// lets switch to the targetAddress that the last packet was // lets switch to the targetAddress that the last packet was
// received as // received as so that all future requests go to the newly created UDP channel
udpTransport.useLastInboundDatagramAsNewTarget(); udpTransport.useLastInboundDatagramAsNewTarget();
} }
}; };
@ -122,8 +121,6 @@ public class UdpTransportFactory extends TransportFactory {
protected OpenWireFormat asOpenWireFormat(WireFormat wf) { protected OpenWireFormat asOpenWireFormat(WireFormat wf) {
OpenWireFormat answer = (OpenWireFormat) wf; OpenWireFormat answer = (OpenWireFormat) wf;
answer.setSizePrefixDisabled(true);
answer.setCacheEnabled(false);
return answer; return answer;
} }
} }

View File

@ -24,7 +24,6 @@ import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerSupport; import org.apache.activemq.transport.TransportServerSupport;
import org.apache.activemq.transport.WireFormatNegotiator; import org.apache.activemq.transport.WireFormatNegotiator;
@ -55,6 +54,10 @@ public class UdpTransportServer extends TransportServerSupport implements Comman
super(connectURI); super(connectURI);
this.serverTransport = serverTransport; this.serverTransport = serverTransport;
this.configuredTransport = configuredTransport; this.configuredTransport = configuredTransport;
// lets disable the incremental checking of the sequence numbers
// as we are getting messages from many different clients
serverTransport.setCheckSequenceNumbers(false);
} }
public String toString() { public String toString() {
@ -96,12 +99,16 @@ public class UdpTransportServer extends TransportServerSupport implements Comman
public void process(Command command, DatagramHeader header) throws IOException { public void process(Command command, DatagramHeader header) throws IOException {
SocketAddress address = header.getFromAddress(); SocketAddress address = header.getFromAddress();
System.out.println(toString() + " received command: " + command + " from address: " + address); if (log.isDebugEnabled()) {
log.debug("Received command on: " + this + " from address: " + address + " command: " + command);
}
Transport transport = null; Transport transport = null;
synchronized (transports) { synchronized (transports) {
transport = (Transport) transports.get(address); transport = (Transport) transports.get(address);
if (transport == null) { if (transport == null) {
System.out.println("###Êcreating new server connector"); if (log.isDebugEnabled()) {
log.debug("Creating a new UDP server connection");
}
transport = createTransport(command, header); transport = createTransport(command, header);
transport = configureTransport(transport); transport = configureTransport(transport);
transports.put(address, transport); transports.put(address, transport);
@ -115,22 +122,29 @@ public class UdpTransportServer extends TransportServerSupport implements Comman
protected Transport configureTransport(Transport transport) { protected Transport configureTransport(Transport transport) {
transport = new ResponseCorrelator(transport); transport = new ResponseCorrelator(transport);
// TODO if (serverTransport.getMaxInactivityDuration() > 0) {
//transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration()); transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
}
getAcceptListener().onAccept(transport); getAcceptListener().onAccept(transport);
return transport; return transport;
} }
protected Transport createTransport(Command command, DatagramHeader header) throws IOException { protected Transport createTransport(final Command command, DatagramHeader header) throws IOException {
final SocketAddress address = header.getFromAddress(); final SocketAddress address = header.getFromAddress();
// TODO lets copy the wireformat... final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
final UdpTransport transport = new UdpTransport(serverTransport.getWireFormat(), address); final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
// lets send the packet into the transport so it can track packets transport.receivedHeader(header);
transport.doConsume(command, header);
return new WireFormatNegotiator(transport, serverTransport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) { return new WireFormatNegotiator(transport, transport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) {
public void start() throws Exception {
super.start();
// process the inbound wireformat
onCommand(command);
}
// lets use the specific addressing of wire format // lets use the specific addressing of wire format
protected void sendWireFormat(WireFormatInfo info) throws IOException { protected void sendWireFormat(WireFormatInfo info) throws IOException {

View File

@ -25,8 +25,9 @@ import java.io.IOException;
*/ */
public interface DatagramReplayStrategy { public interface DatagramReplayStrategy {
void onDroppedPackets(long expectedCounter, long actualCounter) throws IOException; void onDroppedPackets(String name, long expectedCounter, long actualCounter) throws IOException;
void onReceivedPacket(long expectedCounter); void onReceivedPacket(String name, long expectedCounter);
} }

View File

@ -25,12 +25,12 @@ import java.io.IOException;
*/ */
public class ExceptionIfDroppedPacketStrategy implements DatagramReplayStrategy { public class ExceptionIfDroppedPacketStrategy implements DatagramReplayStrategy {
public void onDroppedPackets(long expectedCounter, long actualCounter) throws IOException { public void onDroppedPackets(String name, long expectedCounter, long actualCounter) throws IOException {
long count = actualCounter - expectedCounter; long count = actualCounter - expectedCounter;
throw new IOException("" + count + " packet(s) dropped. Expected: " + expectedCounter + " but was: " + actualCounter); throw new IOException(name + count + " packet(s) dropped. Expected: " + expectedCounter + " but was: " + actualCounter);
} }
public void onReceivedPacket(long expectedCounter) { public void onReceivedPacket(String name, long expectedCounter) {
} }
} }

View File

@ -157,7 +157,7 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
protected Command assertCommandReceived() throws InterruptedException { protected Command assertCommandReceived() throws InterruptedException {
Command answer = null; Command answer = null;
synchronized (lock) { synchronized (lock) {
lock.wait(5000); lock.wait(1000);
answer = receivedCommand; answer = receivedCommand;
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
@ -35,7 +34,8 @@ public class UdpTransportUsingServerTest extends UdpTestSupport {
protected Transport createProducer() throws Exception { protected Transport createProducer() throws Exception {
System.out.println("Producer using URI: " + producerURI); System.out.println("Producer using URI: " + producerURI);
return TransportFactory.connect(new URI(producerURI)); URI uri = new URI(producerURI);
return TransportFactory.connect(uri);
} }
protected TransportServer createServer() throws Exception { protected TransportServer createServer() throws Exception {
@ -45,12 +45,4 @@ public class UdpTransportUsingServerTest extends UdpTestSupport {
protected Transport createConsumer() throws Exception { protected Transport createConsumer() throws Exception {
return null; return null;
} }
protected OpenWireFormat createWireFormat() {
OpenWireFormat answer = new OpenWireFormat();
answer.setCacheEnabled(false);
answer.setSizePrefixDisabled(true);
return answer;
}
} }