Replicated leveldb slaves index snapshots were being labeled with higher journal positions than what they really contained.

This commit is contained in:
Hiram Chirino 2013-11-08 10:25:50 -05:00
parent 0d38840051
commit 42e1c463d4
3 changed files with 68 additions and 45 deletions

View File

@ -671,6 +671,7 @@ class LevelDBClient(store: LevelDBStore) {
for( (id, file)<- lastSnapshotIndex ) { for( (id, file)<- lastSnapshotIndex ) {
try { try {
copyIndex(file, dirtyIndexFile) copyIndex(file, dirtyIndexFile)
debug("Recovering from last index snapshot at: "+dirtyIndexFile)
} catch { } catch {
case e:Exception => case e:Exception =>
warn(e, "Could not recover snapshot of the index: "+e) warn(e, "Could not recover snapshot of the index: "+e)
@ -678,19 +679,18 @@ class LevelDBClient(store: LevelDBStore) {
} }
} }
index = new RichDB(factory.open(dirtyIndexFile, indexOptions)); index = new RichDB(factory.open(dirtyIndexFile, indexOptions));
if ( store.paranoidChecks ) {
for(value <- index.get(DIRTY_INDEX_KEY) ) { for(value <- index.get(DIRTY_INDEX_KEY) ) {
if( java.util.Arrays.equals(value, TRUE) ) { if( java.util.Arrays.equals(value, TRUE) ) {
warn("Recovering from a dirty index.") warn("Recovering from a dirty index.")
} }
} }
}
index.put(DIRTY_INDEX_KEY, TRUE) index.put(DIRTY_INDEX_KEY, TRUE)
loadCounters loadCounters
} }
} }
var replay_write_batch: WriteBatch = null var replay_write_batch: WriteBatch = null
var indexRecoveryPosition = 0L
def replay_from(from:Long, limit:Long, print_progress:Boolean=true) = { def replay_from(from:Long, limit:Long, print_progress:Boolean=true) = {
debug("Replay of journal from: %d to %d.", from, limit) debug("Replay of journal from: %d to %d.", from, limit)
@ -700,19 +700,19 @@ class LevelDBClient(store: LevelDBStore) {
might_fail { might_fail {
try { try {
// Update the index /w what was stored on the logs.. // Update the index /w what was stored on the logs..
var pos = from; indexRecoveryPosition = from;
var last_reported_at = System.currentTimeMillis(); var last_reported_at = System.currentTimeMillis();
var showing_progress = false var showing_progress = false
var last_reported_pos = 0L var last_reported_pos = 0L
try { try {
while (pos < limit) { while (indexRecoveryPosition < limit) {
if( print_progress ) { if( print_progress ) {
val now = System.currentTimeMillis(); val now = System.currentTimeMillis();
if( now > last_reported_at+1000 ) { if( now > last_reported_at+1000 ) {
val at = pos-from val at = indexRecoveryPosition-from
val total = limit-from val total = limit-from
val rate = (pos-last_reported_pos)*1000.0 / (now - last_reported_at) val rate = (indexRecoveryPosition-last_reported_pos)*1000.0 / (now - last_reported_at)
val eta = (total-at)/rate val eta = (total-at)/rate
val remaining = if(eta > 60*60) { val remaining = if(eta > 60*60) {
"%.2f hrs".format(eta/(60*60)) "%.2f hrs".format(eta/(60*60))
@ -726,24 +726,24 @@ class LevelDBClient(store: LevelDBStore) {
at*100.0/total, at, total, rate/1024, remaining)) at*100.0/total, at, total, rate/1024, remaining))
showing_progress = true; showing_progress = true;
last_reported_at = now last_reported_at = now
last_reported_pos = pos last_reported_pos = indexRecoveryPosition
} }
} }
log.read(pos).map { log.read(indexRecoveryPosition).map {
case (kind, data, nextPos) => case (kind, data, nextPos) =>
kind match { kind match {
case LOG_DATA => case LOG_DATA =>
val message = decodeMessage(data) val message = decodeMessage(data)
store.db.producerSequenceIdTracker.isDuplicate(message.getMessageId) store.db.producerSequenceIdTracker.isDuplicate(message.getMessageId)
trace("Replay of LOG_DATA at %d, message id: ", pos, message.getMessageId) trace("Replay of LOG_DATA at %d, message id: ", indexRecoveryPosition, message.getMessageId)
case LOG_ADD_COLLECTION => case LOG_ADD_COLLECTION =>
val record= decodeCollectionRecord(data) val record= decodeCollectionRecord(data)
replay_write_batch.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data) replay_write_batch.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data)
collectionMeta.put(record.getKey, new CollectionMeta) collectionMeta.put(record.getKey, new CollectionMeta)
trace("Replay of LOG_ADD_COLLECTION at %d, collection: %s", pos, record.getKey) trace("Replay of LOG_ADD_COLLECTION at %d, collection: %s", indexRecoveryPosition, record.getKey)
case LOG_REMOVE_COLLECTION => case LOG_REMOVE_COLLECTION =>
val record = decodeCollectionKeyRecord(data) val record = decodeCollectionKeyRecord(data)
@ -761,7 +761,7 @@ class LevelDBClient(store: LevelDBStore) {
} }
index.delete(data) index.delete(data)
collectionMeta.remove(record.getKey) collectionMeta.remove(record.getKey)
trace("Replay of LOG_REMOVE_COLLECTION at %d, collection: %s", pos, record.getKey) trace("Replay of LOG_REMOVE_COLLECTION at %d, collection: %s", indexRecoveryPosition, record.getKey)
case LOG_ADD_ENTRY | LOG_UPDATE_ENTRY => case LOG_ADD_ENTRY | LOG_UPDATE_ENTRY =>
val record = decodeEntryRecord(data) val record = decodeEntryRecord(data)
@ -779,7 +779,8 @@ class LevelDBClient(store: LevelDBStore) {
} }
collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray) collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray)
} }
trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey) trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey)
case LOG_REMOVE_ENTRY => case LOG_REMOVE_ENTRY =>
val record = decodeEntryRecord(data) val record = decodeEntryRecord(data)
@ -791,19 +792,19 @@ class LevelDBClient(store: LevelDBStore) {
replay_write_batch.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey)) replay_write_batch.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey))
collectionDecrementSize( record.getCollectionKey) collectionDecrementSize( record.getCollectionKey)
trace("Replay of LOG_REMOVE_ENTRY collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey) trace("Replay of LOG_REMOVE_ENTRY collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey)
case LOG_TRACE => case LOG_TRACE =>
trace("Replay of LOG_TRACE, message: %s", pos, data.ascii()) trace("Replay of LOG_TRACE, message: %s", indexRecoveryPosition, data.ascii())
case RecordLog.UOW_END_RECORD => case RecordLog.UOW_END_RECORD =>
trace("Replay of UOW_END_RECORD") trace("Replay of UOW_END_RECORD")
index.db.write(replay_write_batch) index.db.write(replay_write_batch)
replay_write_batch=index.db.createWriteBatch() replay_write_batch=index.db.createWriteBatch()
case kind => // Skip other records, they don't modify the index. case kind => // Skip other records, they don't modify the index.
trace("Skipping replay of %d record kind at %d", kind, pos) trace("Skipping replay of %d record kind at %d", kind, indexRecoveryPosition)
} }
pos = nextPos indexRecoveryPosition = nextPos
} }
} }
} }
@ -989,10 +990,11 @@ class LevelDBClient(store: LevelDBStore) {
index.put(DIRTY_INDEX_KEY, FALSE, new WriteOptions().sync(true)) index.put(DIRTY_INDEX_KEY, FALSE, new WriteOptions().sync(true))
index.close index.close
index = null index = null
debug("Gracefuly closed the index")
copyDirtyIndexToSnapshot
} }
if (log!=null && log.isOpen) { if (log!=null && log.isOpen) {
log.close log.close
copyDirtyIndexToSnapshot
stored_wal_append_position = log.appender_limit stored_wal_append_position = log.appender_limit
log = null log = null
} }
@ -1043,15 +1045,18 @@ class LevelDBClient(store: LevelDBStore) {
snapshotRwLock.writeLock().unlock() snapshotRwLock.writeLock().unlock()
} }
def nextIndexSnapshotPos:Long = wal_append_position
def copyDirtyIndexToSnapshot:Unit = { def copyDirtyIndexToSnapshot:Unit = {
if( log.appender_limit == lastIndexSnapshotPos ) { if( nextIndexSnapshotPos == lastIndexSnapshotPos ) {
// no need to snapshot again... // no need to snapshot again...
return return
} }
copyDirtyIndexToSnapshot(log.appender_limit) copyDirtyIndexToSnapshot(nextIndexSnapshotPos)
} }
def copyDirtyIndexToSnapshot(walPosition:Long):Unit = { def copyDirtyIndexToSnapshot(walPosition:Long):Unit = {
debug("Taking a snapshot of the current index: "+snapshotIndexFile(walPosition))
// Where we start copying files into. Delete this on // Where we start copying files into. Delete this on
// restart. // restart.
val tmpDir = tempIndexFile val tmpDir = tempIndexFile

View File

@ -58,11 +58,16 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
// he slave is caught up. // he slave is caught up.
override def post_log_rotate: Unit = { override def post_log_rotate: Unit = {
if( caughtUp ) { if( caughtUp ) {
super.post_log_rotate writeExecutor {
snapshotIndex(false)
} }
} }
} }
// The snapshots we create are based on what has been replayed.
override def nextIndexSnapshotPos:Long = indexRecoveryPosition
}
override def doStart() = { override def doStart() = {
queue.setLabel("slave: "+node_id) queue.setLabel("slave: "+node_id)
client.init() client.init()

View File

@ -118,21 +118,22 @@ public class ReplicatedLevelDBStoreTest {
return f; return f;
} }
@Test(timeout = 1000*60*60) @Test(timeout = 1000*60*20)
public void testReplication() throws Exception { public void testReplication() throws Exception {
LinkedList<File> directories = new LinkedList<File>(); LinkedList<File> directories = new LinkedList<File>();
directories.add(new File("target/activemq-data/leveldb-node1")); directories.add(new File("target/activemq-data/leveldb-node1"));
directories.add(new File("target/activemq-data/leveldb-node2")); directories.add(new File("target/activemq-data/leveldb-node2"));
directories.add(new File("target/activemq-data/leveldb-node3")); directories.add(new File("target/activemq-data/leveldb-node3"));
resetDirectories(directories);
for (File f : directories) { // For some reason this had to be 64k to trigger a bug where
FileSupport.toRichFile(f).recursiveDelete(); // slave index snapshots were being done incorrectly.
} String playload = createPlayload(64*1024);
ArrayList<String> expected_list = new ArrayList<String>(); ArrayList<String> expected_list = new ArrayList<String>();
// We will rotate between 3 nodes the task of being the master. // We will rotate between 3 nodes the task of being the master.
for (int j = 0; j < 10; j++) { for (int j = 0; j < 5; j++) {
MasterLevelDBStore master = createMaster(directories.get(0)); MasterLevelDBStore master = createMaster(directories.get(0));
CountDownFuture masterStart = asyncStart(master); CountDownFuture masterStart = asyncStart(master);
@ -141,8 +142,12 @@ public class ReplicatedLevelDBStoreTest {
asyncStart(slave2); asyncStart(slave2);
masterStart.await(); masterStart.await();
LOG.info("Adding messages...");
MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST")); MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST"));
LOG.info("Checking: "+master.getDirectory());
assertEquals(expected_list, getMessages(ms));
LOG.info("Adding messages...");
final int TOTAL = 500; final int TOTAL = 500;
for (int i = 0; i < TOTAL; i++) { for (int i = 0; i < TOTAL; i++) {
if (i % ((int) (TOTAL * 0.10)) == 0) { if (i % ((int) (TOTAL * 0.10)) == 0) {
@ -152,19 +157,23 @@ public class ReplicatedLevelDBStoreTest {
if (i == 250) { if (i == 250) {
slave1.start(); slave1.start();
slave2.stop(); slave2.stop();
LOG.info("Checking: "+master.getDirectory());
assertEquals(expected_list, getMessages(ms));
} }
String msgid = "m:" + j + ":" + i; String msgid = "m:" + j + ":" + i;
addMessage(ms, msgid); addMessage(ms, msgid, playload);
expected_list.add(msgid); expected_list.add(msgid);
} }
LOG.info("Checking master state"); LOG.info("Checking: "+master.getDirectory());
assertEquals(expected_list, getMessages(ms)); assertEquals(expected_list, getMessages(ms));
LOG.info("Stopping master: " + master.node_id()); LOG.info("Stopping master: " + master.getDirectory());
master.stop(); master.stop();
LOG.info("Stopping slave: " + slave1.node_id());
Thread.sleep(3*1000);
LOG.info("Stopping slave: " + slave1.getDirectory());
slave1.stop(); slave1.stop();
// Rotate the dir order so that slave1 becomes the master next. // Rotate the dir order so that slave1 becomes the master next.
@ -172,22 +181,26 @@ public class ReplicatedLevelDBStoreTest {
} }
} }
void resetDirectories(LinkedList<File> directories) {
for (File directory : directories) {
FileSupport.toRichFile(directory).recursiveDelete();
directory.mkdirs();
FileSupport.toRichFile(new File(directory, "nodeid.txt")).writeText(directory.getName(), "UTF-8");
}
}
@Test(timeout = 1000*60*60) @Test(timeout = 1000*60*60)
public void testSlowSlave() throws Exception { public void testSlowSlave() throws Exception {
File node1Dir = new File("target/activemq-data/leveldb-node1"); LinkedList<File> directories = new LinkedList<File>();
File node2Dir = new File("target/activemq-data/leveldb-node2"); directories.add(new File("target/activemq-data/leveldb-node1"));
File node3Dir = new File("target/activemq-data/leveldb-node3"); directories.add(new File("target/activemq-data/leveldb-node2"));
directories.add(new File("target/activemq-data/leveldb-node3"));
FileSupport.toRichFile(node1Dir).recursiveDelete(); resetDirectories(directories);
FileSupport.toRichFile(node2Dir).recursiveDelete();
FileSupport.toRichFile(node3Dir).recursiveDelete();
node2Dir.mkdirs();
node3Dir.mkdirs();
FileSupport.toRichFile(new File(node2Dir, "nodeid.txt")).writeText("node2", "UTF-8");
FileSupport.toRichFile(new File(node3Dir, "nodeid.txt")).writeText("node3", "UTF-8");
File node1Dir = directories.get(0);
File node2Dir = directories.get(1);
File node3Dir = directories.get(2);
ArrayList<String> expected_list = new ArrayList<String>(); ArrayList<String> expected_list = new ArrayList<String>();