diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index dbf651265d..b130a22079 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -31,7 +31,7 @@ import record.{CollectionKey, EntryKey, EntryRecord, CollectionRecord} import org.apache.activemq.leveldb.util._ import java.util.concurrent._ import org.fusesource.hawtbuf._ -import java.io.{IOException, ObjectInputStream, ObjectOutputStream, File} +import java.io._ import scala.Option._ import org.apache.activemq.command.{MessageAck, Message} import org.apache.activemq.util.{IOExceptionSupport, ByteSequence} @@ -41,6 +41,17 @@ import org.apache.activemq.leveldb.util.TimeMetric import org.apache.activemq.leveldb.RecordLog.LogInfo import org.fusesource.leveldbjni.internal.JniDB import org.apache.activemq.ActiveMQMessageAuditNoSync +import java.util.zip.CRC32 +import org.apache.activemq.leveldb.util.TimeMetric +import org.fusesource.hawtbuf.ByteArrayInputStream +import org.apache.activemq.leveldb.RecordLog.LogInfo +import scala.Some +import scala.Serializable +import org.apache.activemq.leveldb.XaAckRecord +import org.apache.activemq.leveldb.MessageRecord +import org.apache.activemq.leveldb.EntryLocator +import org.apache.activemq.leveldb.DataLocator +import org.fusesource.hawtbuf.ByteArrayOutputStream /** * @author Hiram Chirino @@ -551,6 +562,7 @@ class LevelDBClient(store: LevelDBStore) { log.open() } replay_from(lastIndexSnapshotPos, log.appender_limit) + replay_write_batch = null; } def init() ={ @@ -678,7 +690,13 @@ class LevelDBClient(store: LevelDBStore) { } } + var replay_write_batch: WriteBatch = null + def replay_from(from:Long, limit:Long, print_progress:Boolean=true) = { + debug("Replay of journal from: %d to %d.", from, limit) + if( replay_write_batch==null ) { + replay_write_batch = index.db.createWriteBatch() + } might_fail { try { // Update the index /w what was stored on the logs.. @@ -719,11 +737,13 @@ class LevelDBClient(store: LevelDBStore) { case LOG_DATA => val message = decodeMessage(data) store.db.producerSequenceIdTracker.isDuplicate(message.getMessageId) + trace("Replay of LOG_DATA at %d, message id: ", pos, message.getMessageId) case LOG_ADD_COLLECTION => val record= decodeCollectionRecord(data) - index.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data) + replay_write_batch.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data) collectionMeta.put(record.getKey, new CollectionMeta) + trace("Replay of LOG_ADD_COLLECTION at %d, collection: %s", pos, record.getKey) case LOG_REMOVE_COLLECTION => val record = decodeCollectionKeyRecord(data) @@ -741,6 +761,7 @@ class LevelDBClient(store: LevelDBStore) { } index.delete(data) collectionMeta.remove(record.getKey) + trace("Replay of LOG_REMOVE_COLLECTION at %d, collection: %s", pos, record.getKey) case LOG_ADD_ENTRY | LOG_UPDATE_ENTRY => val record = decodeEntryRecord(data) @@ -750,7 +771,7 @@ class LevelDBClient(store: LevelDBStore) { index_record.setValueLength(record.getValueLength) val index_value = encodeEntryRecord(index_record.freeze()).toByteArray - index.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value) + replay_write_batch.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value) if( kind==LOG_ADD_ENTRY ) { if ( record.hasValueLocation ) { @@ -758,6 +779,7 @@ class LevelDBClient(store: LevelDBStore) { } collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray) } + trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey) case LOG_REMOVE_ENTRY => val record = decodeEntryRecord(data) @@ -767,10 +789,18 @@ class LevelDBClient(store: LevelDBStore) { logRefDecrement(record.getValueLocation) } - index.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey)) + replay_write_batch.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey)) collectionDecrementSize( record.getCollectionKey) + trace("Replay of LOG_REMOVE_ENTRY collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey) - case _ => // Skip other records, they don't modify the index. + case LOG_TRACE => + trace("Replay of LOG_TRACE, message: %s", pos, data.ascii()) + case RecordLog.UOW_END_RECORD => + trace("Replay of UOW_END_RECORD") + index.db.write(replay_write_batch) + replay_write_batch=index.db.createWriteBatch() + case kind => // Skip other records, they don't modify the index. + trace("Skipping replay of %d record kind at %d", kind, pos) } pos = nextPos @@ -788,9 +818,11 @@ class LevelDBClient(store: LevelDBStore) { case e:Throwable => // replay failed.. good thing we are in a retry block... index.close + replay_write_batch = null throw e; } finally { recoveryLogs = null + debug("Replay of journal done") } } }