From b40dc4cc5452455dd93f53abf71e92bb6cd0b9cc Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Wed, 17 Jun 2015 13:23:08 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5668 This commit fixes a race condition in AbstractStoreCursor setLastCacheId that could cause a null pointer exception in certain cases. --- .../region/cursors/AbstractStoreCursor.java | 66 +++++++------ .../cursors/AbstractStoreCursorNpeTest.java | 93 +++++++++++++++++++ 2 files changed, 130 insertions(+), 29 deletions(-) create mode 100755 activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursorNpeTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 07d4351778..4bdd7f6eec 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -58,15 +58,15 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i this.batchList = new OrderedPendingList(); } } - - + + public final synchronized void start() throws Exception{ if (!isStarted()) { super.start(); resetBatch(); resetSize(); setCacheEnabled(size==0&&useCache); - } + } } protected void resetSize() { @@ -84,7 +84,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i gc(); } - + public final boolean recoverMessage(Message message) throws Exception { return recoverMessage(message,false); } @@ -148,12 +148,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i clearIterator(true); size(); } - - + + public synchronized void release() { clearIterator(false); } - + private synchronized void clearIterator(boolean ensureIterator) { boolean haveIterator = this.iterator != null; this.iterator=null; @@ -161,7 +161,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i ensureIterator(); } } - + private synchronized void ensureIterator() { if(this.iterator==null) { this.iterator=this.batchList.iterator(); @@ -171,8 +171,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public final void finished() { } - - + + public final synchronized boolean hasNext() { if (batchList.isEmpty()) { try { @@ -185,8 +185,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i ensureIterator(); return this.iterator.hasNext(); } - - + + public final synchronized MessageReference next() { MessageReference result = null; if (!this.batchList.isEmpty()&&this.iterator.hasNext()) { @@ -198,7 +198,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } return result; } - + public synchronized boolean addMessageLast(MessageReference node) throws Exception { boolean disableCache = false; if (hasSpace()) { @@ -314,23 +314,31 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } private void setLastCachedId(final int index, MessageId candidate) { - if (lastCachedIds[index] == null || lastCachedIds[index].getFutureOrSequenceLong() == null) { // possibly null for topics - lastCachedIds[index] = candidate; - } else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedIds[index].getFutureOrSequenceLong())) > 0) { + MessageId lastCacheId = lastCachedIds[index]; + if (lastCacheId == null) { lastCachedIds[index] = candidate; + } else { + Object lastCacheFutureOrSequenceLong = lastCacheId.getFutureOrSequenceLong(); + Object candidateOrSequenceLong = candidate.getFutureOrSequenceLong(); + if (lastCacheFutureOrSequenceLong == null) { // possibly null for topics + lastCachedIds[index] = candidate; + } else if (candidateOrSequenceLong != null && + Long.compare(((Long) candidateOrSequenceLong), ((Long) lastCacheFutureOrSequenceLong)) > 0) { + lastCachedIds[index] = candidate; + } } } protected void setBatch(MessageId messageId) throws Exception { } - + public synchronized void addMessageFirst(MessageReference node) throws Exception { setCacheEnabled(false); size++; } - + public final synchronized void remove() { size--; if (iterator!=null) { @@ -341,20 +349,20 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } - + public final synchronized void remove(MessageReference node) { if (batchList.remove(node) != null) { size--; setCacheEnabled(false); } } - - + + public final synchronized void clear() { gc(); } - - + + public synchronized void gc() { for (MessageReference msg : batchList) { rollback(msg.getMessageId()); @@ -385,14 +393,14 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } } } - - + + public final synchronized boolean isEmpty() { // negative means more messages added to store through queue.send since last reset return size == 0; } - + public final synchronized boolean hasMessagesBufferedToDeliver() { return !batchList.isEmpty(); } @@ -413,13 +421,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i + ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null") + ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null"); } - + protected abstract void doFillBatch() throws Exception; - + protected abstract void resetBatch(); protected abstract int getStoreSize(); - + protected abstract boolean isStoreEmpty(); public Subscription getSubscription() { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursorNpeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursorNpeTest.java new file mode 100755 index 0000000000..4d47165b9f --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursorNpeTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.test.TestSupport; + +/** + * This test shows that a null pointer exception will not occur when unsubscribing from a + * subscription while a producer is sending messages rapidly to the topic. A null pointer + * exception was occurring in the setLastCachedId method of AbstractMessageCursor due to + * a race condition. If this test is run before the patch that is applied in this commit + * on AbstractStoreCusor, it will consistently fail with a NPE. + * + */ +public class AbstractStoreCursorNpeTest extends TestSupport { + + protected Connection connection; + protected Session session; + protected MessageConsumer consumer; + protected MessageProducer producer; + protected Topic destination; + + + public void testSetLastCachedIdNPE() throws Exception { + connection = createConnection(); + connection.setClientID("clientId"); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = session.createTopic("test.topic"); + producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + + Connection durableCon = createConnection(); + durableCon.setClientID("testCons"); + durableCon.start(); + final Session durSession = durableCon.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //In a new thread rapidly subscribe and unsubscribe to a durable + ExecutorService executorService = Executors.newCachedThreadPool(); + executorService.execute(new Runnable() { + @Override + public void run() { + try{ + //Repeatedly create a durable subscription and then unsubscribe which used to + //cause a NPE while messages were sending + while(true) { + MessageConsumer cons = durSession.createDurableSubscriber(durSession.createTopic("test.topic"), "sub1"); + Thread.sleep(100); + cons.close(); + durSession.unsubscribe("sub1"); + } + } catch (Exception ignored) { + ignored.printStackTrace(); + } + } + }); + + TextMessage myMessage = new ActiveMQTextMessage(); + myMessage.setText("test"); + //Make sure that we can send a bunch of messages without a NPE + //This would fail if the patch is not applied + for (int i = 0; i < 10000; i++) { + producer.send(myMessage); + } + } +}