tidy up vm url handling, fix failover and ssl context: resolve https://issues.apache.org/activemq/browse/AMQ-2715 and add test for same, improve logging around network connector

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@938998 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-04-28 16:12:29 +00:00
parent 80fd502dcd
commit c4d8bc47a0
7 changed files with 203 additions and 13 deletions

View File

@ -71,7 +71,6 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
}
public void onServiceAdd(DiscoveryEvent event) {
String localURIName = localURI.getScheme() + "://" + localURI.getHost();
// Ignore events once we start stopping.
if (serviceSupport.isStopped() || serviceSupport.isStopping()) {
return;
@ -100,7 +99,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
} catch (URISyntaxException e) {
LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e);
}
LOG.info("Establishing network connection from " + localURIName + " to " + connectUri);
LOG.info("Establishing network connection from " + localURI + " to " + connectUri);
Transport remoteTransport;
Transport localTransport;
@ -118,7 +117,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
localTransport = createLocalTransport();
} catch (Exception e) {
ServiceSupport.dispose(remoteTransport);
LOG.warn("Could not connect to local URI: " + localURIName + ": " + e.getMessage());
LOG.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage());
LOG.debug("Connection failure exception: " + e, e);
return;
}
@ -132,7 +131,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
} catch (Exception e) {
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);
LOG.warn("Could not start network bridge between: " + localURIName + " and: " + uri + " due to: " + e);
LOG.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e);
LOG.debug("Start failure exception: " + e, e);
try {
discoveryAgent.serviceFailed(event);

View File

@ -31,6 +31,8 @@ import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionId;
@ -109,9 +111,11 @@ public class FailoverTransport implements CompositeTransport {
private final TransportListener myTransportListener = createTransportListener();
private boolean updateURIsSupported=true;
private boolean reconnectSupported=true;
// remember for reconnect thread
private SslContext brokerSslContext;
public FailoverTransport() throws InterruptedIOException {
brokerSslContext = SslContext.getCurrentSslContext();
stateTracker.setTrackTransactions(true);
// Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
@ -792,6 +796,7 @@ public class FailoverTransport implements CompositeTransport {
Transport t = null;
try {
LOG.debug("Attempting connect to: " + uri);
SslContext.setCurrentSslContext(brokerSslContext);
t = TransportFactory.compositeConnect(uri);
t.setTransportListener(myTransportListener);
t.start();
@ -842,6 +847,8 @@ public class FailoverTransport implements CompositeTransport {
LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
}
}
} finally {
SslContext.setCurrentSslContext(null);
}
}
}
@ -921,6 +928,7 @@ public class FailoverTransport implements CompositeTransport {
URI uri = iter.next();
if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
try {
SslContext.setCurrentSslContext(brokerSslContext);
BackupTransport bt = new BackupTransport(this);
bt.setUri(uri);
if (!backups.contains(bt)) {
@ -932,6 +940,8 @@ public class FailoverTransport implements CompositeTransport {
}
} catch (Exception e) {
LOG.debug("Failed to build backup ", e);
} finally {
SslContext.setCurrentSslContext(null);
}
}
}

View File

@ -76,7 +76,7 @@ public class VMTransportFactory extends TransportFactory {
// If using the less complex vm://localhost?broker.persistent=true
// form
try {
host = location.getHost();
host = extractHost(location);
options = URISupport.parseParamters(location);
String config = (String)options.remove("brokerConfig");
if (config != null) {
@ -157,7 +157,18 @@ public class VMTransportFactory extends TransportFactory {
return transport;
}
/**
private static String extractHost(URI location) {
String host = location.getHost();
if (host == null || host.length() == 0) {
host = location.getAuthority();
if (host == null || host.length() == 0) {
host = "localhost";
}
}
return host;
}
/**
* @param registry
* @param brokerName
* @param waitForStart - time in milliseconds to wait for a broker to appear
@ -193,7 +204,7 @@ public class VMTransportFactory extends TransportFactory {
* @throws IOException
*/
private TransportServer bind(URI location, boolean dispose) throws IOException {
String host = location.getHost();
String host = extractHost(location);
LOG.debug("binding to broker: " + host);
VMTransportServer server = new VMTransportServer(location, dispose);
Object currentBoundValue = SERVERS.get(host);
@ -205,7 +216,7 @@ public class VMTransportFactory extends TransportFactory {
}
public static void stopped(VMTransportServer server) {
String host = server.getBindURI().getHost();
String host = extractHost(server.getBindURI());
stopped(host);
}

View File

@ -157,8 +157,16 @@ public class URISupport {
* Creates a URI with the given query
*/
public static URI createURIWithQuery(URI uri, String query) throws URISyntaxException {
return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uri.getPath(),
query, uri.getFragment());
String schemeSpecificPart = uri.getRawSchemeSpecificPart();
// strip existing query if any
int questionMark = schemeSpecificPart.lastIndexOf("?");
if (questionMark > 0) {
schemeSpecificPart = schemeSpecificPart.substring(0, questionMark);
}
if (query != null && query.length() > 0) {
schemeSpecificPart += "?" + query;
}
return new URI(uri.getScheme(), schemeSpecificPart, uri.getFragment());
}
public static CompositeData parseComposite(URI uri) throws URISyntaxException {

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import static org.junit.Assert.assertTrue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.transport.tcp.SslBrokerServiceTest;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class FailoverStaticNetworkTest {
protected static final Log LOG = LogFactory.getLog(FailoverStaticNetworkTest.class);
private final static String DESTINATION_NAME = "testQ";
protected BrokerService brokerA;
protected BrokerService brokerB;
private SslContext sslContext;
protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts) throws Exception {
BrokerService broker = new BrokerService();
broker.setSslContext(sslContext);
broker.setDeleteAllMessagesOnStartup(true);
broker.setBrokerName("Broker_" + listenPort);
broker.addConnector(scheme + "://localhost:" + listenPort);
if (networkToPorts != null && networkToPorts.length > 0) {
StringBuilder builder = new StringBuilder("static:(failover:(" + scheme + "://localhost:");
builder.append(networkToPorts[0]);
for (int i=1;i<networkToPorts.length; i++) {
builder.append("," + scheme + "://localhost:" + networkToPorts[i]);
}
builder.append(")?randomize=false)");
broker.addNetworkConnector(builder.toString());
}
return broker;
}
@Before
public void init() throws Exception {
KeyManager[] km = SslBrokerServiceTest.getKeyManager();
TrustManager[] tm = SslBrokerServiceTest.getTrustManager();
sslContext = new SslContext(km, tm, null);
}
@After
public void cleanup() throws Exception {
brokerB.stop();
brokerB.waitUntilStopped();
brokerA.stop();
brokerA.waitUntilStopped();
}
/**
* networked broker started after target so first connect attempt succeeds
* start order is important
*/
@Test
public void testSendReceive() throws Exception {
brokerA = createBroker("tcp", "61617", null);
brokerA.start();
brokerB = createBroker("tcp", "62617", new String[]{"61617","1111"});
brokerB.start();
testNetworkSendReceive();
}
@Test
public void testSendReceiveSsl() throws Exception {
brokerA = createBroker("ssl", "61617", null);
brokerA.start();
brokerB = createBroker("ssl", "62617", new String[]{"61617", "1111"});
brokerB.start();
testNetworkSendReceive();
}
private void testNetworkSendReceive() throws Exception, JMSException {
LOG.info("Creating Consumer on the networked broker ...");
SslContext.setCurrentSslContext(sslContext);
// Create a consumer on brokerB
ConnectionFactory consFactory = createConnectionFactory(brokerA);
Connection consConn = consFactory.createConnection();
consConn.start();
Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME);
final MessageConsumer consumer = consSession.createConsumer(destination);
sendMessageTo(destination, brokerB);
assertTrue("consumer got message", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return consumer.receive(1000) != null;
}
}));
}
private void sendMessageTo(ActiveMQDestination destination, BrokerService brokerService) throws Exception {
ConnectionFactory factory = createConnectionFactory(brokerService);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createProducer(destination).send(session.createTextMessage("Hi"));
conn.close();
}
protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception {
String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connectionFactory.setOptimizedMessageDispatch(true);
connectionFactory.setDispatchAsync(false);
connectionFactory.setUseAsyncSend(false);
connectionFactory.setOptimizeAcknowledge(false);
connectionFactory.setAlwaysSyncSend(true);
return connectionFactory;
}
}

View File

@ -121,7 +121,7 @@ public class SslBrokerServiceTest extends TransportBrokerTestSupport {
LOG.info("peer cert: " + session.getPeerCertificateChain()[0].toString());
}
private TrustManager[] getTrustManager() throws Exception {
public static TrustManager[] getTrustManager() throws Exception {
TrustManager[] trustStoreManagers = null;
KeyStore trustedCertStore = KeyStore.getInstance(SslTransportBrokerTest.KEYSTORE_TYPE);
@ -134,7 +134,7 @@ public class SslBrokerServiceTest extends TransportBrokerTestSupport {
return trustStoreManagers;
}
private KeyManager[] getKeyManager() throws Exception {
public static KeyManager[] getKeyManager() throws Exception {
KeyManagerFactory kmf =
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
KeyStore ks = KeyStore.getInstance(SslTransportBrokerTest.KEYSTORE_TYPE);

View File

@ -92,4 +92,14 @@ public class URISupportTest extends TestCase {
assertTrue(URISupport.checkParenthesis(str));
}
public void testCreateWithQuery() throws Exception {
URI source = new URI("vm://localhost");
URI dest = URISupport.createURIWithQuery(source, "network=true&one=two");
assertEquals("correct param count", 2, URISupport.parseParamters(dest).size());
assertEquals("same uri, host", source.getHost(), dest.getHost());
assertEquals("same uri, scheme", source.getScheme(), dest.getScheme());
assertFalse("same uri, ssp", dest.getQuery().equals(source.getQuery()));
}
}