https://issues.apache.org/jira/browse/AMQ-3222 - Failover and SimpleDiscovery - query parameters getting dropped, resolve by leaving composite prams in place and seperating out params that need to be applied to discovered transports, new format 'discovered.x' for the mulitcast case. Revisit https://issues.apache.org/jira/browse/AMQ-2981,https://issues.apache.org/jira/browse/AMQ-2598,https://issues.apache.org/activemq/browse/AMQ-2939

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1082333 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-03-16 22:48:26 +00:00
parent 2a558c80a6
commit 986ed145a7
10 changed files with 206 additions and 34 deletions

View File

@ -100,8 +100,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
}
URI connectUri = uri;
try {
connectUri = URISupport.removeQuery(connectUri);
connectUri = URISupport.applyParameters(connectUri, parameters);
connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX);
} catch (URISyntaxException e) {
LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e);
}

View File

@ -22,12 +22,13 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Manages the thread pool for long running tasks. Long running tasks are not
* always active but when they are active, they may need a few iterations of
* processing for them to become idle. The manager ensures that each task is
* processes but that no one task overtakes the system. This is kina like
* processes but that no one task overtakes the system. This is kinda like
* cooperative multitasking.
*
*
@ -39,6 +40,7 @@ public class TaskRunnerFactory implements Executor {
private String name;
private int priority;
private boolean daemon;
private AtomicLong id = new AtomicLong(0);
public TaskRunnerFactory() {
this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
@ -89,14 +91,14 @@ public class TaskRunnerFactory implements Executor {
if (executor != null) {
executor.execute(runnable);
} else {
new Thread(runnable, name).start();
new Thread(runnable, name + "-" + id.incrementAndGet()).start();
}
}
protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, name);
Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet());
thread.setDaemon(daemon);
thread.setPriority(priority);
return thread;

View File

@ -24,6 +24,7 @@ import org.apache.activemq.command.DiscoveryEvent;
*
*/
public interface DiscoveryListener {
public static final String DISCOVERED_OPTION_PREFIX = "discovered.";
void onServiceAdd(DiscoveryEvent event);

View File

@ -75,7 +75,7 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
try {
URI uri = new URI(url);
LOG.info("Adding new broker connection URL: " + uri);
uri = URISupport.applyParameters(uri, parameters);
uri = URISupport.applyParameters(uri, parameters, DISCOVERED_OPTION_PREFIX);
serviceURIs.put(event.getServiceName(), uri);
next.add(false,new URI[] {uri});
} catch (URISyntaxException e) {

View File

@ -141,7 +141,7 @@ public class FailoverTransport implements CompositeTransport {
buildBackups();
} else {
// build backups on the next iteration
result = true;
buildBackup = true;
try {
reconnectTask.wakeup();
} catch (InterruptedException e) {

View File

@ -131,24 +131,29 @@ public class URISupport {
CompositeData data = URISupport.parseComposite(uri);
Map<String, String> parameters = new HashMap<String, String>();
parameters.putAll(data.getParameters());
for (URI component : data.getComponents()) {
parameters.putAll(component.getQuery() == null ? emptyMap() : parseQuery(stripPrefix(component.getQuery(), "?")));
}
if (parameters.isEmpty())
if (parameters.isEmpty()) {
parameters = emptyMap();
}
return parameters;
}
}
public static URI applyParameters(URI uri, Map<String, String> queryParameters) throws URISyntaxException {
return applyParameters(uri, queryParameters, "");
}
public static URI applyParameters(URI uri, Map<String, String> queryParameters, String optionPrefix) throws URISyntaxException {
if (queryParameters != null && !queryParameters.isEmpty()) {
StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery()) : new StringBuffer() ;
for ( Map.Entry<String, String> param: queryParameters.entrySet()) {
if (newQuery.length()!=0) {
newQuery.append('&');
if (param.getKey().startsWith(optionPrefix)) {
if (newQuery.length()!=0) {
newQuery.append('&');
}
final String key = param.getKey().substring(optionPrefix.length());
newQuery.append(key).append('=').append(param.getValue());
}
newQuery.append(param.getKey()).append('=').append(param.getValue());
}
uri = createURIWithQuery(uri, newQuery.toString());
}
@ -219,7 +224,6 @@ public class URISupport {
* @param uri
* @param rc
* @param ssp
* @param p
* @throws URISyntaxException
*/
private static void parseComposite(URI uri, CompositeData rc, String ssp) throws URISyntaxException {
@ -269,7 +273,7 @@ public class URISupport {
}
/**
* @param componentString
* @param str
* @return
*/
private static String[] splitComponents(String str) {

View File

@ -60,14 +60,14 @@ public class XBeanBrokerFactory implements BrokerFactoryHandler {
}
public BrokerService createBroker(URI config) throws Exception {
Map map = URISupport.parseParameters(config);
if (!map.isEmpty()) {
IntrospectionSupport.setProperties(this, map);
config = URISupport.removeQuery(config);
}
String uri = config.getSchemeSpecificPart();
Map<String,String> parameters = URISupport.parseQuery(uri);
if (!parameters.isEmpty()) {
IntrospectionSupport.setProperties(this, parameters);
uri = uri.substring(0, uri.lastIndexOf('?'));
}
ApplicationContext context = createApplicationContext(uri);
BrokerService broker = null;

View File

@ -16,6 +16,13 @@
*/
package org.apache.activemq.network;
import java.net.URI;
import java.util.HashMap;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.junit.Assert.assertTrue;
import javax.jms.Connection;
@ -32,6 +39,7 @@ import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.transport.tcp.SslBrokerServiceTest;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,14 +52,21 @@ public class FailoverStaticNetworkTest {
private final static String DESTINATION_NAME = "testQ";
protected BrokerService brokerA;
protected BrokerService brokerA1;
protected BrokerService brokerB;
protected BrokerService brokerC;
private SslContext sslContext;
protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts) throws Exception {
return createBroker(scheme, listenPort, networkToPorts, null);
}
protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts,
HashMap<String, String> networkProps) throws Exception {
BrokerService broker = new BrokerService();
broker.setUseJmx(true);
broker.setUseJmx(false);
broker.getManagementContext().setCreateConnector(false);
broker.setSslContext(sslContext);
broker.setDeleteAllMessagesOnStartup(true);
@ -63,12 +78,30 @@ public class FailoverStaticNetworkTest {
for (int i=1;i<networkToPorts.length; i++) {
builder.append("," + scheme + "://localhost:" + networkToPorts[i]);
}
builder.append(")?randomize=false)");
broker.addNetworkConnector(builder.toString());
// limit the reconnects in case of initial random connection to slave
// leaving randomize on verifies that this config is picked up
builder.append(")?maxReconnectAttempts=1)");
NetworkConnector nc = broker.addNetworkConnector(builder.toString());
if (networkProps != null) {
IntrospectionSupport.setProperties(nc, networkProps);
}
}
return broker;
}
private BrokerService createBroker(String listenPort, String dataDir) throws Exception {
BrokerService broker = new BrokerService();
broker.setUseJmx(false);
broker.getManagementContext().setCreateConnector(false);
broker.setBrokerName("Broker_Shared");
// lazy create transport connector on start completion
TransportConnector connector = new TransportConnector();
connector.setUri(new URI("tcp://localhost:" + listenPort));
broker.addConnector(connector);
broker.setDataDirectory(dataDir);
return broker;
}
@Before
public void setUp() throws Exception {
KeyManager[] km = SslBrokerServiceTest.getKeyManager();
@ -83,6 +116,16 @@ public class FailoverStaticNetworkTest {
brokerA.stop();
brokerA.waitUntilStopped();
if (brokerA1 != null) {
brokerA1.stop();
brokerA1.waitUntilStopped();
}
if (brokerC != null) {
brokerC.stop();
brokerC.waitUntilStopped();
}
}
@Test
@ -123,6 +166,100 @@ public class FailoverStaticNetworkTest {
doTestNetworkSendReceive();
}
@Test
public void testSendReceiveFailoverDuplex() throws Exception {
final Vector<Throwable> errors = new Vector<Throwable>();
final String dataDir = "target/data/shared";
brokerA = createBroker("61617", dataDir);
brokerA.start();
final BrokerService slave = createBroker("63617", dataDir);
brokerA1 = slave;
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
public void run() {
try {
slave.start();
} catch (Exception e) {
e.printStackTrace();
errors.add(e);
}
}
});
executor.shutdown();
HashMap<String, String> networkConnectorProps = new HashMap<String, String>();
networkConnectorProps.put("duplex", "true");
brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, networkConnectorProps);
brokerB.start();
doTestNetworkSendReceive(brokerA, brokerB);
doTestNetworkSendReceive(brokerB, brokerA);
LOG.info("stopping brokerA (master shared_broker)");
brokerA.stop();
brokerA.waitUntilStopped();
// wait for slave to start
brokerA1.waitUntilStarted();
doTestNetworkSendReceive(brokerA1, brokerB);
doTestNetworkSendReceive(brokerB, brokerA1);
assertTrue("No unexpected exceptions " + errors, errors.isEmpty());
}
@Test
// master slave piggy in the middle setup
public void testSendReceiveFailoverDuplexWithPIM() throws Exception {
final String dataDir = "target/data/shared/pim";
brokerA = createBroker("61617", dataDir);
brokerA.start();
final BrokerService slave = createBroker("63617", dataDir);
brokerA1 = slave;
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
public void run() {
try {
slave.start();
} catch (Exception e) {
e.printStackTrace();
}
}
});
executor.shutdown();
HashMap<String, String> networkConnectorProps = new HashMap<String, String>();
networkConnectorProps.put("duplex", "true");
networkConnectorProps.put("networkTTL", "2");
brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, networkConnectorProps);
brokerB.start();
assertTrue("all props applied", networkConnectorProps.isEmpty());
networkConnectorProps.put("duplex", "true");
networkConnectorProps.put("networkTTL", "2");
brokerC = createBroker("tcp", "64617", new String[]{"61617", "63617"}, networkConnectorProps);
brokerC.start();
assertTrue("all props applied a second time", networkConnectorProps.isEmpty());
//Thread.sleep(4000);
doTestNetworkSendReceive(brokerC, brokerB);
doTestNetworkSendReceive(brokerB, brokerC);
LOG.info("stopping brokerA (master shared_broker)");
brokerA.stop();
brokerA.waitUntilStopped();
doTestNetworkSendReceive(brokerC, brokerB);
doTestNetworkSendReceive(brokerB, brokerC);
brokerC.stop();
brokerC.waitUntilStopped();
}
/**
* networked broker started after target so first connect attempt succeeds
* start order is important
@ -150,20 +287,25 @@ public class FailoverStaticNetworkTest {
}
private void doTestNetworkSendReceive() throws Exception, JMSException {
LOG.info("Creating Consumer on the networked brokerA ...");
doTestNetworkSendReceive(brokerB, brokerA);
}
private void doTestNetworkSendReceive(BrokerService to, BrokerService from) throws Exception, JMSException {
LOG.info("Creating Consumer on the networked broker ..." + from);
SslContext.setCurrentSslContext(sslContext);
// Create a consumer on brokerA
ConnectionFactory consFactory = createConnectionFactory(brokerA);
ConnectionFactory consFactory = createConnectionFactory(from);
Connection consConn = consFactory.createConnection();
consConn.start();
Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME);
final MessageConsumer consumer = consSession.createConsumer(destination);
LOG.info("publishing to brokerB");
LOG.info("publishing to " + to);
sendMessageTo(destination, brokerB);
sendMessageTo(destination, to);
boolean gotMessage = Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {

View File

@ -105,7 +105,7 @@ public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport {
String groupId = "WillNotMatch" + startT;
try {
String urlStr = "discovery:(multicast://default?group=" + groupId +
")?useExponentialBackOff=false&maxReconnectAttempts=2&reconnectDelay=" + initialReconnectDelay;
")?useExponentialBackOff=false&maxReconnectAttempts=2&reconnectDelay=" + initialReconnectDelay;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlStr);
LOG.info("Connecting.");
Connection connection = factory.createConnection();
@ -121,7 +121,8 @@ public class DiscoveryTransportNoBrokerTest extends CombinationTestSupport {
public void testSetDiscoveredBrokerProperties() throws Exception {
final String extraParameterName = "connectionTimeout";
final String extraParameterValue = "3000";
final URI uri = new URI("discovery:(multicast://default)?initialReconnectDelay=100&" + extraParameterName + "=" + extraParameterValue);
final URI uri = new URI("discovery:(multicast://default)?initialReconnectDelay=100&"
+ DiscoveryListener.DISCOVERED_OPTION_PREFIX + extraParameterName + "=" + extraParameterValue);
CompositeData compositeData = URISupport.parseComposite(uri);
StubCompositeTransport compositeTransport = new StubCompositeTransport();

View File

@ -18,6 +18,7 @@ package org.apache.activemq.util;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import junit.framework.TestCase;
@ -103,7 +104,7 @@ public class URISupportTest extends TestCase {
}
public void testParsingParams() throws Exception {
URI uri = new URI("static:(http://localhost:61617?proxyHost=localhost&proxyPort=80)");
URI uri = new URI("static:(http://localhost:61617?proxyHost=jo&proxyPort=90)?proxyHost=localhost&proxyPort=80");
Map<String,String>parameters = URISupport.parseParameters(uri);
verifyParams(parameters);
uri = new URI("static://http://localhost:61617?proxyHost=localhost&proxyPort=80");
@ -134,6 +135,28 @@ public class URISupportTest extends TestCase {
assertEquals(querylessURI, URISupport.createURIWithQuery(originalURI, ""));
assertEquals(new URI(querylessURI + "?" + queryString), URISupport.createURIWithQuery(originalURI, queryString));
}
public void testApplyParameters() throws Exception {
URI uri = new URI("http://0.0.0.0:61616");
Map<String,String> parameters = new HashMap<String, String>();
parameters.put("t.proxyHost", "localhost");
parameters.put("t.proxyPort", "80");
uri = URISupport.applyParameters(uri, parameters);
Map<String,String> appliedParameters = URISupport.parseParameters(uri);
assertEquals("all params applied with no prefix", 2, appliedParameters.size());
// strip off params again
uri = URISupport.createURIWithQuery(uri, null);
uri = URISupport.applyParameters(uri, parameters, "joe");
appliedParameters = URISupport.parseParameters(uri);
assertTrue("no params applied as none match joe", appliedParameters.isEmpty());
uri = URISupport.applyParameters(uri, parameters, "t.");
verifyParams(URISupport.parseParameters(uri));
}
private void verifyParams(Map<String,String> parameters) {
assertEquals(parameters.get("proxyHost"), "localhost");