https://issues.apache.org/jira/browse/AMQ-5266 https://issues.apache.org/jira/browse/AMQ-4485 - single dest test with low limit exposed ignored setbatch in kahadb when sequence was not found in the index due to acking - resolved and validated with test that verifies dlq is empty

This commit is contained in:
gtully 2014-10-21 16:04:54 +01:00
parent ab3de0c4c2
commit 67ead201e1
4 changed files with 21 additions and 47 deletions

View File

@ -1200,7 +1200,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} finally {
pagedInMessagesLock.readLock().unlock();
}
messagesLock.readLock().lock();
messagesLock.writeLock().lock();
try{
try {
messages.reset();
@ -1217,7 +1217,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
messages.release();
}
}finally {
messagesLock.readLock().unlock();
messagesLock.writeLock().unlock();
}
return null;
}

View File

@ -2791,7 +2791,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
BTreeIndex<Long, MessageKeys> lowPriorityIndex;
BTreeIndex<Long, MessageKeys> highPriorityIndex;
MessageOrderCursor cursor = new MessageOrderCursor();
final MessageOrderCursor cursor = new MessageOrderCursor();
Long lastDefaultKey;
Long lastHighKey;
Long lastLowKey;
@ -2892,16 +2892,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (defaultPriorityIndex.containsKey(tx, sequence)) {
lastDefaultKey = sequence;
cursor.defaultCursorPosition = nextPosition.longValue();
} else if (highPriorityIndex != null) {
if (highPriorityIndex.containsKey(tx, sequence)) {
lastHighKey = sequence;
cursor.highPriorityCursorPosition = nextPosition.longValue();
} else if (lowPriorityIndex.containsKey(tx, sequence)) {
lastLowKey = sequence;
cursor.lowPriorityCursorPosition = nextPosition.longValue();
}
} else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequence)) {
lastHighKey = sequence;
cursor.highPriorityCursorPosition = nextPosition.longValue();
} else if (lowPriorityIndex.containsKey(tx, sequence)) {
lastLowKey = sequence;
cursor.lowPriorityCursorPosition = nextPosition.longValue();
} else {
LOG.warn("setBatch: sequence " + sequence + " not found in orderindex:" + this);
lastDefaultKey = sequence;
cursor.defaultCursorPosition = nextPosition.longValue();
}

View File

@ -179,7 +179,7 @@ public abstract class TestSupport extends CombinationTestSupport {
return setPersistenceAdapter(broker, defaultPersistenceAdapter);
}
public PersistenceAdapter setPersistenceAdapter(BrokerService broker, PersistenceAdapterChoice choice) throws IOException {
public static PersistenceAdapter setPersistenceAdapter(BrokerService broker, PersistenceAdapterChoice choice) throws IOException {
PersistenceAdapter adapter = null;
switch (choice) {
case JDBC:

View File

@ -35,15 +35,13 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -63,7 +61,6 @@ public class AMQ5266SingleDestTest {
static Logger LOG = LoggerFactory.getLogger(AMQ5266SingleDestTest.class);
String activemqURL;
BrokerService brokerService;
private EmbeddedDataSource dataSource;
public int numDests = 1;
public int messageSize = 10*1000;
@ -84,7 +81,7 @@ public class AMQ5266SingleDestTest {
public boolean useCache = true;
@Parameterized.Parameter(5)
public boolean useDefaultStore = false;
public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = TestSupport.PersistenceAdapterChoice.KahaDB;
@Parameterized.Parameter(6)
public boolean optimizeDispatch = false;
@ -92,7 +89,7 @@ public class AMQ5266SingleDestTest {
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
public static Iterable<Object[]> parameters() {
return Arrays.asList(new Object[][]{
{1000, 80, 80, 1024*1024*5, true, true, false},
{1000, 80, 80, 1024*1024*1, true, TestSupport.PersistenceAdapterChoice.KahaDB, false},
});
}
@ -102,22 +99,10 @@ public class AMQ5266SingleDestTest {
public void startBroker() throws Exception {
brokerService = new BrokerService();
dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName("target/derbyDb");
dataSource.setCreateDatabase("create");
JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
jdbcPersistenceAdapter.setDataSource(dataSource);
jdbcPersistenceAdapter.setUseLock(false);
if (!useDefaultStore) {
brokerService.setPersistenceAdapter(jdbcPersistenceAdapter);
} else {
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
}
TestSupport.setPersistenceAdapter(brokerService, persistenceAdapterChoice);
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setUseJmx(false);
brokerService.setAdvisorySupport(false);
PolicyMap policyMap = new PolicyMap();
@ -133,11 +118,12 @@ public class AMQ5266SingleDestTest {
policyMap.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(policyMap);
brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024);
brokerService.getSystemUsage().getMemoryUsage().setLimit(64 * 1024 * 1024);
TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0");
brokerService.start();
activemqURL = transportConnector.getPublishableConnectString();
activemqURL += "?jms.watchTopicAdvisories=false"; // ensure all messages are queue or dlq messages
}
@After
@ -145,10 +131,6 @@ public class AMQ5266SingleDestTest {
if (brokerService != null) {
brokerService.stop();
}
try {
dataSource.setShutdownDatabase("shutdown");
dataSource.getConnection();
} catch (Exception ignored) {}
}
@Test
@ -202,9 +184,6 @@ public class AMQ5266SingleDestTest {
try {
int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
if (!useDefaultStore) {
DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
}
Thread.sleep(1000);
} catch (Exception e) {
}
@ -217,11 +196,6 @@ public class AMQ5266SingleDestTest {
consumer.shutdown();
TimeUnit.SECONDS.sleep(2);
LOG.info("DB Contents START");
if (!useDefaultStore) {
DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
}
LOG.info("DB Contents END");
LOG.info("Consumer Stats:");
@ -243,6 +217,9 @@ public class AMQ5266SingleDestTest {
assertEquals("expect to get all messages!", 0, diff);
}
// verify empty dlq
assertEquals("No pending messages", 0l, ((RegionBroker) brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
}
public class ExportQueuePublisher {