From f392884e31d9325fd90643b6d8e01668a58b70de Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Mon, 8 Mar 2010 12:48:45 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-2632 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@920306 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQConnection.java | 1 + .../activemq/advisory/AdvisoryBroker.java | 7 +- .../apache/activemq/broker/BrokerService.java | 27 +- .../apache/activemq/broker/Connection.java | 10 +- .../org/apache/activemq/broker/Connector.java | 20 +- .../activemq/broker/TransportConnection.java | 33 +- .../activemq/broker/TransportConnector.java | 89 ++- .../activemq/broker/region/RegionBroker.java | 11 + .../apache/activemq/command/BaseCommand.java | 6 +- .../apache/activemq/command/BrokerInfo.java | 28 +- .../org/apache/activemq/command/Command.java | 2 + .../activemq/command/ConnectionControl.java | 52 ++ .../activemq/command/ConnectionInfo.java | 17 + .../activemq/command/PartialCommand.java | 7 +- .../command/RemoveSubscriptionInfo.java | 2 - .../activemq/command/SubscriptionInfo.java | 5 +- .../activemq/command/WireFormatInfo.java | 6 +- .../DemandForwardingBridgeSupport.java | 6 +- .../network/NetworkBridgeConfiguration.java | 16 +- .../v6/ConnectionControlMarshaller.java | 15 + .../openwire/v6/ConnectionInfoMarshaller.java | 5 + .../apache/activemq/proxy/ProxyConnector.java | 8 +- .../transport/CompositeTransport.java | 4 +- .../apache/activemq/transport/Transport.java | 18 +- .../activemq/transport/TransportFilter.java | 32 +- .../activemq/transport/TransportSupport.java | 42 +- .../discovery/DiscoveryTransport.java | 7 +- .../transport/failover/BackupTransport.java | 14 +- .../transport/failover/FailoverTransport.java | 513 +++++++++++------- .../failover/FailoverTransportFactory.java | 6 +- .../transport/fanout/FanoutTransport.java | 25 +- .../transport/mock/MockTransport.java | 16 +- .../activemq/transport/stomp/StompFrame.java | 4 + .../activemq/transport/tcp/TcpTransport.java | 5 +- .../activemq/transport/vm/VMTransport.java | 17 +- .../openwire/v6/ConnectionControlTest.java | 3 + .../openwire/v6/ConnectionInfoTest.java | 1 + .../activemq/perf/KahaDBDurableTopicTest.java | 10 +- .../apache/activemq/perf/KahaDBQueueTest.java | 5 +- .../org/apache/activemq/perf/RunBroker.java | 64 +++ .../failover/FailoverClusterTest.java | 149 +++++ 41 files changed, 1014 insertions(+), 294 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/perf/RunBroker.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 6b6ce52816..71a3d5daf4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -215,6 +215,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId())); this.info.setManageable(true); + this.info.setFaultTolerant(transport.isFaultTolerant()); this.connectionSessionId = new SessionId(info.getConnectionId(), -1); this.transport.setTransportListener(this); diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index eff550f8e5..6b2e5fb989 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -440,11 +440,10 @@ public class AdvisoryBroker extends BrokerFilter { String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET"; advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id); - String[] uris = getBrokerService().getTransportConnectorURIs(); String url = getBrokerService().getVmConnectorURI().toString(); - if (uris != null && uris.length > 0) { - url = uris[0]; - } + if (getBrokerService().getDefaultSocketURI() != null) { + url = getBrokerService().getDefaultSocketURI().toString(); + } advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url); //set the data structure diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 64f3da74bd..c2bfb30195 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -31,7 +31,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.management.MalformedObjectNameException; @@ -154,6 +153,7 @@ public class BrokerService implements Service { private boolean deleteAllMessagesOnStartup; private boolean advisorySupport = true; private URI vmConnectorURI; + private URI defaultSocketURI; private PolicyMap destinationPolicy; private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean stopped = new AtomicBoolean(false); @@ -1271,6 +1271,28 @@ public class BrokerService implements Service { public void setVmConnectorURI(URI vmConnectorURI) { this.vmConnectorURI = vmConnectorURI; } + + public URI getDefaultSocketURI() { + + if (started.get()) { + if (this.defaultSocketURI==null) { + for (TransportConnector tc:this.transportConnectors) { + URI result = null; + try { + result = tc.getConnectUri(); + } catch (Exception e) { + LOG.warn("Failed to get the ConnectURI for "+tc,e); + } + if (result != null) { + this.defaultSocketURI=result; + break; + } + } + } + return this.defaultSocketURI; + } + return null; + } /** * @return Returns the shutdownOnMasterFailure. @@ -2007,6 +2029,9 @@ public class BrokerService implements Service { connector.setLocalUri(uri); connector.setBrokerName(getBrokerName()); connector.setDurableDestinations(durableDestinations); + if (getDefaultSocketURI() != null) { + connector.setBrokerURL(getDefaultSocketURI().toString()); + } connector.start(); } for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java b/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java index 65237c9374..c402c09d52 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java @@ -17,10 +17,10 @@ package org.apache.activemq.broker; import java.io.IOException; - import org.apache.activemq.Service; import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.Response; /** @@ -51,6 +51,7 @@ public interface Connection extends Service { * Services a client command and submits it to the broker. * * @param command + * @return Response */ Response service(Command command); @@ -110,5 +111,12 @@ public interface Connection extends Service { * @return */ boolean isNetworkConnection(); + + /** + * @return true if a fault tolerant connection + */ + boolean isFaultTolerantConnection(); + + void updateClient(ConnectionControl control); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java b/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java index 15a8d883a6..cd18debde7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java @@ -28,8 +28,7 @@ import org.apache.activemq.command.BrokerInfo; public interface Connector extends Service { /** - * - * @return + * @return brokerInfo */ BrokerInfo getBrokerInfo(); @@ -37,4 +36,21 @@ public interface Connector extends Service { * @return the statistics for this connector */ ConnectorStatistics getStatistics(); + + /** + * @return true if update client connections when brokers leave/join a cluster + */ + public boolean isUpdateClusterClients(); + + /** + * @return true if clients should be re-balanced across the cluster + */ + public boolean isRebalanceClusterClients(); + + /** + * Update all the connections with information + * about the connected brokers in the cluster + */ + public void updateClientClusterInfo(); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 9a742bc167..f33077dfce 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -121,7 +121,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { // Used to do async dispatch.. this should perhaps be pushed down into the // transport layer.. private boolean inServiceException; - private ConnectionStatistics statistics = new ConnectionStatistics(); + private final ConnectionStatistics statistics = new ConnectionStatistics(); private boolean manageable; private boolean slow; private boolean markedCandidate; @@ -133,15 +133,15 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private boolean pendingStop; private long timeStamp; private final AtomicBoolean stopping = new AtomicBoolean(false); - private CountDownLatch stopped = new CountDownLatch(1); + private final CountDownLatch stopped = new CountDownLatch(1); private final AtomicBoolean asyncException = new AtomicBoolean(false); private final Map producerExchanges = new HashMap(); private final Map consumerExchanges = new HashMap(); - private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); + private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); private ConnectionContext context; private boolean networkConnection; private boolean faultTolerantConnection; - private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); + private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); private DemandForwardingBridge duplexBridge; private final TaskRunnerFactory taskRunnerFactory; private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); @@ -168,6 +168,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { this.taskRunnerFactory = taskRunnerFactory; this.transport = transport; this.transport.setTransportListener(new DefaultTransportListener() { + @Override public void onCommand(Object o) { serviceLock.readLock().lock(); try { @@ -184,6 +185,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } + @Override public void onException(IOException exception) { serviceLock.readLock().lock(); try { @@ -241,6 +243,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public void serviceExceptionAsync(final IOException e) { if (asyncException.compareAndSet(false, true)) { new Thread("Async Exception Handler") { + @Override public void run() { serviceException(e); } @@ -654,6 +657,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } registerConnectionState(info.getConnectionId(), state); LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress()); + this.faultTolerantConnection=info.isFaultTolerant(); // Setup the context. String clientId = info.getClientId(); context = new ConnectionContext(); @@ -672,6 +676,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { this.manageable = info.isManageable(); state.setContext(context); state.setConnection(this); + try { broker.addConnection(context, info); } catch (Exception e) { @@ -679,9 +684,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { LOG.warn("Failed to add Connection", e); throw e; } - if (info.isManageable() && broker.isFaultTolerantConfiguration()) { + if (info.isManageable()) { // send ConnectionCommand - ConnectionControl command = new ConnectionControl(); + ConnectionControl command = this.connector.getConnectionControl(); command.setFaultTolerant(broker.isFaultTolerantConfiguration()); dispatchAsync(command); } @@ -867,7 +872,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } transport.start(); active = true; - dispatchAsync(connector.getBrokerInfo()); + BrokerInfo info = connector.getBrokerInfo().copy(); + info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos()); + dispatchAsync(info); + connector.onStarted(this); } } catch (Exception e) { @@ -1120,6 +1128,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public synchronized boolean isNetworkConnection() { return networkConnection; } + + public boolean isFaultTolerantConnection() { + return this.faultTolerantConnection; + } protected synchronized void setStarting(boolean starting) { this.starting = starting; @@ -1222,6 +1234,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } return null; } + + public void updateClient(ConnectionControl control) { + if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null + && this.wireFormatInfo.getVersion() >= 6) { + dispatchAsync(control); + } + } private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) { ProducerBrokerExchange result = producerExchanges.get(id); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java index a0278cf8d3..d572ff01a6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -16,10 +16,17 @@ */ package org.apache.activemq.broker; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Iterator; +import java.util.concurrent.CopyOnWriteArrayList; +import javax.management.ObjectName; import org.apache.activemq.broker.jmx.ManagedTransportConnector; import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.broker.region.ConnectorStatistics; import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.TaskRunnerFactory; @@ -34,6 +41,7 @@ import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import static org.apache.activemq.thread.DefaultThreadPools.*; import java.io.IOException; @@ -53,7 +61,6 @@ public class TransportConnector implements Connector, BrokerServiceAware { protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); protected TransportStatusDetector statusDector; - private BrokerService brokerService; private TransportServer server; private URI uri; @@ -61,14 +68,16 @@ public class TransportConnector implements Connector, BrokerServiceAware { private TaskRunnerFactory taskRunnerFactory; private MessageAuthorizationPolicy messageAuthorizationPolicy; private DiscoveryAgent discoveryAgent; - private ConnectorStatistics statistics = new ConnectorStatistics(); + private final ConnectorStatistics statistics = new ConnectorStatistics(); private URI discoveryUri; private URI connectUri; private String name; private boolean disableAsyncDispatch; private boolean enableStatusMonitor = false; private Broker broker; - + private boolean updateClusterClients=false; + private boolean rebalanceClusterClients; + public TransportConnector() { } @@ -109,6 +118,8 @@ public class TransportConnector implements Connector, BrokerServiceAware { rc.setTaskRunnerFactory(getTaskRunnerFactory()); rc.setUri(getUri()); rc.setBrokerService(brokerService); + rc.setUpdateClusterClients(isUpdateClusterClients()); + rc.setRebalanceClusterClients(isRebalanceClusterClients()); return rc; } @@ -193,16 +204,13 @@ public class TransportConnector implements Connector, BrokerServiceAware { } public void start() throws Exception { - - TransportServer server = getServer(); - + TransportServer server = getServer(); broker = brokerService.getBroker(); brokerInfo.setBrokerName(broker.getBrokerName()); brokerInfo.setBrokerId(broker.getBrokerId()); brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration()); brokerInfo.setBrokerURL(server.getConnectURI().toString()); - server.setAcceptListener(new TransportAcceptListener() { public void onAccept(final Transport transport) { try { @@ -233,7 +241,6 @@ public class TransportConnector implements Connector, BrokerServiceAware { LOG.debug("Reason: " + error, error); } }); - server.setBrokerInfo(brokerInfo); server.start(); @@ -366,6 +373,7 @@ public class TransportConnector implements Connector, BrokerServiceAware { this.name = name; } + @Override public String toString() { String rc = getName(); if (rc == null) { @@ -373,6 +381,43 @@ public class TransportConnector implements Connector, BrokerServiceAware { } return rc; } + + protected ConnectionControl getConnectionControl() { + boolean rebalance = isRebalanceClusterClients(); + String connectedBrokers = ""; + String self = ""; + if (brokerService.getDefaultSocketURI() != null) { + self += brokerService.getDefaultSocketURI().toString(); + self += ","; + } + if (rebalance == false) { + connectedBrokers += self; + } + if (this.broker.getPeerBrokerInfos() != null) { + for (BrokerInfo info : this.broker.getPeerBrokerInfos()) { + connectedBrokers += info.getBrokerURL(); + connectedBrokers += ","; + } + } + if (rebalance) { + connectedBrokers += self; + } + + ConnectionControl control = new ConnectionControl(); + control.setConnectedBrokers(connectedBrokers); + control.setRebalanceConnection(rebalance); + return control; + + } + + public void updateClientClusterInfo() { + if (isRebalanceClusterClients() || isUpdateClusterClients()) { + ConnectionControl control = getConnectionControl(); + for (Connection c: this.connections) { + c.updateClient(control); + } + } + } public boolean isDisableAsyncDispatch() { return disableAsyncDispatch; @@ -410,4 +455,32 @@ public class TransportConnector implements Connector, BrokerServiceAware { public BrokerService getBrokerService() { return brokerService; } + + /** + * @return the updateClusterClients + */ + public boolean isUpdateClusterClients() { + return this.updateClusterClients; + } + + /** + * @param updateClusterClients the updateClusterClients to set + */ + public void setUpdateClusterClients(boolean updateClusterClients) { + this.updateClusterClients = updateClusterClients; + } + + /** + * @return the rebalanceClusterClients + */ + public boolean isRebalanceClusterClients() { + return this.rebalanceClusterClients; + } + + /** + * @param rebalanceClusterClients the rebalanceClusterClients to set + */ + public void setRebalanceClusterClients(boolean rebalanceClusterClients) { + this.rebalanceClusterClients = rebalanceClusterClients; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 5869d4fd5a..285431cb5b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -34,6 +35,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConsumerBrokerExchange; import org.apache.activemq.broker.EmptyBroker; import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; @@ -585,12 +587,14 @@ public class RegionBroker extends EmptyBroker { @Override public synchronized void addBroker(Connection connection, BrokerInfo info) { brokerInfos.add(info); + updateClients(); } @Override public synchronized void removeBroker(Connection connection, BrokerInfo info) { if (info != null) { brokerInfos.remove(info); + updateClients(); } } @@ -830,4 +834,11 @@ public class RegionBroker extends EmptyBroker { LOG.warn("unmatched destination: " + destination + ", in consumerControl: " + control); } } + + protected void updateClients() { + List connectors = this.brokerService.getTransportConnectors(); + for (TransportConnector connector : connectors) { + connector.updateClientClusterInfo(); + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java b/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java index df9d5aca0a..1778c0a17f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java @@ -17,7 +17,6 @@ package org.apache.activemq.command; import java.util.Map; - import org.apache.activemq.util.IntrospectionSupport; @@ -61,6 +60,7 @@ public abstract class BaseCommand implements Command { this.responseRequired = responseRequired; } + @Override public String toString() { return toString(null); } @@ -104,6 +104,10 @@ public abstract class BaseCommand implements Command { public boolean isShutdownInfo() { return false; } + + public boolean isConnectionControl() { + return false; + } /** * The endpoint within the transport where this message came from. diff --git a/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java index 60c3df41a1..91db7191cc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java @@ -16,13 +16,12 @@ */ package org.apache.activemq.command; -import org.apache.activemq.plugin.StatisticsBrokerPlugin; +import java.io.IOException; +import java.util.Properties; import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.util.MarshallingSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.IOException; -import java.util.Properties; /** * When a client connects to a broker, the broker send the client a BrokerInfo @@ -49,7 +48,30 @@ public class BrokerInfo extends BaseCommand { long connectionId; String brokerUploadUrl; String networkProperties; + + public BrokerInfo copy() { + BrokerInfo copy = new BrokerInfo(); + copy(copy); + return copy; + } + + private void copy(BrokerInfo copy) { + super.copy(copy); + copy.brokerId = this.brokerId; + copy.brokerURL = this.brokerURL; + copy.slaveBroker = this.slaveBroker; + copy.masterBroker = this.masterBroker; + copy.faultTolerantConfiguration = this.faultTolerantConfiguration; + copy.networkConnection = this.networkConnection; + copy.duplexConnection = this.duplexConnection; + copy.peerBrokerInfos = this.peerBrokerInfos; + copy.brokerName = this.brokerName; + copy.connectionId = this.connectionId; + copy.brokerUploadUrl = this.brokerUploadUrl; + copy.networkProperties = this.networkProperties; + } + @Override public boolean isBrokerInfo() { return true; } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Command.java b/activemq-core/src/main/java/org/apache/activemq/command/Command.java index 4ba75c8467..384ab436cc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/Command.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/Command.java @@ -52,6 +52,8 @@ public interface Command extends DataStructure { boolean isMessageDispatchNotification(); boolean isShutdownInfo(); + + boolean isConnectionControl(); Response visit(CommandVisitor visitor) throws Exception; diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java index 324330aa10..cdc83993b4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java @@ -31,6 +31,9 @@ public class ConnectionControl extends BaseCommand { protected boolean close; protected boolean exit; protected boolean faultTolerant; + protected String connectedBrokers=""; + protected String reconnectTo = ""; + protected boolean rebalanceConnection; public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; @@ -39,6 +42,10 @@ public class ConnectionControl extends BaseCommand { public Response visit(CommandVisitor visitor) throws Exception { return visitor.processConnectionControl(this); } + @Override + public boolean isConnectionControl() { + return true; + } /** * @openwire:property version=1 @@ -114,4 +121,49 @@ public class ConnectionControl extends BaseCommand { public void setSuspend(boolean suspend) { this.suspend = suspend; } + + /** + * @openwire:property version=6 cache=false + * @return connected brokers. + */ + public String getConnectedBrokers() { + return this.connectedBrokers; + } + + /** + * @param connectedBrokers the connectedBrokers to set + */ + public void setConnectedBrokers(String connectedBrokers) { + this.connectedBrokers = connectedBrokers; + } + + /** + * @openwire:property version=6 cache=false + * @return the reconnectTo + */ + public String getReconnectTo() { + return this.reconnectTo; + } + + /** + * @param reconnectTo the reconnectTo to set + */ + public void setReconnectTo(String reconnectTo) { + this.reconnectTo = reconnectTo; + } + + /** + * @return the rebalanceConnection + * @openwire:property version=6 cache=false + */ + public boolean isRebalanceConnection() { + return this.rebalanceConnection; + } + + /** + * @param rebalanceConnection the rebalanceConnection to set + */ + public void setRebalanceConnection(boolean rebalanceConnection) { + this.rebalanceConnection = rebalanceConnection; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java index 2064440b33..73313ef84a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java @@ -35,6 +35,7 @@ public class ConnectionInfo extends BaseCommand { protected boolean brokerMasterConnector; protected boolean manageable; protected boolean clientMaster = true; + protected boolean faultTolerant = false; protected transient Object transportContext; public ConnectionInfo() { @@ -65,6 +66,7 @@ public class ConnectionInfo extends BaseCommand { copy.manageable = manageable; copy.clientMaster = clientMaster; copy.transportContext = transportContext; + copy.faultTolerant= faultTolerant; } /** @@ -199,4 +201,19 @@ public class ConnectionInfo extends BaseCommand { this.clientMaster = clientMaster; } + /** + * @openwire:property version=6 cache=false + * @return the faultTolerant + */ + public boolean isFaultTolerant() { + return this.faultTolerant; + } + + /** + * @param faultTolerant the faultTolerant to set + */ + public void setFaultTolerant(boolean faultTolerant) { + this.faultTolerant = faultTolerant; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java b/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java index 60422a2f65..6f42da28f6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java @@ -117,6 +117,10 @@ public class PartialCommand implements Command { public boolean isShutdownInfo() { return false; } + + public boolean isConnectionControl() { + return false; + } public void setResponseRequired(boolean responseRequired) { } @@ -135,7 +139,6 @@ public class PartialCommand implements Command { size = data.length; } return "PartialCommand[id: " + commandId + " data: " + size + " byte(s)]"; - } - + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java index d236f29c3b..9aa09d59f7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java @@ -47,14 +47,12 @@ public class RemoveSubscriptionInfo extends BaseCommand { /** * @openwire:property version=1 - * @deprecated */ public String getSubcriptionName() { return subscriptionName; } /** - * @deprecated */ public void setSubcriptionName(String subscriptionName) { this.subscriptionName = subscriptionName; diff --git a/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java index 467eea1bd3..6c5753d63d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java @@ -76,7 +76,6 @@ public class SubscriptionInfo implements DataStructure { /** * @openwire:property version=1 - * @deprecated */ public String getSubcriptionName() { return subscriptionName; @@ -84,7 +83,6 @@ public class SubscriptionInfo implements DataStructure { /** * @param subscriptionName * - * @deprecated */ public void setSubcriptionName(String subscriptionName) { this.subscriptionName = subscriptionName; @@ -102,16 +100,19 @@ public class SubscriptionInfo implements DataStructure { return false; } + @Override public String toString() { return IntrospectionSupport.toString(this); } + @Override public int hashCode() { int h1 = clientId != null ? clientId.hashCode() : -1; int h2 = subscriptionName != null ? subscriptionName.hashCode() : -1; return h1 ^ h2; } + @Override public boolean equals(Object obj) { boolean result = false; if (obj instanceof SubscriptionInfo) { diff --git a/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java index f44bc0f5a7..91c5563dd2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; - import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.util.ByteArrayInputStream; import org.apache.activemq.util.ByteArrayOutputStream; @@ -290,6 +289,7 @@ public class WireFormatInfo implements Command, MarshallAware { return visitor.processWireFormat(this); } + @Override public String toString() { Map p = null; try { @@ -356,6 +356,10 @@ public class WireFormatInfo implements Command, MarshallAware { public boolean isShutdownInfo() { return false; } + + public boolean isConnectionControl() { + return false; + } public void setCachedMarshalledForm(WireFormat wireFormat, ByteSequence data) { } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 9e7a322887..7bf6ef6968 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -30,7 +30,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - import org.apache.activemq.Service; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerService; @@ -131,7 +130,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br private BrokerInfo localBrokerInfo; private BrokerInfo remoteBrokerInfo; - private AtomicBoolean started = new AtomicBoolean(); + private final AtomicBoolean started = new AtomicBoolean(); private TransportConnection duplexInitiatingConnection; private BrokerService brokerService = null; @@ -153,11 +152,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (started.compareAndSet(false, true)) { localBroker.setTransportListener(new DefaultTransportListener() { + @Override public void onCommand(Object o) { Command command = (Command) o; serviceLocalCommand(command); } + @Override public void onException(IOException error) { serviceLocalException(error); } @@ -318,6 +319,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (!isCreatedByDuplex()) { BrokerInfo brokerInfo = new BrokerInfo(); brokerInfo.setBrokerName(configuration.getBrokerName()); + brokerInfo.setBrokerURL(configuration.getBrokerURL()); brokerInfo.setNetworkConnection(true); brokerInfo.setDuplexConnection(configuration.isDuplex()); // set our properties diff --git a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index 21b164c942..68562ab994 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -17,7 +17,6 @@ package org.apache.activemq.network; import java.util.List; - import org.apache.activemq.command.ActiveMQDestination; /** @@ -36,6 +35,7 @@ public class NetworkBridgeConfiguration { private int prefetchSize = 1000; private int networkTTL = 1; private String brokerName = "localhost"; + private String brokerURL = ""; private String userName; private String password; private String destinationFilter = ">"; @@ -274,4 +274,18 @@ public class NetworkBridgeConfiguration { public void setSuppressDuplicateQueueSubscriptions(boolean val) { suppressDuplicateQueueSubscriptions = val; } + + /** + * @return the brokerURL + */ + public String getBrokerURL() { + return this.brokerURL; + } + + /** + * @param brokerURL the brokerURL to set + */ + public void setBrokerURL(String brokerURL) { + this.brokerURL = brokerURL; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java index 55d07f4cb3..d53428cab3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java @@ -71,6 +71,9 @@ public class ConnectionControlMarshaller extends BaseCommandMarshaller { info.setFaultTolerant(bs.readBoolean()); info.setResume(bs.readBoolean()); info.setSuspend(bs.readBoolean()); + info.setConnectedBrokers(tightUnmarshalString(dataIn, bs)); + info.setReconnectTo(tightUnmarshalString(dataIn, bs)); + info.setRebalanceConnection(bs.readBoolean()); } @@ -88,6 +91,9 @@ public class ConnectionControlMarshaller extends BaseCommandMarshaller { bs.writeBoolean(info.isFaultTolerant()); bs.writeBoolean(info.isResume()); bs.writeBoolean(info.isSuspend()); + rc += tightMarshalString1(info.getConnectedBrokers(), bs); + rc += tightMarshalString1(info.getReconnectTo(), bs); + bs.writeBoolean(info.isRebalanceConnection()); return rc + 0; } @@ -108,6 +114,9 @@ public class ConnectionControlMarshaller extends BaseCommandMarshaller { bs.readBoolean(); bs.readBoolean(); bs.readBoolean(); + tightMarshalString2(info.getConnectedBrokers(), dataOut, bs); + tightMarshalString2(info.getReconnectTo(), dataOut, bs); + bs.readBoolean(); } @@ -127,6 +136,9 @@ public class ConnectionControlMarshaller extends BaseCommandMarshaller { info.setFaultTolerant(dataIn.readBoolean()); info.setResume(dataIn.readBoolean()); info.setSuspend(dataIn.readBoolean()); + info.setConnectedBrokers(looseUnmarshalString(dataIn)); + info.setReconnectTo(looseUnmarshalString(dataIn)); + info.setRebalanceConnection(dataIn.readBoolean()); } @@ -144,6 +156,9 @@ public class ConnectionControlMarshaller extends BaseCommandMarshaller { dataOut.writeBoolean(info.isFaultTolerant()); dataOut.writeBoolean(info.isResume()); dataOut.writeBoolean(info.isSuspend()); + looseMarshalString(info.getConnectedBrokers(), dataOut); + looseMarshalString(info.getReconnectTo(), dataOut); + dataOut.writeBoolean(info.isRebalanceConnection()); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java index db3722c96c..df2727c771 100644 --- a/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java @@ -85,6 +85,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller { info.setBrokerMasterConnector(bs.readBoolean()); info.setManageable(bs.readBoolean()); info.setClientMaster(bs.readBoolean()); + info.setFaultTolerant(bs.readBoolean()); } @@ -105,6 +106,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller { bs.writeBoolean(info.isBrokerMasterConnector()); bs.writeBoolean(info.isManageable()); bs.writeBoolean(info.isClientMaster()); + bs.writeBoolean(info.isFaultTolerant()); return rc + 0; } @@ -128,6 +130,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller { bs.readBoolean(); bs.readBoolean(); bs.readBoolean(); + bs.readBoolean(); } @@ -161,6 +164,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller { info.setBrokerMasterConnector(dataIn.readBoolean()); info.setManageable(dataIn.readBoolean()); info.setClientMaster(dataIn.readBoolean()); + info.setFaultTolerant(dataIn.readBoolean()); } @@ -181,6 +185,7 @@ public class ConnectionInfoMarshaller extends BaseCommandMarshaller { dataOut.writeBoolean(info.isBrokerMasterConnector()); dataOut.writeBoolean(info.isManageable()); dataOut.writeBoolean(info.isClientMaster()); + dataOut.writeBoolean(info.isFaultTolerant()); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java b/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java index d0137e5cf7..3026e6b431 100644 --- a/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java @@ -21,7 +21,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Iterator; import java.util.concurrent.CopyOnWriteArrayList; - import org.apache.activemq.Service; import org.apache.activemq.transport.CompositeTransport; import org.apache.activemq.transport.Transport; @@ -46,7 +45,7 @@ public class ProxyConnector implements Service { private URI remote; private URI localUri; private String name; - private CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); + private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); public void start() throws Exception { @@ -131,13 +130,14 @@ public class ProxyConnector implements Service { private Transport createRemoteTransport() throws Exception { Transport transport = TransportFactory.compositeConnect(remote); - CompositeTransport ct = (CompositeTransport)transport.narrow(CompositeTransport.class); + CompositeTransport ct = transport.narrow(CompositeTransport.class); if (ct != null && localUri != null) { - ct.add(new URI[] {localUri}); + ct.add(false,new URI[] {localUri}); } // Add a transport filter so that can track the transport life cycle transport = new TransportFilter(transport) { + @Override public void stop() throws Exception { LOG.info("Stopping proxy."); super.stop(); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java index 06d4146204..d3a2fd858e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java @@ -19,6 +19,6 @@ package org.apache.activemq.transport; import java.net.URI; public interface CompositeTransport extends Transport { - void add(URI[] uris); - void remove(URI[] uris); + void add(boolean rebalance,URI[] uris); + void remove(boolean rebalance,URI[] uris); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java b/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java index 2c226f9090..479e044961 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java @@ -18,7 +18,6 @@ package org.apache.activemq.transport; import java.io.IOException; import java.net.URI; - import org.apache.activemq.Service; /** @@ -147,12 +146,29 @@ public interface Transport extends Service { */ boolean isConnected(); + /** + * @return true if reconnect is supported + */ + boolean isReconnectSupported(); + + /** + * @return true if updating uris is supported + */ + boolean isUpdateURIsSupported(); /** * reconnect to another location * @param uri * @throws IOException on failure of if not supported */ void reconnect(URI uri) throws IOException; + + /** + * Provide a list of available alternative locations + * @param rebalance + * @param uris + * @throws IOException + */ + void updateURIs(boolean rebalance,URI[] uris) throws IOException; /** * Returns a counter which gets incremented as data is read from the transport. diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java index 1284b2df8d..820c22f55e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java @@ -45,7 +45,8 @@ public class TransportFilter implements TransportListener, Transport { /** * @see org.apache.activemq.Service#start() - * @throws IOException if the next channel has not been set. + * @throws IOException + * if the next channel has not been set. */ public void start() throws Exception { if (next == null) { @@ -75,6 +76,7 @@ public class TransportFilter implements TransportListener, Transport { return next; } + @Override public String toString() { return next.toString(); } @@ -126,19 +128,31 @@ public class TransportFilter implements TransportListener, Transport { return next.isFaultTolerant(); } - public boolean isDisposed() { - return next.isDisposed(); - } - - public boolean isConnected() { + public boolean isDisposed() { + return next.isDisposed(); + } + + public boolean isConnected() { return next.isConnected(); } - public void reconnect(URI uri) throws IOException { - next.reconnect(uri); - } + public void reconnect(URI uri) throws IOException { + next.reconnect(uri); + } public int getReceiveCounter() { return next.getReceiveCounter(); } + + public boolean isReconnectSupported() { + return next.isReconnectSupported(); + } + + public boolean isUpdateURIsSupported() { + return next.isUpdateURIsSupported(); + } + + public void updateURIs(boolean rebalance,URI[] uris) throws IOException { + next.updateURIs(rebalance,uris); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java index 21d65e2238..b209bc6fcc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java @@ -18,7 +18,6 @@ package org.apache.activemq.transport; import java.io.IOException; import java.net.URI; - import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -96,8 +95,9 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo try { transportListener.onException(e); } catch (RuntimeException e2) { - // Handle any unexpected runtime exceptions by debug logging them. - LOG.debug("Unexpected runtime exception: "+e2, e2); + // Handle any unexpected runtime exceptions by debug logging + // them. + LOG.debug("Unexpected runtime exception: " + e2, e2); } } } @@ -111,18 +111,28 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo public boolean isFaultTolerant() { return false; } - - - public void reconnect(URI uri) throws IOException { - throw new IOException("Not supported"); - } - - public boolean isDisposed() { - return isStopped(); - } - - public boolean isConnected() { - return isStarted(); - } + + public void reconnect(URI uri) throws IOException { + throw new IOException("Not supported"); + } + + public boolean isReconnectSupported() { + return false; + } + + public boolean isUpdateURIsSupported() { + return false; + } + public void updateURIs(boolean reblance,URI[] uris) throws IOException { + throw new IOException("Not supported"); + } + + public boolean isDisposed() { + return isStopped(); + } + + public boolean isConnected() { + return isStarted(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java index 8516266454..b1e8478ccb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java @@ -20,7 +20,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.transport.CompositeTransport; import org.apache.activemq.transport.TransportFilter; @@ -50,6 +49,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList this.next = next; } + @Override public void start() throws Exception { if (discoveryAgent == null) { throw new IllegalStateException("discoveryAgent not configured"); @@ -61,6 +61,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList next.start(); } + @Override public void stop() throws Exception { ServiceStopper ss = new ServiceStopper(); ss.stop(discoveryAgent); @@ -75,7 +76,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList URI uri = new URI(url); serviceURIs.put(event.getServiceName(), uri); LOG.info("Adding new broker connection URL: " + uri); - next.add(new URI[] {URISupport.applyParameters(uri, parameters)}); + next.add(false,new URI[] {URISupport.applyParameters(uri, parameters)}); } catch (URISyntaxException e) { LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); } @@ -85,7 +86,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList public void onServiceRemove(DiscoveryEvent event) { URI uri = serviceURIs.get(event.getServiceName()); if (uri != null) { - next.remove(new URI[] {uri}); + next.remove(false,new URI[] {uri}); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java index e4b7656c5d..fea5655a6e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java @@ -20,12 +20,11 @@ package org.apache.activemq.transport.failover; import java.io.IOException; import java.net.URI; - import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.Transport; class BackupTransport extends DefaultTransportListener{ - private FailoverTransport failoverTransport; + private final FailoverTransport failoverTransport; private Transport transport; private URI uri; private boolean disposed; @@ -33,10 +32,11 @@ class BackupTransport extends DefaultTransportListener{ BackupTransport(FailoverTransport ft){ this.failoverTransport=ft; } - public void onException(IOException error) { + @Override + public void onException(IOException error) { this.disposed=true; if (failoverTransport!=null) { - this.failoverTransport.reconnect(); + this.failoverTransport.reconnect(false); } } @@ -62,11 +62,13 @@ class BackupTransport extends DefaultTransportListener{ this.disposed = disposed; } - public int hashCode() { + @Override + public int hashCode() { return uri != null ? uri.hashCode():-1; } - public boolean equals(Object obj) { + @Override + public boolean equals(Object obj) { if (obj instanceof BackupTransport) { BackupTransport other = (BackupTransport) obj; return uri== null && other.uri==null || diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 6d62c66bc3..e4af1b3c13 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -19,15 +19,18 @@ package org.apache.activemq.transport.failover; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.InetAddress; import java.net.URI; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReference; -import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionId; @@ -50,6 +53,7 @@ import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + /** * A Transport that is made reliable by being able to fail over to another * transport when a transport failure is detected. @@ -64,6 +68,7 @@ public class FailoverTransport implements CompositeTransport { private boolean disposed; private boolean connected; private final CopyOnWriteArrayList uris = new CopyOnWriteArrayList(); + private final CopyOnWriteArrayList updated = new CopyOnWriteArrayList(); private final Object reconnectMutex = new Object(); private final Object backupMutex = new Object(); @@ -77,28 +82,28 @@ public class FailoverTransport implements CompositeTransport { private final AtomicReference connectedTransport = new AtomicReference(); private final TaskRunner reconnectTask; private boolean started; - + private boolean initialized; private long initialReconnectDelay = 10; private long maxReconnectDelay = 1000 * 30; private double backOffMultiplier = 2d; private long timeout = -1; private boolean useExponentialBackOff = true; private boolean randomize = true; - private boolean initialized; private int maxReconnectAttempts; private int startupMaxReconnectAttempts; private int connectFailures; private long reconnectDelay = this.initialReconnectDelay; private Exception connectionFailure; private boolean firstConnection = true; - //optionally always have a backup created - private boolean backup=false; - private List backups=new CopyOnWriteArrayList(); - private int backupPoolSize=1; + // optionally always have a backup created + private boolean backup = false; + private final List backups = new CopyOnWriteArrayList(); + private int backupPoolSize = 1; private boolean trackMessages = false; private boolean trackTransactionProducers = true; private int maxCacheSize = 128 * 1024; - private TransportListener disposedListener = new DefaultTransportListener() {}; + private final TransportListener disposedListener = new DefaultTransportListener() { + }; private boolean connectionInterruptProcessingComplete; private final TransportListener myTransportListener = createTransportListener(); @@ -109,27 +114,27 @@ public class FailoverTransport implements CompositeTransport { // Setup a task that is used to reconnect the a connection async. reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() { public boolean iterate() { - boolean result=false; - boolean buildBackup=true; - boolean doReconnect = !disposed; - synchronized(backupMutex) { - if (connectedTransport.get()==null && !disposed) { - result=doReconnect(); - buildBackup=false; - } - } - if(buildBackup) { - buildBackups(); - }else { - //build backups on the next iteration - result=true; - try { + boolean result = false; + boolean buildBackup = true; + boolean doReconnect = !disposed; + synchronized (backupMutex) { + if (connectedTransport.get() == null && !disposed) { + result = doReconnect(); + buildBackup = false; + } + } + if (buildBackup) { + buildBackups(); + } else { + // build backups on the next iteration + result = true; + try { reconnectTask.wakeup(); } catch (InterruptedException e) { LOG.debug("Reconnect task has been interrupted.", e); } - } - return result; + } + return result; } }, "ActiveMQ Failover Worker: " + System.identityHashCode(this)); @@ -138,32 +143,25 @@ public class FailoverTransport implements CompositeTransport { TransportListener createTransportListener() { return new TransportListener() { public void onCommand(Object o) { - Command command = (Command)o; + Command command = (Command) o; if (command == null) { return; } if (command.isResponse()) { Object object = null; - synchronized(requestMap) { - object = requestMap.remove(Integer.valueOf(((Response)command).getCorrelationId())); + synchronized (requestMap) { + object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId())); } if (object != null && object.getClass() == Tracked.class) { - ((Tracked)object).onResponses(); + ((Tracked) object).onResponses(); } } - if (!initialized) { - if (command.isBrokerInfo()) { - BrokerInfo info = (BrokerInfo)command; - BrokerInfo[] peers = info.getPeerBrokerInfos(); - if (peers != null) { - for (int i = 0; i < peers.length; i++) { - String brokerString = peers[i].getBrokerURL(); - add(brokerString); - } - } - initialized = true; - } - + if (!initialized) { + initialized = true; + } + + if(command.isConnectionControl()) { + handleConnectionControl((ConnectionControl) command); } if (transportListener != null) { transportListener.onCommand(command); @@ -193,7 +191,6 @@ public class FailoverTransport implements CompositeTransport { }; } - public final void handleTransportFailure(IOException e) throws InterruptedException { if (LOG.isTraceEnabled()) { LOG.trace(this + " handleTransportFailure: " + e); @@ -202,39 +199,83 @@ public class FailoverTransport implements CompositeTransport { if (transport == null) { // sync with possible in progress reconnect synchronized (reconnectMutex) { - transport = connectedTransport.getAndSet(null); + transport = connectedTransport.getAndSet(null); } } if (transport != null) { - + transport.setTransportListener(disposedListener); ServiceSupport.dispose(transport); - + boolean reconnectOk = false; synchronized (reconnectMutex) { - if(started) { - LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed to " + connectedTransportURI+ " , attempting to automatically reconnect due to: " + e); + if (started) { + LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed to " + connectedTransportURI + + " , attempting to automatically reconnect due to: " + e); LOG.debug("Transport failed with the following exception:", e); reconnectOk = true; - } + } initialized = false; - failedConnectTransportURI=connectedTransportURI; + failedConnectTransportURI = connectedTransportURI; connectedTransportURI = null; - connected=false; + connected = false; stateTracker.transportInterrupted(); - // notify before any reconnect attempt so ack state can be whacked + // notify before any reconnect attempt so ack state can be + // whacked if (transportListener != null) { transportListener.transportInterupted(); } - + if (reconnectOk) { reconnectTask.wakeup(); } } } + } + public final void handleConnectionControl(ConnectionControl control) { + String reconnectStr = control.getReconnectTo(); + if (reconnectStr != null) { + reconnectStr = reconnectStr.trim(); + if (reconnectStr.length() > 0) { + try { + URI uri = new URI(reconnectStr); + if (isReconnectSupported()) { + reconnect(uri); + LOG.info("Reconnected to: " + uri); + } + } catch (Exception e) { + LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e); + } + } + } + String connectedStr = control.getConnectedBrokers(); + if (connectedStr != null) { + connectedStr = connectedStr.trim(); + if (connectedStr.length() > 0 && isUpdateURIsSupported()) { + List list = new ArrayList(); + StringTokenizer tokenizer = new StringTokenizer(connectedStr, ","); + while (tokenizer.hasMoreTokens()) { + String str = tokenizer.nextToken(); + try { + URI uri = new URI(str); + list.add(uri); + } catch (Exception e) { + LOG.error("Failed to parse broker address: " + str, e); + } + } + if (list.isEmpty() == false) { + try { + updateURIs(control.isRebalanceConnection(), list.toArray(new URI[list.size()])); + } catch (IOException e) { + LOG.error("Failed to update transport URI's from: " + connectedStr, e); + } + } + + } + } } public void start() throws Exception { @@ -250,13 +291,13 @@ public class FailoverTransport implements CompositeTransport { if (connectedTransport.get() != null) { stateTracker.restore(connectedTransport.get()); } else { - reconnect(); + reconnect(false); } } } public void stop() throws Exception { - Transport transportToStop=null; + Transport transportToStop = null; synchronized (reconnectMutex) { LOG.debug("Stopped."); if (!started) { @@ -265,7 +306,7 @@ public class FailoverTransport implements CompositeTransport { started = false; disposed = true; connected = false; - for (BackupTransport t:backups) { + for (BackupTransport t : backups) { t.setDisposed(true); } backups.clear(); @@ -279,7 +320,7 @@ public class FailoverTransport implements CompositeTransport { sleepMutex.notifyAll(); } reconnectTask.shutdown(); - if( transportToStop!=null ) { + if (transportToStop != null) { transportToStop.stop(); } } @@ -331,7 +372,7 @@ public class FailoverTransport implements CompositeTransport { public void setMaxReconnectAttempts(int maxReconnectAttempts) { this.maxReconnectAttempts = maxReconnectAttempts; } - + public int getStartupMaxReconnectAttempts() { return this.startupMaxReconnectAttempts; } @@ -341,14 +382,14 @@ public class FailoverTransport implements CompositeTransport { } public long getTimeout() { - return timeout; - } + return timeout; + } - public void setTimeout(long timeout) { - this.timeout = timeout; - } + public void setTimeout(long timeout) { + this.timeout = timeout; + } - /** + /** * @return Returns the randomize. */ public boolean isRandomize() { @@ -356,29 +397,30 @@ public class FailoverTransport implements CompositeTransport { } /** - * @param randomize The randomize to set. + * @param randomize + * The randomize to set. */ public void setRandomize(boolean randomize) { this.randomize = randomize; } - + public boolean isBackup() { - return backup; - } + return backup; + } - public void setBackup(boolean backup) { - this.backup = backup; - } + public void setBackup(boolean backup) { + this.backup = backup; + } - public int getBackupPoolSize() { - return backupPoolSize; - } + public int getBackupPoolSize() { + return backupPoolSize; + } - public void setBackupPoolSize(int backupPoolSize) { - this.backupPoolSize = backupPoolSize; - } - - public boolean isTrackMessages() { + public void setBackupPoolSize(int backupPoolSize) { + this.backupPoolSize = backupPoolSize; + } + + public boolean isTrackMessages() { return trackMessages; } @@ -401,31 +443,32 @@ public class FailoverTransport implements CompositeTransport { public void setMaxCacheSize(int maxCacheSize) { this.maxCacheSize = maxCacheSize; } - + /** - * @return Returns true if the command is one sent when a connection - * is being closed. + * @return Returns true if the command is one sent when a connection is + * being closed. */ private boolean isShutdownCommand(Command command) { - return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo)); + return (command != null && (command.isShutdownInfo() || command instanceof RemoveInfo)); } - public void oneway(Object o) throws IOException { - - Command command = (Command)o; + + Command command = (Command) o; Exception error = null; try { synchronized (reconnectMutex) { - + if (isShutdownCommand(command) && connectedTransport.get() == null) { - if(command.isShutdownInfo()) { - // Skipping send of ShutdownInfo command when not connected. + if (command.isShutdownInfo()) { + // Skipping send of ShutdownInfo command when not + // connected. return; } - if(command instanceof RemoveInfo || command.isMessageAck()) { - // Simulate response to RemoveInfo command or ack (as it will be stale) + if (command instanceof RemoveInfo || command.isMessageAck()) { + // Simulate response to RemoveInfo command or ack (as it + // will be stale) stateTracker.track(command); Response response = new Response(); response.setCorrelationId(command.getCommandId()); @@ -441,15 +484,14 @@ public class FailoverTransport implements CompositeTransport { Transport transport = connectedTransport.get(); long start = System.currentTimeMillis(); boolean timedout = false; - while (transport == null && !disposed - && connectionFailure == null + while (transport == null && !disposed && connectionFailure == null && !Thread.currentThread().isInterrupted()) { LOG.trace("Waiting for transport to reconnect..: " + command); long end = System.currentTimeMillis(); if (timeout > 0 && (end - start > timeout)) { - timedout = true; - LOG.info("Failover timed out after " + (end - start) + "ms"); - break; + timedout = true; + LOG.info("Failover timed out after " + (end - start) + "ms"); + break; } try { reconnectMutex.wait(100); @@ -468,8 +510,8 @@ public class FailoverTransport implements CompositeTransport { } else if (connectionFailure != null) { error = connectionFailure; } else if (timedout == true) { - error = new IOException("Failover timeout of " + timeout + " ms reached."); - }else { + error = new IOException("Failover timeout of " + timeout + " ms reached."); + } else { error = new IOException("Unexpected failure."); } break; @@ -480,7 +522,7 @@ public class FailoverTransport implements CompositeTransport { // then hold it in the requestMap so that we can replay // it later. Tracked tracked = stateTracker.track(command); - synchronized(requestMap) { + synchronized (requestMap) { if (tracked != null && tracked.isWaitingForResponse()) { requestMap.put(Integer.valueOf(command.getCommandId()), tracked); } else if (tracked == null && command.isResponseRequired()) { @@ -531,7 +573,7 @@ public class FailoverTransport implements CompositeTransport { if (!disposed) { if (error != null) { if (error instanceof IOException) { - throw (IOException)error; + throw (IOException) error; } throw IOExceptionSupport.create(error); } @@ -550,38 +592,53 @@ public class FailoverTransport implements CompositeTransport { throw new AssertionError("Unsupported Method"); } - public void add(URI u[]) { + public void add(boolean rebalance, URI u[]) { + boolean newURI = false; for (int i = 0; i < u.length; i++) { - if (!uris.contains(u[i])) { - uris.add(u[i]); + if (contains(u[i])==false) { + uris.add(i, u[i]); + newURI = true; } } - reconnect(); + if (newURI) { + reconnect(rebalance); + } } - public void remove(URI u[]) { + public void remove(boolean rebalance, URI u[]) { for (int i = 0; i < u.length; i++) { uris.remove(u[i]); } - reconnect(); + reconnect(rebalance); } - public void add(String u) { + public void add(boolean rebalance, String u) { try { - URI uri = new URI(u); - if (!uris.contains(uri)) { - uris.add(uri); + URI newURI = new URI(u); + if (contains(newURI)==false) { + uris.add(newURI); + reconnect(rebalance); } - - reconnect(); + } catch (Exception e) { LOG.error("Failed to parse URI: " + u); } } - public void reconnect() { + public void reconnect(boolean rebalance) { synchronized (reconnectMutex) { if (started) { + if (rebalance) { + Transport transport = this.connectedTransport.getAndSet(null); + if (transport != null) { + try { + transport.stop(); + } catch (Exception e) { + LOG.debug("Caught an exception stopping existing transport", e); + } + } + + } LOG.debug("Waking up reconnect task"); try { reconnectTask.wakeup(); @@ -603,7 +660,7 @@ public class FailoverTransport implements CompositeTransport { if (randomize) { // Randomly, reorder the list by random swapping for (int i = 0; i < l.size(); i++) { - int p = (int) (Math.random()*100 % l.size()); + int p = (int) (Math.random() * 100 % l.size()); URI t = l.get(p); l.set(p, l.get(i)); l.set(i, t); @@ -621,7 +678,7 @@ public class FailoverTransport implements CompositeTransport { } public void setTransportListener(TransportListener commandListener) { - synchronized(listenerMutex) { + synchronized (listenerMutex) { this.transportListener = commandListener; listenerMutex.notifyAll(); } @@ -633,7 +690,7 @@ public class FailoverTransport implements CompositeTransport { return target.cast(this); } Transport transport = connectedTransport.get(); - if ( transport != null) { + if (transport != null) { return transport.narrow(target); } return null; @@ -642,13 +699,13 @@ public class FailoverTransport implements CompositeTransport { protected void restoreTransport(Transport t) throws Exception, IOException { t.start(); - //send information to the broker - informing it we are an ft client + // send information to the broker - informing it we are an ft client ConnectionControl cc = new ConnectionControl(); cc.setFaultTolerant(true); t.oneway(cc); stateTracker.restore(t); Map tmpMap = null; - synchronized(requestMap) { + synchronized (requestMap) { tmpMap = new LinkedHashMap(requestMap); } for (Iterator iter2 = tmpMap.values().iterator(); iter2.hasNext();) { @@ -668,13 +725,14 @@ public class FailoverTransport implements CompositeTransport { this.useExponentialBackOff = useExponentialBackOff; } + @Override public String toString() { return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString(); } public String getRemoteAddress() { Transport transport = connectedTransport.get(); - if ( transport != null) { + if (transport != null) { return transport.getRemoteAddress(); } return null; @@ -683,8 +741,8 @@ public class FailoverTransport implements CompositeTransport { public boolean isFaultTolerant() { return true; } - - final boolean doReconnect() { + + final boolean doReconnect() { Exception failure = null; synchronized (reconnectMutex) { @@ -702,32 +760,32 @@ public class FailoverTransport implements CompositeTransport { if (!useExponentialBackOff) { reconnectDelay = initialReconnectDelay; } - synchronized(backupMutex) { + synchronized (backupMutex) { if (backup && !backups.isEmpty()) { - BackupTransport bt = backups.remove(0); + BackupTransport bt = backups.remove(0); Transport t = bt.getTransport(); URI uri = bt.getUri(); t.setTransportListener(myTransportListener); try { - if (started) { - restoreTransport(t); + if (started) { + restoreTransport(t); } reconnectDelay = initialReconnectDelay; - failedConnectTransportURI=null; + failedConnectTransportURI = null; connectedTransportURI = uri; connectedTransport.set(t); reconnectMutex.notifyAll(); connectFailures = 0; LOG.info("Successfully reconnected to backup " + uri); return false; - }catch (Exception e) { - LOG.debug("Backup transport failed",e); - } + } catch (Exception e) { + LOG.debug("Backup transport failed", e); + } } } - + Iterator iter = connectList.iterator(); - while(iter.hasNext() && connectedTransport.get() == null && !disposed) { + while (iter.hasNext() && connectedTransport.get() == null && !disposed) { URI uri = iter.next(); Transport t = null; try { @@ -735,7 +793,7 @@ public class FailoverTransport implements CompositeTransport { t = TransportFactory.compositeConnect(uri); t.setTransportListener(myTransportListener); t.start(); - + if (started) { restoreTransport(t); } @@ -746,36 +804,38 @@ public class FailoverTransport implements CompositeTransport { connectedTransport.set(t); reconnectMutex.notifyAll(); connectFailures = 0; - // Make sure on initial startup, that the transportListener - // has been initialized for this instance. - synchronized(listenerMutex) { - if (transportListener==null) { + // Make sure on initial startup, that the + // transportListener + // has been initialized for this instance. + synchronized (listenerMutex) { + if (transportListener == null) { try { - //if it isn't set after 2secs - it - //probably never will be + // if it isn't set after 2secs - it + // probably never will be listenerMutex.wait(2000); - }catch(InterruptedException ex) {} + } catch (InterruptedException ex) { + } } } if (transportListener != null) { transportListener.transportResumed(); - }else { + } else { LOG.debug("transport resumed by transport listener not set"); } if (firstConnection) { - firstConnection=false; + firstConnection = false; LOG.info("Successfully connected to " + uri); - }else { + } else { LOG.info("Successfully reconnected to " + uri); } - connected=true; + connected = true; return false; } catch (Exception e) { failure = e; LOG.debug("Connect fail to: " + uri + ", reason: " + e); - if (t!=null) { + if (t != null) { try { - t.stop(); + t.stop(); } catch (Exception ee) { LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee); } @@ -790,31 +850,32 @@ public class FailoverTransport implements CompositeTransport { reconnectAttempts = this.startupMaxReconnectAttempts; } } - if (reconnectAttempts==0) { + if (reconnectAttempts == 0) { reconnectAttempts = this.maxReconnectAttempts; - } + } if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) { LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)"); connectionFailure = failure; - - // Make sure on initial startup, that the transportListener has been initialized + + // Make sure on initial startup, that the transportListener has + // been initialized // for this instance. - synchronized(listenerMutex) { - if (transportListener==null) { + synchronized (listenerMutex) { + if (transportListener == null) { try { listenerMutex.wait(2000); - }catch(InterruptedException ex) {} + } catch (InterruptedException ex) { + } } } - - if(transportListener != null) { + if (transportListener != null) { if (connectionFailure instanceof IOException) { - transportListener.onException((IOException)connectionFailure); + transportListener.onException((IOException) connectionFailure); } else { - transportListener.onException(IOExceptionSupport.create(connectionFailure)); + transportListener.onException(IOExceptionSupport.create(connectionFailure)); } - } + } reconnectMutex.notifyAll(); return false; } @@ -841,59 +902,92 @@ public class FailoverTransport implements CompositeTransport { return !disposed; } - - final boolean buildBackups() { - synchronized (backupMutex) { - if (!disposed && backup && backups.size() < backupPoolSize) { - List connectList = getConnectList(); - //removed disposed backups - ListdisposedList = new ArrayList(); - for (BackupTransport bt:backups) { - if (bt.isDisposed()) { - disposedList.add(bt); - } - } - backups.removeAll(disposedList); - disposedList.clear(); - for (Iteratoriter = connectList.iterator();iter.hasNext() && backups.size() < backupPoolSize;) { - URI uri = iter.next(); - if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { - try { - BackupTransport bt = new BackupTransport(this); - bt.setUri(uri); - if (!backups.contains(bt)) { - Transport t = TransportFactory.compositeConnect(uri); - t.setTransportListener(bt); - t.start(); - bt.setTransport(t); - backups.add(bt); - } - } catch(Exception e) { - LOG.debug("Failed to build backup ",e); - } - } - } - } - } - return false; - } + final boolean buildBackups() { + synchronized (backupMutex) { + if (!disposed && backup && backups.size() < backupPoolSize) { + List connectList = getConnectList(); + // removed disposed backups + List disposedList = new ArrayList(); + for (BackupTransport bt : backups) { + if (bt.isDisposed()) { + disposedList.add(bt); + } + } + backups.removeAll(disposedList); + disposedList.clear(); + for (Iterator iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize;) { + URI uri = iter.next(); + if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { + try { + BackupTransport bt = new BackupTransport(this); + bt.setUri(uri); + if (!backups.contains(bt)) { + Transport t = TransportFactory.compositeConnect(uri); + t.setTransportListener(bt); + t.start(); + bt.setTransport(t); + backups.add(bt); + } + } catch (Exception e) { + LOG.debug("Failed to build backup ", e); + } + } + } + } + } + return false; + } public boolean isDisposed() { - return disposed; + return disposed; } - - + public boolean isConnected() { return connected; } - + public void reconnect(URI uri) throws IOException { - add(new URI[] {uri}); + add(true, new URI[] { uri }); + } + + public boolean isReconnectSupported() { + return true; + } + + public boolean isUpdateURIsSupported() { + return true; + } + + public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { + List copy = new ArrayList(this.updated); + List add = new ArrayList(); + if (updatedURIs != null && updatedURIs.length > 0) { + Set set = new HashSet(); + for (int i = 0; i < updatedURIs.length; i++) { + URI uri = updatedURIs[i]; + if (uri != null) { + set.add(uri); + } + } + for (URI uri : set) { + if (copy.remove(uri) == false) { + add.add(uri); + } + } + synchronized (reconnectMutex) { + this.updated.clear(); + this.updated.addAll(add); + for (URI uri : copy) { + this.uris.remove(uri); + } + add(rebalance, add.toArray(new URI[add.size()])); + } + } } public int getReceiveCounter() { Transport transport = connectedTransport.get(); - if( transport == null ) { + if (transport == null) { return 0; } return transport.getReceiveCounter(); @@ -904,4 +998,25 @@ public class FailoverTransport implements CompositeTransport { stateTracker.connectionInterruptProcessingComplete(this, connectionId); } } + + private boolean contains(URI newURI) { + + boolean result = false; + try { + for (URI uri:uris) { + if (newURI.getPort()==uri.getPort()) { + InetAddress newAddr = InetAddress.getByName(newURI.getHost()); + InetAddress addr = InetAddress.getByName(uri.getHost()); + if (addr.equals(newAddr)) { + result = true; + break; + } + } + } + }catch(IOException e) { + result = true; + LOG.error("Failed to verify URI " + newURI + " already known: " + e); + } + return result; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java index 27e5c376c7..d898035204 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Map; - import org.apache.activemq.transport.MutexTransport; import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.Transport; @@ -32,6 +31,7 @@ import org.apache.activemq.util.URISupport.CompositeData; public class FailoverTransportFactory extends TransportFactory { + @Override public Transport doConnect(URI location) throws IOException { try { Transport transport = createTransport(URISupport.parseComposite(location)); @@ -43,6 +43,7 @@ public class FailoverTransportFactory extends TransportFactory { } } + @Override public Transport doCompositeConnect(URI location) throws IOException { try { return createTransport(URISupport.parseComposite(location)); @@ -62,7 +63,7 @@ public class FailoverTransportFactory extends TransportFactory { if (!options.isEmpty()) { throw new IllegalArgumentException("Invalid connect parameters: " + options); } - transport.add(compositData.getComponents()); + transport.add(false,compositData.getComponents()); return transport; } @@ -72,6 +73,7 @@ public class FailoverTransportFactory extends TransportFactory { return transport; } + @Override public TransportServer doBind(URI location) throws IOException { throw new IOException("Invalid server URI: " + location); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java index 7196281d4b..a736daf921 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.activemq.command.Command; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; @@ -65,7 +64,7 @@ public class FanoutTransport implements CompositeTransport { private final TaskRunner reconnectTask; private boolean started; - private ArrayList transports = new ArrayList(); + private final ArrayList transports = new ArrayList(); private int connectedCount; private int minAckCount = 2; @@ -73,7 +72,7 @@ public class FanoutTransport implements CompositeTransport { private long initialReconnectDelay = 10; private long maxReconnectDelay = 1000 * 30; private long backOffMultiplier = 2; - private boolean useExponentialBackOff = true; + private final boolean useExponentialBackOff = true; private int maxReconnectAttempts; private Exception connectionFailure; private FanoutTransportHandler primary; @@ -89,6 +88,7 @@ public class FanoutTransport implements CompositeTransport { this.ackCount = new AtomicInteger(count); } + @Override public String toString() { return command.getCommandId() + "=" + ackCount.get(); } @@ -107,6 +107,7 @@ public class FanoutTransport implements CompositeTransport { this.uri = uri; } + @Override public void onCommand(Object o) { Command command = (Command)o; if (command.isResponse()) { @@ -125,6 +126,7 @@ public class FanoutTransport implements CompositeTransport { } } + @Override public void onException(IOException error) { try { synchronized (reconnectMutex) { @@ -499,7 +501,7 @@ public class FanoutTransport implements CompositeTransport { } } - public void add(URI uris[]) { + public void add(boolean reblance,URI uris[]) { synchronized (reconnectMutex) { for (int i = 0; i < uris.length; i++) { @@ -523,7 +525,7 @@ public class FanoutTransport implements CompositeTransport { } - public void remove(URI uris[]) { + public void remove(boolean rebalance,URI uris[]) { synchronized (reconnectMutex) { for (int i = 0; i < uris.length; i++) { @@ -546,9 +548,20 @@ public class FanoutTransport implements CompositeTransport { } public void reconnect(URI uri) throws IOException { - add(new URI[]{uri}); + add(true,new URI[]{uri}); } + + public boolean isReconnectSupported() { + return true; + } + + public boolean isUpdateURIsSupported() { + return true; + } + public void updateURIs(boolean reblance,URI[] uris) throws IOException { + add(reblance,uris); + } public String getRemoteAddress() { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java index 166589103a..36c54a1cba 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java @@ -18,7 +18,6 @@ package org.apache.activemq.transport.mock; import java.io.IOException; import java.net.URI; - import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.ResponseCallback; @@ -70,6 +69,7 @@ public class MockTransport extends DefaultTransportListener implements Transport getNext().stop(); } + @Override public void onCommand(Object command) { getTransportListener().onCommand(command); } @@ -88,6 +88,7 @@ public class MockTransport extends DefaultTransportListener implements Transport return transportListener; } + @Override public String toString() { return getNext().toString(); } @@ -108,6 +109,7 @@ public class MockTransport extends DefaultTransportListener implements Transport return getNext().request(command, timeout); } + @Override public void onException(IOException error) { getTransportListener().onException(error); } @@ -155,4 +157,16 @@ public class MockTransport extends DefaultTransportListener implements Transport public int getReceiveCounter() { return getNext().getReceiveCounter(); } + + + public boolean isReconnectSupported() { + return getNext().isReconnectSupported(); + } + + public boolean isUpdateURIsSupported() { + return getNext().isUpdateURIsSupported(); + } + public void updateURIs(boolean reblance,URI[] uris) throws IOException { + getNext().updateURIs(reblance,uris); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java index 1e685dc582..29e9dea139 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java @@ -133,6 +133,10 @@ public class StompFrame implements Command { public boolean isShutdownInfo() { return false; } + + public boolean isConnectionControl() { + return false; + } public boolean isWireFormatInfo() { return false; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index c4c319318f..79056d511a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -35,9 +35,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - import javax.net.SocketFactory; - import org.apache.activemq.Service; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportLoggerFactory; @@ -171,6 +169,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S /** * @return pretty print of 'this' */ + @Override public String toString() { return "tcp://" + socket.getInetAddress() + ":" + socket.getPort(); } @@ -398,6 +397,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S } } + @Override protected void doStart() throws Exception { connect(); stoppedLatch.set(new CountDownLatch(1)); @@ -454,6 +454,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S initializeStreams(); } + @Override protected void doStop(ServiceStopper stopper) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Stopping transport " + this); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index d0952bf04b..1668914278 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -21,7 +21,6 @@ import java.net.URI; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; @@ -275,6 +274,7 @@ public class VMTransport implements Transport, Task { this.network = network; } + @Override public String toString() { return location + "#" + id; } @@ -342,8 +342,19 @@ public class VMTransport implements Transport, Task { } public void reconnect(URI uri) throws IOException { - throw new IOException("Not supported"); - } + throw new IOException("Not supported"); + } + + public boolean isReconnectSupported() { + return false; + } + + public boolean isUpdateURIsSupported() { + return false; + } + public void updateURIs(boolean reblance,URI[] uris) throws IOException { + throw new IOException("Not supported"); + } public int getReceiveCounter() { return receiveCounter; diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionControlTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionControlTest.java index 97df28cfcf..abd4f72662 100644 --- a/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionControlTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionControlTest.java @@ -56,5 +56,8 @@ public class ConnectionControlTest extends BaseCommandTestSupport { info.setFaultTolerant(true); info.setResume(false); info.setSuspend(true); + info.setConnectedBrokers("ConnectedBrokers:1"); + info.setReconnectTo("ReconnectTo:2"); + info.setRebalanceConnection(false); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java b/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java index 41fc4e982e..e1930a151e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java @@ -65,5 +65,6 @@ public class ConnectionInfoTest extends BaseCommandTestSupport { info.setBrokerMasterConnector(true); info.setManageable(false); info.setClientMaster(true); + info.setFaultTolerant(false); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java index cfdef60e51..82c477a40a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.perf; import java.io.File; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.kahadb.KahaDBStore; @@ -30,6 +31,13 @@ public class KahaDBDurableTopicTest extends SimpleDurableTopicTest { this.initialConsumerDelay = 10 * 1000; super.setUp(); } + + @Override + protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception { + ActiveMQConnectionFactory result = new ActiveMQConnectionFactory(uri); + //result.setDispatchAsync(false); + return result; + } @Override protected void configureBroker(BrokerService answer, String uri) throws Exception { @@ -52,7 +60,7 @@ public class KahaDBDurableTopicTest extends SimpleDurableTopicTest { // small batch means more frequent and smaller writes kaha.setIndexWriteBatchSize(100); - kaha.setIndexCacheSize(10000); + kaha.setIndexCacheSize(1000); // do the index write in a separate thread //kaha.setEnableIndexWriteAsync(true); diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java index 1ca48158da..e43340a5d9 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java @@ -27,7 +27,7 @@ public class KahaDBQueueTest extends SimpleQueueTest { @Override protected void setUp() throws Exception { - this.initialConsumerDelay = 10 * 1000; + // this.initialConsumerDelay = 10 * 1000; super.setUp(); } @Override @@ -43,7 +43,7 @@ public class KahaDBQueueTest extends SimpleQueueTest { // The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified // what happens if the index is updated but a journal update is lost. // Index is going to be in consistent, but can it be repaired? - //kaha.setEnableJournalDiskSyncs(false); + kaha.setEnableJournalDiskSyncs(false); // Using a bigger journal file size makes he take fewer spikes as it is not switching files as often. //kaha.setJournalMaxFileLength(1024*1024*100); @@ -51,6 +51,7 @@ public class KahaDBQueueTest extends SimpleQueueTest { kaha.setIndexWriteBatchSize(100); // do the index write in a separate thread kaha.setEnableIndexWriteAsync(true); + kaha.setIndexCacheSize(10000); answer.setPersistenceAdapter(kaha); answer.addConnector(uri); diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/RunBroker.java b/activemq-core/src/test/java/org/apache/activemq/perf/RunBroker.java new file mode 100644 index 0000000000..53af6abd13 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/perf/RunBroker.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.perf; + +import java.io.File; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBStore; + +public class RunBroker { + + public static void main(String arg[]) { + + try { + KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(); + File dataFileDir = new File("target/test-amq-data/perfTest/kahadb"); + File archiveDir = new File(dataFileDir,"archive"); + KahaDBStore kahaDB = new KahaDBStore(); + kahaDB.setDirectory(dataFileDir); + kahaDB.setDirectoryArchive(archiveDir); + kahaDB.setArchiveDataLogs(true); + + // The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified + // what happens if the index is updated but a journal update is lost. + // Index is going to be in consistent, but can it be repaired? + //kaha.setEnableJournalDiskSyncs(false); + // Using a bigger journal file size makes he take fewer spikes as it is not switching files as often. + //kaha.setJournalMaxFileLength(1024*1024*100); + + // small batch means more frequent and smaller writes + kahaDB.setIndexWriteBatchSize(1000); + kahaDB.setIndexCacheSize(10000); + // do the index write in a separate thread + kahaDB.setEnableIndexWriteAsync(true); + BrokerService broker = new BrokerService(); + broker.setUseJmx(false); + //broker.setPersistenceAdapter(adaptor); + //broker.setPersistenceAdapter(kahaDB); + broker.setPersistent(false); + broker.setDeleteAllMessagesOnStartup(true); + broker.addConnector("tcp://0.0.0.0:61616"); + broker.start(); + System.err.println("Running"); + Thread.sleep(Long.MAX_VALUE); + }catch(Throwable e) { + e.printStackTrace(); + } + + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java new file mode 100644 index 0000000000..0707d9eba3 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.failover; + +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.network.NetworkConnector; + + +public class FailoverClusterTest extends TestCase { + +private static final int NUMBER = 10; +private static final String BROKER_A_BIND_ADDRESS = "tcp://0.0.0.0:61616"; +private static final String BROKER_B_BIND_ADDRESS = "tcp://0.0.0.0:61617"; +private static final String CLIENT_URL = "failover://("+BROKER_A_BIND_ADDRESS+")"; +private static final String BROKER_A_NAME = "BROKERA"; +private static final String BROKER_B_NAME = "BROKERB"; +private BrokerService brokerA; +private BrokerService brokerB; +private final Listconnections = new ArrayList(); + + + public void testClusterConnectedAfterClients() throws Exception{ + createClients(); + if (brokerB == null) { + brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); + } + Thread.sleep(3000); + Set set = new HashSet(); + for (ActiveMQConnection c:connections) { + set.add(c.getTransportChannel().getRemoteAddress()); + } + assertTrue(set.size() > 1); + } + + public void testClusterConnectedBeforeClients() throws Exception{ + + if (brokerB == null) { + brokerB = createBrokerB(BROKER_B_BIND_ADDRESS); + } + Thread.sleep(5000); + createClients(); + Thread.sleep(2000); + brokerA.stop(); + Thread.sleep(2000); + + URI brokerBURI = new URI(BROKER_B_BIND_ADDRESS); + for (ActiveMQConnection c:connections) { + String addr = c.getTransportChannel().getRemoteAddress(); + assertTrue(addr.indexOf(""+brokerBURI.getPort()) > 0); + } + } + + @Override + protected void setUp() throws Exception { + if (brokerA == null) { + brokerA = createBrokerA(BROKER_A_BIND_ADDRESS); + } + + + } + + @Override + protected void tearDown() throws Exception { + for (Connection c:connections) { + c.close(); + } + if (brokerB != null) { + brokerB.stop(); + brokerB = null; + } + if (brokerA != null) { + brokerA.stop(); + brokerA = null; + } + } + + protected BrokerService createBrokerA(String uri) throws Exception { + BrokerService answer = new BrokerService(); + configureConsumerBroker(answer,uri); + answer.start(); + return answer; + } + + protected void configureConsumerBroker(BrokerService answer,String uri) throws Exception { + answer.setBrokerName(BROKER_A_NAME); + answer.setPersistent(false); + TransportConnector connector = answer.addConnector(uri); + connector.setRebalanceClusterClients(true); + connector.setUpdateClusterClients(true); + answer.setUseShutdownHook(false); + } + + protected BrokerService createBrokerB(String uri) throws Exception { + BrokerService answer = new BrokerService(); + configureNetwork(answer,uri); + answer.start(); + return answer; + } + + protected void configureNetwork(BrokerService answer,String uri) throws Exception { + answer.setBrokerName(BROKER_B_NAME); + answer.setPersistent(false); + NetworkConnector network = answer.addNetworkConnector("static://"+BROKER_A_BIND_ADDRESS); + network.setDuplex(true); + TransportConnector connector =answer.addConnector(uri); + connector.setRebalanceClusterClients(true); + connector.setUpdateClusterClients(true); + answer.setUseShutdownHook(false); + } + + protected void createClients() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(CLIENT_URL); + for (int i =0;i < NUMBER; i++) { + ActiveMQConnection c = (ActiveMQConnection) factory.createConnection(); + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = s.createQueue(getClass().getName()); + MessageConsumer consumer = s.createConsumer(queue); + connections.add(c); + } + } +}