This closes #706

This commit is contained in:
Martyn Taylor 2016-08-09 11:23:46 +01:00
commit b863219959
12 changed files with 305 additions and 32 deletions

View File

@ -172,6 +172,12 @@ public enum ActiveMQExceptionType {
return new ActiveMQInvalidTransientQueueUseException(msg); return new ActiveMQInvalidTransientQueueUseException(msg);
} }
}, },
REMOTE_DISCONNECT(119) {
@Override
public ActiveMQException createException(String msg) {
return new ActiveMQRemoteDisconnectException(msg);
}
},
GENERIC_EXCEPTION(999), GENERIC_EXCEPTION(999),
NATIVE_ERROR_INTERNAL(200), NATIVE_ERROR_INTERNAL(200),

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.REMOTE_DISCONNECT;
/**
* A security problem occurred (authentication issues, permission issues,...)
*/
public final class ActiveMQRemoteDisconnectException extends ActiveMQException {
public ActiveMQRemoteDisconnectException() {
super(REMOTE_DISCONNECT);
}
public ActiveMQRemoteDisconnectException(String msg) {
super(REMOTE_DISCONNECT, msg);
}
}

View File

@ -71,6 +71,19 @@ public class AssertionLoggerHandler extends ExtHandler {
return false; return false;
} }
public static boolean findText(long mstimeout, String ... text) {
long timeMax = System.currentTimeMillis() + mstimeout;
do {
if (findText(text)) {
return true;
}
}
while (timeMax > System.currentTimeMillis());
return false;
}
/** /**
* Find a line that contains the parameters passed as an argument * Find a line that contains the parameters passed as an argument
* *

View File

@ -25,6 +25,7 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
@ -191,7 +192,9 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
destroyed = true; destroyed = true;
} }
ActiveMQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); if (!(me instanceof ActiveMQRemoteDisconnectException)) {
ActiveMQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
}
try { try {
transportConnection.forceClose(); transportConnection.forceClose();
@ -329,6 +332,17 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
return getTransportConnection().getDefaultActiveMQPrincipal(); return getTransportConnection().getDefaultActiveMQPrincipal();
} }
@Override
public boolean isSupportReconnect() {
for (Channel channel : channels.values()) {
if (channel.getConfirmationWindowSize() > 0) {
return true;
}
}
return false;
}
// Buffer Handler implementation // Buffer Handler implementation
// ---------------------------------------------------- // ----------------------------------------------------
@Override @Override

View File

@ -77,6 +77,8 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
active = false; active = false;
} }
} }
super.channelInactive(ctx);
} }
@Override @Override

View File

@ -201,6 +201,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
return res; return res;
} }
@Override
public boolean isSupportReconnect() {
return false;
}
/* /*
* This can be called concurrently by more than one thread so needs to be locked * This can be called concurrently by more than one thread so needs to be locked
*/ */

View File

@ -189,4 +189,11 @@ public interface RemotingConnection extends BufferHandler {
*if slow consumer is killed,send the msessage to client. *if slow consumer is killed,send the msessage to client.
*/ */
void killMessage(SimpleString nodeID); void killMessage(SimpleString nodeID);
/**
* This will check if reconnects are supported on the protocol and configuration.
* In case it's not supported a connection failure could remove messages right away from pending deliveries.
* @return
*/
boolean isSupportReconnect();
} }

View File

@ -216,4 +216,9 @@ public class MQTTConnection implements RemotingConnection {
public void killMessage(SimpleString nodeID) { public void killMessage(SimpleString nodeID) {
//unsupported //unsupported
} }
@Override
public boolean isSupportReconnect() {
return false;
}
} }

View File

@ -96,6 +96,11 @@ public final class StompConnection implements RemotingConnection {
private final int minLargeMessageSize; private final int minLargeMessageSize;
@Override
public boolean isSupportReconnect() {
return false;
}
public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException { public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {
StompFrame frame = null; StompFrame frame = null;
try { try {

View File

@ -40,13 +40,13 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory; import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
@ -57,7 +57,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServiceRegistry; import org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@ -536,29 +535,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
ConnectionEntry conn = connections.get(connectionID); ConnectionEntry conn = connections.get(connectionID);
if (conn != null) { if (conn != null && !conn.connection.isSupportReconnect()) {
// Bit of a hack - find a better way to do this removeConnection(connectionID);
List<FailureListener> failureListeners = conn.connection.getFailureListeners(); conn.connection.fail(new ActiveMQRemoteDisconnectException());
boolean empty = true;
for (FailureListener listener : failureListeners) {
if (listener instanceof ServerSessionImpl) {
empty = false;
break;
}
}
// We only destroy the connection if the connection has no sessions attached to it
// Otherwise it means the connection has died without the sessions being closed first
// so we need to keep them for ttl, in case re-attachment occurs
if (empty) {
removeConnection(connectionID);
conn.connection.destroy();
}
} }
} }

View File

@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.clientcrash;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class PendingDeliveriesTest extends ClientTestBase {
@Before
public void createQueue() throws Exception {
server.createQueue(SimpleString.toSimpleString("jms.queue.queue1"), SimpleString.toSimpleString("jms.queue.queue1"), null, true, false);
}
@After
public void clearLogger() throws Exception {
System.out.println("After clearing");
AssertionLoggerHandler.stopCapture();
AssertionLoggerHandler.clear();
}
private static final String AMQP_URI = "amqp://localhost:61616?amqp.saslLayer=false";
private static final String CORE_URI_NO_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=-1";
private static final String CORE_URI_WITH_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=" + (1024 * 1024);
public static void main(String[] arg) {
if (arg.length != 3) {
System.err.println("Usage:: URI destinationName cleanShutdown");
System.exit(-1);
}
String uri = arg[0];
String destinationName = arg[1];
boolean cleanShutdown = Boolean.valueOf(arg[2]);
ConnectionFactory factory;
factory = createCF(uri);
try {
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(destinationName);
System.err.println("***** " + destination);
connection.start();
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 100; i++) {
producer.send(session.createTextMessage("hello"));
}
System.err.println("CleanShutdown::" + cleanShutdown);
if (cleanShutdown) {
consumer.close();
connection.close();
}
System.exit(0);
}
catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
}
private static ConnectionFactory createCF(String uri) {
ConnectionFactory factory;
if (uri.startsWith("amqp")) {
factory = new JmsConnectionFactory(uri);
}
else {
factory = new ActiveMQConnectionFactory(uri);
}
return factory;
}
@Test
public void testWithoutReconnect() throws Exception {
internalNoReconnect(AMQP_URI, "jms.queue.queue1");
internalNoReconnect(CORE_URI_NO_RECONNECT, "queue1");
}
private void internalNoReconnect(String uriToUse, String destinationName) throws Exception {
startClient(uriToUse, destinationName, true, false);
ConnectionFactory cf = createCF(uriToUse);
Connection connection = cf.createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(destinationName);
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 0; i < 100; i++) {
Assert.assertNotNull(consumer.receive(1000));
}
}
finally {
connection.stop();
connection.close();
}
if (cf instanceof ActiveMQConnectionFactory) {
((ActiveMQConnectionFactory)cf).close();
}
}
@Test
public void testWithtReconnect() throws Exception {
startClient(CORE_URI_WITH_RECONNECT, "queue1", true, false);
ConnectionFactory cf = createCF(CORE_URI_WITH_RECONNECT);
Connection connection = cf.createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue1");
MessageConsumer consumer = session.createConsumer(destination);
int i = 0;
for (; i < 100; i++) {
Message msg = consumer.receive(100);
if (msg == null) {
break;
}
}
Assert.assertTrue(i < 100);
}
finally {
connection.stop();
connection.close();
}
}
@Test
public void testCleanShutdownNoLogger() throws Exception {
AssertionLoggerHandler.startCapture();
startClient(CORE_URI_NO_RECONNECT, "queue1", false, true);
Thread.sleep(500);
Assert.assertFalse(AssertionLoggerHandler.findText("clearing up resources"));
}
@Test
public void testBadShutdownLogger() throws Exception {
AssertionLoggerHandler.startCapture();
startClient(CORE_URI_NO_RECONNECT, "queue1", false, false);
Assert.assertTrue(AssertionLoggerHandler.findText(1000, "clearing up resources"));
}
@Test
public void testCleanShutdown() throws Exception {
}
private void startClient(String uriToUse, String destinationName, boolean log, boolean cleanShutdown) throws Exception {
Process process = SpawnedVMSupport.spawnVM(PendingDeliveriesTest.class.getName(), log, uriToUse, destinationName, Boolean.toString(cleanShutdown));
Assert.assertEquals(0, process.waitFor());
}
}

View File

@ -119,10 +119,7 @@ public final class SpawnedVMSupport {
Process process = builder.start(); Process process = builder.start();
if (logOutput) { SpawnedVMSupport.startLogger(logOutput, wordMatch, wordRunning, className, process);
SpawnedVMSupport.startLogger(wordMatch, wordRunning, className, process);
}
// Adding a reader to System.err, so the VM won't hang on a System.err.println as identified on this forum thread: // Adding a reader to System.err, so the VM won't hang on a System.err.println as identified on this forum thread:
// http://www.jboss.org/index.html?module=bb&op=viewtopic&t=151815 // http://www.jboss.org/index.html?module=bb&op=viewtopic&t=151815
@ -138,8 +135,8 @@ public final class SpawnedVMSupport {
* @param process * @param process
* @throws ClassNotFoundException * @throws ClassNotFoundException
*/ */
public static void startLogger(final String wordMatch, final Runnable wordRunanble, final String className, final Process process) throws ClassNotFoundException { public static void startLogger(final boolean print, final String wordMatch, final Runnable wordRunanble, final String className, final Process process) throws ClassNotFoundException {
ProcessLogger outputLogger = new ProcessLogger(true, process.getInputStream(), className, wordMatch, wordRunanble); ProcessLogger outputLogger = new ProcessLogger(print, process.getInputStream(), className, wordMatch, wordRunanble);
outputLogger.start(); outputLogger.start();
} }
@ -149,7 +146,7 @@ public final class SpawnedVMSupport {
* @throws ClassNotFoundException * @throws ClassNotFoundException
*/ */
public static void startLogger(final String className, final Process process) throws ClassNotFoundException { public static void startLogger(final String className, final Process process) throws ClassNotFoundException {
startLogger(null, null, className, process); startLogger(true, null, null, className, process);
} }
/** /**