Fixes for leveldb replication: make sure we only apply index updates when we encounter a UOW_END_RECORD so that we don't end up with an inconsistent index if a partial UOW is replicated.

This commit is contained in:
Hiram Chirino 2013-11-01 14:21:30 -04:00
parent dea38e62f8
commit b1d8cbe4cd
1 changed files with 37 additions and 5 deletions

View File

@ -31,7 +31,7 @@ import record.{CollectionKey, EntryKey, EntryRecord, CollectionRecord}
import org.apache.activemq.leveldb.util._ import org.apache.activemq.leveldb.util._
import java.util.concurrent._ import java.util.concurrent._
import org.fusesource.hawtbuf._ import org.fusesource.hawtbuf._
import java.io.{IOException, ObjectInputStream, ObjectOutputStream, File} import java.io._
import scala.Option._ import scala.Option._
import org.apache.activemq.command.{MessageAck, Message} import org.apache.activemq.command.{MessageAck, Message}
import org.apache.activemq.util.{IOExceptionSupport, ByteSequence} import org.apache.activemq.util.{IOExceptionSupport, ByteSequence}
@ -41,6 +41,17 @@ import org.apache.activemq.leveldb.util.TimeMetric
import org.apache.activemq.leveldb.RecordLog.LogInfo import org.apache.activemq.leveldb.RecordLog.LogInfo
import org.fusesource.leveldbjni.internal.JniDB import org.fusesource.leveldbjni.internal.JniDB
import org.apache.activemq.ActiveMQMessageAuditNoSync import org.apache.activemq.ActiveMQMessageAuditNoSync
import java.util.zip.CRC32
import org.apache.activemq.leveldb.util.TimeMetric
import org.fusesource.hawtbuf.ByteArrayInputStream
import org.apache.activemq.leveldb.RecordLog.LogInfo
import scala.Some
import scala.Serializable
import org.apache.activemq.leveldb.XaAckRecord
import org.apache.activemq.leveldb.MessageRecord
import org.apache.activemq.leveldb.EntryLocator
import org.apache.activemq.leveldb.DataLocator
import org.fusesource.hawtbuf.ByteArrayOutputStream
/** /**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a> * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@ -551,6 +562,7 @@ class LevelDBClient(store: LevelDBStore) {
log.open() log.open()
} }
replay_from(lastIndexSnapshotPos, log.appender_limit) replay_from(lastIndexSnapshotPos, log.appender_limit)
replay_write_batch = null;
} }
def init() ={ def init() ={
@ -678,7 +690,13 @@ class LevelDBClient(store: LevelDBStore) {
} }
} }
var replay_write_batch: WriteBatch = null
def replay_from(from:Long, limit:Long, print_progress:Boolean=true) = { def replay_from(from:Long, limit:Long, print_progress:Boolean=true) = {
debug("Replay of journal from: %d to %d.", from, limit)
if( replay_write_batch==null ) {
replay_write_batch = index.db.createWriteBatch()
}
might_fail { might_fail {
try { try {
// Update the index /w what was stored on the logs.. // Update the index /w what was stored on the logs..
@ -719,11 +737,13 @@ class LevelDBClient(store: LevelDBStore) {
case LOG_DATA => case LOG_DATA =>
val message = decodeMessage(data) val message = decodeMessage(data)
store.db.producerSequenceIdTracker.isDuplicate(message.getMessageId) store.db.producerSequenceIdTracker.isDuplicate(message.getMessageId)
trace("Replay of LOG_DATA at %d, message id: ", pos, message.getMessageId)
case LOG_ADD_COLLECTION => case LOG_ADD_COLLECTION =>
val record= decodeCollectionRecord(data) val record= decodeCollectionRecord(data)
index.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data) replay_write_batch.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data)
collectionMeta.put(record.getKey, new CollectionMeta) collectionMeta.put(record.getKey, new CollectionMeta)
trace("Replay of LOG_ADD_COLLECTION at %d, collection: %s", pos, record.getKey)
case LOG_REMOVE_COLLECTION => case LOG_REMOVE_COLLECTION =>
val record = decodeCollectionKeyRecord(data) val record = decodeCollectionKeyRecord(data)
@ -741,6 +761,7 @@ class LevelDBClient(store: LevelDBStore) {
} }
index.delete(data) index.delete(data)
collectionMeta.remove(record.getKey) collectionMeta.remove(record.getKey)
trace("Replay of LOG_REMOVE_COLLECTION at %d, collection: %s", pos, record.getKey)
case LOG_ADD_ENTRY | LOG_UPDATE_ENTRY => case LOG_ADD_ENTRY | LOG_UPDATE_ENTRY =>
val record = decodeEntryRecord(data) val record = decodeEntryRecord(data)
@ -750,7 +771,7 @@ class LevelDBClient(store: LevelDBStore) {
index_record.setValueLength(record.getValueLength) index_record.setValueLength(record.getValueLength)
val index_value = encodeEntryRecord(index_record.freeze()).toByteArray val index_value = encodeEntryRecord(index_record.freeze()).toByteArray
index.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value) replay_write_batch.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value)
if( kind==LOG_ADD_ENTRY ) { if( kind==LOG_ADD_ENTRY ) {
if ( record.hasValueLocation ) { if ( record.hasValueLocation ) {
@ -758,6 +779,7 @@ class LevelDBClient(store: LevelDBStore) {
} }
collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray) collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray)
} }
trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey)
case LOG_REMOVE_ENTRY => case LOG_REMOVE_ENTRY =>
val record = decodeEntryRecord(data) val record = decodeEntryRecord(data)
@ -767,10 +789,18 @@ class LevelDBClient(store: LevelDBStore) {
logRefDecrement(record.getValueLocation) logRefDecrement(record.getValueLocation)
} }
index.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey)) replay_write_batch.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey))
collectionDecrementSize( record.getCollectionKey) collectionDecrementSize( record.getCollectionKey)
trace("Replay of LOG_REMOVE_ENTRY collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey)
case _ => // Skip other records, they don't modify the index. case LOG_TRACE =>
trace("Replay of LOG_TRACE, message: %s", pos, data.ascii())
case RecordLog.UOW_END_RECORD =>
trace("Replay of UOW_END_RECORD")
index.db.write(replay_write_batch)
replay_write_batch=index.db.createWriteBatch()
case kind => // Skip other records, they don't modify the index.
trace("Skipping replay of %d record kind at %d", kind, pos)
} }
pos = nextPos pos = nextPos
@ -788,9 +818,11 @@ class LevelDBClient(store: LevelDBStore) {
case e:Throwable => case e:Throwable =>
// replay failed.. good thing we are in a retry block... // replay failed.. good thing we are in a retry block...
index.close index.close
replay_write_batch = null
throw e; throw e;
} finally { } finally {
recoveryLogs = null recoveryLogs = null
debug("Replay of journal done")
} }
} }
} }