From 986ed145a791052a7190f069563095a0110c8742 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 16 Mar 2011 22:48:26 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3222 - Failover and SimpleDiscovery - query parameters getting dropped, resolve by leaving composite prams in place and seperating out params that need to be applied to discovered transports, new format 'discovered.x' for the mulitcast case. Revisit https://issues.apache.org/jira/browse/AMQ-2981,https://issues.apache.org/jira/browse/AMQ-2598,https://issues.apache.org/activemq/browse/AMQ-2939 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1082333 13f79535-47bb-0310-9956-ffa450edef68 --- .../network/DiscoveryNetworkConnector.java | 3 +- .../activemq/thread/TaskRunnerFactory.java | 8 +- .../discovery/DiscoveryListener.java | 1 + .../discovery/DiscoveryTransport.java | 2 +- .../transport/failover/FailoverTransport.java | 2 +- .../org/apache/activemq/util/URISupport.java | 22 ++- .../activemq/xbean/XBeanBrokerFactory.java | 12 +- .../network/FailoverStaticNetworkTest.java | 160 +++++++++++++++++- .../DiscoveryTransportNoBrokerTest.java | 5 +- .../apache/activemq/util/URISupportTest.java | 25 ++- 10 files changed, 206 insertions(+), 34 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java index 9e6e773a33..3c17545b40 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java @@ -100,8 +100,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco } URI connectUri = uri; try { - connectUri = URISupport.removeQuery(connectUri); - connectUri = URISupport.applyParameters(connectUri, parameters); + connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX); } catch (URISyntaxException e) { LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e); } diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java index 4e9517f8c5..c79d715482 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java @@ -22,12 +22,13 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * Manages the thread pool for long running tasks. Long running tasks are not * always active but when they are active, they may need a few iterations of * processing for them to become idle. The manager ensures that each task is - * processes but that no one task overtakes the system. This is kina like + * processes but that no one task overtakes the system. This is kinda like * cooperative multitasking. * * @@ -39,6 +40,7 @@ public class TaskRunnerFactory implements Executor { private String name; private int priority; private boolean daemon; + private AtomicLong id = new AtomicLong(0); public TaskRunnerFactory() { this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000); @@ -89,14 +91,14 @@ public class TaskRunnerFactory implements Executor { if (executor != null) { executor.execute(runnable); } else { - new Thread(runnable, name).start(); + new Thread(runnable, name + "-" + id.incrementAndGet()).start(); } } protected ExecutorService createDefaultExecutor() { ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, name); + Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet()); thread.setDaemon(daemon); thread.setPriority(priority); return thread; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java index 369aadb9af..60f48e5b40 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java @@ -24,6 +24,7 @@ import org.apache.activemq.command.DiscoveryEvent; * */ public interface DiscoveryListener { + public static final String DISCOVERED_OPTION_PREFIX = "discovered."; void onServiceAdd(DiscoveryEvent event); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java index c7a3230b6e..48cba828d9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java @@ -75,7 +75,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList try { URI uri = new URI(url); LOG.info("Adding new broker connection URL: " + uri); - uri = URISupport.applyParameters(uri, parameters); + uri = URISupport.applyParameters(uri, parameters, DISCOVERED_OPTION_PREFIX); serviceURIs.put(event.getServiceName(), uri); next.add(false,new URI[] {uri}); } catch (URISyntaxException e) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index c6db08e01f..80ace90a46 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -141,7 +141,7 @@ public class FailoverTransport implements CompositeTransport { buildBackups(); } else { // build backups on the next iteration - result = true; + buildBackup = true; try { reconnectTask.wakeup(); } catch (InterruptedException e) { diff --git a/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java b/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java index 06a7c5000e..686f8f3054 100755 --- a/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java @@ -131,24 +131,29 @@ public class URISupport { CompositeData data = URISupport.parseComposite(uri); Map parameters = new HashMap(); parameters.putAll(data.getParameters()); - for (URI component : data.getComponents()) { - parameters.putAll(component.getQuery() == null ? emptyMap() : parseQuery(stripPrefix(component.getQuery(), "?"))); - } - if (parameters.isEmpty()) + if (parameters.isEmpty()) { parameters = emptyMap(); + } return parameters; } } public static URI applyParameters(URI uri, Map queryParameters) throws URISyntaxException { + return applyParameters(uri, queryParameters, ""); + } + + public static URI applyParameters(URI uri, Map queryParameters, String optionPrefix) throws URISyntaxException { if (queryParameters != null && !queryParameters.isEmpty()) { StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery()) : new StringBuffer() ; for ( Map.Entry param: queryParameters.entrySet()) { - if (newQuery.length()!=0) { - newQuery.append('&'); + if (param.getKey().startsWith(optionPrefix)) { + if (newQuery.length()!=0) { + newQuery.append('&'); + } + final String key = param.getKey().substring(optionPrefix.length()); + newQuery.append(key).append('=').append(param.getValue()); } - newQuery.append(param.getKey()).append('=').append(param.getValue()); } uri = createURIWithQuery(uri, newQuery.toString()); } @@ -219,7 +224,6 @@ public class URISupport { * @param uri * @param rc * @param ssp - * @param p * @throws URISyntaxException */ private static void parseComposite(URI uri, CompositeData rc, String ssp) throws URISyntaxException { @@ -269,7 +273,7 @@ public class URISupport { } /** - * @param componentString + * @param str * @return */ private static String[] splitComponents(String str) { diff --git a/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerFactory.java b/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerFactory.java index 714f45b893..951739ff4e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerFactory.java @@ -60,14 +60,14 @@ public class XBeanBrokerFactory implements BrokerFactoryHandler { } public BrokerService createBroker(URI config) throws Exception { - - Map map = URISupport.parseParameters(config); - if (!map.isEmpty()) { - IntrospectionSupport.setProperties(this, map); - config = URISupport.removeQuery(config); - } String uri = config.getSchemeSpecificPart(); + Map parameters = URISupport.parseQuery(uri); + if (!parameters.isEmpty()) { + IntrospectionSupport.setProperties(this, parameters); + uri = uri.substring(0, uri.lastIndexOf('?')); + } + ApplicationContext context = createApplicationContext(uri); BrokerService broker = null; diff --git a/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java index 62e8906e79..62fcb94de0 100644 --- a/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java @@ -16,6 +16,13 @@ */ package org.apache.activemq.network; +import java.net.URI; +import java.util.HashMap; +import java.util.Vector; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + + import static org.junit.Assert.assertTrue; import javax.jms.Connection; @@ -32,6 +39,7 @@ import org.apache.activemq.broker.SslContext; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.transport.tcp.SslBrokerServiceTest; +import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,14 +52,21 @@ public class FailoverStaticNetworkTest { private final static String DESTINATION_NAME = "testQ"; protected BrokerService brokerA; + protected BrokerService brokerA1; protected BrokerService brokerB; + protected BrokerService brokerC; private SslContext sslContext; - + protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts) throws Exception { + return createBroker(scheme, listenPort, networkToPorts, null); + } + + protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts, + HashMap networkProps) throws Exception { BrokerService broker = new BrokerService(); - broker.setUseJmx(true); + broker.setUseJmx(false); broker.getManagementContext().setCreateConnector(false); broker.setSslContext(sslContext); broker.setDeleteAllMessagesOnStartup(true); @@ -63,12 +78,30 @@ public class FailoverStaticNetworkTest { for (int i=1;i errors = new Vector(); + final String dataDir = "target/data/shared"; + brokerA = createBroker("61617", dataDir); + brokerA.start(); + + final BrokerService slave = createBroker("63617", dataDir); + brokerA1 = slave; + ExecutorService executor = Executors.newCachedThreadPool(); + executor.execute(new Runnable() { + public void run() { + try { + slave.start(); + } catch (Exception e) { + e.printStackTrace(); + errors.add(e); + } + } + }); + executor.shutdown(); + + HashMap networkConnectorProps = new HashMap(); + networkConnectorProps.put("duplex", "true"); + brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, networkConnectorProps); + brokerB.start(); + + doTestNetworkSendReceive(brokerA, brokerB); + doTestNetworkSendReceive(brokerB, brokerA); + + LOG.info("stopping brokerA (master shared_broker)"); + brokerA.stop(); + brokerA.waitUntilStopped(); + + // wait for slave to start + brokerA1.waitUntilStarted(); + + doTestNetworkSendReceive(brokerA1, brokerB); + doTestNetworkSendReceive(brokerB, brokerA1); + + assertTrue("No unexpected exceptions " + errors, errors.isEmpty()); + } + + @Test + // master slave piggy in the middle setup + public void testSendReceiveFailoverDuplexWithPIM() throws Exception { + final String dataDir = "target/data/shared/pim"; + brokerA = createBroker("61617", dataDir); + brokerA.start(); + + final BrokerService slave = createBroker("63617", dataDir); + brokerA1 = slave; + ExecutorService executor = Executors.newCachedThreadPool(); + executor.execute(new Runnable() { + public void run() { + try { + slave.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + executor.shutdown(); + + HashMap networkConnectorProps = new HashMap(); + networkConnectorProps.put("duplex", "true"); + networkConnectorProps.put("networkTTL", "2"); + + brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, networkConnectorProps); + brokerB.start(); + + assertTrue("all props applied", networkConnectorProps.isEmpty()); + networkConnectorProps.put("duplex", "true"); + networkConnectorProps.put("networkTTL", "2"); + + brokerC = createBroker("tcp", "64617", new String[]{"61617", "63617"}, networkConnectorProps); + brokerC.start(); + assertTrue("all props applied a second time", networkConnectorProps.isEmpty()); + + //Thread.sleep(4000); + doTestNetworkSendReceive(brokerC, brokerB); + doTestNetworkSendReceive(brokerB, brokerC); + + LOG.info("stopping brokerA (master shared_broker)"); + brokerA.stop(); + brokerA.waitUntilStopped(); + + doTestNetworkSendReceive(brokerC, brokerB); + doTestNetworkSendReceive(brokerB, brokerC); + + brokerC.stop(); + brokerC.waitUntilStopped(); + } + /** * networked broker started after target so first connect attempt succeeds * start order is important @@ -150,20 +287,25 @@ public class FailoverStaticNetworkTest { } private void doTestNetworkSendReceive() throws Exception, JMSException { - LOG.info("Creating Consumer on the networked brokerA ..."); + doTestNetworkSendReceive(brokerB, brokerA); + } + + private void doTestNetworkSendReceive(BrokerService to, BrokerService from) throws Exception, JMSException { + + LOG.info("Creating Consumer on the networked broker ..." + from); SslContext.setCurrentSslContext(sslContext); // Create a consumer on brokerA - ConnectionFactory consFactory = createConnectionFactory(brokerA); + ConnectionFactory consFactory = createConnectionFactory(from); Connection consConn = consFactory.createConnection(); consConn.start(); Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE); ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME); final MessageConsumer consumer = consSession.createConsumer(destination); - LOG.info("publishing to brokerB"); + LOG.info("publishing to " + to); - sendMessageTo(destination, brokerB); + sendMessageTo(destination, to); boolean gotMessage = Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java index a4fe6accbc..70950f7c37 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java @@ -105,7 +105,7 @@ public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport { String groupId = "WillNotMatch" + startT; try { String urlStr = "discovery:(multicast://default?group=" + groupId + - ")?useExponentialBackOff=false&maxReconnectAttempts=2&reconnectDelay=" + initialReconnectDelay; + ")?useExponentialBackOff=false&maxReconnectAttempts=2&reconnectDelay=" + initialReconnectDelay; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlStr); LOG.info("Connecting."); Connection connection = factory.createConnection(); @@ -121,7 +121,8 @@ public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport { public void testSetDiscoveredBrokerProperties() throws Exception { final String extraParameterName = "connectionTimeout"; final String extraParameterValue = "3000"; - final URI uri = new URI("discovery:(multicast://default)?initialReconnectDelay=100&" + extraParameterName + "=" + extraParameterValue); + final URI uri = new URI("discovery:(multicast://default)?initialReconnectDelay=100&" + + DiscoveryListener.DISCOVERED_OPTION_PREFIX + extraParameterName + "=" + extraParameterValue); CompositeData compositeData = URISupport.parseComposite(uri); StubCompositeTransport compositeTransport = new StubCompositeTransport(); diff --git a/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java b/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java index a7ac7b5b5c..3a98c1fdef 100644 --- a/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.util; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashMap; import java.util.Map; import junit.framework.TestCase; @@ -103,7 +104,7 @@ public class URISupportTest extends TestCase { } public void testParsingParams() throws Exception { - URI uri = new URI("static:(http://localhost:61617?proxyHost=localhost&proxyPort=80)"); + URI uri = new URI("static:(http://localhost:61617?proxyHost=jo&proxyPort=90)?proxyHost=localhost&proxyPort=80"); Mapparameters = URISupport.parseParameters(uri); verifyParams(parameters); uri = new URI("static://http://localhost:61617?proxyHost=localhost&proxyPort=80"); @@ -134,6 +135,28 @@ public class URISupportTest extends TestCase { assertEquals(querylessURI, URISupport.createURIWithQuery(originalURI, "")); assertEquals(new URI(querylessURI + "?" + queryString), URISupport.createURIWithQuery(originalURI, queryString)); } + + public void testApplyParameters() throws Exception { + + URI uri = new URI("http://0.0.0.0:61616"); + Map parameters = new HashMap(); + parameters.put("t.proxyHost", "localhost"); + parameters.put("t.proxyPort", "80"); + + uri = URISupport.applyParameters(uri, parameters); + Map appliedParameters = URISupport.parseParameters(uri); + assertEquals("all params applied with no prefix", 2, appliedParameters.size()); + + // strip off params again + uri = URISupport.createURIWithQuery(uri, null); + + uri = URISupport.applyParameters(uri, parameters, "joe"); + appliedParameters = URISupport.parseParameters(uri); + assertTrue("no params applied as none match joe", appliedParameters.isEmpty()); + + uri = URISupport.applyParameters(uri, parameters, "t."); + verifyParams(URISupport.parseParameters(uri)); + } private void verifyParams(Map parameters) { assertEquals(parameters.get("proxyHost"), "localhost");