sync translog to disk after recovery from primary

Otherwise if that node is shutdown and restarted it might will have lost all operations
that were in the translog.
This commit is contained in:
Britta Weber 2016-01-07 16:27:40 +01:00
parent 8bd54dbf5a
commit f93b4cb215
2 changed files with 29 additions and 0 deletions

View File

@ -70,6 +70,7 @@ public class TranslogRecoveryPerformer {
performRecoveryOperation(engine, operation, false);
numOps++;
}
engine.getTranslog().sync();
} catch (Throwable t) {
throw new BatchOperationException(shardId, "failed to apply batch translog operation", numOps, t);
}

View File

@ -100,8 +100,10 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
@ -113,6 +115,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@ -1054,4 +1057,29 @@ public class IndexShardTests extends ESSingleNodeTestCase {
return newShard;
}
public void testTranslogRecoverySyncsTranslog() throws IOException {
createIndex("testindexfortranslogsync");
client().admin().indices().preparePutMapping("testindexfortranslogsync").setType("testtype").setSource(jsonBuilder().startObject()
.startObject("testtype")
.startObject("properties")
.startObject("foo")
.field("type", "string")
.endObject()
.endObject().endObject().endObject()).get();
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("testindexfortranslogsync");
IndexShard shard = test.getShardOrNull(0);
ShardRouting routing = new ShardRouting(shard.routingEntry());
test.removeShard(0, "b/c britta says so");
IndexShard newShard = test.createShard(routing);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
newShard.markAsRecovering("for testing", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.REPLICA, localNode, localNode));
List<Translog.Operation> operations = new ArrayList<>();
operations.add(new Translog.Index("testtype", "1", jsonBuilder().startObject().field("foo", "bar").endObject().bytes().toBytes()));
newShard.prepareForIndexRecovery();
newShard.performTranslogRecovery(true);
newShard.performBatchRecovery(operations);
assertFalse(newShard.getTranslog().syncNeeded());
}
}