add operation result as a parameter to postIndex/delete in indexing operation listener

This commit is contained in:
Areek Zillur 2016-10-25 09:12:39 -04:00
parent 1587a77ffd
commit 1aee578aa1
10 changed files with 53 additions and 54 deletions

View File

@ -242,7 +242,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
translate = updateHelper.prepare(updateRequest, primary, threadPool::estimatedTimeInMillis);
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
updateOperationResult = new Engine.IndexResult(failure, updateRequest.version(), 0);
// we use index result to communicate failure while translating update request
updateOperationResult = new Engine.IndexResult(failure, updateRequest.version(), 0, 0);
break; // out of retry loop
}
// execute translated update request

View File

@ -133,15 +133,10 @@ public final class IndexingSlowLog implements IndexingOperationListener {
this.reformat = reformat;
}
@Override
public void postIndex(Engine.Index index, boolean created) {
final long took = index.endTime() - index.startTime();
postIndexing(index.parsedDoc(), took);
}
private void postIndexing(ParsedDocument doc, long tookInNanos) {
public void postIndex(Engine.Index indexOperation, Engine.IndexResult result) {
final ParsedDocument doc = indexOperation.parsedDoc();
final long tookInNanos = result.getTook();
if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) {
indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
} else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) {

View File

@ -422,7 +422,8 @@ public class InternalEngine extends Engine {
}
} catch (Exception e) {
Exception transientOperationFailure = handleOperationFailure(index, e);
result = new IndexResult(transientOperationFailure, index.version(), index.startTime() - System.nanoTime());
result = new IndexResult(transientOperationFailure, index.version(),
index.startTime() - System.nanoTime(), index.estimatedSizeInBytes());
}
return result;
}
@ -550,7 +551,7 @@ public class InternalEngine extends Engine {
final long expectedVersion = index.version();
if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) {
// skip index operation because of version conflict on recovery
return new IndexResult(null, expectedVersion, false, index.startTime() - System.nanoTime());
return new IndexResult(null, expectedVersion, false, index.startTime() - System.nanoTime(), index.estimatedSizeInBytes());
} else {
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);
@ -561,7 +562,7 @@ public class InternalEngine extends Engine {
update(index.uid(), index.docs(), indexWriter);
}
location = maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE);
return new IndexResult(location, updatedVersion, deleted, index.startTime() - System.nanoTime());
return new IndexResult(location, updatedVersion, deleted, index.startTime() - System.nanoTime(), index.estimatedSizeInBytes());
}
}
}
@ -591,7 +592,8 @@ public class InternalEngine extends Engine {
result = innerDelete(delete);
} catch (Exception e) {
Exception transientOperationFailure = handleOperationFailure(delete, e);
result = new DeleteResult(transientOperationFailure, delete.version(), delete.startTime() - System.nanoTime());
result = new DeleteResult(transientOperationFailure, delete.version(),
delete.startTime() - System.nanoTime(), delete.estimatedSizeInBytes());
}
maybePruneDeletedTombstones();
return result;
@ -626,12 +628,14 @@ public class InternalEngine extends Engine {
final long expectedVersion = delete.version();
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) {
// skip executing delete because of version conflict on recovery
return new DeleteResult(null, expectedVersion, true, delete.startTime() - System.nanoTime());
return new DeleteResult(null, expectedVersion, true,
delete.startTime() - System.nanoTime(), delete.estimatedSizeInBytes());
} else {
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
location = maybeAddToTranslog(delete, updatedVersion, Translog.Delete::new, DeleteVersionValue::new);
return new DeleteResult(location, updatedVersion, found, delete.startTime() - System.nanoTime());
return new DeleteResult(location, updatedVersion, found,
delete.startTime() - System.nanoTime(), delete.estimatedSizeInBytes());
}
}
}

View File

@ -40,7 +40,7 @@ public interface IndexingOperationListener {
/**
* Called after the indexing operation occurred.
*/
default void postIndex(Engine.Index index, boolean created) {}
default void postIndex(Engine.Index index, Engine.IndexResult result) {}
/**
* Called after the indexing operation occurred with exception.
@ -58,7 +58,7 @@ public interface IndexingOperationListener {
/**
* Called after the delete operation occurred.
*/
default void postDelete(Engine.Delete delete) {}
default void postDelete(Engine.Delete delete, Engine.DeleteResult result) {}
/**
* Called after the delete operation occurred with exception.
@ -91,11 +91,11 @@ public interface IndexingOperationListener {
}
@Override
public void postIndex(Engine.Index index, boolean created) {
public void postIndex(Engine.Index index, Engine.IndexResult result) {
assert index != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postIndex(index, created);
listener.postIndex(index, result);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), e);
}
@ -129,11 +129,11 @@ public interface IndexingOperationListener {
}
@Override
public void postDelete(Engine.Delete delete) {
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
assert delete != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postDelete(delete);
listener.postDelete(delete, result);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), e);
}

View File

@ -74,9 +74,9 @@ final class InternalIndexingStats implements IndexingOperationListener {
}
@Override
public void postIndex(Engine.Index index, boolean created) {
public void postIndex(Engine.Index index, Engine.IndexResult result) {
if (!index.origin().isRecovery()) {
long took = index.endTime() - index.startTime();
long took = result.getTook();
totalStats.indexMetric.inc(took);
totalStats.indexCurrent.dec();
StatsHolder typeStats = typeStats(index.type());
@ -106,9 +106,9 @@ final class InternalIndexingStats implements IndexingOperationListener {
}
@Override
public void postDelete(Engine.Delete delete) {
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
if (!delete.origin().isRecovery()) {
long took = delete.endTime() - delete.startTime();
long took = result.getTook();
totalStats.deleteMetric.inc(took);
totalStats.deleteCurrent.dec();
StatsHolder typeStats = typeStats(delete.type());

View File

@ -189,11 +189,6 @@ public class IndexingMemoryController extends AbstractComponent implements Index
statusChecker.run();
}
/** called by IndexShard to record that this many bytes were written to translog */
public void bytesWritten(int bytes) {
statusChecker.bytesWritten(bytes);
}
/** Asks this shard to throttle indexing to one thread */
protected void activateThrottling(IndexShard shard) {
shard.activateThrottling();
@ -205,17 +200,18 @@ public class IndexingMemoryController extends AbstractComponent implements Index
}
@Override
public void postIndex(Engine.Index index, boolean created) {
recordOperationBytes(index);
public void postIndex(Engine.Index index, Engine.IndexResult result) {
recordOperationBytes(result);
}
@Override
public void postDelete(Engine.Delete delete) {
recordOperationBytes(delete);
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
recordOperationBytes(result);
}
private void recordOperationBytes(Engine.Operation op) {
bytesWritten(op.sizeInBytes());
/** called by IndexShard to record that this many bytes were written to translog */
private void recordOperationBytes(Engine.Result result) {
statusChecker.bytesWritten(result.getSizeInBytes());
}
private static final class ShardAndBytesUsed implements Comparable<ShardAndBytesUsed> {

View File

@ -408,7 +408,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public void postIndex(Engine.Index index, boolean created) {
public void postIndex(Engine.Index index, Engine.IndexResult result) {
try {
assertNotNull(shardRef.get());
// this is all IMC needs to do - check current memory and refresh
@ -422,7 +422,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
@Override
public void postDelete(Engine.Delete delete) {
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
try {
assertNotNull(shardRef.get());
// this is all IMC needs to do - check current memory and refresh

View File

@ -564,8 +564,8 @@ public class IndexShardTests extends IndexShardTestCase {
}
@Override
public void postIndex(Engine.Index index, boolean created) {
if (created) {
public void postIndex(Engine.Index index, Engine.IndexResult result) {
if (result.isCreated()) {
postIndexCreate.incrementAndGet();
} else {
postIndexUpdate.incrementAndGet();
@ -584,7 +584,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
@Override
public void postDelete(Engine.Delete delete) {
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
postDelete.incrementAndGet();
}
@ -1127,7 +1127,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
@Override
public void postIndex(Engine.Index index, boolean created) {
public void postIndex(Engine.Index index, Engine.IndexResult result) {
postIndex.incrementAndGet();
}
@ -1138,7 +1138,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
@Override
public void postDelete(Engine.Delete delete) {
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
postDelete.incrementAndGet();
}

View File

@ -46,7 +46,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
}
@Override
public void postIndex(Engine.Index index, boolean created) {
public void postIndex(Engine.Index index, Engine.IndexResult result) {
postIndex.incrementAndGet();
}
@ -62,7 +62,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
}
@Override
public void postDelete(Engine.Delete delete) {
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
postDelete.incrementAndGet();
}
@ -79,12 +79,14 @@ public class IndexingOperationListenerTests extends ESTestCase{
}
@Override
public void postIndex(Engine.Index index, boolean created) {
throw new RuntimeException(); }
public void postIndex(Engine.Index index, Engine.IndexResult result) {
throw new RuntimeException();
}
@Override
public void postIndex(Engine.Index index, Exception ex) {
throw new RuntimeException(); }
throw new RuntimeException();
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
@ -92,8 +94,9 @@ public class IndexingOperationListenerTests extends ESTestCase{
}
@Override
public void postDelete(Engine.Delete delete) {
throw new RuntimeException(); }
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
throw new RuntimeException();
}
@Override
public void postDelete(Engine.Delete delete, Exception ex) {
@ -111,7 +114,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger);
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
compositeListener.postDelete(delete);
compositeListener.postDelete(delete, new Engine.DeleteResult(null, 1, true, 0, 0));
assertEquals(0, preIndex.get());
assertEquals(0, postIndex.get());
assertEquals(0, postIndexException.get());
@ -135,7 +138,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
compositeListener.postIndex(index, false);
compositeListener.postIndex(index, new Engine.IndexResult(null, 0, false, 0, 0));
assertEquals(0, preIndex.get());
assertEquals(2, postIndex.get());
assertEquals(0, postIndexException.get());

View File

@ -441,7 +441,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
}
protected Engine.Index indexDoc(IndexShard shard, String type, String id, String source) {
final Engine.Operation index;
final Engine.Index index;
if (shard.routingEntry().primary()) {
index = shard.prepareIndexOnPrimary(
SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)),
@ -452,7 +452,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
}
shard.index(index);
return ((Engine.Index) index);
return index;
}
protected Engine.Delete deleteDoc(IndexShard shard, String type, String id) {