mark the Udp and Reliable as deprecated.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1440073 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-29 19:29:23 +00:00
parent 99c2b2e2d2
commit d977b0c9bb
3 changed files with 52 additions and 22 deletions

View File

@ -35,13 +35,14 @@ import org.slf4j.LoggerFactory;
* This interceptor deals with out of order commands together with being able to * This interceptor deals with out of order commands together with being able to
* handle dropped commands and the re-requesting dropped commands. * handle dropped commands and the re-requesting dropped commands.
* *
* * @deprecated
*/ */
@Deprecated
public class ReliableTransport extends ResponseCorrelator { public class ReliableTransport extends ResponseCorrelator {
private static final Logger LOG = LoggerFactory.getLogger(ReliableTransport.class); private static final Logger LOG = LoggerFactory.getLogger(ReliableTransport.class);
private ReplayStrategy replayStrategy; private ReplayStrategy replayStrategy;
private SortedSet<Command> commands = new TreeSet<Command>(new CommandIdComparator()); private final SortedSet<Command> commands = new TreeSet<Command>(new CommandIdComparator());
private int expectedCounter = 1; private int expectedCounter = 1;
private int replayBufferCommandCount = 50; private int replayBufferCommandCount = 50;
private int requestTimeout = 2000; private int requestTimeout = 2000;
@ -74,6 +75,7 @@ public class ReliableTransport extends ResponseCorrelator {
} }
} }
@Override
public Object request(Object o) throws IOException { public Object request(Object o) throws IOException {
final Command command = (Command)o; final Command command = (Command)o;
FutureResponse response = asyncRequest(command, null); FutureResponse response = asyncRequest(command, null);
@ -86,6 +88,7 @@ public class ReliableTransport extends ResponseCorrelator {
} }
} }
@Override
public Object request(Object o, int timeout) throws IOException { public Object request(Object o, int timeout) throws IOException {
final Command command = (Command)o; final Command command = (Command)o;
FutureResponse response = asyncRequest(command, null); FutureResponse response = asyncRequest(command, null);
@ -104,6 +107,7 @@ public class ReliableTransport extends ResponseCorrelator {
return response.getResult(0); return response.getResult(0);
} }
@Override
public void onCommand(Object o) { public void onCommand(Object o) {
Command command = (Command)o; Command command = (Command)o;
// lets pass wireformat through // lets pass wireformat through
@ -243,10 +247,12 @@ public class ReliableTransport extends ResponseCorrelator {
this.replayer = replayer; this.replayer = replayer;
} }
@Override
public String toString() { public String toString() {
return next.toString(); return next.toString();
} }
@Override
public void start() throws Exception { public void start() throws Exception {
if (udpTransport != null) { if (udpTransport != null) {
udpTransport.setReplayBuffer(getReplayBuffer()); udpTransport.setReplayBuffer(getReplayBuffer());

View File

@ -16,10 +16,25 @@
*/ */
package org.apache.activemq.transport.udp; package org.apache.activemq.transport.udp;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.TransportLoggerSupport; import org.apache.activemq.TransportLoggerSupport;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.*; import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.reliable.*; import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.reliable.DefaultReplayStrategy;
import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.reliable.ReliableTransport;
import org.apache.activemq.transport.reliable.ReplayStrategy;
import org.apache.activemq.transport.reliable.Replayer;
import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
@ -28,21 +43,17 @@ import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
/** /**
* @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
* *
* @deprecated
*/ */
@Deprecated
public class UdpTransportFactory extends TransportFactory { public class UdpTransportFactory extends TransportFactory {
private static final Logger log = LoggerFactory.getLogger(TcpTransportFactory.class); private static final Logger log = LoggerFactory.getLogger(TcpTransportFactory.class);
@Override
public TransportServer doBind(final URI location) throws IOException { public TransportServer doBind(final URI location) throws IOException {
try { try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
@ -64,10 +75,12 @@ public class UdpTransportFactory extends TransportFactory {
} }
} }
@Override
public Transport configure(Transport transport, WireFormat format, Map options) throws Exception { public Transport configure(Transport transport, WireFormat format, Map options) throws Exception {
return configure(transport, format, options, false); return configure(transport, format, options, false);
} }
@Override
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options); IntrospectionSupport.setProperties(transport, options);
final UdpTransport udpTransport = (UdpTransport)transport; final UdpTransport udpTransport = (UdpTransport)transport;
@ -92,6 +105,7 @@ public class UdpTransportFactory extends TransportFactory {
return transport; return transport;
} }
@Override
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
OpenWireFormat wireFormat = asOpenWireFormat(wf); OpenWireFormat wireFormat = asOpenWireFormat(wf);
return new UdpTransport(wireFormat, location); return new UdpTransport(wireFormat, location);

View File

@ -41,17 +41,17 @@ import org.slf4j.LoggerFactory;
/** /**
* A UDP based implementation of {@link TransportServer} * A UDP based implementation of {@link TransportServer}
* *
* * @deprecated
*/ */
@Deprecated
public class UdpTransportServer extends TransportServerSupport { public class UdpTransportServer extends TransportServerSupport {
private static final Logger LOG = LoggerFactory.getLogger(UdpTransportServer.class); private static final Logger LOG = LoggerFactory.getLogger(UdpTransportServer.class);
private UdpTransport serverTransport; private final UdpTransport serverTransport;
private ReplayStrategy replayStrategy; private final ReplayStrategy replayStrategy;
private Transport configuredTransport; private final Transport configuredTransport;
private boolean usingWireFormatNegotiation; private boolean usingWireFormatNegotiation;
private Map<DatagramEndpoint, Transport> transports = new HashMap<DatagramEndpoint, Transport>(); private final Map<DatagramEndpoint, Transport> transports = new HashMap<DatagramEndpoint, Transport>();
public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) { public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) {
super(connectURI); super(connectURI);
@ -60,6 +60,7 @@ public class UdpTransportServer extends TransportServerSupport {
this.replayStrategy = replayStrategy; this.replayStrategy = replayStrategy;
} }
@Override
public String toString() { public String toString() {
return "UdpTransportServer@" + serverTransport; return "UdpTransportServer@" + serverTransport;
} }
@ -71,31 +72,38 @@ public class UdpTransportServer extends TransportServerSupport {
return serverTransport; return serverTransport;
} }
@Override
public void setBrokerInfo(BrokerInfo brokerInfo) { public void setBrokerInfo(BrokerInfo brokerInfo) {
} }
@Override
protected void doStart() throws Exception { protected void doStart() throws Exception {
LOG.info("Starting " + this); LOG.info("Starting " + this);
configuredTransport.setTransportListener(new TransportListener() { configuredTransport.setTransportListener(new TransportListener() {
@Override
public void onCommand(Object o) { public void onCommand(Object o) {
final Command command = (Command)o; final Command command = (Command)o;
processInboundConnection(command); processInboundConnection(command);
} }
@Override
public void onException(IOException error) { public void onException(IOException error) {
LOG.error("Caught: " + error, error); LOG.error("Caught: " + error, error);
} }
@Override
public void transportInterupted() { public void transportInterupted() {
} }
@Override
public void transportResumed() { public void transportResumed() {
} }
}); });
configuredTransport.start(); configuredTransport.start();
} }
@Override
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
configuredTransport.stop(); configuredTransport.stop();
} }
@ -151,6 +159,7 @@ public class UdpTransportServer extends TransportServerSupport {
// Joiner must be on outside as the inbound messages must be processed // Joiner must be on outside as the inbound messages must be processed
// by the reliable transport first // by the reliable transport first
return new CommandJoiner(reliableTransport, connectionWireFormat) { return new CommandJoiner(reliableTransport, connectionWireFormat) {
@Override
public void start() throws Exception { public void start() throws Exception {
super.start(); super.start();
reliableTransport.onCommand(command); reliableTransport.onCommand(command);
@ -171,6 +180,7 @@ public class UdpTransportServer extends TransportServerSupport {
*/ */
} }
@Override
public InetSocketAddress getSocketAddress() { public InetSocketAddress getSocketAddress() {
return serverTransport.getLocalSocketAddress(); return serverTransport.getLocalSocketAddress();
} }