Make sure IndexShard is active during recovery so that it gets its fair share of the indexing buffer

Closes #16250
This commit is contained in:
Mike McCandless 2016-01-28 06:01:06 -05:00
parent 2ca3433bea
commit 0cc94168dc
2 changed files with 83 additions and 0 deletions

View File

@ -854,6 +854,10 @@ public class IndexShard extends AbstractIndexShardComponent {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still invoke any onShardInactive listeners ... we won't sync'd flush in this case because we only do that on primary and this
// is a replica
active.set(true);
return engineConfig.getTranslogRecoveryPerformer().performBatchRecovery(getEngine(), operations);
}
@ -883,6 +887,11 @@ public class IndexShard extends AbstractIndexShardComponent {
// but we need to make sure we don't loose deletes until we are done recovering
engineConfig.setEnableGcDeletes(false);
engineConfig.setCreate(indexExists == false);
if (skipTranslogRecovery == false) {
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run:
active.set(true);
}
createNewEngine(skipTranslogRecovery, engineConfig);
}
@ -1043,6 +1052,10 @@ public class IndexShard extends AbstractIndexShardComponent {
MetaDataStateFormat.deleteMetaState(shardPath().getDataPath());
}
public boolean isActive() {
return active.get();
}
public ShardPath shardPath() {
return path;
}
@ -1302,6 +1315,15 @@ public class IndexShard extends AbstractIndexShardComponent {
assert this.currentEngineReference.get() == null;
this.currentEngineReference.set(newEngine(skipTranslogRecovery, config));
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which
// settings changes could possibly have happened, so here we forcefully push any config changes to the new engine:
Engine engine = getEngineOrNull();
// engine could perhaps be null if we were e.g. concurrently closed:
if (engine != null) {
engine.onSettingsChanged();
}
}
protected Engine newEngine(boolean skipTranslogRecovery, EngineConfig config) {

View File

@ -1087,4 +1087,65 @@ public class IndexShardTests extends ESSingleNodeTestCase {
newShard.performBatchRecovery(operations);
assertFalse(newShard.getTranslog().syncNeeded());
}
public void testIndexingBufferDuringInternalRecovery() throws IOException {
createIndex("index");
client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject()
.startObject("testtype")
.startObject("properties")
.startObject("foo")
.field("type", "string")
.endObject()
.endObject().endObject().endObject()).get();
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("index");
IndexShard shard = test.getShardOrNull(0);
ShardRouting routing = new ShardRouting(shard.routingEntry());
test.removeShard(0, "b/c britta says so");
IndexShard newShard = test.createShard(routing);
newShard.shardRouting = routing;
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
newShard.markAsRecovering("for testing", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.REPLICA, localNode, localNode));
// Shard is still inactive since we haven't started recovering yet
assertFalse(newShard.isActive());
newShard.prepareForIndexRecovery();
// Shard is still inactive since we haven't started recovering yet
assertFalse(newShard.isActive());
newShard.performTranslogRecovery(true);
// Shard should now be active since we did recover:
assertTrue(newShard.isActive());
}
public void testIndexingBufferDuringPeerRecovery() throws IOException {
createIndex("index");
client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject()
.startObject("testtype")
.startObject("properties")
.startObject("foo")
.field("type", "string")
.endObject()
.endObject().endObject().endObject()).get();
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("index");
IndexShard shard = test.getShardOrNull(0);
ShardRouting routing = new ShardRouting(shard.routingEntry());
test.removeShard(0, "b/c britta says so");
IndexShard newShard = test.createShard(routing);
newShard.shardRouting = routing;
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
newShard.markAsRecovering("for testing", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.REPLICA, localNode, localNode));
// Shard is still inactive since we haven't started recovering yet
assertFalse(newShard.isActive());
List<Translog.Operation> operations = new ArrayList<>();
operations.add(new Translog.Index("testtype", "1", jsonBuilder().startObject().field("foo", "bar").endObject().bytes().toBytes()));
newShard.prepareForIndexRecovery();
newShard.skipTranslogRecovery();
// Shard is still inactive since we haven't started recovering yet
assertFalse(newShard.isActive());
newShard.performBatchRecovery(operations);
// Shard should now be active since we did recover:
assertTrue(newShard.isActive());
}
}