Indexing: Add shard id to indexing operation listener (#22606)

The IndexingOperationListener interface did not provide any
information about the shard id when a document was indexed.

This commit adds the shard id as the first parameter to all methods
in the IndexingOperationListener.
This commit is contained in:
Alexander Reelsen 2017-01-16 09:08:16 +01:00 committed by GitHub
parent 526cf6182d
commit f6ee6e420b
11 changed files with 113 additions and 93 deletions

View File

@ -311,7 +311,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]AlreadyExpiredException.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]CompositeIndexEventListener.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexSettings.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexingSlowLog.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]MergePolicyConfig.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]SearchSlowLog.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]analysis[/\\]AnalysisRegistry.java" checks="LineLength" />
@ -787,7 +786,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]search[/\\]nested[/\\]LongNestedSortingTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]search[/\\]nested[/\\]NestedSortingTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]shard[/\\]IndexShardTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]shard[/\\]IndexingOperationListenerTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]shard[/\\]ShardPathTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]similarity[/\\]SimilarityTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]snapshots[/\\]blobstore[/\\]FileInfoTests.java" checks="LineLength" />

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@ -76,7 +77,8 @@ public final class IndexingSlowLog implements IndexingOperationListener {
* and everything else is interpreted as Elasticsearch interprets booleans
* which is then converted to 0 for false and Integer.MAX_VALUE for true.
*/
public static final Setting<Integer> INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING = new Setting<>(INDEX_INDEXING_SLOWLOG_PREFIX + ".source", "1000", (value) -> {
public static final Setting<Integer> INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING =
new Setting<>(INDEX_INDEXING_SLOWLOG_PREFIX + ".source", "1000", (value) -> {
try {
return Integer.parseInt(value, 10);
} catch (NumberFormatException e) {
@ -90,17 +92,22 @@ public final class IndexingSlowLog implements IndexingOperationListener {
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING, this::setReformat);
this.reformat = indexSettings.getValue(INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING);
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN_SETTING, this::setWarnThreshold);
indexSettings.getScopedSettings()
.addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN_SETTING, this::setWarnThreshold);
this.indexWarnThreshold = indexSettings.getValue(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN_SETTING).nanos();
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO_SETTING, this::setInfoThreshold);
indexSettings.getScopedSettings()
.addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO_SETTING, this::setInfoThreshold);
this.indexInfoThreshold = indexSettings.getValue(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO_SETTING).nanos();
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG_SETTING, this::setDebugThreshold);
indexSettings.getScopedSettings()
.addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG_SETTING, this::setDebugThreshold);
this.indexDebugThreshold = indexSettings.getValue(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG_SETTING).nanos();
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_TRACE_SETTING, this::setTraceThreshold);
indexSettings.getScopedSettings()
.addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_TRACE_SETTING, this::setTraceThreshold);
this.indexTraceThreshold = indexSettings.getValue(INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_TRACE_SETTING).nanos();
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_LEVEL_SETTING, this::setLevel);
setLevel(indexSettings.getValue(INDEX_INDEXING_SLOWLOG_LEVEL_SETTING));
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING, this::setMaxSourceCharsToLog);
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING,
this::setMaxSourceCharsToLog);
this.maxSourceCharsToLog = indexSettings.getValue(INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING);
}
@ -134,7 +141,7 @@ public final class IndexingSlowLog implements IndexingOperationListener {
}
@Override
public void postIndex(Engine.Index indexOperation, Engine.IndexResult result) {
public void postIndex(ShardId shardId, Engine.Index indexOperation, Engine.IndexResult result) {
if (result.hasFailure() == false) {
final ParsedDocument doc = indexOperation.parsedDoc();
final long tookInNanos = result.getTook();
@ -169,7 +176,8 @@ public final class IndexingSlowLog implements IndexingOperationListener {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(index).append(" ");
sb.append("took[").append(TimeValue.timeValueNanos(tookInNanos)).append("], took_millis[").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos)).append("], ");
sb.append("took[").append(TimeValue.timeValueNanos(tookInNanos)).append("], ");
sb.append("took_millis[").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos)).append("], ");
sb.append("type[").append(doc.type()).append("], ");
sb.append("id[").append(doc.id()).append("], ");
if (doc.routing() == null) {

View File

@ -553,17 +553,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private Engine.IndexResult index(Engine engine, Engine.Index index) {
active.set(true);
final Engine.IndexResult result;
index = indexingOperationListeners.preIndex(index);
index = indexingOperationListeners.preIndex(shardId, index);
try {
if (logger.isTraceEnabled()) {
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
}
result = engine.index(index);
} catch (Exception e) {
indexingOperationListeners.postIndex(index, e);
indexingOperationListeners.postIndex(shardId, index, e);
throw e;
}
indexingOperationListeners.postIndex(index, result);
indexingOperationListeners.postIndex(shardId, index, result);
return result;
}
@ -602,17 +602,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) {
active.set(true);
final Engine.DeleteResult result;
delete = indexingOperationListeners.preDelete(delete);
delete = indexingOperationListeners.preDelete(shardId, delete);
try {
if (logger.isTraceEnabled()) {
logger.trace("delete [{}]", delete.uid().text());
}
result = engine.delete(delete);
} catch (Exception e) {
indexingOperationListeners.postDelete(delete, e);
indexingOperationListeners.postDelete(shardId, delete, e);
throw e;
}
indexingOperationListeners.postDelete(delete, result);
indexingOperationListeners.postDelete(shardId, delete, result);
return result;
}

View File

@ -33,29 +33,29 @@ public interface IndexingOperationListener {
/**
* Called before the indexing occurs.
*/
default Engine.Index preIndex(Engine.Index operation) {
default Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
return operation;
}
/**
* Called after the indexing operation occurred. Note that this is
* also called when indexing a document did not succeed due to document
* related failures. See {@link #postIndex(Engine.Index, Exception)}
* related failures. See {@link #postIndex(ShardId, Engine.Index, Exception)}
* for engine level failures
*/
default void postIndex(Engine.Index index, Engine.IndexResult result) {}
default void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {}
/**
* Called after the indexing operation occurred with engine level exception.
* See {@link #postIndex(Engine.Index, Engine.IndexResult)} for document
* See {@link #postIndex(ShardId, Engine.Index, Engine.IndexResult)} for document
* related failures
*/
default void postIndex(Engine.Index index, Exception ex) {}
default void postIndex(ShardId shardId, Engine.Index index, Exception ex) {}
/**
* Called before the delete occurs.
*/
default Engine.Delete preDelete(Engine.Delete delete) {
default Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
return delete;
}
@ -63,17 +63,17 @@ public interface IndexingOperationListener {
/**
* Called after the delete operation occurred. Note that this is
* also called when deleting a document did not succeed due to document
* related failures. See {@link #postDelete(Engine.Delete, Exception)}
* related failures. See {@link #postDelete(ShardId, Engine.Delete, Exception)}
* for engine level failures
*/
default void postDelete(Engine.Delete delete, Engine.DeleteResult result) {}
default void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {}
/**
* Called after the delete operation occurred with engine level exception.
* See {@link #postDelete(Engine.Delete, Engine.DeleteResult)} for document
* See {@link #postDelete(ShardId, Engine.Delete, Engine.DeleteResult)} for document
* related failures
*/
default void postDelete(Engine.Delete delete, Exception ex) {}
default void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {}
/**
* A Composite listener that multiplexes calls to each of the listeners methods.
@ -88,11 +88,11 @@ public interface IndexingOperationListener {
}
@Override
public Engine.Index preIndex(Engine.Index operation) {
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
assert operation != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.preIndex(operation);
listener.preIndex(shardId, operation);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preIndex listener [{}] failed", listener), e);
}
@ -101,11 +101,11 @@ public interface IndexingOperationListener {
}
@Override
public void postIndex(Engine.Index index, Engine.IndexResult result) {
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
assert index != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postIndex(index, result);
listener.postIndex(shardId, index, result);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), e);
}
@ -113,11 +113,11 @@ public interface IndexingOperationListener {
}
@Override
public void postIndex(Engine.Index index, Exception ex) {
public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
assert index != null && ex != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postIndex(index, ex);
listener.postIndex(shardId, index, ex);
} catch (Exception inner) {
inner.addSuppressed(ex);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), inner);
@ -126,11 +126,11 @@ public interface IndexingOperationListener {
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
assert delete != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.preDelete(delete);
listener.preDelete(shardId, delete);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preDelete listener [{}] failed", listener), e);
}
@ -139,11 +139,11 @@ public interface IndexingOperationListener {
}
@Override
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {
assert delete != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postDelete(delete, result);
listener.postDelete(shardId, delete, result);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), e);
}
@ -151,11 +151,11 @@ public interface IndexingOperationListener {
}
@Override
public void postDelete(Engine.Delete delete, Exception ex) {
public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {
assert delete != null && ex != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postDelete(delete, ex);
listener.postDelete(shardId, delete, ex);
} catch (Exception inner) {
inner.addSuppressed(ex);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), inner);

View File

@ -65,7 +65,7 @@ final class InternalIndexingStats implements IndexingOperationListener {
}
@Override
public Engine.Index preIndex(Engine.Index operation) {
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
if (!operation.origin().isRecovery()) {
totalStats.indexCurrent.inc();
typeStats(operation.type()).indexCurrent.inc();
@ -74,7 +74,7 @@ final class InternalIndexingStats implements IndexingOperationListener {
}
@Override
public void postIndex(Engine.Index index, Engine.IndexResult result) {
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
if (result.hasFailure() == false) {
if (!index.origin().isRecovery()) {
long took = result.getTook();
@ -85,12 +85,12 @@ final class InternalIndexingStats implements IndexingOperationListener {
typeStats.indexCurrent.dec();
}
} else {
postIndex(index, result.getFailure());
postIndex(shardId, index, result.getFailure());
}
}
@Override
public void postIndex(Engine.Index index, Exception ex) {
public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
if (!index.origin().isRecovery()) {
totalStats.indexCurrent.dec();
typeStats(index.type()).indexCurrent.dec();
@ -100,7 +100,7 @@ final class InternalIndexingStats implements IndexingOperationListener {
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
if (!delete.origin().isRecovery()) {
totalStats.deleteCurrent.inc();
typeStats(delete.type()).deleteCurrent.inc();
@ -110,7 +110,7 @@ final class InternalIndexingStats implements IndexingOperationListener {
}
@Override
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {
if (result.hasFailure() == false) {
if (!delete.origin().isRecovery()) {
long took = result.getTook();
@ -121,12 +121,12 @@ final class InternalIndexingStats implements IndexingOperationListener {
typeStats.deleteCurrent.dec();
}
} else {
postDelete(delete, result.getFailure());
postDelete(shardId, delete, result.getFailure());
}
}
@Override
public void postDelete(Engine.Delete delete, Exception ex) {
public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {
if (!delete.origin().isRecovery()) {
totalStats.deleteCurrent.dec();
typeStats(delete.type()).deleteCurrent.dec();

View File

@ -34,6 +34,7 @@ import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
@ -200,12 +201,12 @@ public class IndexingMemoryController extends AbstractComponent implements Index
}
@Override
public void postIndex(Engine.Index index, Engine.IndexResult result) {
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
recordOperationBytes(index, result);
}
@Override
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {
recordOperationBytes(delete, result);
}

View File

@ -232,7 +232,7 @@ public class IndexModuleTests extends ESTestCase {
AtomicBoolean executed = new AtomicBoolean(false);
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public Engine.Index preIndex(Engine.Index operation) {
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
executed.set(true);
return operation;
}
@ -248,8 +248,9 @@ public class IndexModuleTests extends ESTestCase {
assertSame(listener, indexService.getIndexOperationListeners().get(1));
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
ShardId shardId = new ShardId(new Index("foo", "bar"), 0);
for (IndexingOperationListener l : indexService.getIndexOperationListeners()) {
l.preIndex(index);
l.preIndex(shardId, index);
}
assertTrue(executed.get());
indexService.close("simon says", false);

View File

@ -420,7 +420,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public void postIndex(Engine.Index index, Engine.IndexResult result) {
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
try {
assertNotNull(shardRef.get());
// this is all IMC needs to do - check current memory and refresh
@ -434,7 +434,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
@Override
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {
try {
assertNotNull(shardRef.get());
// this is all IMC needs to do - check current memory and refresh

View File

@ -573,13 +573,13 @@ public class IndexShardTests extends IndexShardTestCase {
shard.close("simon says", true);
shard = reinitShard(shard, new IndexingOperationListener() {
@Override
public Engine.Index preIndex(Engine.Index operation) {
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
preIndex.incrementAndGet();
return operation;
}
@Override
public void postIndex(Engine.Index index, Engine.IndexResult result) {
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
if (result.hasFailure() == false) {
if (result.isCreated()) {
postIndexCreate.incrementAndGet();
@ -587,32 +587,32 @@ public class IndexShardTests extends IndexShardTestCase {
postIndexUpdate.incrementAndGet();
}
} else {
postIndex(index, result.getFailure());
postIndex(shardId, index, result.getFailure());
}
}
@Override
public void postIndex(Engine.Index index, Exception ex) {
public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
postIndexException.incrementAndGet();
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
preDelete.incrementAndGet();
return delete;
}
@Override
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {
if (result.hasFailure() == false) {
postDelete.incrementAndGet();
} else {
postDelete(delete, result.getFailure());
postDelete(shardId, delete, result.getFailure());
}
}
@Override
public void postDelete(Engine.Delete delete, Exception ex) {
public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {
postDeleteException.incrementAndGet();
}
@ -1144,24 +1144,24 @@ public class IndexShardTests extends IndexShardTestCase {
final AtomicInteger postDelete = new AtomicInteger();
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public Engine.Index preIndex(Engine.Index operation) {
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
preIndex.incrementAndGet();
return operation;
}
@Override
public void postIndex(Engine.Index index, Engine.IndexResult result) {
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
postIndex.incrementAndGet();
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
preDelete.incrementAndGet();
return delete;
}
@Override
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {
postDelete.incrementAndGet();
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.shard;
import org.apache.lucene.index.Term;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.test.ESTestCase;
@ -29,6 +30,8 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.is;
public class IndexingOperationListenerTests extends ESTestCase{
// this test also tests if calls are correct if one or more listeners throw exceptions
@ -39,76 +42,83 @@ public class IndexingOperationListenerTests extends ESTestCase{
AtomicInteger preDelete = new AtomicInteger();
AtomicInteger postDelete = new AtomicInteger();
AtomicInteger postDeleteException = new AtomicInteger();
ShardId randomShardId = new ShardId(new Index(randomAsciiOfLength(10), randomAsciiOfLength(10)), randomIntBetween(1, 10));
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public Engine.Index preIndex(Engine.Index operation) {
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
assertThat(shardId, is(randomShardId));
preIndex.incrementAndGet();
return operation;
}
@Override
public void postIndex(Engine.Index index, Engine.IndexResult result) {
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
assertThat(shardId, is(randomShardId));
if (result.hasFailure() == false) {
postIndex.incrementAndGet();
} else {
postIndex(index, result.getFailure());
postIndex(shardId, index, result.getFailure());
}
}
@Override
public void postIndex(Engine.Index index, Exception ex) {
public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
assertThat(shardId, is(randomShardId));
postIndexException.incrementAndGet();
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
assertThat(shardId, is(randomShardId));
preDelete.incrementAndGet();
return delete;
}
@Override
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {
assertThat(shardId, is(randomShardId));
if (result.hasFailure() == false) {
postDelete.incrementAndGet();
} else {
postDelete(delete, result.getFailure());
postDelete(shardId, delete, result.getFailure());
}
}
@Override
public void postDelete(Engine.Delete delete, Exception ex) {
public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {
assertThat(shardId, is(randomShardId));
postDeleteException.incrementAndGet();
}
};
IndexingOperationListener throwingListener = new IndexingOperationListener() {
@Override
public Engine.Index preIndex(Engine.Index operation) {
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
throw new RuntimeException();
}
@Override
public void postIndex(Engine.Index index, Engine.IndexResult result) {
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
throw new RuntimeException();
}
@Override
public void postIndex(Engine.Index index, Exception ex) {
public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
throw new RuntimeException();
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
throw new RuntimeException();
}
@Override
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {
throw new RuntimeException();
}
@Override
public void postDelete(Engine.Delete delete, Exception ex) {
public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) {
throw new RuntimeException();
}
};
@ -120,10 +130,11 @@ public class IndexingOperationListenerTests extends ESTestCase{
}
}
Collections.shuffle(indexingOperationListeners, random());
IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger);
IndexingOperationListener.CompositeListener compositeListener =
new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger);
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
compositeListener.postDelete(delete, new Engine.DeleteResult(1, SequenceNumbersService.UNASSIGNED_SEQ_NO, true));
compositeListener.postDelete(randomShardId, delete, new Engine.DeleteResult(1, SequenceNumbersService.UNASSIGNED_SEQ_NO, true));
assertEquals(0, preIndex.get());
assertEquals(0, postIndex.get());
assertEquals(0, postIndexException.get());
@ -131,7 +142,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
assertEquals(2, postDelete.get());
assertEquals(0, postDeleteException.get());
compositeListener.postDelete(delete, new RuntimeException());
compositeListener.postDelete(randomShardId, delete, new RuntimeException());
assertEquals(0, preIndex.get());
assertEquals(0, postIndex.get());
assertEquals(0, postIndexException.get());
@ -139,7 +150,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
compositeListener.preDelete(delete);
compositeListener.preDelete(randomShardId, delete);
assertEquals(0, preIndex.get());
assertEquals(0, postIndex.get());
assertEquals(0, postIndexException.get());
@ -147,7 +158,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
compositeListener.postIndex(index, new Engine.IndexResult(0, SequenceNumbersService.UNASSIGNED_SEQ_NO, false));
compositeListener.postIndex(randomShardId, index, new Engine.IndexResult(0, SequenceNumbersService.UNASSIGNED_SEQ_NO, false));
assertEquals(0, preIndex.get());
assertEquals(2, postIndex.get());
assertEquals(0, postIndexException.get());
@ -155,7 +166,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
compositeListener.postIndex(index, new RuntimeException());
compositeListener.postIndex(randomShardId, index, new RuntimeException());
assertEquals(0, preIndex.get());
assertEquals(2, postIndex.get());
assertEquals(2, postIndexException.get());
@ -163,7 +174,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
compositeListener.preIndex(index);
compositeListener.preIndex(randomShardId, index);
assertEquals(2, preIndex.get());
assertEquals(2, postIndex.get());
assertEquals(2, postIndexException.get());

View File

@ -29,6 +29,7 @@ import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.Engine.Operation.Origin;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.ingest.IngestTestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskInfo;
@ -269,12 +270,12 @@ public class CancelTests extends ReindexTestCase {
public static class BlockingOperationListener implements IndexingOperationListener {
@Override
public Engine.Index preIndex(Engine.Index index) {
public Engine.Index preIndex(ShardId shardId, Engine.Index index) {
return preCheck(index, index.type());
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
return preCheck(delete, delete.type());
}