Invoke `IndexingOperationListeners` also when recovering from store or remote

Today we don't invoke `IndexingOperationListeners` when we are running
a recovery form store or replaying translog from remote. This is problematic since
the actual code path for indexing is different between normal indexing and recovery.
An important detail is left out on recovery since we implemented the `IndexingMemoryController`
as an `IndexingOperationListener` we might never flush the `IndexWriter` of a recovering shard
which can lead to `OOMs` on node startup / recovery.
This commit is contained in:
Simon Willnauer 2016-03-30 14:07:34 +02:00
parent 7e2696c570
commit 43b87e8f86
3 changed files with 93 additions and 15 deletions

View File

@ -486,6 +486,11 @@ public class IndexShard extends AbstractIndexShardComponent {
*/
public boolean index(Engine.Index index) {
ensureWriteAllowed(index);
Engine engine = getEngine();
return index(engine, index);
}
private boolean index(Engine engine, Engine.Index index) {
active.set(true);
index = indexingOperationListeners.preIndex(index);
final boolean created;
@ -493,7 +498,6 @@ public class IndexShard extends AbstractIndexShardComponent {
if (logger.isTraceEnabled()) {
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
}
Engine engine = getEngine();
created = engine.index(index);
index.endTime(System.nanoTime());
} catch (Throwable ex) {
@ -524,13 +528,17 @@ public class IndexShard extends AbstractIndexShardComponent {
public void delete(Engine.Delete delete) {
ensureWriteAllowed(delete);
Engine engine = getEngine();
delete(engine, delete);
}
private void delete(Engine engine, Engine.Delete delete) {
active.set(true);
delete = indexingOperationListeners.preDelete(delete);
try {
if (logger.isTraceEnabled()) {
logger.trace("delete [{}]", delete.uid().text());
}
Engine engine = getEngine();
engine.delete(delete);
delete.endTime(System.nanoTime());
} catch (Throwable ex) {
@ -1382,6 +1390,16 @@ public class IndexShard extends AbstractIndexShardComponent {
translogStats.totalOperationsOnStart(snapshot.totalOperations());
return super.recoveryFromSnapshot(engine, snapshot);
}
@Override
protected void index(Engine engine, Engine.Index engineIndex) {
IndexShard.this.index(engine, engineIndex);
}
@Override
protected void delete(Engine engine, Engine.Delete engineDelete) {
IndexShard.this.delete(engine, engineDelete);
}
};
return new EngineConfig(shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),

View File

@ -158,7 +158,7 @@ public class TranslogRecoveryPerformer {
if (logger.isTraceEnabled()) {
logger.trace("[translog] recover [index] op of [{}][{}]", index.type(), index.id());
}
engine.index(engineIndex);
index(engine, engineIndex);
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;
@ -166,8 +166,9 @@ public class TranslogRecoveryPerformer {
if (logger.isTraceEnabled()) {
logger.trace("[translog] recover [delete] op of [{}][{}]", uid.type(), uid.id());
}
engine.delete(new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.version(),
delete.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, System.nanoTime(), false));
final Engine.Delete engineDelete = new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.version(),
delete.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, System.nanoTime(), false);
delete(engine, engineDelete);
break;
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
@ -193,6 +194,14 @@ public class TranslogRecoveryPerformer {
operationProcessed();
}
protected void index(Engine engine, Engine.Index engineIndex) {
engine.index(engineIndex);
}
protected void delete(Engine engine, Engine.Delete engineDelete) {
engine.delete(engineDelete);
}
/**
* Called once for every processed operation by this recovery performer.

View File

@ -691,6 +691,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
AtomicInteger preDelete = new AtomicInteger();
AtomicInteger postDelete = new AtomicInteger();
AtomicInteger postDeleteException = new AtomicInteger();
shard.close("simon says", true);
shard = reinitWithWrapper(test, shard, null, new IndexingOperationListener() {
@Override
public Engine.Index preIndex(Engine.Index operation) {
@ -771,10 +772,10 @@ public class IndexShardTests extends ESSingleNodeTestCase {
}
assertEquals(3, preIndex.get());
assertEquals(2, preIndex.get());
assertEquals(1, postIndexCreate.get());
assertEquals(1, postIndexUpdate.get());
assertEquals(1, postIndexException.get());
assertEquals(0, postIndexException.get());
assertEquals(1, preDelete.get());
assertEquals(1, postDelete.get());
assertEquals(0, postDeleteException.get());
@ -785,14 +786,13 @@ public class IndexShardTests extends ESSingleNodeTestCase {
}
assertEquals(3, preIndex.get());
assertEquals(2, preIndex.get());
assertEquals(1, postIndexCreate.get());
assertEquals(1, postIndexUpdate.get());
assertEquals(1, postIndexException.get());
assertEquals(2, preDelete.get());
assertEquals(0, postIndexException.get());
assertEquals(1, preDelete.get());
assertEquals(1, postDelete.get());
assertEquals(1, postDeleteException.get());
assertEquals(0, postDeleteException.get());
}
public void testMaybeFlush() throws Exception {
@ -1190,7 +1190,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
return searcher;
}
};
shard.close("simon says", true);
IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
try {
try (Engine.Searcher searcher = newShard.acquireSearcher("test")) {
@ -1231,6 +1231,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
}
};
shard.close("simon says", true);
IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
try {
// test global ordinals are evicted
@ -1258,6 +1259,57 @@ public class IndexShardTests extends ESSingleNodeTestCase {
}
}
public void testIndexingOperationListnenersIsInvokedOnRecovery() throws IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService(resolveIndex("test"));
IndexShard shard = indexService.getShardOrNull(0);
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").get();
client().prepareDelete("test", "test", "0").get();
client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {};
shard.close("simon says", false);
final AtomicInteger preIndex = new AtomicInteger();
final AtomicInteger postIndex = new AtomicInteger();
final AtomicInteger preDelete = new AtomicInteger();
final AtomicInteger postDelete = new AtomicInteger();
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public Engine.Index preIndex(Engine.Index operation) {
preIndex.incrementAndGet();
return operation;
}
@Override
public void postIndex(Engine.Index index, boolean created) {
postIndex.incrementAndGet();
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
preDelete.incrementAndGet();
return delete;
}
@Override
public void postDelete(Engine.Delete delete) {
postDelete.incrementAndGet();
}
};
final IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper, listener);
try {
assertEquals(2, preIndex.get());
assertEquals(2, postIndex.get());
assertEquals(1, preDelete.get());
assertEquals(1, postDelete.get());
} finally {
newShard.close("just do it", randomBoolean());
}
}
public void testSearchIsReleaseIfWrapperFails() throws IOException {
createIndex("test");
ensureGreen();
@ -1276,6 +1328,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
}
};
shard.close("simon says", true);
IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
try {
newShard.acquireSearcher("test");
@ -1285,12 +1338,10 @@ public class IndexShardTests extends ESSingleNodeTestCase {
} finally {
newShard.close("just do it", randomBoolean());
}
// test will fail due to unclosed searchers if the searcher is not released
}
private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException {
ShardRouting routing = new ShardRouting(shard.routingEntry());
shard.close("simon says", true);
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(),
shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,