diff --git a/activemq-core/project.xml b/activemq-core/project.xml index 401481e5e2..79510c07d3 100755 --- a/activemq-core/project.xml +++ b/activemq-core/project.xml @@ -360,10 +360,6 @@ **/MultipleTestsWithSpringFactoryBeanTest.* **/MultipleTestsWithXBeanFactoryBeanTest.* **/MultipleTestsWithSpringXBeanFactoryBeanTest.* - - - **/UdpTransportUsingServerTest.* - **/UdpSendReceiveWithTwoConnectionsTest.* diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java index ef07cf4852..e29063a869 100755 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java @@ -81,11 +81,13 @@ final public class OpenWireFormat implements WireFormat { public OpenWireFormat copy() { OpenWireFormat answer = new OpenWireFormat(); + answer.version = version; answer.stackTraceEnabled = stackTraceEnabled; answer.tcpNoDelayEnabled = tcpNoDelayEnabled; answer.cacheEnabled = cacheEnabled; answer.tightEncodingEnabled = tightEncodingEnabled; answer.sizePrefixDisabled = sizePrefixDisabled; + answer.preferedWireFormatInfo = preferedWireFormatInfo; return answer; } @@ -104,8 +106,8 @@ final public class OpenWireFormat implements WireFormat { static IdGenerator g = new IdGenerator(); String id = g.generateId(); public String toString() { - //return "OpenWireFormat{version="+version+", cacheEnabled="+cacheEnabled+", stackTraceEnabled="+stackTraceEnabled+", tightEncodingEnabled="+tightEncodingEnabled+", sizePrefixDisabled="+sizePrefixDisabled+"}"; - return "OpenWireFormat{id="+id+", tightEncodingEnabled="+tightEncodingEnabled+"}"; + return "OpenWireFormat{version="+version+", cacheEnabled="+cacheEnabled+", stackTraceEnabled="+stackTraceEnabled+", tightEncodingEnabled="+tightEncodingEnabled+", sizePrefixDisabled="+sizePrefixDisabled+"}"; + //return "OpenWireFormat{id="+id+", tightEncodingEnabled="+tightEncodingEnabled+"}"; } public int getVersion() { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java index a0a901ba48..a2ee6b28bf 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java @@ -94,7 +94,14 @@ public class WireFormatNegotiator extends TransportFilter { onException(new IOException("Remote wire format ("+info.getVersion()+") is lower the minimum version required ("+minimumVersion+")")); } + if (log.isDebugEnabled()) { + log.debug(this + " before negotiation: " + wireFormat); + } wireFormat.renegociatWireFormat(info); + + if (log.isDebugEnabled()) { + log.debug(this + " after negotiation: " + wireFormat); + } } catch (IOException e) { onException(e); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java index 8d124a3abb..47bb018634 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java @@ -43,6 +43,7 @@ public class CommandChannel implements Service { private static final Log log = LogFactory.getLog(CommandChannel.class); + private final String name; private DatagramChannel channel; private OpenWireFormat wireFormat; private ByteBufferPool bufferPool; @@ -50,11 +51,12 @@ public class CommandChannel implements Service { private DatagramReplayStrategy replayStrategy; private SocketAddress targetAddress; private DatagramHeaderMarshaller headerMarshaller = new DatagramHeaderMarshaller(); + private final boolean checkSequenceNumbers; // reading private Object readLock = new Object(); private ByteBuffer readBuffer; - private CommandReadBuffer readStack; + private DatagramReadBuffer readStack; private SocketAddress lastReadDatagramAddress; // writing @@ -64,14 +66,20 @@ public class CommandChannel implements Service { private int largeMessageBufferSize = 128 * 1024; private DatagramHeader header = new DatagramHeader(); - public CommandChannel(DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, - DatagramReplayStrategy replayStrategy, SocketAddress targetAddress) { + public CommandChannel(String name, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize, + DatagramReplayStrategy replayStrategy, SocketAddress targetAddress, boolean checkSequenceNumbers) { + this.name = name; this.channel = channel; this.wireFormat = wireFormat; this.bufferPool = bufferPool; this.datagramSize = datagramSize; this.replayStrategy = replayStrategy; this.targetAddress = targetAddress; + this.checkSequenceNumbers = checkSequenceNumbers; + } + + public String toString() { + return "CommandChannel#" + name; } public void start() throws Exception { @@ -79,7 +87,9 @@ public class CommandChannel implements Service { wireFormat.setCacheEnabled(false); wireFormat.setTightEncodingEnabled(true); - readStack = new CommandReadBuffer(wireFormat, replayStrategy); + if (checkSequenceNumbers) { + readStack = new CommandReadBuffer(name, wireFormat, replayStrategy); + } bufferPool.setDefaultSize(datagramSize); bufferPool.start(); readBuffer = bufferPool.borrowBuffer(); @@ -98,26 +108,21 @@ public class CommandChannel implements Service { readBuffer.clear(); lastReadDatagramAddress = channel.receive(readBuffer); readBuffer.flip(); - - if (log.isDebugEnabled()) { - log.debug("Read a datagram from: " + lastReadDatagramAddress); - } + header = headerMarshaller.readHeader(readBuffer); header.setFromAddress(lastReadDatagramAddress); if (log.isDebugEnabled()) { - log.debug("Received datagram from: " + lastReadDatagramAddress + " header: " + header); + log.debug("Received datagram on: " + name + " from: " + lastReadDatagramAddress + " header: " + header); } int remaining = readBuffer.remaining(); int size = header.getDataSize(); /* - if (size > remaining) { - throw new IOException("Invalid command size: " + size + " when there are only: " + remaining + " byte(s) remaining"); - } - else if (size < remaining) { - log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining); - } - */ + * if (size > remaining) { throw new IOException("Invalid command + * size: " + size + " when there are only: " + remaining + " byte(s) + * remaining"); } else if (size < remaining) { log.warn("Extra bytes + * in buffer. Expecting: " + size + " but has: " + remaining); } + */ if (size != remaining) { log.warn("Expecting: " + size + " but has: " + remaining); } @@ -133,23 +138,39 @@ public class CommandChannel implements Service { // TODO use a DataInput implementation that talks direct to the // ByteBuffer DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data)); - Command command = (Command) wireFormat.doUnmarshal(dataIn); + Command command = (Command) wireFormat.unmarshal(dataIn); + // Command command = (Command) wireFormat.doUnmarshal(dataIn); header.setCommand(command); } - answer = readStack.read(header); + if (readStack != null) { + answer = readStack.read(header); + } + else { + answer = header.getCommand(); + } } if (answer != null) { + if (log.isDebugEnabled()) { + log.debug("Channel: " + name + " about to process: " + answer); + } processor.process(answer, header); } } /** - * Called if a packet is received on a different channel from a remote client - * @throws IOException + * Called if a packet is received on a different channel from a remote + * client + * + * @throws IOException */ public Command onDatagramReceived(DatagramHeader header) throws IOException { - return readStack.read(header); + if (readStack != null) { + return readStack.read(header); + } + else { + return header.getCommand(); + } } public void write(Command command) throws IOException { @@ -159,10 +180,12 @@ public class CommandChannel implements Service { public void write(Command command, SocketAddress address) throws IOException { synchronized (writeLock) { header.incrementCounter(); - bs = new BooleanStream(); - // TODO - //bs.clear(); - int size = wireFormat.tightMarshal1(command, bs); + + ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize); + wireFormat.marshal(command, new DataOutputStream(largeBuffer)); + byte[] data = largeBuffer.toByteArray(); + int size = data.length; + if (size < datagramSize) { header.setPartial(false); header.setComplete(true); @@ -170,13 +193,6 @@ public class CommandChannel implements Service { writeBuffer.clear(); headerMarshaller.writeHeader(header, writeBuffer); - // TODO use a DataOutput implementation that talks direct to the - // ByteBuffer - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - DataOutputStream dataOut = new DataOutputStream(buffer); - wireFormat.tightMarshal2(command, dataOut, bs); - dataOut.close(); - byte[] data = buffer.toByteArray(); writeBuffer.put(data); sendWriteBuffer(address); @@ -186,10 +202,7 @@ public class CommandChannel implements Service { header.setComplete(false); // lets split the command up into chunks - ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(largeMessageBufferSize); - wireFormat.marshal(command, new DataOutputStream(largeBuffer)); - byte[] data = largeBuffer.toByteArray(); int offset = 0; boolean lastFragment = false; for (int fragment = 0, length = data.length; !lastFragment; fragment++) { @@ -248,15 +261,14 @@ public class CommandChannel implements Service { return lastReadDatagramAddress; } } - // Implementation methods // ------------------------------------------------------------------------- protected void sendWriteBuffer(SocketAddress address) throws IOException { writeBuffer.flip(); - + if (log.isDebugEnabled()) { - log.debug("Sending datagram to: " + address + " header: " + header); + log.debug("Channel: " + name + " sending datagram to: " + address + " header: " + header); } channel.send(writeBuffer, address); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java index ef41e74ca8..a532bfd5ad 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java @@ -35,7 +35,7 @@ import java.util.TreeSet; * * @version $Revision$ */ -public class CommandReadBuffer { +public class CommandReadBuffer implements DatagramReadBuffer { private static final Log log = LogFactory.getLog(CommandReadBuffer.class); private OpenWireFormat wireFormat; @@ -43,8 +43,10 @@ public class CommandReadBuffer { private SortedSet headers = new TreeSet(); private long expectedCounter = 1; private ByteArrayOutputStream out = new ByteArrayOutputStream(); + private final String name; - public CommandReadBuffer(OpenWireFormat wireFormat, DatagramReplayStrategy replayStrategy) { + public CommandReadBuffer(String name, OpenWireFormat wireFormat, DatagramReplayStrategy replayStrategy) { + this.name = name; this.wireFormat = wireFormat; this.replayStrategy = replayStrategy; } @@ -57,13 +59,16 @@ public class CommandReadBuffer { log.warn("Ignoring out of step packet: " + header); } else { - replayStrategy.onDroppedPackets(expectedCounter, actualCounter); + replayStrategy.onDroppedPackets(name, expectedCounter, actualCounter); // lets add it to the list for later on headers.add(header); } // lets see if the first item in the set is the next header + if (headers.isEmpty()) { + return null; + } header = (DatagramHeader) headers.first(); if (expectedCounter != header.getCounter()) { return null; @@ -71,7 +76,7 @@ public class CommandReadBuffer { } // we've got a valid header so increment counter - replayStrategy.onReceivedPacket(expectedCounter); + replayStrategy.onReceivedPacket(name, expectedCounter); expectedCounter++; Command answer = null; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java new file mode 100644 index 0000000000..da215b82a5 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramReadBuffer.java @@ -0,0 +1,33 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.udp; + +import org.apache.activemq.command.Command; + +import java.io.IOException; + +/** + * Represents an inbound buffer of datagrams for dealing with out of order + * or fragmented commands. + * + * @version $Revision$ + */ +public interface DatagramReadBuffer { + + Command read(DatagramHeader header) throws IOException; + +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java index aed6cbc38f..08166b4356 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java @@ -56,6 +56,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S private DatagramChannel channel; private boolean trace = false; private boolean useLocalHost = true; + private boolean checkSequenceNumbers = true; private int port; private int minmumWireFormatVersion; private String description = null; @@ -112,7 +113,8 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S commandChannel.write(command, address); } - public void doConsume(Command command, DatagramHeader header) throws IOException { + + public void receivedHeader(DatagramHeader header) { wireFormatHeader = header; } @@ -253,6 +255,14 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S public OpenWireFormat getWireFormat() { return wireFormat; } + + public boolean isCheckSequenceNumbers() { + return checkSequenceNumbers; + } + + public void setCheckSequenceNumbers(boolean checkSequenceNumbers) { + this.checkSequenceNumbers = checkSequenceNumbers; + } // Implementation methods // ------------------------------------------------------------------------- @@ -303,7 +313,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S if (bufferPool == null) { bufferPool = new DefaultBufferPool(); } - commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress); + commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool, datagramSize, replayStrategy, targetAddress, isCheckSequenceNumbers()); commandChannel.start(); // lets pass the header & address into the channel so it avoids a @@ -337,4 +347,5 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S } } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java index ffb3e53525..73b536c9b4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java @@ -85,7 +85,6 @@ public class UdpTransportFactory extends TransportFactory { protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { OpenWireFormat wireFormat = asOpenWireFormat(wf); - wireFormat.setSizePrefixDisabled(true); return new UdpTransport(wireFormat, location); } @@ -113,7 +112,7 @@ public class UdpTransportFactory extends TransportFactory { transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) { protected void onWireFormatNegotiated(WireFormatInfo info) { // lets switch to the targetAddress that the last packet was - // received as + // received as so that all future requests go to the newly created UDP channel udpTransport.useLastInboundDatagramAsNewTarget(); } }; @@ -122,8 +121,6 @@ public class UdpTransportFactory extends TransportFactory { protected OpenWireFormat asOpenWireFormat(WireFormat wf) { OpenWireFormat answer = (OpenWireFormat) wf; - answer.setSizePrefixDisabled(true); - answer.setCacheEnabled(false); return answer; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java index 5e19bd5d46..5809390ef1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java @@ -24,7 +24,6 @@ import org.apache.activemq.transport.InactivityMonitor; import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportListener; -import org.apache.activemq.transport.TransportLogger; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServerSupport; import org.apache.activemq.transport.WireFormatNegotiator; @@ -55,6 +54,10 @@ public class UdpTransportServer extends TransportServerSupport implements Comman super(connectURI); this.serverTransport = serverTransport; this.configuredTransport = configuredTransport; + + // lets disable the incremental checking of the sequence numbers + // as we are getting messages from many different clients + serverTransport.setCheckSequenceNumbers(false); } public String toString() { @@ -96,12 +99,16 @@ public class UdpTransportServer extends TransportServerSupport implements Comman public void process(Command command, DatagramHeader header) throws IOException { SocketAddress address = header.getFromAddress(); - System.out.println(toString() + " received command: " + command + " from address: " + address); + if (log.isDebugEnabled()) { + log.debug("Received command on: " + this + " from address: " + address + " command: " + command); + } Transport transport = null; synchronized (transports) { transport = (Transport) transports.get(address); if (transport == null) { - System.out.println("###Êcreating new server connector"); + if (log.isDebugEnabled()) { + log.debug("Creating a new UDP server connection"); + } transport = createTransport(command, header); transport = configureTransport(transport); transports.put(address, transport); @@ -114,23 +121,30 @@ public class UdpTransportServer extends TransportServerSupport implements Comman protected Transport configureTransport(Transport transport) { transport = new ResponseCorrelator(transport); - - // TODO - //transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration()); + if (serverTransport.getMaxInactivityDuration() > 0) { + transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration()); + } + getAcceptListener().onAccept(transport); return transport; } - protected Transport createTransport(Command command, DatagramHeader header) throws IOException { + protected Transport createTransport(final Command command, DatagramHeader header) throws IOException { final SocketAddress address = header.getFromAddress(); - // TODO lets copy the wireformat... - final UdpTransport transport = new UdpTransport(serverTransport.getWireFormat(), address); - - // lets send the packet into the transport so it can track packets - transport.doConsume(command, header); + final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy(); + final UdpTransport transport = new UdpTransport(connectionWireFormat, address); - return new WireFormatNegotiator(transport, serverTransport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) { + transport.receivedHeader(header); + + return new WireFormatNegotiator(transport, transport.getWireFormat(), serverTransport.getMinmumWireFormatVersion()) { + + public void start() throws Exception { + super.start(); + + // process the inbound wireformat + onCommand(command); + } // lets use the specific addressing of wire format protected void sendWireFormat(WireFormatInfo info) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java index 8220e5790d..2a408c9efb 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/DatagramReplayStrategy.java @@ -25,8 +25,9 @@ import java.io.IOException; */ public interface DatagramReplayStrategy { - void onDroppedPackets(long expectedCounter, long actualCounter) throws IOException; + void onDroppedPackets(String name, long expectedCounter, long actualCounter) throws IOException; - void onReceivedPacket(long expectedCounter); + void onReceivedPacket(String name, long expectedCounter); } + diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java b/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java index 67b2a4603c..e6c38bcc24 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/udp/replay/ExceptionIfDroppedPacketStrategy.java @@ -25,12 +25,12 @@ import java.io.IOException; */ public class ExceptionIfDroppedPacketStrategy implements DatagramReplayStrategy { - public void onDroppedPackets(long expectedCounter, long actualCounter) throws IOException { + public void onDroppedPackets(String name, long expectedCounter, long actualCounter) throws IOException { long count = actualCounter - expectedCounter; - throw new IOException("" + count + " packet(s) dropped. Expected: " + expectedCounter + " but was: " + actualCounter); + throw new IOException(name + count + " packet(s) dropped. Expected: " + expectedCounter + " but was: " + actualCounter); } - public void onReceivedPacket(long expectedCounter) { + public void onReceivedPacket(String name, long expectedCounter) { } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java index 7e8c2faf11..c89a908957 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java @@ -157,7 +157,7 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen protected Command assertCommandReceived() throws InterruptedException { Command answer = null; synchronized (lock) { - lock.wait(5000); + lock.wait(1000); answer = receivedCommand; } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java index 22513b0642..9fc9184b48 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.transport.udp; -import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportServer; @@ -35,7 +34,8 @@ public class UdpTransportUsingServerTest extends UdpTestSupport { protected Transport createProducer() throws Exception { System.out.println("Producer using URI: " + producerURI); - return TransportFactory.connect(new URI(producerURI)); + URI uri = new URI(producerURI); + return TransportFactory.connect(uri); } protected TransportServer createServer() throws Exception { @@ -45,12 +45,4 @@ public class UdpTransportUsingServerTest extends UdpTestSupport { protected Transport createConsumer() throws Exception { return null; } - - protected OpenWireFormat createWireFormat() { - OpenWireFormat answer = new OpenWireFormat(); - answer.setCacheEnabled(false); - answer.setSizePrefixDisabled(true); - return answer; - } - }