diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index a42af26bfa..757b1d822b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -139,6 +139,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor { this.transport.setTransportListener(new DefaultTransportListener() { @Override public void onCommand(Object o) { + + if (pendingStop) { + if (LOG.isTraceEnabled()) { + LOG.trace("Ignoring Command due to pending stop: " + o); + } + return; + } + serviceLock.readLock().lock(); try { if (!(o instanceof Command)) { @@ -267,7 +275,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor { SERVICELOG.warn("Async error occurred: " + e, e); ConnectionError ce = new ConnectionError(); ce.setException(e); - dispatchAsync(ce); + if (pendingStop) { + dispatchSync(ce); + } else { + dispatchAsync(ce); + } } finally { inServiceException = false; } @@ -287,13 +299,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor { + " command: " + command + ", exception: " + e, e); } - if (responseRequired) { + if(e instanceof java.lang.SecurityException){ + // still need to close this down - in case the peer of this transport doesn't play nice + delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage()); + } + if (responseRequired) { response = new ExceptionResponse(e); - if(e instanceof java.lang.SecurityException){ - //still need to close this down - incase the peer of this transport doesn't play nice - delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage()); - } } else { serviceException(e); } @@ -482,6 +494,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor { SessionId sessionId = info.getProducerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); TransportConnectionState cs = lookupConnectionState(connectionId); + if (cs == null) { + throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + + connectionId); + } SessionState ss = cs.getSessionState(sessionId); if (ss == null) { throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " @@ -521,6 +537,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor { SessionId sessionId = info.getConsumerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); TransportConnectionState cs = lookupConnectionState(connectionId); + if (cs == null) { + throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + + connectionId); + } SessionState ss = cs.getSessionState(sessionId); if (ss == null) { throw new IllegalStateException(broker.getBrokerName() @@ -593,16 +613,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor { // this down. session.shutdown(); // Cascade the connection stop to the consumers and producers. - for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext(); ) { - ConsumerId consumerId = (ConsumerId) iter.next(); + for (ConsumerId consumerId : session.getConsumerIds()) { try { processRemoveConsumer(consumerId, lastDeliveredSequenceId); } catch (Throwable e) { LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e); } } - for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext(); ) { - ProducerId producerId = (ProducerId) iter.next(); + for (ProducerId producerId : session.getProducerIds()) { try { processRemoveProducer(producerId); } catch (Throwable e) { @@ -709,8 +727,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { // are shutting down. cs.shutdown(); // Cascade the connection stop to the sessions. - for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext(); ) { - SessionId sessionId = (SessionId) iter.next(); + for (SessionId sessionId : cs.getSessionIds()) { try { processRemoveSession(sessionId, lastDeliveredSequenceId); } catch (Throwable e) { @@ -718,8 +735,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } // Cascade the connection stop to temp destinations. - for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) { - DestinationInfo di = (DestinationInfo) iter.next(); + for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) { + DestinationInfo di = iter.next(); try { broker.removeDestination(cs.getContext(), di.getDestination(), 0); } catch (Throwable e) { @@ -913,6 +930,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public void delayedStop(final int waitTime, final String reason) { if (waitTime > 0) { + synchronized (this) { + pendingStop = true; + } try { DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { public void run() { diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java new file mode 100644 index 0000000000..939312a439 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * + */ + +public class AMQ3625Test { + + protected BrokerService broker1; + protected BrokerService broker2; + + protected AtomicBoolean authenticationFailed = new AtomicBoolean(false); + protected AtomicBoolean gotNPE = new AtomicBoolean(false); + + protected String java_security_auth_login_config = "java.security.auth.login.config"; + protected String xbean = "xbean:"; + protected String base = "src/test/resources/org/apache/activemq/bugs/amq3625"; + protected String conf = "conf"; + protected String keys = "keys"; + protected String sep = File.separator; + protected String JaasStompSSLBroker1_xml = "JaasStompSSLBroker1.xml"; + protected String JaasStompSSLBroker2_xml = "JaasStompSSLBroker2.xml"; + + protected String oldLoginConf = null; + + @Before + public void before() throws Exception { + if (System.getProperty(java_security_auth_login_config) != null) { + oldLoginConf = System.getProperty(java_security_auth_login_config); + } + System.setProperty(java_security_auth_login_config, base + sep + conf + sep + "login.config"); + broker1 = BrokerFactory.createBroker(xbean + base + sep + conf + sep + JaasStompSSLBroker1_xml); + broker2 = BrokerFactory.createBroker(xbean + base + sep + conf + sep + JaasStompSSLBroker2_xml); + + broker1.start(); + broker1.waitUntilStarted(); + broker2.start(); + broker2.waitUntilStarted(); + } + + @After + public void after() throws Exception { + broker1.stop(); + broker2.stop(); + + if (oldLoginConf != null) { + System.setProperty(java_security_auth_login_config, oldLoginConf); + } + } + + @Test + public void go() throws Exception { + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getThrowableInformation() != null) { + Throwable t = event.getThrowableInformation().getThrowable(); + if (t instanceof SecurityException) { + authenticationFailed.set(true); + } + if (t instanceof NullPointerException) { + gotNPE.set(true); + } + } + } + }; + Logger.getRootLogger().addAppender(appender); + + String connectURI = broker1.getConnectorByName("openwire").getConnectUri().toString(); + connectURI = connectURI.replace("?needClientAuth=true", ""); + broker2.addNetworkConnector("static:(" + connectURI + ")").start(); + + Thread.sleep(10 * 1000); + + Logger.getRootLogger().removeAppender(appender); + + assertTrue(authenticationFailed.get()); + assertFalse(gotNPE.get()); + } +} diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker1.xml b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker1.xml new file mode 100644 index 0000000000..471c333001 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker1.xml @@ -0,0 +1,52 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker2.xml b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker2.xml new file mode 100644 index 0000000000..0c8585b3b9 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker2.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + + + + + + diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/groups2.properties b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/groups2.properties new file mode 100644 index 0000000000..40e9d24e18 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/groups2.properties @@ -0,0 +1,4 @@ +admins=system,dave +tempDestinationAdmins=system,user,dave +users=system,tester,user,dave,admin +guests=guest diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/login.config b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/login.config new file mode 100644 index 0000000000..7c0e8e1998 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/login.config @@ -0,0 +1,7 @@ + +CertLogin { + org.apache.activemq.jaas.TextFileCertificateLoginModule required + debug=true + org.apache.activemq.jaas.textfiledn.user="users2.properties + org.apache.activemq.jaas.textfiledn.group="groups2.properties"; +}; diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/users2.properties b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/users2.properties new file mode 100644 index 0000000000..70c7f6bbc9 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/users2.properties @@ -0,0 +1,7 @@ +guests=myguests +system=manager +admin=apassword +user=password +guest=password +tester=mypassword +dave=CN=Hello Dave Stanley, OU=FuseSource, O=Progress, L=Unknown, ST=MA, C=US diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/broker2.ks b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/broker2.ks new file mode 100644 index 0000000000..ebb7c5fef4 Binary files /dev/null and b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/broker2.ks differ diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ks b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ks new file mode 100644 index 0000000000..3bfa43ea7c Binary files /dev/null and b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ks differ diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ts b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ts new file mode 100644 index 0000000000..3f8301f440 Binary files /dev/null and b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ts differ