ARTEMIS-671 Returning messages after connection killed, and validating usage of reconnect
This commit is contained in:
parent
2539e6f2ab
commit
579d6226aa
|
@ -172,6 +172,12 @@ public enum ActiveMQExceptionType {
|
|||
return new ActiveMQInvalidTransientQueueUseException(msg);
|
||||
}
|
||||
},
|
||||
REMOTE_DISCONNECT(119) {
|
||||
@Override
|
||||
public ActiveMQException createException(String msg) {
|
||||
return new ActiveMQRemoteDisconnectException(msg);
|
||||
}
|
||||
},
|
||||
|
||||
GENERIC_EXCEPTION(999),
|
||||
NATIVE_ERROR_INTERNAL(200),
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.api.core;
|
||||
|
||||
import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.REMOTE_DISCONNECT;
|
||||
|
||||
/**
|
||||
* A security problem occurred (authentication issues, permission issues,...)
|
||||
*/
|
||||
public final class ActiveMQRemoteDisconnectException extends ActiveMQException {
|
||||
|
||||
public ActiveMQRemoteDisconnectException() {
|
||||
super(REMOTE_DISCONNECT);
|
||||
}
|
||||
|
||||
public ActiveMQRemoteDisconnectException(String msg) {
|
||||
super(REMOTE_DISCONNECT, msg);
|
||||
}
|
||||
}
|
|
@ -71,6 +71,19 @@ public class AssertionLoggerHandler extends ExtHandler {
|
|||
return false;
|
||||
}
|
||||
|
||||
public static boolean findText(long mstimeout, String ... text) {
|
||||
|
||||
long timeMax = System.currentTimeMillis() + mstimeout;
|
||||
do {
|
||||
if (findText(text)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
while (timeMax > System.currentTimeMillis());
|
||||
|
||||
return false;
|
||||
|
||||
}
|
||||
/**
|
||||
* Find a line that contains the parameters passed as an argument
|
||||
*
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.concurrent.Executor;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
|
@ -191,7 +192,9 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
destroyed = true;
|
||||
}
|
||||
|
||||
ActiveMQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
|
||||
if (!(me instanceof ActiveMQRemoteDisconnectException)) {
|
||||
ActiveMQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
|
||||
}
|
||||
|
||||
try {
|
||||
transportConnection.forceClose();
|
||||
|
@ -329,6 +332,17 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
return getTransportConnection().getDefaultActiveMQPrincipal();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSupportReconnect() {
|
||||
for (Channel channel : channels.values()) {
|
||||
if (channel.getConfirmationWindowSize() > 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// Buffer Handler implementation
|
||||
// ----------------------------------------------------
|
||||
@Override
|
||||
|
|
|
@ -77,6 +77,8 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
|
|||
active = false;
|
||||
}
|
||||
}
|
||||
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -201,6 +201,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
|
|||
return res;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSupportReconnect() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* This can be called concurrently by more than one thread so needs to be locked
|
||||
*/
|
||||
|
|
|
@ -189,4 +189,11 @@ public interface RemotingConnection extends BufferHandler {
|
|||
*if slow consumer is killed,send the msessage to client.
|
||||
*/
|
||||
void killMessage(SimpleString nodeID);
|
||||
|
||||
/**
|
||||
* This will check if reconnects are supported on the protocol and configuration.
|
||||
* In case it's not supported a connection failure could remove messages right away from pending deliveries.
|
||||
* @return
|
||||
*/
|
||||
boolean isSupportReconnect();
|
||||
}
|
||||
|
|
|
@ -216,4 +216,9 @@ public class MQTTConnection implements RemotingConnection {
|
|||
public void killMessage(SimpleString nodeID) {
|
||||
//unsupported
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSupportReconnect() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,6 +96,11 @@ public final class StompConnection implements RemotingConnection {
|
|||
|
||||
private final int minLargeMessageSize;
|
||||
|
||||
@Override
|
||||
public boolean isSupportReconnect() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {
|
||||
StompFrame frame = null;
|
||||
try {
|
||||
|
|
|
@ -40,13 +40,13 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
|
||||
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
|
||||
|
@ -57,7 +57,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
|||
import org.apache.activemq.artemis.core.server.ServiceRegistry;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
||||
|
@ -536,29 +535,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
|
|||
|
||||
ConnectionEntry conn = connections.get(connectionID);
|
||||
|
||||
if (conn != null) {
|
||||
// Bit of a hack - find a better way to do this
|
||||
if (conn != null && !conn.connection.isSupportReconnect()) {
|
||||
removeConnection(connectionID);
|
||||
|
||||
List<FailureListener> failureListeners = conn.connection.getFailureListeners();
|
||||
|
||||
boolean empty = true;
|
||||
|
||||
for (FailureListener listener : failureListeners) {
|
||||
if (listener instanceof ServerSessionImpl) {
|
||||
empty = false;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// We only destroy the connection if the connection has no sessions attached to it
|
||||
// Otherwise it means the connection has died without the sessions being closed first
|
||||
// so we need to keep them for ttl, in case re-attachment occurs
|
||||
if (empty) {
|
||||
removeConnection(connectionID);
|
||||
|
||||
conn.connection.destroy();
|
||||
}
|
||||
conn.connection.fail(new ActiveMQRemoteDisconnectException());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.clientcrash;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||
import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class PendingDeliveriesTest extends ClientTestBase {
|
||||
|
||||
|
||||
@Before
|
||||
public void createQueue() throws Exception {
|
||||
server.createQueue(SimpleString.toSimpleString("jms.queue.queue1"), SimpleString.toSimpleString("jms.queue.queue1"), null, true, false);
|
||||
}
|
||||
|
||||
@After
|
||||
public void clearLogger() throws Exception {
|
||||
System.out.println("After clearing");
|
||||
AssertionLoggerHandler.stopCapture();
|
||||
AssertionLoggerHandler.clear();
|
||||
}
|
||||
|
||||
private static final String AMQP_URI = "amqp://localhost:61616?amqp.saslLayer=false";
|
||||
private static final String CORE_URI_NO_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=-1";
|
||||
private static final String CORE_URI_WITH_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=" + (1024 * 1024);
|
||||
|
||||
public static void main(String[] arg) {
|
||||
if (arg.length != 3) {
|
||||
System.err.println("Usage:: URI destinationName cleanShutdown");
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
|
||||
String uri = arg[0];
|
||||
String destinationName = arg[1];
|
||||
boolean cleanShutdown = Boolean.valueOf(arg[2]);
|
||||
|
||||
|
||||
ConnectionFactory factory;
|
||||
|
||||
factory = createCF(uri);
|
||||
|
||||
try {
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createQueue(destinationName);
|
||||
|
||||
System.err.println("***** " + destination);
|
||||
connection.start();
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
producer.send(session.createTextMessage("hello"));
|
||||
}
|
||||
|
||||
System.err.println("CleanShutdown::" + cleanShutdown);
|
||||
|
||||
if (cleanShutdown) {
|
||||
consumer.close();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
System.exit(0);
|
||||
|
||||
}
|
||||
catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
private static ConnectionFactory createCF(String uri) {
|
||||
ConnectionFactory factory;
|
||||
if (uri.startsWith("amqp")) {
|
||||
factory = new JmsConnectionFactory(uri);
|
||||
}
|
||||
else {
|
||||
factory = new ActiveMQConnectionFactory(uri);
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithoutReconnect() throws Exception {
|
||||
|
||||
internalNoReconnect(AMQP_URI, "jms.queue.queue1");
|
||||
internalNoReconnect(CORE_URI_NO_RECONNECT, "queue1");
|
||||
}
|
||||
|
||||
private void internalNoReconnect(String uriToUse, String destinationName) throws Exception {
|
||||
startClient(uriToUse, destinationName, true, false);
|
||||
|
||||
ConnectionFactory cf = createCF(uriToUse);
|
||||
Connection connection = cf.createConnection();
|
||||
connection.start();
|
||||
try {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createQueue(destinationName);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
Assert.assertNotNull(consumer.receive(1000));
|
||||
}
|
||||
}
|
||||
finally {
|
||||
connection.stop();
|
||||
connection.close();
|
||||
|
||||
}
|
||||
|
||||
if (cf instanceof ActiveMQConnectionFactory) {
|
||||
((ActiveMQConnectionFactory)cf).close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testWithtReconnect() throws Exception {
|
||||
startClient(CORE_URI_WITH_RECONNECT, "queue1", true, false);
|
||||
ConnectionFactory cf = createCF(CORE_URI_WITH_RECONNECT);
|
||||
Connection connection = cf.createConnection();
|
||||
connection.start();
|
||||
try {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createQueue("queue1");
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
int i = 0;
|
||||
for (; i < 100; i++) {
|
||||
Message msg = consumer.receive(100);
|
||||
if (msg == null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertTrue(i < 100);
|
||||
}
|
||||
finally {
|
||||
connection.stop();
|
||||
connection.close();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCleanShutdownNoLogger() throws Exception {
|
||||
AssertionLoggerHandler.startCapture();
|
||||
startClient(CORE_URI_NO_RECONNECT, "queue1", false, true);
|
||||
Thread.sleep(500);
|
||||
Assert.assertFalse(AssertionLoggerHandler.findText("clearing up resources"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBadShutdownLogger() throws Exception {
|
||||
AssertionLoggerHandler.startCapture();
|
||||
startClient(CORE_URI_NO_RECONNECT, "queue1", false, false);
|
||||
Assert.assertTrue(AssertionLoggerHandler.findText(1000, "clearing up resources"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCleanShutdown() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
private void startClient(String uriToUse, String destinationName, boolean log, boolean cleanShutdown) throws Exception {
|
||||
Process process = SpawnedVMSupport.spawnVM(PendingDeliveriesTest.class.getName(), log, uriToUse, destinationName, Boolean.toString(cleanShutdown));
|
||||
Assert.assertEquals(0, process.waitFor());
|
||||
}
|
||||
|
||||
}
|
|
@ -119,10 +119,7 @@ public final class SpawnedVMSupport {
|
|||
|
||||
Process process = builder.start();
|
||||
|
||||
if (logOutput) {
|
||||
SpawnedVMSupport.startLogger(wordMatch, wordRunning, className, process);
|
||||
|
||||
}
|
||||
SpawnedVMSupport.startLogger(logOutput, wordMatch, wordRunning, className, process);
|
||||
|
||||
// Adding a reader to System.err, so the VM won't hang on a System.err.println as identified on this forum thread:
|
||||
// http://www.jboss.org/index.html?module=bb&op=viewtopic&t=151815
|
||||
|
@ -138,8 +135,8 @@ public final class SpawnedVMSupport {
|
|||
* @param process
|
||||
* @throws ClassNotFoundException
|
||||
*/
|
||||
public static void startLogger(final String wordMatch, final Runnable wordRunanble, final String className, final Process process) throws ClassNotFoundException {
|
||||
ProcessLogger outputLogger = new ProcessLogger(true, process.getInputStream(), className, wordMatch, wordRunanble);
|
||||
public static void startLogger(final boolean print, final String wordMatch, final Runnable wordRunanble, final String className, final Process process) throws ClassNotFoundException {
|
||||
ProcessLogger outputLogger = new ProcessLogger(print, process.getInputStream(), className, wordMatch, wordRunanble);
|
||||
outputLogger.start();
|
||||
}
|
||||
|
||||
|
@ -149,7 +146,7 @@ public final class SpawnedVMSupport {
|
|||
* @throws ClassNotFoundException
|
||||
*/
|
||||
public static void startLogger(final String className, final Process process) throws ClassNotFoundException {
|
||||
startLogger(null, null, className, process);
|
||||
startLogger(true, null, null, className, process);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue