Merge pull request #15832 from brwe/sync-translog-to-disk-after-recovery

sync translog to disk after recovery from primary
This commit is contained in:
Britta Weber 2016-01-07 16:54:56 +01:00
commit bc5a6be0e3
2 changed files with 29 additions and 0 deletions

View File

@ -70,6 +70,7 @@ public class TranslogRecoveryPerformer {
performRecoveryOperation(engine, operation, false); performRecoveryOperation(engine, operation, false);
numOps++; numOps++;
} }
engine.getTranslog().sync();
} catch (Throwable t) { } catch (Throwable t) {
throw new BatchOperationException(shardId, "failed to apply batch translog operation", numOps, t); throw new BatchOperationException(shardId, "failed to apply batch translog operation", numOps, t);
} }

View File

@ -100,8 +100,10 @@ import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
@ -113,6 +115,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@ -1054,4 +1057,29 @@ public class IndexShardTests extends ESSingleNodeTestCase {
return newShard; return newShard;
} }
public void testTranslogRecoverySyncsTranslog() throws IOException {
createIndex("testindexfortranslogsync");
client().admin().indices().preparePutMapping("testindexfortranslogsync").setType("testtype").setSource(jsonBuilder().startObject()
.startObject("testtype")
.startObject("properties")
.startObject("foo")
.field("type", "string")
.endObject()
.endObject().endObject().endObject()).get();
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("testindexfortranslogsync");
IndexShard shard = test.getShardOrNull(0);
ShardRouting routing = new ShardRouting(shard.routingEntry());
test.removeShard(0, "b/c britta says so");
IndexShard newShard = test.createShard(routing);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
newShard.markAsRecovering("for testing", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.REPLICA, localNode, localNode));
List<Translog.Operation> operations = new ArrayList<>();
operations.add(new Translog.Index("testtype", "1", jsonBuilder().startObject().field("foo", "bar").endObject().bytes().toBytes()));
newShard.prepareForIndexRecovery();
newShard.performTranslogRecovery(true);
newShard.performBatchRecovery(operations);
assertFalse(newShard.getTranslog().syncNeeded());
}
} }