[AMQ-6587] - fix root cause of deletion candidate seeing sub. contention between adding sub to dests and adding dests to sub, also removing sub from dests twice could leave dest stat negative allowing invalid candidate for gc

(cherry picked from commit d86c98a687)
This commit is contained in:
gtully 2017-02-10 11:37:44 +00:00 committed by Christopher L. Shannon
parent 3e66eccb55
commit 727000f112
4 changed files with 88 additions and 15 deletions

View File

@ -267,14 +267,10 @@ public abstract class AbstractRegion implements Region {
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) { for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
Subscription sub = iter.next(); Subscription sub = iter.next();
if (sub.matches(destination) ) { if (sub.matches(destination) ) {
// may be a new sub created after gc decision, verify if really subscribed
Destination toDelete = destinations.get(destination);
if (toDelete != null && toDelete.getDestinationStatistics().getConsumers().getCount() > 0 ) {
throw new JMSException("Destination still has an active subscription: " + destination); throw new JMSException("Destination still has an active subscription: " + destination);
} }
} }
} }
}
if (timeout > 0) { if (timeout > 0) {
// TODO: implement a way to notify the subscribers that we want to // TODO: implement a way to notify the subscribers that we want to
@ -394,6 +390,8 @@ public abstract class AbstractRegion implements Region {
for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) { for (Destination dest : (Set<Destination>) destinationMap.get(info.getDestination())) {
addList.add(dest); addList.add(dest);
} }
// ensure sub visible to any new dest addSubscriptionsForDestination
subscriptions.put(info.getConsumerId(), sub);
} finally { } finally {
destinationsLock.readLock().unlock(); destinationsLock.readLock().unlock();
} }
@ -416,6 +414,8 @@ public abstract class AbstractRegion implements Region {
LOG.error("Error unsubscribing " + sub + " from " + remove + ": " + ex.getMessage(), ex); LOG.error("Error unsubscribing " + sub + " from " + remove + ": " + ex.getMessage(), ex);
} }
} }
subscriptions.remove(info.getConsumerId());
removeList.clear();
throw e; throw e;
} }
} }
@ -426,8 +426,6 @@ public abstract class AbstractRegion implements Region {
((QueueBrowserSubscription) sub).destinationsAdded(); ((QueueBrowserSubscription) sub).destinationsAdded();
} }
subscriptions.put(info.getConsumerId(), sub);
return sub; return sub;
} }
} }

View File

@ -263,6 +263,7 @@ public abstract class BaseDestination implements Destination {
@Override @Override
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{ public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
destinationStatistics.getConsumers().decrement(); destinationStatistics.getConsumers().decrement();
this.lastActiveTime=0l;
} }
@ -298,9 +299,9 @@ public abstract class BaseDestination implements Destination {
@Override @Override
public boolean isActive() { public boolean isActive() {
boolean isActive = destinationStatistics.getConsumers().getCount() != 0 || boolean isActive = destinationStatistics.getConsumers().getCount() > 0 ||
destinationStatistics.getProducers().getCount() != 0; destinationStatistics.getProducers().getCount() > 0;
if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) { if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) {
isActive = hasRegularConsumers(getConsumers()); isActive = hasRegularConsumers(getConsumers());
} }
return isActive; return isActive;

View File

@ -192,9 +192,12 @@ public class Topic extends BaseDestination implements Task {
@Override @Override
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
if (!sub.getConsumerInfo().isDurable()) { if (!sub.getConsumerInfo().isDurable()) {
super.removeSubscription(context, sub, lastDeliveredSequenceId); boolean removed = false;
synchronized (consumers) { synchronized (consumers) {
consumers.remove(sub); removed = consumers.remove(sub);
}
if (removed) {
super.removeSubscription(context, sub, lastDeliveredSequenceId);
} }
} }
sub.remove(context, this); sub.remove(context, this);

View File

@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
public class DestinationGCStressTest { public class DestinationGCStressTest {
@ -168,7 +169,7 @@ public class DestinationGCStressTest {
log4jLogger.addAppender(appender); log4jLogger.addAppender(appender);
try { try {
final AtomicInteger max = new AtomicInteger(10000); final AtomicInteger max = new AtomicInteger(20000);
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false"); final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false");
factory.setWatchTopicAdvisories(false); factory.setWatchTopicAdvisories(false);
@ -201,13 +202,12 @@ public class DestinationGCStressTest {
executorService.submit(new Runnable() { executorService.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 1000; i++) {
try { try {
MessageConsumer messageConsumer = session.createConsumer(new ActiveMQTopic(">")); MessageConsumer messageConsumer = session.createConsumer(new ActiveMQTopic(">"));
messageConsumer.close(); messageConsumer.close();
} catch (Exception ignored) { } catch (Exception ignored) {
ignored.printStackTrace();
} }
} }
} }
@ -226,4 +226,75 @@ public class DestinationGCStressTest {
assertFalse("failed on unexpected log event", failed.get()); assertFalse("failed on unexpected log event", failed.get());
} }
@Test(timeout = 60000)
public void testAllDestsSeeSub() throws Exception {
final AtomicInteger foundDestWithMissingSub = new AtomicInteger(0);
final AtomicInteger max = new AtomicInteger(20000);
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false");
factory.setWatchTopicAdvisories(false);
Connection connection = factory.createConnection();
connection.start();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 1; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Connection c = factory.createConnection();
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = s.createProducer(null);
Message message = s.createTextMessage();
int j;
while ((j = max.decrementAndGet()) > 0) {
producer.send(new ActiveMQTopic("A." + j), message);
}
} catch (Exception ignored) {
ignored.printStackTrace();
}
}
});
}
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
try {
MessageConsumer messageConsumer = session.createConsumer(new ActiveMQTopic(">"));
if (destMissingSub(foundDestWithMissingSub)) {
break;
}
messageConsumer.close();
} catch (Exception ignored) {
}
}
}
});
executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
connection.close();
assertEquals("no dests missing sub", 0, foundDestWithMissingSub.get());
}
private boolean destMissingSub(AtomicInteger tally) {
for (Destination destination :
((RegionBroker)brokerService.getRegionBroker()).getTopicRegion().getDestinationMap().values()) {
if (destination.getConsumers().isEmpty()) {
tally.incrementAndGet();
return true;
}
}
return false;
}
} }