mirror of https://github.com/apache/activemq.git
AMQ-7106 - fix pending stop support by avoiding sync through single shared status var - fix and test
This commit is contained in:
parent
d020af2034
commit
8cc0c5ad6c
|
@ -138,8 +138,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
private boolean blocked;
|
||||
private boolean connected;
|
||||
private boolean active;
|
||||
private final AtomicBoolean starting = new AtomicBoolean();
|
||||
private final AtomicBoolean pendingStop = new AtomicBoolean();
|
||||
|
||||
// state management around pending stop
|
||||
private static final int NEW = 0;
|
||||
private static final int STARTING = 1;
|
||||
private static final int STARTED = 2;
|
||||
private static final int PENDING_STOP = 3;
|
||||
private final AtomicInteger status = new AtomicInteger(NEW);
|
||||
|
||||
private long timeStamp;
|
||||
private final AtomicBoolean stopping = new AtomicBoolean(false);
|
||||
private final CountDownLatch stopped = new CountDownLatch(1);
|
||||
|
@ -229,7 +235,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
}
|
||||
|
||||
public void serviceTransportException(IOException e) {
|
||||
if (!stopping.get() && !pendingStop.get()) {
|
||||
if (!stopping.get() && status.get() != PENDING_STOP) {
|
||||
transportException.set(e);
|
||||
if (TRANSPORTLOG.isDebugEnabled()) {
|
||||
TRANSPORTLOG.debug(this + " failed: " + e, e);
|
||||
|
@ -303,7 +309,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
}
|
||||
ConnectionError ce = new ConnectionError();
|
||||
ce.setException(e);
|
||||
if (pendingStop.get()) {
|
||||
if (status.get() == PENDING_STOP) {
|
||||
dispatchSync(ce);
|
||||
} else {
|
||||
dispatchAsync(ce);
|
||||
|
@ -321,7 +327,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
boolean responseRequired = command.isResponseRequired();
|
||||
int commandId = command.getCommandId();
|
||||
try {
|
||||
if (!pendingStop.get()) {
|
||||
if (status.get() != PENDING_STOP) {
|
||||
response = command.visit(this);
|
||||
} else {
|
||||
response = new ExceptionResponse(transportException.get());
|
||||
|
@ -993,7 +999,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
@Override
|
||||
public boolean iterate() {
|
||||
try {
|
||||
if (pendingStop.get() || stopping.get()) {
|
||||
if (status.get() == PENDING_STOP || stopping.get()) {
|
||||
if (dispatchStopped.compareAndSet(false, true)) {
|
||||
if (transportException.get() == null) {
|
||||
try {
|
||||
|
@ -1049,9 +1055,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
if (status.compareAndSet(NEW, STARTING)) {
|
||||
try {
|
||||
synchronized (this) {
|
||||
starting.set(true);
|
||||
if (taskRunnerFactory != null) {
|
||||
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
|
||||
+ getRemoteAddress());
|
||||
|
@ -1072,19 +1078,19 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
}
|
||||
} catch (Exception e) {
|
||||
// Force clean up on an error starting up.
|
||||
pendingStop.set(true);
|
||||
status.set(PENDING_STOP);
|
||||
throw e;
|
||||
} finally {
|
||||
// stop() can be called from within the above block,
|
||||
// but we want to be sure start() completes before
|
||||
// stop() runs, so queue the stop until right now:
|
||||
setStarting(false);
|
||||
if (isPendingStop()) {
|
||||
if (!status.compareAndSet(STARTING, STARTED)) {
|
||||
LOG.debug("Calling the delayed stop() after start() {}", this);
|
||||
stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
|
@ -1099,10 +1105,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
|
||||
public void delayedStop(final int waitTime, final String reason, Throwable cause) {
|
||||
if (waitTime > 0) {
|
||||
synchronized (this) {
|
||||
pendingStop.set(true);
|
||||
status.compareAndSet(STARTING, PENDING_STOP);
|
||||
transportException.set(cause);
|
||||
}
|
||||
try {
|
||||
stopTaskRunnerFactory.execute(new Runnable() {
|
||||
@Override
|
||||
|
@ -1128,13 +1132,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
|
||||
public void stopAsync() {
|
||||
// If we're in the middle of starting then go no further... for now.
|
||||
synchronized (this) {
|
||||
pendingStop.set(true);
|
||||
if (starting.get()) {
|
||||
if (status.compareAndSet(STARTING, PENDING_STOP)) {
|
||||
LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (stopping.compareAndSet(false, true)) {
|
||||
// Let all the connection contexts know we are shutting down
|
||||
// so that in progress operations can notice and unblock.
|
||||
|
@ -1342,7 +1343,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
* @return true if the Connection is starting
|
||||
*/
|
||||
public boolean isStarting() {
|
||||
return starting.get();
|
||||
return status.get() == STARTING;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1355,19 +1356,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
return this.faultTolerantConnection;
|
||||
}
|
||||
|
||||
protected void setStarting(boolean starting) {
|
||||
this.starting.set(starting);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the Connection needs to stop
|
||||
*/
|
||||
public boolean isPendingStop() {
|
||||
return pendingStop.get();
|
||||
}
|
||||
|
||||
protected void setPendingStop(boolean pendingStop) {
|
||||
this.pendingStop.set(pendingStop);
|
||||
return status.get() == PENDING_STOP;
|
||||
}
|
||||
|
||||
private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException {
|
||||
|
|
|
@ -78,7 +78,7 @@ public class MQTTInactivityMonitor extends TransportFilter {
|
|||
ASYNC_TASKS.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime + readGraceTime) + ") long: "
|
||||
onException(new InactivityIOException("CONNECT frame not received with in connectionTimeout (>" + connectionTimeout + "): "
|
||||
+ next.getRemoteAddress()));
|
||||
}
|
||||
});
|
||||
|
|
|
@ -0,0 +1,203 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.tcp;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.util.DefaultTestAppender;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.spi.LoggingEvent;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.net.ssl.*;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.security.SecureRandom;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TcpTransportInactiveDuringHandshakeTest {
|
||||
|
||||
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TcpTransportInactiveDuringHandshakeTest.class);
|
||||
|
||||
public static final String KEYSTORE_TYPE = "jks";
|
||||
public static final String PASSWORD = "password";
|
||||
public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
|
||||
public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
|
||||
|
||||
static {
|
||||
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
|
||||
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
|
||||
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
|
||||
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
|
||||
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
|
||||
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
|
||||
}
|
||||
|
||||
private BrokerService brokerService;
|
||||
private DefaultTestAppender appender;
|
||||
CountDownLatch inactivityMonitorFired = new CountDownLatch(1);
|
||||
CountDownLatch handShakeComplete = new CountDownLatch(1);
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
brokerService = new BrokerService();
|
||||
brokerService.setPersistent(false);
|
||||
brokerService.setUseJmx(false);
|
||||
|
||||
appender = new DefaultTestAppender() {
|
||||
@Override
|
||||
public void doAppend(LoggingEvent event) {
|
||||
if (event.getLevel().equals(Level.WARN) && event.getRenderedMessage().contains("InactivityIOException")) {
|
||||
inactivityMonitorFired.countDown();
|
||||
}
|
||||
}
|
||||
};
|
||||
org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
|
||||
rootLogger.addAppender(appender);
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws Exception {
|
||||
org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
|
||||
rootLogger.removeAppender(appender);
|
||||
|
||||
if (brokerService != null) {
|
||||
brokerService.stop();
|
||||
brokerService.waitUntilStopped();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testInactivityMonitorThreadCompletesWhenFiringDuringStart() throws Exception {
|
||||
brokerService.addConnector("mqtt+nio+ssl://localhost:0?transport.connectAttemptTimeout=1000&transport.closeAsync=false");
|
||||
brokerService.start();
|
||||
brokerService.waitUntilStarted();
|
||||
|
||||
TransportConnector transportConnector = brokerService.getTransportConnectors().get(0);
|
||||
URI uri = transportConnector.getPublishableConnectURI();
|
||||
|
||||
|
||||
CountDownLatch blockHandShakeCompletion = new CountDownLatch(1);
|
||||
|
||||
TrustManager[] trustManagers = new TrustManager[]{new X509TrustManager() {
|
||||
@Override
|
||||
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
|
||||
LOG.info("Check Server Trusted: " + s, new Throwable("HERE"));
|
||||
try {
|
||||
blockHandShakeCompletion.await(20, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
LOG.info("Check Server Trusted done!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public X509Certificate[] getAcceptedIssuers() {
|
||||
return new X509Certificate[0];
|
||||
}
|
||||
}};
|
||||
|
||||
|
||||
SSLContext sslContext = SSLContext.getInstance("TLS");
|
||||
sslContext.init(null, trustManagers, new SecureRandom());
|
||||
|
||||
final SSLSocket sslSocket = (SSLSocket) sslContext.getSocketFactory().createSocket("127.0.0.1", uri.getPort());
|
||||
|
||||
sslSocket.addHandshakeCompletedListener(new HandshakeCompletedListener() {
|
||||
@Override
|
||||
public void handshakeCompleted(HandshakeCompletedEvent handshakeCompletedEvent) {
|
||||
handShakeComplete.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Executors.newCachedThreadPool().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
sslSocket.startHandshake();
|
||||
assertTrue("Socket connected", sslSocket.isConnected());
|
||||
} catch (IOException oops) {
|
||||
oops.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue("inactivity fired", inactivityMonitorFired.await(10, TimeUnit.SECONDS));
|
||||
|
||||
assertTrue("Found non blocked inactivity monitor thread - done its work", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
// verify no InactivityMonitor Task blocked
|
||||
Thread[] threads = new Thread[20];
|
||||
int activeCount = Thread.currentThread().getThreadGroup().enumerate(threads);
|
||||
for (int i = 0; i<activeCount; i++) {
|
||||
Thread thread = threads[i];
|
||||
LOG.info("T[" + i++ + "]: " + thread);
|
||||
if (thread.getName().contains("InactivityMonitor") && thread.getState().equals(Thread.State.TIMED_WAITING)) {
|
||||
LOG.info("Found inactivity monitor in timed-wait");
|
||||
// good
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}));
|
||||
|
||||
// allow handshake to complete
|
||||
blockHandShakeCompletion.countDown();
|
||||
|
||||
final OutputStream socketOutPutStream = sslSocket.getOutputStream();
|
||||
|
||||
assertTrue("Handshake complete", handShakeComplete.await(10, TimeUnit.SECONDS));
|
||||
|
||||
// wait for socket to be closed via Inactivity monitor
|
||||
|
||||
assertTrue("socket error", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
LOG.info("Expecting socket to error from remote close: " + sslSocket);
|
||||
try {
|
||||
socketOutPutStream.write(2);
|
||||
socketOutPutStream.flush();
|
||||
} catch (IOException expected) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}));
|
||||
|
||||
LOG.info("Socket at end: " + sslSocket);
|
||||
sslSocket.close();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue