applied patch to add the supplied test case.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1221310 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-12-20 15:12:11 +00:00
parent c534900092
commit c856f30bc5
10 changed files with 239 additions and 14 deletions

View File

@ -139,6 +139,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
this.transport.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object o) {
if (pendingStop) {
if (LOG.isTraceEnabled()) {
LOG.trace("Ignoring Command due to pending stop: " + o);
}
return;
}
serviceLock.readLock().lock();
try {
if (!(o instanceof Command)) {
@ -267,7 +275,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
SERVICELOG.warn("Async error occurred: " + e, e);
ConnectionError ce = new ConnectionError();
ce.setException(e);
dispatchAsync(ce);
if (pendingStop) {
dispatchSync(ce);
} else {
dispatchAsync(ce);
}
} finally {
inServiceException = false;
}
@ -287,13 +299,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
+ " command: " + command + ", exception: " + e, e);
}
if (responseRequired) {
if(e instanceof java.lang.SecurityException){
// still need to close this down - in case the peer of this transport doesn't play nice
delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage());
}
if (responseRequired) {
response = new ExceptionResponse(e);
if(e instanceof java.lang.SecurityException){
//still need to close this down - incase the peer of this transport doesn't play nice
delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage());
}
} else {
serviceException(e);
}
@ -482,6 +494,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
SessionId sessionId = info.getProducerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
if (cs == null) {
throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
+ connectionId);
}
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
@ -521,6 +537,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
SessionId sessionId = info.getConsumerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
if (cs == null) {
throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
+ connectionId);
}
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
throw new IllegalStateException(broker.getBrokerName()
@ -593,16 +613,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// this down.
session.shutdown();
// Cascade the connection stop to the consumers and producers.
for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext(); ) {
ConsumerId consumerId = (ConsumerId) iter.next();
for (ConsumerId consumerId : session.getConsumerIds()) {
try {
processRemoveConsumer(consumerId, lastDeliveredSequenceId);
} catch (Throwable e) {
LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
}
}
for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext(); ) {
ProducerId producerId = (ProducerId) iter.next();
for (ProducerId producerId : session.getProducerIds()) {
try {
processRemoveProducer(producerId);
} catch (Throwable e) {
@ -709,8 +727,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// are shutting down.
cs.shutdown();
// Cascade the connection stop to the sessions.
for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext(); ) {
SessionId sessionId = (SessionId) iter.next();
for (SessionId sessionId : cs.getSessionIds()) {
try {
processRemoveSession(sessionId, lastDeliveredSequenceId);
} catch (Throwable e) {
@ -718,8 +735,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
// Cascade the connection stop to temp destinations.
for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
DestinationInfo di = (DestinationInfo) iter.next();
for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
DestinationInfo di = iter.next();
try {
broker.removeDestination(cs.getContext(), di.getDestination(), 0);
} catch (Throwable e) {
@ -913,6 +930,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public void delayedStop(final int waitTime, final String reason) {
if (waitTime > 0) {
synchronized (this) {
pendingStop = true;
}
try {
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
public void run() {

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Appender;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
*
*/
public class AMQ3625Test {
protected BrokerService broker1;
protected BrokerService broker2;
protected AtomicBoolean authenticationFailed = new AtomicBoolean(false);
protected AtomicBoolean gotNPE = new AtomicBoolean(false);
protected String java_security_auth_login_config = "java.security.auth.login.config";
protected String xbean = "xbean:";
protected String base = "src/test/resources/org/apache/activemq/bugs/amq3625";
protected String conf = "conf";
protected String keys = "keys";
protected String sep = File.separator;
protected String JaasStompSSLBroker1_xml = "JaasStompSSLBroker1.xml";
protected String JaasStompSSLBroker2_xml = "JaasStompSSLBroker2.xml";
protected String oldLoginConf = null;
@Before
public void before() throws Exception {
if (System.getProperty(java_security_auth_login_config) != null) {
oldLoginConf = System.getProperty(java_security_auth_login_config);
}
System.setProperty(java_security_auth_login_config, base + sep + conf + sep + "login.config");
broker1 = BrokerFactory.createBroker(xbean + base + sep + conf + sep + JaasStompSSLBroker1_xml);
broker2 = BrokerFactory.createBroker(xbean + base + sep + conf + sep + JaasStompSSLBroker2_xml);
broker1.start();
broker1.waitUntilStarted();
broker2.start();
broker2.waitUntilStarted();
}
@After
public void after() throws Exception {
broker1.stop();
broker2.stop();
if (oldLoginConf != null) {
System.setProperty(java_security_auth_login_config, oldLoginConf);
}
}
@Test
public void go() throws Exception {
Appender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
if (event.getThrowableInformation() != null) {
Throwable t = event.getThrowableInformation().getThrowable();
if (t instanceof SecurityException) {
authenticationFailed.set(true);
}
if (t instanceof NullPointerException) {
gotNPE.set(true);
}
}
}
};
Logger.getRootLogger().addAppender(appender);
String connectURI = broker1.getConnectorByName("openwire").getConnectUri().toString();
connectURI = connectURI.replace("?needClientAuth=true", "");
broker2.addNetworkConnector("static:(" + connectURI + ")").start();
Thread.sleep(10 * 1000);
Logger.getRootLogger().removeAppender(appender);
assertTrue(authenticationFailed.get());
assertFalse(gotNPE.get());
}
}

View File

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
http://activemq.org/config/1.0 http://activemq.apache.org/snapshot-schema/activemq-core-5.0-SNAPSHOT.xsd"
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker1" id="broker1" useJmx="false" persistent="false">
<plugins>
<jaasCertificateAuthenticationPlugin configuration="CertLogin"/>
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<amq:authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
<amq:authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
<authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
</authorizationEntries>
<tempDestinationAuthorizationEntry>
<tempDestinationAuthorizationEntry read="tempDestinationAdmins" write="tempDestinationAdmins" admin="tempDestinationAdmins"/>
</tempDestinationAuthorizationEntry>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
<sslContext>
<sslContext
keyStore="file:./src/test/resources/org/apache/activemq/bugs/amq3625/keys/broker2.ks" keyStorePassword="password"
trustStore="file:./src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ks" trustStorePassword="password"/>
</sslContext>
<transportConnectors>
<transportConnector name="openwire" uri="ssl://0.0.0.0:0?needClientAuth=true" />
</transportConnectors>
</broker>
</beans>

View File

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker2" id="broker2" useJmx="false" persistent="false">
<sslContext>
<sslContext
keyStore="./src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ks" keyStorePassword="password"
trustStore="./src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ts" trustStorePassword="password"/>
</sslContext>
<transportConnectors>
<transportConnector name="openwire" uri="ssl://localhost:0?needClientAuth=true" />
</transportConnectors>
</broker>
</beans>

View File

@ -0,0 +1,4 @@
admins=system,dave
tempDestinationAdmins=system,user,dave
users=system,tester,user,dave,admin
guests=guest

View File

@ -0,0 +1,7 @@
CertLogin {
org.apache.activemq.jaas.TextFileCertificateLoginModule required
debug=true
org.apache.activemq.jaas.textfiledn.user="users2.properties
org.apache.activemq.jaas.textfiledn.group="groups2.properties";
};

View File

@ -0,0 +1,7 @@
guests=myguests
system=manager
admin=apassword
user=password
guest=password
tester=mypassword
dave=CN=Hello Dave Stanley, OU=FuseSource, O=Progress, L=Unknown, ST=MA, C=US