Persist the latest producer position across restarts in the leveldb store.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1515799 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-08-20 12:38:32 +00:00
parent f155083a61
commit 45e1462609
3 changed files with 38 additions and 23 deletions

View File

@ -168,7 +168,7 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
BitArrayBin bab = map.get(pid);
if (bab == null) {
bab = new BitArrayBin(auditDepth);
map.put(pid, bab);
map.put(pid.toString(), bab);
modified = true;
}
answer = bab.setBit(id.getProducerSequenceId(), true);
@ -272,7 +272,7 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
BitArrayBin bab = map.get(pid);
if (bab == null) {
bab = new BitArrayBin(auditDepth);
map.put(pid, bab);
map.put(pid.toString(), bab);
modified = true;
}
answer = bab.isInOrder(id.getProducerSequenceId());

View File

@ -427,7 +427,7 @@ class DBManager(val parent:LevelDBStore) {
val lastUowId = new AtomicInteger(1)
val producerSequenceIdTracker = new ActiveMQMessageAuditNoSync
var producerSequenceIdTracker = new ActiveMQMessageAuditNoSync
def getLastProducerSequenceId(id: ProducerId): Long = dispatchQueue.sync {
producerSequenceIdTracker.getLastSeqId(id)
@ -437,13 +437,6 @@ class DBManager(val parent:LevelDBStore) {
dispatchQueue.assertExecuting()
uowClosedCounter += 1
// track the producer seq positions.
for( (_, action) <- uow.actions ) {
if( action.messageRecord!=null ) {
producerSequenceIdTracker.isDuplicate(action.messageRecord.id)
}
}
// Broker could issue a flush_message call before
// this stage runs.. which make the stage jump over UowDelayed
if( uow.state.stage < UowDelayed.stage ) {
@ -452,10 +445,6 @@ class DBManager(val parent:LevelDBStore) {
if( uow.state.stage < UowFlushing.stage ) {
uow.actions.foreach { case (id, action) =>
if( action.messageRecord!=null ) {
producerSequenceIdTracker.isDuplicate(action.messageRecord.id)
}
// The UoW may have been canceled.
if( action.messageRecord!=null && action.enqueues.isEmpty ) {
action.removeFromPendingStore()

View File

@ -40,6 +40,7 @@ import java.util.{Date, Collections}
import org.apache.activemq.leveldb.util.TimeMetric
import org.apache.activemq.leveldb.RecordLog.LogInfo
import org.fusesource.leveldbjni.internal.JniDB
import org.apache.activemq.ActiveMQMessageAuditNoSync
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@ -66,6 +67,7 @@ object LevelDBClient extends Log {
final val DIRTY_INDEX_KEY = bytes(":dirty")
final val LOG_REF_INDEX_KEY = bytes(":log-refs")
final val LOGS_INDEX_KEY = bytes(":logs")
final val PRODUCER_IDS_INDEX_KEY = bytes(":producer_ids")
final val COLLECTION_META_KEY = bytes(":collection-meta")
final val TRUE = bytes("true")
@ -699,6 +701,10 @@ class LevelDBClient(store: LevelDBStore) {
log.read(pos).map {
case (kind, data, nextPos) =>
kind match {
case LOG_DATA =>
val message = decodeMessage(data)
store.db.producerSequenceIdTracker.isDuplicate(message.getMessageId)
case LOG_ADD_COLLECTION =>
val record= decodeCollectionRecord(data)
index.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data)
@ -846,10 +852,19 @@ class LevelDBClient(store: LevelDBStore) {
case e => throw e
}
}
def storeObject(key:Array[Byte], o:Object) = {
val baos = new ByteArrayOutputStream()
val os = new ObjectOutputStream(baos);
os.writeObject(o)
os.close()
index.put(key, baos.toByteArray)
}
storeMap(LOG_REF_INDEX_KEY, logRefs)
storeMap(COLLECTION_META_KEY, collectionMeta)
storeList(LOGS_INDEX_KEY, log.log_file_positions)
storeObject(PRODUCER_IDS_INDEX_KEY, store.db.producerSequenceIdTracker)
}
private def loadCounters = {
@ -878,6 +893,13 @@ class LevelDBClient(store: LevelDBStore) {
rc
}
}
def loadObject(key:Array[Byte]) = {
index.get(key, new ReadOptions).map { value=>
val bais = new ByteArrayInputStream(value)
val is = new ObjectInputStream(bais);
is.readObject();
}
}
loadMap(LOG_REF_INDEX_KEY, logRefs)
loadMap(COLLECTION_META_KEY, collectionMeta)
@ -887,6 +909,9 @@ class LevelDBClient(store: LevelDBStore) {
recoveryLogs.put(k, null)
}
}
for( audit <- loadObject(PRODUCER_IDS_INDEX_KEY) ) {
store.db.producerSequenceIdTracker = audit.asInstanceOf[ActiveMQMessageAuditNoSync]
}
}
var wal_append_position = 0L
@ -1183,16 +1208,17 @@ class LevelDBClient(store: LevelDBStore) {
}
// Lets decode
buffer.map{ x =>
var data = if( store.snappyCompressLogs ) {
Snappy.uncompress(x)
} else {
x
}
store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[Message]
}.getOrElse(null)
buffer.map(decodeMessage(_)).getOrElse(null)
}
def decodeMessage(x: Buffer): Message = {
var data = if (store.snappyCompressLogs) {
Snappy.uncompress(x)
} else {
x
}
store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[Message]
}
def collectionCursor(collectionKey: Long, cursorPosition:Buffer)(func: (Buffer, EntryRecord.Buffer)=>Boolean) = {
val ro = new ReadOptions
@ -1267,6 +1293,7 @@ class LevelDBClient(store: LevelDBStore) {
var dataLocator: DataLocator = null
if (messageRecord != null && messageRecord.locator == null) {
store.db.producerSequenceIdTracker.isDuplicate(messageRecord.id)
val start = System.nanoTime()
val p = appender.append(LOG_DATA, messageRecord.data)
log_info = p._2
@ -1329,7 +1356,6 @@ class LevelDBClient(store: LevelDBStore) {
val index_record = new EntryRecord.Bean()
index_record.setValueLocation(dataLocator.pos)
index_record.setValueLength(dataLocator.len)
batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray)
val index_data = encodeEntryRecord(index_record.freeze()).toByteArray
batch.put(key, index_data)