Adding a LevelDB version of the RedeliveryRestartTest. Implemented redelivery tracking in the leveldb store.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1518289 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-08-28 17:20:25 +00:00
parent feb71c3c6e
commit f4d51e092b
5 changed files with 127 additions and 24 deletions

View File

@ -41,7 +41,7 @@ case class DataLocator(pos:Long, len:Int)
case class MessageRecord(id:MessageId, data:Buffer, syncNeeded:Boolean) {
var locator:DataLocator = _
}
case class QueueEntryRecord(id:MessageId, queueKey:Long, queueSeq:Long)
case class QueueEntryRecord(id:MessageId, queueKey:Long, queueSeq:Long, deliveries:Int=0)
case class QueueRecord(id:ActiveMQDestination, queue_key:Long)
case class QueueEntryRange()
case class SubAckRecord(subKey:Long, ackPosition:Long)
@ -308,6 +308,26 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
countDownFuture
}
def incrementRedelivery(expectedQueueKey:Long, id:MessageId) = {
if( id.getEntryLocator != null ) {
val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator];
assert(queueKey == expectedQueueKey)
val counter = manager.client.getDeliveryCounter(queueKey, queueSeq)
val entry = QueueEntryRecord(id, queueKey, queueSeq, counter+1)
val a = this.synchronized {
val action = getAction(entry.id)
action.enqueues += entry
delayableActions += 1
action
}
manager.dispatchQueue {
manager.cancelable_enqueue_actions.put(key(entry), a)
a.addToPendingStore()
}
}
countDownFuture
}
def dequeue(expectedQueueKey:Long, id:MessageId) = {
if( id.getEntryLocator != null ) {
val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator];

View File

@ -85,6 +85,7 @@ object LevelDBClient extends Log {
final val LOG_REMOVE_ENTRY = 4.toByte
final val LOG_DATA = 5.toByte
final val LOG_TRACE = 6.toByte
final val LOG_UPDATE_ENTRY = 7.toByte
final val LOG_SUFFIX = ".log"
final val INDEX_SUFFIX = ".index"
@ -727,7 +728,7 @@ class LevelDBClient(store: LevelDBStore) {
index.delete(data)
collectionMeta.remove(record.getKey)
case LOG_ADD_ENTRY =>
case LOG_ADD_ENTRY | LOG_UPDATE_ENTRY =>
val record = decodeEntryRecord(data)
val index_record = new EntryRecord.Bean()
@ -737,10 +738,12 @@ class LevelDBClient(store: LevelDBStore) {
index.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value)
if ( record.hasValueLocation ) {
logRefIncrement(record.getValueLocation)
if( kind==LOG_ADD_ENTRY ) {
if ( record.hasValueLocation ) {
logRefIncrement(record.getValueLocation)
}
collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray)
}
collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray)
case LOG_REMOVE_ENTRY =>
val record = decodeEntryRecord(data)
@ -1150,6 +1153,33 @@ class LevelDBClient(store: LevelDBStore) {
}
}
def decodeQueueEntryMeta(value:EntryRecord.Getter):Int= {
if( value.hasMeta ) {
val is = new DataByteArrayInputStream(value.getMeta);
val metaVersion = is.readVarInt()
metaVersion match {
case 1 =>
return is.readVarInt()
case _ =>
}
}
return 0
}
def getDeliveryCounter(collectionKey: Long, seq:Long):Int = {
val ro = new ReadOptions
ro.fillCache(true)
ro.verifyChecksums(verifyChecksums)
val key = encodeEntryKey(ENTRY_PREFIX, collectionKey, encodeLong(seq))
var rc = 0
might_fail_using_index {
for( v <- index.get(key, ro) ) {
rc = decodeQueueEntryMeta(EntryRecord.FACTORY.parseUnframed(v))
}
}
return rc
}
def queueCursor(collectionKey: Long, seq:Long)(func: (Message)=>Boolean) = {
collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
val seq = decodeLong(key)
@ -1157,6 +1187,7 @@ class LevelDBClient(store: LevelDBStore) {
val msg = getMessage(locator)
msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
msg.getMessageId().setDataLocator(locator)
msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
func(msg)
}
}
@ -1351,20 +1382,32 @@ class LevelDBClient(store: LevelDBStore) {
log_record.setEntryKey(new Buffer(key, 9, 8))
log_record.setValueLocation(dataLocator.pos)
log_record.setValueLength(dataLocator.len)
appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze()))
val kind = if (entry.deliveries==0) LOG_ADD_ENTRY else LOG_UPDATE_ENTRY
appender.append(kind, encodeEntryRecord(log_record.freeze()))
val index_record = new EntryRecord.Bean()
index_record.setValueLocation(dataLocator.pos)
index_record.setValueLength(dataLocator.len)
// Store the delivery counter.
if( entry.deliveries!=0 ) {
val os = new DataByteArrayOutputStream()
os.writeVarInt(1) // meta data format version
os.writeVarInt(entry.deliveries)
index_record.setMeta(os.toBuffer)
}
val index_data = encodeEntryRecord(index_record.freeze()).toByteArray
batch.put(key, index_data)
for (key <- logRefKey(dataLocator.pos, log_info)) {
logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
if( kind==LOG_ADD_ENTRY ) {
for (key <- logRefKey(dataLocator.pos, log_info)) {
logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet()
}
collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
}
collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray)
write_enqueue_total += System.nanoTime() - start
}

View File

@ -380,6 +380,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
if( prepared ) {
store.preparedAcks.remove(ack.getLastMessageId)
}
uow.incrementRedelivery(store.key, ack.getLastMessageId)
}
}
}
@ -452,16 +453,16 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
case null =>
debug("on rollback, the transaction " + txid + " does not exist")
case tx =>
if( tx.prepared ) {
val done = new CountDownLatch(1)
withUow { uow =>
for( action <- tx.commitActions.reverse ) {
action.rollback(uow)
}
uow.syncFlag = true
uow.addCompleteListener { done.countDown() }
val done = new CountDownLatch(1)
withUow { uow =>
for( action <- tx.commitActions.reverse ) {
action.rollback(uow)
}
done.await()
uow.syncFlag = true
uow.addCompleteListener { done.countDown() }
}
done.await()
if( tx.prepared ) {
db.removeTransactionContainer(tx.xacontainer_id)
}
}

View File

@ -0,0 +1,33 @@
package org.apache.activemq.broker;
import junit.framework.Test;
import org.apache.activemq.leveldb.LevelDBStore;
import java.io.IOException;
/**
*/
public class LevelDBRedeliveryRestartTest extends RedeliveryRestartTest {
@Override
protected void configureBroker(BrokerService broker) throws Exception {
broker.setDestinationPolicy(policyMap);
LevelDBStore store = new LevelDBStore();
broker.setPersistenceAdapter(store);
broker.addConnector("tcp://0.0.0.0:0");
}
@Override
protected void stopBrokerWithStoreFailure() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
public static Test suite() {
return suite(LevelDBRedeliveryRestartTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

View File

@ -33,6 +33,8 @@ import org.apache.activemq.transport.failover.FailoverTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class RedeliveryRestartTest extends BrokerRestartTestSupport {
private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartTest.class);
@ -82,7 +84,7 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
// make failover aware of the restarted auto assigned port
connection.getTransport().narrow(FailoverTransport.class).add(true, broker.getTransportConnectors().get(0)
.getPublishableConnectString());
.getPublishableConnectString());
consumer = session.createConsumer(destination);
for (int i = 0; i < 5; i++) {
@ -125,11 +127,7 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
assertEquals("not a redelivery", false, msg.getJMSRedelivered());
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
// have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover
kahaDBPersistenceAdapter.getStore().getJournal().close();
broker.waitUntilStopped();
stopBrokerWithStoreFailure();
broker = createRestartedBroker();
broker.start();
@ -150,6 +148,14 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
connection.close();
}
protected void stopBrokerWithStoreFailure() throws Exception {
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
// have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover
kahaDBPersistenceAdapter.getStore().getJournal().close();
broker.waitUntilStopped();
}
private void populateDestination(final int nbMessages, final String destinationName, javax.jms.Connection connection) throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(destinationName);