diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 73ba9d04003..03c7e4e82e1 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -854,6 +854,10 @@ public class IndexShard extends AbstractIndexShardComponent { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } + // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, + // we still invoke any onShardInactive listeners ... we won't sync'd flush in this case because we only do that on primary and this + // is a replica + active.set(true); return engineConfig.getTranslogRecoveryPerformer().performBatchRecovery(getEngine(), operations); } @@ -883,6 +887,11 @@ public class IndexShard extends AbstractIndexShardComponent { // but we need to make sure we don't loose deletes until we are done recovering engineConfig.setEnableGcDeletes(false); engineConfig.setCreate(indexExists == false); + if (skipTranslogRecovery == false) { + // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive, + // we still give sync'd flush a chance to run: + active.set(true); + } createNewEngine(skipTranslogRecovery, engineConfig); } @@ -1043,6 +1052,10 @@ public class IndexShard extends AbstractIndexShardComponent { MetaDataStateFormat.deleteMetaState(shardPath().getDataPath()); } + public boolean isActive() { + return active.get(); + } + public ShardPath shardPath() { return path; } @@ -1302,6 +1315,15 @@ public class IndexShard extends AbstractIndexShardComponent { assert this.currentEngineReference.get() == null; this.currentEngineReference.set(newEngine(skipTranslogRecovery, config)); } + + // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which + // settings changes could possibly have happened, so here we forcefully push any config changes to the new engine: + Engine engine = getEngineOrNull(); + + // engine could perhaps be null if we were e.g. concurrently closed: + if (engine != null) { + engine.onSettingsChanged(); + } } protected Engine newEngine(boolean skipTranslogRecovery, EngineConfig config) { diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index e84b4546ce0..13fa55e8295 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1087,4 +1087,65 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShard.performBatchRecovery(operations); assertFalse(newShard.getTranslog().syncNeeded()); } + + public void testIndexingBufferDuringInternalRecovery() throws IOException { + createIndex("index"); + client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject() + .startObject("testtype") + .startObject("properties") + .startObject("foo") + .field("type", "string") + .endObject() + .endObject().endObject().endObject()).get(); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("index"); + IndexShard shard = test.getShardOrNull(0); + ShardRouting routing = new ShardRouting(shard.routingEntry()); + test.removeShard(0, "b/c britta says so"); + IndexShard newShard = test.createShard(routing); + newShard.shardRouting = routing; + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); + newShard.markAsRecovering("for testing", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.REPLICA, localNode, localNode)); + // Shard is still inactive since we haven't started recovering yet + assertFalse(newShard.isActive()); + newShard.prepareForIndexRecovery(); + // Shard is still inactive since we haven't started recovering yet + assertFalse(newShard.isActive()); + newShard.performTranslogRecovery(true); + // Shard should now be active since we did recover: + assertTrue(newShard.isActive()); + } + + public void testIndexingBufferDuringPeerRecovery() throws IOException { + createIndex("index"); + client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject() + .startObject("testtype") + .startObject("properties") + .startObject("foo") + .field("type", "string") + .endObject() + .endObject().endObject().endObject()).get(); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("index"); + IndexShard shard = test.getShardOrNull(0); + ShardRouting routing = new ShardRouting(shard.routingEntry()); + test.removeShard(0, "b/c britta says so"); + IndexShard newShard = test.createShard(routing); + newShard.shardRouting = routing; + DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); + newShard.markAsRecovering("for testing", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.REPLICA, localNode, localNode)); + // Shard is still inactive since we haven't started recovering yet + assertFalse(newShard.isActive()); + List operations = new ArrayList<>(); + operations.add(new Translog.Index("testtype", "1", jsonBuilder().startObject().field("foo", "bar").endObject().bytes().toBytes())); + newShard.prepareForIndexRecovery(); + newShard.skipTranslogRecovery(); + // Shard is still inactive since we haven't started recovering yet + assertFalse(newShard.isActive()); + newShard.performBatchRecovery(operations); + // Shard should now be active since we did recover: + assertTrue(newShard.isActive()); + } }