From c856f30bc556ccdf5c1f05a0244743eca8100c3e Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Tue, 20 Dec 2011 15:12:11 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-3625 applied patch to add the supplied test case. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1221310 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/TransportConnection.java | 48 +++++--- .../org/apache/activemq/bugs/AMQ3625Test.java | 112 ++++++++++++++++++ .../bugs/amq3625/conf/JaasStompSSLBroker1.xml | 52 ++++++++ .../bugs/amq3625/conf/JaasStompSSLBroker2.xml | 23 ++++ .../bugs/amq3625/conf/groups2.properties | 4 + .../activemq/bugs/amq3625/conf/login.config | 7 ++ .../bugs/amq3625/conf/users2.properties | 7 ++ .../activemq/bugs/amq3625/keys/broker2.ks | Bin 0 -> 1357 bytes .../activemq/bugs/amq3625/keys/client2.ks | Bin 0 -> 1358 bytes .../activemq/bugs/amq3625/keys/client2.ts | Bin 0 -> 651 bytes 10 files changed, 239 insertions(+), 14 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java create mode 100644 activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker1.xml create mode 100644 activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker2.xml create mode 100644 activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/groups2.properties create mode 100644 activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/login.config create mode 100644 activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/users2.properties create mode 100644 activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/broker2.ks create mode 100644 activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ks create mode 100644 activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ts diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index a42af26bfa..757b1d822b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -139,6 +139,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor { this.transport.setTransportListener(new DefaultTransportListener() { @Override public void onCommand(Object o) { + + if (pendingStop) { + if (LOG.isTraceEnabled()) { + LOG.trace("Ignoring Command due to pending stop: " + o); + } + return; + } + serviceLock.readLock().lock(); try { if (!(o instanceof Command)) { @@ -267,7 +275,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor { SERVICELOG.warn("Async error occurred: " + e, e); ConnectionError ce = new ConnectionError(); ce.setException(e); - dispatchAsync(ce); + if (pendingStop) { + dispatchSync(ce); + } else { + dispatchAsync(ce); + } } finally { inServiceException = false; } @@ -287,13 +299,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor { + " command: " + command + ", exception: " + e, e); } - if (responseRequired) { + if(e instanceof java.lang.SecurityException){ + // still need to close this down - in case the peer of this transport doesn't play nice + delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage()); + } + if (responseRequired) { response = new ExceptionResponse(e); - if(e instanceof java.lang.SecurityException){ - //still need to close this down - incase the peer of this transport doesn't play nice - delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage()); - } } else { serviceException(e); } @@ -482,6 +494,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor { SessionId sessionId = info.getProducerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); TransportConnectionState cs = lookupConnectionState(connectionId); + if (cs == null) { + throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + + connectionId); + } SessionState ss = cs.getSessionState(sessionId); if (ss == null) { throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " @@ -521,6 +537,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor { SessionId sessionId = info.getConsumerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); TransportConnectionState cs = lookupConnectionState(connectionId); + if (cs == null) { + throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " + + connectionId); + } SessionState ss = cs.getSessionState(sessionId); if (ss == null) { throw new IllegalStateException(broker.getBrokerName() @@ -593,16 +613,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor { // this down. session.shutdown(); // Cascade the connection stop to the consumers and producers. - for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext(); ) { - ConsumerId consumerId = (ConsumerId) iter.next(); + for (ConsumerId consumerId : session.getConsumerIds()) { try { processRemoveConsumer(consumerId, lastDeliveredSequenceId); } catch (Throwable e) { LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e); } } - for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext(); ) { - ProducerId producerId = (ProducerId) iter.next(); + for (ProducerId producerId : session.getProducerIds()) { try { processRemoveProducer(producerId); } catch (Throwable e) { @@ -709,8 +727,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { // are shutting down. cs.shutdown(); // Cascade the connection stop to the sessions. - for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext(); ) { - SessionId sessionId = (SessionId) iter.next(); + for (SessionId sessionId : cs.getSessionIds()) { try { processRemoveSession(sessionId, lastDeliveredSequenceId); } catch (Throwable e) { @@ -718,8 +735,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } } // Cascade the connection stop to temp destinations. - for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) { - DestinationInfo di = (DestinationInfo) iter.next(); + for (Iterator iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) { + DestinationInfo di = iter.next(); try { broker.removeDestination(cs.getContext(), di.getDestination(), 0); } catch (Throwable e) { @@ -913,6 +930,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { public void delayedStop(final int waitTime, final String reason) { if (waitTime > 0) { + synchronized (this) { + pendingStop = true; + } try { DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() { public void run() { diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java new file mode 100644 index 0000000000..939312a439 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * + */ + +public class AMQ3625Test { + + protected BrokerService broker1; + protected BrokerService broker2; + + protected AtomicBoolean authenticationFailed = new AtomicBoolean(false); + protected AtomicBoolean gotNPE = new AtomicBoolean(false); + + protected String java_security_auth_login_config = "java.security.auth.login.config"; + protected String xbean = "xbean:"; + protected String base = "src/test/resources/org/apache/activemq/bugs/amq3625"; + protected String conf = "conf"; + protected String keys = "keys"; + protected String sep = File.separator; + protected String JaasStompSSLBroker1_xml = "JaasStompSSLBroker1.xml"; + protected String JaasStompSSLBroker2_xml = "JaasStompSSLBroker2.xml"; + + protected String oldLoginConf = null; + + @Before + public void before() throws Exception { + if (System.getProperty(java_security_auth_login_config) != null) { + oldLoginConf = System.getProperty(java_security_auth_login_config); + } + System.setProperty(java_security_auth_login_config, base + sep + conf + sep + "login.config"); + broker1 = BrokerFactory.createBroker(xbean + base + sep + conf + sep + JaasStompSSLBroker1_xml); + broker2 = BrokerFactory.createBroker(xbean + base + sep + conf + sep + JaasStompSSLBroker2_xml); + + broker1.start(); + broker1.waitUntilStarted(); + broker2.start(); + broker2.waitUntilStarted(); + } + + @After + public void after() throws Exception { + broker1.stop(); + broker2.stop(); + + if (oldLoginConf != null) { + System.setProperty(java_security_auth_login_config, oldLoginConf); + } + } + + @Test + public void go() throws Exception { + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getThrowableInformation() != null) { + Throwable t = event.getThrowableInformation().getThrowable(); + if (t instanceof SecurityException) { + authenticationFailed.set(true); + } + if (t instanceof NullPointerException) { + gotNPE.set(true); + } + } + } + }; + Logger.getRootLogger().addAppender(appender); + + String connectURI = broker1.getConnectorByName("openwire").getConnectUri().toString(); + connectURI = connectURI.replace("?needClientAuth=true", ""); + broker2.addNetworkConnector("static:(" + connectURI + ")").start(); + + Thread.sleep(10 * 1000); + + Logger.getRootLogger().removeAppender(appender); + + assertTrue(authenticationFailed.get()); + assertFalse(gotNPE.get()); + } +} diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker1.xml b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker1.xml new file mode 100644 index 0000000000..471c333001 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker1.xml @@ -0,0 +1,52 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker2.xml b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker2.xml new file mode 100644 index 0000000000..0c8585b3b9 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/JaasStompSSLBroker2.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + + + + + + diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/groups2.properties b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/groups2.properties new file mode 100644 index 0000000000..40e9d24e18 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/groups2.properties @@ -0,0 +1,4 @@ +admins=system,dave +tempDestinationAdmins=system,user,dave +users=system,tester,user,dave,admin +guests=guest diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/login.config b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/login.config new file mode 100644 index 0000000000..7c0e8e1998 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/login.config @@ -0,0 +1,7 @@ + +CertLogin { + org.apache.activemq.jaas.TextFileCertificateLoginModule required + debug=true + org.apache.activemq.jaas.textfiledn.user="users2.properties + org.apache.activemq.jaas.textfiledn.group="groups2.properties"; +}; diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/users2.properties b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/users2.properties new file mode 100644 index 0000000000..70c7f6bbc9 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/conf/users2.properties @@ -0,0 +1,7 @@ +guests=myguests +system=manager +admin=apassword +user=password +guest=password +tester=mypassword +dave=CN=Hello Dave Stanley, OU=FuseSource, O=Progress, L=Unknown, ST=MA, C=US diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/broker2.ks b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/broker2.ks new file mode 100644 index 0000000000000000000000000000000000000000..ebb7c5fef4ac14798183c693372173ab278db7ba GIT binary patch literal 1357 zcmezO_TO6u1_mY|W&~rlq@w)n)FL2P#cG3YCQxdZK@-z913oq`Z8k<0MlB{mMn+Z! zmL{e}<;E^av2VlvGbk!uc)mT@Mx|%A#A7?fV{8)-t$mYn@p`Y^>9y`Lx5F*|KMLon zV9yS?QE`)}RQR9i*(&+poa56tAbpQ^_<@sj$t%E#lpzcj0-{O;mrp+~p$gblBPVn_sEXhW`sU?>&~YOk{z} zF8;}{g5rH71sM;Toj(yLSH$0O!D#Cx_pOG_P21dBHcyo>O6+J~_$+$YzUZhFvwP-7 znLgWtexH}@GE-P!^J<;Zm-U}-tbca3^wFZtY6iVm)q_RYO8%DBY)zd!=~|u$Kcnkv zhKmy$*IZg_>YRDBZPJv#9{X%xT-=jZ8Ms;4GRxw4;6BBmC|hgC z#MNoL7C!WsmVNUx^s~V??Y3JMvk!?a<72nlw8-L4TElDQo&OowW=uSEG$$*WXO7)< z9+rthZ)cpUo_EeGHPPPJUhLn859u4SJ9pe%)cUSNm1oM?Y}tJ)EAsqp|6bB zjk7GbROrfQ;h_2_htGEGRQc^H>ub1~>*R`^Jk0Jp9#{w*Z@GPH(Z*BS>IY=DD_+qq zE6DaNTk=~V;_!^K`XA)ms((dKZdj}y_2ayB;61Ggi@fU-1BHyeIZMkHKFwVFam%yZ z!v3Hij=z6|vzVA_mrT%G#K)3$>pSbY(;8JT7TNFN^S$9*&6_A>J=g!w1kP@r8<$1c z*&ScX38rvXsH{9~@b}D&8g?!FN|{T0O@swr1pi(yl&D~z1y0ed5qhQumcWGU15C&s z22G3`nOMBeciJ29vTFm_0Nv zJ1@UH4<^kn%;f86C}_YBlI9ZT2q?-=FG?*gHWW4x1PO5qbGelkrv~Sj7A2<|iW&%m z1et|-ToTJt6@p6=^Kw!v4dldm4J-`}fY{Q&#Mm%OoEOM3G%$y9>Fb!r`N&}b%-zh5 zy$lA8olK353>#-opK@cv^bj@eCryb=Rjw22??)`2dhlXRm}@rUmkE;tl75&lDx7_= zzdLr*ACLMkd$gBaIs9<>pQ#Gm>Vf51nK!mfKlJ~b@9E`d=jSk%~+*&U9ypTtEBnvZC?9z?RF)ErCOb+-x9HEW60l=H>T9<23q(X*^|0wRac5& z)ze=u>wdI`2?XV={3fF<&fK!&wzkQ?@2mGa|2(z&y8*)&5BWB)-7ocoI3GT$7V+$U z7n-irX=~@J$(Pqd2i?0jTa}D H_s;|X)f^@J literal 0 HcmV?d00001 diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ks b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ks new file mode 100644 index 0000000000000000000000000000000000000000..3bfa43ea7ceec4f4eab6d4b55931df85dc45446e GIT binary patch literal 1358 zcmezO_TO6u1_mY|W&~rlT+#cIP}NubnjgC?f!27GK>+H8z0j9N^BjEt-d zEKN*{Ib3!>cl;qYH+RcNqp8Xo*FP2fU*9N_)_(8m&nHuFG`W~^_shrbt~wj2er%=H z>!(i>_Jq`4w9BlNWK}mZzaH_pZ=-T^Kw~8$DvQr`3!!zb7K4R6H zcqp3BbCsCqwP&`wb>8`|xEU)J%N4X{pNSrq<@MOyo=clHrZwg~ZLj;8pnSG=Mw*i2 zWzXZ)r=L!bRlA_?5ErmA{=w7AnqOOwa(xeHT%xl2@WdjHlnH0A_iBr*dU)vjp{iJi znda$rZ#A7v*}gM$olG@)`1afx?auJ-hDyI3bLtoUT$k$msh#ul;}otP&kC-te8uu9 zc-Jb^Th-^;PjYijk-v8`;j>B4EazJln-k^bTCN+dcS$i@^=ALo-@HqotL4X^7QT4u z=A^AVJ8$~VQo1%nPx$nN%L@DFs_DcnIPP^+s8fkm_!vv%GimwdCvAS0=WhBt(dO*W z+5id5^YixjzT}!RA?M+g#=p*wg4jOqTkE@Fa_zO2oAO%)Ln9`p2hC^`j$YVTnbmdo z-^(Krvkp$2A~ZX3n-6QwjCCSgSG-MIx}`F;)-z_g!{2YK{(R|iZ>rb-CcD)pLB@N2 z>&BnoKS)N+h&WRrak>1|fo>y{l0&)L-RhFeS??6o?q^Tyx07a`r}JW`)w=Z}UqA5m z?Yp6K)IfUMMgK^<1@%=;SGP1BZB4nePuZOFllk)RLPfGA+WX%;m?Ub>sq!^QnDNZ7 zJv*Bu*{`!~I(+5RgpO$st$d^TTm`fkF0i~}dmJjFY`S~hfzFFzNtU0N`+T4JWb3)FR?K>|cBMU2oLAD{c0Vf-CC<~h~Q)n=Z!y(M% z>u4wd%m6?cc479=yzIRE@;pO91AdSQmoP^_QGR++YH_iluz?^*h+CM;t+Y5bIKQ+g zIn_|qKo}&*EX?DQSeB{~T#}fVlUivYC(dhNX=nh%mIkIqrcvU&K#rk-Ih0FZ$287I z4hvxRW^U|dFlg*#YHVb9weg#J;)(jAFkAkhO$LSgp0UfkIwon^cDK@mC1gd)F7b*_ zvs2@~^6F%CZmfO8KBe`|GAXB{^X4CRF}gi%>5Gt=i_cVPNGjUw%GZc=xHG9=>Fl&? z`}BA7&3yggR?uyJ+n7(mDLr>@S(Dd9U#Na+|obtwzcFO?Qjh MiIqQAvi>Xw0B6BC9smFU literal 0 HcmV?d00001 diff --git a/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ts b/activemq-core/src/test/resources/org/apache/activemq/bugs/amq3625/keys/client2.ts new file mode 100644 index 0000000000000000000000000000000000000000..3f8301f4403db1da42bd03f7827ed3d4b7fe2e71 GIT binary patch literal 651 zcmezO_TO6u1_mY|W(3o0Nk#eDsYO7piq*!o!3?YsdZq@JKovd)O-vpJO^h3vSiH`6 z+8gk)acZ@Bw0-AgWMpAwFvvFKHsEAq4rO5zW(o~96foceaX5t8L-Vrp^2_sJ((J-a zzK({12K*ptE@6&aDHh~a;l-IfiOsrS(wKqu`E>~ zxFj(zC$-W*PMp`k($D~iEe%YJ4Wq<)fgD2vb10X-j%l2a92ShM49tza3E&nV=Q(CMym+^6v(LTeZ|j{n%GOE&FIU*5H0(;eQH^hF!5`@DIj!FpN3_RbH^`Fo$XGchwVFd{n-7*)(bcl9gHSfzGd zvXOhMr1|b`Ui)?Jb|r+RTAirh60vGy$lsGUrqt^OTKFB=le%YBSBhZO(_b&^ezb-O z1m&#!CZjFR+_K}gw#mQmtM@zqJhl3}0mBy$`8KcJFZF~tA3mxU@$7yVny%DoYOv^D y