ARTEMIS-2368 Fix races on closing consumer

This commit is contained in:
yang wei 2019-06-04 15:04:20 +08:00 committed by Michael Pearce
parent 9cb689fa41
commit 448449c88e
3 changed files with 257 additions and 5 deletions

View File

@ -1377,7 +1377,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
sessionContext.resetName(name); sessionContext.resetName(name);
for (ClientConsumerInternal consumer : cloneConsumers()) { Map<ConsumerContext, ClientConsumerInternal> clonedConsumerEntries = cloneConsumerEntries();
for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) {
consumer.clearAtFailover(); consumer.clearAtFailover();
} }
@ -1395,12 +1397,15 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
if (!inClose && mayAttemptToFailover) { if (!inClose && mayAttemptToFailover) {
sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge); sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge);
for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : consumers.entrySet()) { for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : clonedConsumerEntries.entrySet()) {
ClientConsumerInternal consumerInternal = entryx.getValue(); ClientConsumerInternal consumerInternal = entryx.getValue();
synchronized (consumerInternal) {
if (!consumerInternal.isClosed()) {
sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(), started); sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(), started);
} }
}
}
if ((!autoCommitAcks || !autoCommitSends) && workDone) { if ((!autoCommitAcks || !autoCommitSends) && workDone) {
// this is protected by a lock, so we can guarantee nothing will sneak here // this is protected by a lock, so we can guarantee nothing will sneak here
@ -1414,7 +1419,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
// Now start the session if it was already started // Now start the session if it was already started
if (started) { if (started) {
for (ClientConsumerInternal consumer : cloneConsumers()) { for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) {
consumer.clearAtFailover(); consumer.clearAtFailover();
consumer.start(); consumer.start();
} }
@ -2082,6 +2087,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
} }
} }
public Map<ConsumerContext, ClientConsumerInternal> cloneConsumerEntries() {
synchronized (consumers) {
return new HashMap<>(consumers);
}
}
private void closeChildren() throws ActiveMQException { private void closeChildren() throws ActiveMQException {
Set<ClientConsumerInternal> consumersClone = cloneConsumers(); Set<ClientConsumerInternal> consumersClone = cloneConsumers();

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(BMUnitRunner.class)
public class RaceOnClosingConsumerWhileReconnecting extends ActiveMQTestBase {
static RemotingConnection conn;
static ClientConsumer consumer;
protected ActiveMQServer server = null;
protected ClientSessionFactoryInternal sf = null;
protected ClientSessionInternal session = null;
protected final SimpleString queueName1 = new SimpleString("my_queue_one");
@Override
@Before
public void setUp() throws Exception {
super.setUp();
conn = null;
consumer = null;
server = createServer(true, true);
server.start();
SimpleString addressName1 = new SimpleString("my_address_one");
server.addAddressInfo(new AddressInfo(addressName1, RoutingType.ANYCAST));
server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, null, true, false);
final long retryInterval = 500;
final double retryMultiplier = 1d;
final int reconnectAttempts = 10;
ServerLocator locator = createFactory(true).setCallFailoverTimeout(0).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
sf = (ClientSessionFactoryInternal) createSessionFactory(locator);
session = (ClientSessionInternal)sf.createSession(false, true, true);
}
@Override
@After
public void tearDown() throws Exception {
if (session != null) {
session.close();
}
if (sf != null) {
sf.close();
}
if (server != null) {
server.stop();
}
conn = null;
consumer = null;
super.tearDown();
}
@Test
@BMRules(
rules = {@BMRule(
name = "session.removeConsumer wait",
targetClass = "org.apache.activemq.artemis.core.client.impl.ClientSessionImpl",
targetMethod = "removeConsumer(org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal)",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnClosingConsumerWhileReconnecting.waitForReconnection();")})
public void testClosingConsumerBeforeReconnecting() throws Exception {
conn = session.getConnection();
ClientConsumer clientConsumer1 = session.createConsumer(queueName1);
ClientConsumer clientConsumer2 = session.createConsumer(queueName1);
clientConsumer1.close();
Thread.sleep(500);
Set<ServerConsumer> serverConsumers = server.getSessionByID(session.getName()).getServerConsumers();
ServerConsumer serverConsumer = serverConsumers.iterator().next();
assertEquals(1, serverConsumers.size());
assertEquals(clientConsumer2.getConsumerContext().getId(), serverConsumer.getID());
}
@Test
@BMRules(
rules = {@BMRule(
name = "session.closeConsumer before recreating consumer",
targetClass = "org.apache.activemq.artemis.core.client.impl.ClientSessionImpl",
targetMethod = "handleFailover",
targetLocation = "AFTER WRITE $consumerInternal 1",
action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnClosingConsumerWhileReconnecting.closeConsumer();")})
public void testClosingConsumerBeforeRecreatingOneConsumer() throws Exception {
RemotingConnection conn = session.getConnection();
ClientConsumer clientConsumer1 = session.createConsumer(queueName1);
consumer = clientConsumer1;
conn.fail(new ActiveMQNotConnectedException());
Thread.sleep(500);
Set<ServerConsumer> serverConsumers = server.getSessionByID(session.getName()).getServerConsumers();
assertEquals(0, serverConsumers.size());
}
@Test
@BMRules(
rules = {@BMRule(
name = "session.closeConsumer before recreating consumer",
targetClass = "org.apache.activemq.artemis.core.client.impl.ClientSessionImpl",
targetMethod = "handleFailover",
targetLocation = "AFTER WRITE $consumerInternal 1",
action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnClosingConsumerWhileReconnecting.closeConsumer();")})
public void testClosingConsumerBeforeRecreatingTwoConsumers() throws Exception {
RemotingConnection conn = session.getConnection();
ClientConsumer clientConsumer1 = session.createConsumer(queueName1);
ClientConsumer clientConsumer2 = session.createConsumer(queueName1);
consumer = clientConsumer1;
conn.fail(new ActiveMQNotConnectedException());
Thread.sleep(500);
ServerSession serverSession = server.getSessionByID(session.getName());
assertNotNull(serverSession);
Set<ServerConsumer> serverConsumers = serverSession.getServerConsumers();
ServerConsumer serverConsumer = serverConsumers.iterator().next();
assertEquals(1, serverConsumers.size());
assertEquals(clientConsumer2.getConsumerContext().getId(), serverConsumer.getID());
}
public static void closeConsumer() {
if (consumer != null) {
try {
consumer.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer = null;
}
}
}
public static void waitForReconnection() {
if (conn != null) {
try {
conn.fail(new ActiveMQNotConnectedException());
} catch (Exception e) {
e.printStackTrace();
} finally {
conn = null;
}
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.remoting;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -26,6 +27,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException; import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener; import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
@ -37,7 +41,9 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert; import org.junit.Assert;
@ -370,6 +376,54 @@ public class ReconnectTest extends ActiveMQTestBase {
server.stop(); server.stop();
} }
@Test
public void testClosingConsumerTimeout() throws Exception {
ActiveMQServer server = createServer(true, true);
server.start();
// imitate consumer close timeout
Interceptor reattachInterceptor = new Interceptor() {
boolean consumerClosed;
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
if (!consumerClosed && packet.getType() == PacketImpl.SESS_CONSUMER_CLOSE) {
consumerClosed = true;
return false;
} else {
return true;
}
}
};
server.getRemotingService().addIncomingInterceptor(reattachInterceptor);
final long retryInterval = 500;
final double retryMultiplier = 1d;
final int reconnectAttempts = 10;
ServerLocator locator = createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator);
ClientSessionInternal session = (ClientSessionInternal)sf.createSession(false, true, true);
SimpleString queueName1 = new SimpleString("my_queue_one");
SimpleString addressName1 = new SimpleString("my_address_one");
server.addAddressInfo(new AddressInfo(addressName1, RoutingType.ANYCAST));
server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, null, true, false);
ClientConsumer clientConsumer1 = session.createConsumer(queueName1);
ClientConsumer clientConsumer2 = session.createConsumer(queueName1);
clientConsumer1.close();
Set<ServerConsumer> serverConsumers = server.getSessionByID(session.getName()).getServerConsumers();
ServerConsumer serverConsumer = serverConsumers.iterator().next();
assertEquals(1, serverConsumers.size());
assertEquals(clientConsumer2.getConsumerContext().getId(), serverConsumer.getID());
session.close();
sf.close();
server.stop();
}
// Package protected --------------------------------------------- // Package protected ---------------------------------------------
// Protected ----------------------------------------------------- // Protected -----------------------------------------------------