Generify index shard method to execute engine write operation
Now index and delete methods in index shard share code for indexing stats. This commit collapses seperate methods for index and delete operations into a generic execute method for performing engine write operations. As an added benefit, this commit cleans up the interface for indexing operation listener making it more simple and concise to use.
This commit is contained in:
parent
71dc4178b9
commit
1bdeada8aa
|
@ -57,7 +57,8 @@ import org.elasticsearch.transport.TransportService;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.action.delete.TransportDeleteAction.*;
|
||||
import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnPrimary;
|
||||
import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnReplica;
|
||||
import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary;
|
||||
import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException;
|
||||
import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException;
|
||||
|
|
|
@ -129,7 +129,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
|
|||
|
||||
public static PrimaryOperationResult<DeleteResponse> executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
|
||||
Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
|
||||
primary.delete(delete);
|
||||
primary.execute(delete);
|
||||
if (delete.hasFailure()) {
|
||||
return new PrimaryOperationResult<>(delete.getFailure());
|
||||
} else {
|
||||
|
@ -145,7 +145,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
|
|||
|
||||
public static ReplicaOperationResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
|
||||
Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
|
||||
replica.delete(delete);
|
||||
replica.execute(delete);
|
||||
return delete.hasFailure()
|
||||
? new ReplicaOperationResult(delete.getFailure())
|
||||
: new ReplicaOperationResult(delete.getTranslogLocation());
|
||||
|
|
|
@ -153,14 +153,14 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
* Execute the given {@link IndexRequest} on a replica shard, throwing a
|
||||
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
|
||||
*/
|
||||
public static ReplicaOperationResult executeIndexRequestOnReplica(IndexRequest request, IndexShard indexShard) {
|
||||
final ShardId shardId = indexShard.shardId();
|
||||
public static ReplicaOperationResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) {
|
||||
final ShardId shardId = replica.shardId();
|
||||
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
|
||||
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
|
||||
|
||||
final Engine.Index operation;
|
||||
try {
|
||||
operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
|
||||
operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
|
||||
} catch (MapperParsingException | IllegalArgumentException e) {
|
||||
return new ReplicaOperationResult(e);
|
||||
}
|
||||
|
@ -168,7 +168,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
if (update != null) {
|
||||
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
|
||||
}
|
||||
indexShard.index(operation);
|
||||
replica.execute(operation);
|
||||
if (operation.hasFailure()) {
|
||||
return new ReplicaOperationResult(operation.getFailure());
|
||||
} else {
|
||||
|
@ -177,28 +177,28 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
}
|
||||
|
||||
/** Utility method to prepare an index operation on primary shards */
|
||||
static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard indexShard) {
|
||||
static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
|
||||
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source())
|
||||
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
|
||||
return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
|
||||
return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
|
||||
}
|
||||
|
||||
public static PrimaryOperationResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard,
|
||||
public static PrimaryOperationResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
|
||||
MappingUpdatedAction mappingUpdatedAction) throws Exception {
|
||||
Engine.Index operation;
|
||||
try {
|
||||
operation = prepareIndexOperationOnPrimary(request, indexShard);
|
||||
operation = prepareIndexOperationOnPrimary(request, primary);
|
||||
} catch (MapperParsingException | IllegalArgumentException e) {
|
||||
return new PrimaryOperationResult<>(e);
|
||||
}
|
||||
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
|
||||
final ShardId shardId = indexShard.shardId();
|
||||
final ShardId shardId = primary.shardId();
|
||||
if (update != null) {
|
||||
try {
|
||||
// can throw timeout exception when updating mappings or ISE for attempting to update default mappings
|
||||
// which are bubbled up
|
||||
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
|
||||
operation = prepareIndexOperationOnPrimary(request, indexShard);
|
||||
operation = prepareIndexOperationOnPrimary(request, primary);
|
||||
} catch (MapperParsingException | IllegalArgumentException e) {
|
||||
return new PrimaryOperationResult<>(e);
|
||||
}
|
||||
|
@ -208,7 +208,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
"Dynamic mappings are not available on the node that holds the primary yet");
|
||||
}
|
||||
}
|
||||
indexShard.index(operation);
|
||||
primary.execute(operation);
|
||||
if (operation.hasFailure()) {
|
||||
return new PrimaryOperationResult<>(operation.getFailure());
|
||||
} else {
|
||||
|
|
|
@ -133,23 +133,20 @@ public final class IndexingSlowLog implements IndexingOperationListener {
|
|||
this.reformat = reformat;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, boolean created) {
|
||||
final long took = index.endTime() - index.startTime();
|
||||
postIndexing(index.parsedDoc(), took);
|
||||
}
|
||||
|
||||
|
||||
private void postIndexing(ParsedDocument doc, long tookInNanos) {
|
||||
if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) {
|
||||
indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
|
||||
} else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) {
|
||||
indexLogger.info("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
|
||||
} else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) {
|
||||
indexLogger.debug("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
|
||||
} else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) {
|
||||
indexLogger.trace("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
|
||||
public void postOperation(Engine.Operation operation) {
|
||||
if (operation.operationType() == Engine.Operation.TYPE.INDEX) {
|
||||
final long tookInNanos = operation.endTime() - operation.startTime();
|
||||
ParsedDocument doc = ((Engine.Index) operation).parsedDoc();
|
||||
if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) {
|
||||
indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
|
||||
} else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) {
|
||||
indexLogger.info("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
|
||||
} else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) {
|
||||
indexLogger.debug("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
|
||||
} else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) {
|
||||
indexLogger.trace("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -880,11 +880,13 @@ public abstract class Engine implements Closeable {
|
|||
return this.endTime;
|
||||
}
|
||||
|
||||
abstract String type();
|
||||
public abstract String type();
|
||||
|
||||
abstract String id();
|
||||
|
||||
abstract TYPE operationType();
|
||||
public abstract TYPE operationType();
|
||||
|
||||
public abstract String toString();
|
||||
}
|
||||
|
||||
public static class Index extends Operation {
|
||||
|
@ -925,7 +927,7 @@ public abstract class Engine implements Closeable {
|
|||
}
|
||||
|
||||
@Override
|
||||
TYPE operationType() {
|
||||
public TYPE operationType() {
|
||||
return TYPE.INDEX;
|
||||
}
|
||||
|
||||
|
@ -989,6 +991,10 @@ public abstract class Engine implements Closeable {
|
|||
return isRetry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "index [{" + type() + "}][{" + id()+ "}] [{" + docs() + "}]";
|
||||
}
|
||||
}
|
||||
|
||||
public static class Delete extends Operation {
|
||||
|
@ -1023,10 +1029,15 @@ public abstract class Engine implements Closeable {
|
|||
}
|
||||
|
||||
@Override
|
||||
TYPE operationType() {
|
||||
public TYPE operationType() {
|
||||
return TYPE.DELETE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "delete [{"+ uid().text() +"}]";
|
||||
}
|
||||
|
||||
public void updateVersion(long version, boolean found) {
|
||||
updateVersion(version);
|
||||
this.found = found;
|
||||
|
|
|
@ -536,29 +536,36 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
return new Engine.Index(uid, doc, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
|
||||
}
|
||||
|
||||
public void index(Engine.Index index) {
|
||||
ensureWriteAllowed(index);
|
||||
public void execute(Engine.Operation operation) {
|
||||
ensureWriteAllowed(operation);
|
||||
Engine engine = getEngine();
|
||||
index(engine, index);
|
||||
execute(engine, operation);
|
||||
}
|
||||
|
||||
private void index(Engine engine, Engine.Index index) {
|
||||
private void execute(Engine engine, Engine.Operation operation) {
|
||||
active.set(true);
|
||||
index = indexingOperationListeners.preIndex(index);
|
||||
indexingOperationListeners.preOperation(operation);
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
|
||||
logger.trace(operation.toString());
|
||||
}
|
||||
engine.index(index);
|
||||
index.endTime(System.nanoTime());
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
engine.index(((Engine.Index) operation));
|
||||
break;
|
||||
case DELETE:
|
||||
engine.delete(((Engine.Delete) operation));
|
||||
break;
|
||||
}
|
||||
operation.endTime(System.nanoTime());
|
||||
} catch (Exception e) {
|
||||
indexingOperationListeners.postIndex(index, e);
|
||||
indexingOperationListeners.postOperation(operation, e);
|
||||
throw e;
|
||||
}
|
||||
if (index.hasFailure()) {
|
||||
indexingOperationListeners.postIndex(index, index.getFailure());
|
||||
if (operation.hasFailure()) {
|
||||
indexingOperationListeners.postOperation(operation, operation.getFailure());
|
||||
} else {
|
||||
indexingOperationListeners.postIndex(index, index.isCreated());
|
||||
indexingOperationListeners.postOperation(operation);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -584,32 +591,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
return new Engine.Delete(type, id, uid, version, versionType, origin, startTime, false);
|
||||
}
|
||||
|
||||
public void delete(Engine.Delete delete) {
|
||||
ensureWriteAllowed(delete);
|
||||
Engine engine = getEngine();
|
||||
delete(engine, delete);
|
||||
}
|
||||
|
||||
private void delete(Engine engine, Engine.Delete delete) {
|
||||
active.set(true);
|
||||
delete = indexingOperationListeners.preDelete(delete);
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("delete [{}]", delete.uid().text());
|
||||
}
|
||||
engine.delete(delete);
|
||||
delete.endTime(System.nanoTime());
|
||||
} catch (Exception e) {
|
||||
indexingOperationListeners.postDelete(delete, e);
|
||||
throw e;
|
||||
}
|
||||
if (delete.hasFailure()) {
|
||||
indexingOperationListeners.postDelete(delete, delete.getFailure());
|
||||
} else {
|
||||
indexingOperationListeners.postDelete(delete);
|
||||
}
|
||||
}
|
||||
|
||||
public Engine.GetResult get(Engine.Get get) {
|
||||
readAllowed();
|
||||
return getEngine().get(get, this::acquireSearcher);
|
||||
|
@ -1841,12 +1822,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
|
||||
@Override
|
||||
protected void index(Engine engine, Engine.Index engineIndex) {
|
||||
IndexShard.this.index(engine, engineIndex);
|
||||
IndexShard.this.execute(engine, engineIndex);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void delete(Engine engine, Engine.Delete engineDelete) {
|
||||
IndexShard.this.delete(engine, engineDelete);
|
||||
IndexShard.this.execute(engine, engineDelete);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,49 +25,19 @@ import org.elasticsearch.index.engine.Engine;
|
|||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* An indexing listener for indexing, delete, events.
|
||||
*/
|
||||
/** An engine operation listener for index and delete execution. */
|
||||
public interface IndexingOperationListener {
|
||||
|
||||
/**
|
||||
* Called before the indexing occurs.
|
||||
*/
|
||||
default Engine.Index preIndex(Engine.Index operation) {
|
||||
return operation;
|
||||
}
|
||||
/** Called before executing index or delete operation */
|
||||
default void preOperation(Engine.Operation operation) {}
|
||||
|
||||
/**
|
||||
* Called after the indexing operation occurred.
|
||||
*/
|
||||
default void postIndex(Engine.Index index, boolean created) {}
|
||||
/** Called after executing index or delete operation */
|
||||
default void postOperation(Engine.Operation operation) {}
|
||||
|
||||
/**
|
||||
* Called after the indexing operation occurred with exception.
|
||||
*/
|
||||
default void postIndex(Engine.Index index, Exception ex) {}
|
||||
/** Called after index or delete operation failed with exception */
|
||||
default void postOperation(Engine.Operation operation, Exception ex) {}
|
||||
|
||||
/**
|
||||
* Called before the delete occurs.
|
||||
*/
|
||||
default Engine.Delete preDelete(Engine.Delete delete) {
|
||||
return delete;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Called after the delete operation occurred.
|
||||
*/
|
||||
default void postDelete(Engine.Delete delete) {}
|
||||
|
||||
/**
|
||||
* Called after the delete operation occurred with exception.
|
||||
*/
|
||||
default void postDelete(Engine.Delete delete, Exception ex) {}
|
||||
|
||||
/**
|
||||
* A Composite listener that multiplexes calls to each of the listeners methods.
|
||||
*/
|
||||
/** A Composite listener that multiplexes calls to each of the listeners methods. */
|
||||
final class CompositeListener implements IndexingOperationListener{
|
||||
private final List<IndexingOperationListener> listeners;
|
||||
private final Logger logger;
|
||||
|
@ -78,79 +48,40 @@ public interface IndexingOperationListener {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Engine.Index preIndex(Engine.Index operation) {
|
||||
public void preOperation(Engine.Operation operation) {
|
||||
assert operation != null;
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
try {
|
||||
listener.preIndex(operation);
|
||||
listener.preOperation(operation);
|
||||
} catch (Exception e) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preIndex listener [{}] failed", listener), e);
|
||||
}
|
||||
}
|
||||
return operation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, boolean created) {
|
||||
assert index != null;
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
try {
|
||||
listener.postIndex(index, created);
|
||||
} catch (Exception e) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), e);
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preOperation listener [{}] failed", listener), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, Exception ex) {
|
||||
assert index != null && ex != null;
|
||||
public void postOperation(Engine.Operation operation) {
|
||||
assert operation != null;
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
try {
|
||||
listener.postIndex(index, ex);
|
||||
listener.postOperation(operation);
|
||||
} catch (Exception e) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postOperation listener [{}] failed", listener), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postOperation(Engine.Operation operation, Exception ex) {
|
||||
assert operation != null && ex != null;
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
try {
|
||||
listener.postOperation(operation, ex);
|
||||
} catch (Exception inner) {
|
||||
inner.addSuppressed(ex);
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), inner);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||
assert delete != null;
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
try {
|
||||
listener.preDelete(delete);
|
||||
} catch (Exception e) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preDelete listener [{}] failed", listener), e);
|
||||
}
|
||||
}
|
||||
return delete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete) {
|
||||
assert delete != null;
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
try {
|
||||
listener.postDelete(delete);
|
||||
} catch (Exception e) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete, Exception ex) {
|
||||
assert delete != null && ex != null;
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
try {
|
||||
listener.postDelete(delete, ex);
|
||||
} catch (Exception inner) {
|
||||
inner.addSuppressed(ex);
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), inner);
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postOperation listener [{}] failed", listener), inner);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -65,63 +65,60 @@ final class InternalIndexingStats implements IndexingOperationListener {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Engine.Index preIndex(Engine.Index operation) {
|
||||
public void preOperation(Engine.Operation operation) {
|
||||
if (!operation.origin().isRecovery()) {
|
||||
totalStats.indexCurrent.inc();
|
||||
typeStats(operation.type()).indexCurrent.inc();
|
||||
}
|
||||
return operation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, boolean created) {
|
||||
if (!index.origin().isRecovery()) {
|
||||
long took = index.endTime() - index.startTime();
|
||||
totalStats.indexMetric.inc(took);
|
||||
totalStats.indexCurrent.dec();
|
||||
StatsHolder typeStats = typeStats(index.type());
|
||||
typeStats.indexMetric.inc(took);
|
||||
typeStats.indexCurrent.dec();
|
||||
StatsHolder statsHolder = typeStats(operation.type());
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
totalStats.indexCurrent.inc();
|
||||
statsHolder.indexCurrent.inc();
|
||||
break;
|
||||
case DELETE:
|
||||
totalStats.deleteCurrent.inc();
|
||||
statsHolder.deleteCurrent.inc();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, Exception ex) {
|
||||
if (!index.origin().isRecovery()) {
|
||||
totalStats.indexCurrent.dec();
|
||||
typeStats(index.type()).indexCurrent.dec();
|
||||
totalStats.indexFailed.inc();
|
||||
typeStats(index.type()).indexFailed.inc();
|
||||
public void postOperation(Engine.Operation operation) {
|
||||
if (!operation.origin().isRecovery()) {
|
||||
long took = operation.endTime() - operation.startTime();
|
||||
StatsHolder typeStats = typeStats(operation.type());
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
totalStats.indexMetric.inc(took);
|
||||
totalStats.indexCurrent.dec();
|
||||
typeStats.indexMetric.inc(took);
|
||||
typeStats.indexCurrent.dec();
|
||||
break;
|
||||
case DELETE:
|
||||
totalStats.deleteMetric.inc(took);
|
||||
totalStats.deleteCurrent.dec();
|
||||
typeStats.deleteMetric.inc(took);
|
||||
typeStats.deleteCurrent.dec();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||
if (!delete.origin().isRecovery()) {
|
||||
totalStats.deleteCurrent.inc();
|
||||
typeStats(delete.type()).deleteCurrent.inc();
|
||||
}
|
||||
return delete;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete) {
|
||||
if (!delete.origin().isRecovery()) {
|
||||
long took = delete.endTime() - delete.startTime();
|
||||
totalStats.deleteMetric.inc(took);
|
||||
totalStats.deleteCurrent.dec();
|
||||
StatsHolder typeStats = typeStats(delete.type());
|
||||
typeStats.deleteMetric.inc(took);
|
||||
typeStats.deleteCurrent.dec();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete, Exception ex) {
|
||||
if (!delete.origin().isRecovery()) {
|
||||
totalStats.deleteCurrent.dec();
|
||||
typeStats(delete.type()).deleteCurrent.dec();
|
||||
public void postOperation(Engine.Operation operation, Exception ex) {
|
||||
if (!operation.origin().isRecovery()) {
|
||||
StatsHolder statsHolder = typeStats(operation.type());
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
totalStats.indexCurrent.dec();
|
||||
statsHolder.indexCurrent.dec();
|
||||
totalStats.indexFailed.inc();
|
||||
statsHolder.indexFailed.inc();
|
||||
break;
|
||||
case DELETE:
|
||||
totalStats.deleteCurrent.dec();
|
||||
statsHolder.deleteCurrent.dec();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -158,10 +155,5 @@ final class InternalIndexingStats implements IndexingOperationListener {
|
|||
deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(),
|
||||
noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis));
|
||||
}
|
||||
|
||||
void clear() {
|
||||
indexMetric.clear();
|
||||
deleteMetric.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -189,11 +189,6 @@ public class IndexingMemoryController extends AbstractComponent implements Index
|
|||
statusChecker.run();
|
||||
}
|
||||
|
||||
/** called by IndexShard to record that this many bytes were written to translog */
|
||||
public void bytesWritten(int bytes) {
|
||||
statusChecker.bytesWritten(bytes);
|
||||
}
|
||||
|
||||
/** Asks this shard to throttle indexing to one thread */
|
||||
protected void activateThrottling(IndexShard shard) {
|
||||
shard.activateThrottling();
|
||||
|
@ -205,17 +200,8 @@ public class IndexingMemoryController extends AbstractComponent implements Index
|
|||
}
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, boolean created) {
|
||||
recordOperationBytes(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete) {
|
||||
recordOperationBytes(delete);
|
||||
}
|
||||
|
||||
private void recordOperationBytes(Engine.Operation op) {
|
||||
bytesWritten(op.sizeInBytes());
|
||||
public void postOperation(Engine.Operation operation) {
|
||||
statusChecker.bytesWritten(operation.sizeInBytes());
|
||||
}
|
||||
|
||||
private static final class ShardAndBytesUsed implements Comparable<ShardAndBytesUsed> {
|
||||
|
|
|
@ -233,9 +233,10 @@ public class IndexModuleTests extends ESTestCase {
|
|||
AtomicBoolean executed = new AtomicBoolean(false);
|
||||
IndexingOperationListener listener = new IndexingOperationListener() {
|
||||
@Override
|
||||
public Engine.Index preIndex(Engine.Index operation) {
|
||||
executed.set(true);
|
||||
return operation;
|
||||
public void preOperation(Engine.Operation operation) {
|
||||
if (operation.operationType() == Engine.Operation.TYPE.INDEX) {
|
||||
executed.set(true);
|
||||
}
|
||||
}
|
||||
};
|
||||
module.addIndexOperationListener(listener);
|
||||
|
@ -251,7 +252,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
|
||||
for (IndexingOperationListener l : indexService.getIndexOperationListeners()) {
|
||||
l.preIndex(index);
|
||||
l.preOperation(index);
|
||||
}
|
||||
assertTrue(executed.get());
|
||||
indexService.close("simon says", false);
|
||||
|
|
|
@ -219,7 +219,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
|
|||
assertEquals("b", fields[1].stringValue());
|
||||
|
||||
IndexShard shard = indexService.getShard(0);
|
||||
shard.index(new Engine.Index(new Term("_uid", "1"), doc));
|
||||
shard.execute(new Engine.Index(new Term("_uid", "1"), doc));
|
||||
shard.refresh("test");
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
|
||||
|
@ -258,7 +258,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
|
|||
assertEquals("b", fields[1].stringValue());
|
||||
|
||||
IndexShard shard = indexService.getShard(0);
|
||||
shard.index(new Engine.Index(new Term("_uid", "1"), doc));
|
||||
shard.execute(new Engine.Index(new Term("_uid", "1"), doc));
|
||||
shard.refresh("test");
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
|
||||
|
|
|
@ -321,7 +321,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
|||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(),
|
||||
new BytesArray(new byte[]{1}), null);
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
|
||||
shard.index(index);
|
||||
shard.execute(index);
|
||||
assertTrue(shard.shouldFlush());
|
||||
assertEquals(2, shard.getEngine().getTranslog().totalOperations());
|
||||
client().prepareIndex("test", "test", "2").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
|
||||
|
@ -406,23 +406,8 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
|||
AtomicReference<IndexShard> shardRef = new AtomicReference<>();
|
||||
List<Exception> failures = new ArrayList<>();
|
||||
IndexingOperationListener listener = new IndexingOperationListener() {
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, boolean created) {
|
||||
try {
|
||||
assertNotNull(shardRef.get());
|
||||
// this is all IMC needs to do - check current memory and refresh
|
||||
assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0);
|
||||
shardRef.get().refresh("test");
|
||||
} catch (Exception e) {
|
||||
failures.add(e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete) {
|
||||
public void postOperation(Engine.Operation operation) {
|
||||
try {
|
||||
assertNotNull(shardRef.get());
|
||||
// this is all IMC needs to do - check current memory and refresh
|
||||
|
|
|
@ -560,40 +560,43 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
shard.close("simon says", true);
|
||||
shard = reinitShard(shard, new IndexingOperationListener() {
|
||||
@Override
|
||||
public Engine.Index preIndex(Engine.Index operation) {
|
||||
preIndex.incrementAndGet();
|
||||
return operation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, boolean created) {
|
||||
if (created) {
|
||||
postIndexCreate.incrementAndGet();
|
||||
} else {
|
||||
postIndexUpdate.incrementAndGet();
|
||||
public void preOperation(Engine.Operation operation) {
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
preIndex.incrementAndGet();
|
||||
break;
|
||||
case DELETE:
|
||||
preDelete.incrementAndGet();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, Exception ex) {
|
||||
postIndexException.incrementAndGet();
|
||||
public void postOperation(Engine.Operation operation) {
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
if (((Engine.Index) operation).isCreated()) {
|
||||
postIndexCreate.incrementAndGet();
|
||||
} else {
|
||||
postIndexUpdate.incrementAndGet();
|
||||
}
|
||||
break;
|
||||
case DELETE:
|
||||
postDelete.incrementAndGet();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||
preDelete.incrementAndGet();
|
||||
return delete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete) {
|
||||
postDelete.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete, Exception ex) {
|
||||
postDeleteException.incrementAndGet();
|
||||
|
||||
public void postOperation(Engine.Operation operation, Exception ex) {
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
postIndexException.incrementAndGet();
|
||||
break;
|
||||
case DELETE:
|
||||
postDeleteException.incrementAndGet();
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
recoveryShardFromStore(shard);
|
||||
|
@ -601,7 +604,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(),
|
||||
new BytesArray(new byte[]{1}), null);
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
|
||||
shard.index(index);
|
||||
shard.execute(index);
|
||||
assertEquals(1, preIndex.get());
|
||||
assertEquals(1, postIndexCreate.get());
|
||||
assertEquals(0, postIndexUpdate.get());
|
||||
|
@ -610,7 +613,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
assertEquals(0, postDelete.get());
|
||||
assertEquals(0, postDeleteException.get());
|
||||
|
||||
shard.index(index);
|
||||
shard.execute(index);
|
||||
assertEquals(2, preIndex.get());
|
||||
assertEquals(1, postIndexCreate.get());
|
||||
assertEquals(1, postIndexUpdate.get());
|
||||
|
@ -620,7 +623,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
assertEquals(0, postDeleteException.get());
|
||||
|
||||
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
|
||||
shard.delete(delete);
|
||||
shard.execute(delete);
|
||||
|
||||
assertEquals(2, preIndex.get());
|
||||
assertEquals(1, postIndexCreate.get());
|
||||
|
@ -634,7 +637,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
shard.state = IndexShardState.STARTED; // It will generate exception
|
||||
|
||||
try {
|
||||
shard.index(index);
|
||||
shard.execute(index);
|
||||
fail();
|
||||
} catch (IllegalIndexShardStateException e) {
|
||||
|
||||
|
@ -648,7 +651,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
assertEquals(1, postDelete.get());
|
||||
assertEquals(0, postDeleteException.get());
|
||||
try {
|
||||
shard.delete(delete);
|
||||
shard.execute(delete);
|
||||
fail();
|
||||
} catch (IllegalIndexShardStateException e) {
|
||||
|
||||
|
@ -1123,26 +1126,27 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
final AtomicInteger postDelete = new AtomicInteger();
|
||||
IndexingOperationListener listener = new IndexingOperationListener() {
|
||||
@Override
|
||||
public Engine.Index preIndex(Engine.Index operation) {
|
||||
preIndex.incrementAndGet();
|
||||
return operation;
|
||||
public void preOperation(Engine.Operation operation) {
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
preIndex.incrementAndGet();
|
||||
break;
|
||||
case DELETE:
|
||||
preDelete.incrementAndGet();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, boolean created) {
|
||||
postIndex.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||
preDelete.incrementAndGet();
|
||||
return delete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete) {
|
||||
postDelete.incrementAndGet();
|
||||
|
||||
public void postOperation(Engine.Operation operation) {
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
postIndex.incrementAndGet();
|
||||
break;
|
||||
case DELETE:
|
||||
postDelete.incrementAndGet();
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
final IndexShard newShard = reinitShard(shard, listener);
|
||||
|
|
|
@ -40,63 +40,55 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|||
AtomicInteger postDeleteException = new AtomicInteger();
|
||||
IndexingOperationListener listener = new IndexingOperationListener() {
|
||||
@Override
|
||||
public Engine.Index preIndex(Engine.Index operation) {
|
||||
preIndex.incrementAndGet();
|
||||
return operation;
|
||||
public void preOperation(Engine.Operation operation) {
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
preIndex.incrementAndGet();
|
||||
break;
|
||||
case DELETE:
|
||||
preDelete.incrementAndGet();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, boolean created) {
|
||||
postIndex.incrementAndGet();
|
||||
public void postOperation(Engine.Operation operation) {
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
postIndex.incrementAndGet();
|
||||
break;
|
||||
case DELETE:
|
||||
postDelete.incrementAndGet();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, Exception ex) {
|
||||
postIndexException.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||
preDelete.incrementAndGet();
|
||||
return delete;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete) {
|
||||
postDelete.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete, Exception ex) {
|
||||
postDeleteException.incrementAndGet();
|
||||
public void postOperation(Engine.Operation operation, Exception ex) {
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
postIndexException.incrementAndGet();
|
||||
break;
|
||||
case DELETE:
|
||||
postDeleteException.incrementAndGet();
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
IndexingOperationListener throwingListener = new IndexingOperationListener() {
|
||||
@Override
|
||||
public Engine.Index preIndex(Engine.Index operation) {
|
||||
public void preOperation(Engine.Operation operation) {
|
||||
throw new RuntimeException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, boolean created) {
|
||||
throw new RuntimeException(); }
|
||||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, Exception ex) {
|
||||
throw new RuntimeException(); }
|
||||
|
||||
@Override
|
||||
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||
public void postOperation(Engine.Operation operation) {
|
||||
throw new RuntimeException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete) {
|
||||
throw new RuntimeException(); }
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete, Exception ex) {
|
||||
public void postOperation(Engine.Operation operation, Exception ex) {
|
||||
throw new RuntimeException();
|
||||
}
|
||||
};
|
||||
|
@ -111,7 +103,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|||
IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger);
|
||||
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
|
||||
compositeListener.postDelete(delete);
|
||||
compositeListener.postOperation(delete);
|
||||
assertEquals(0, preIndex.get());
|
||||
assertEquals(0, postIndex.get());
|
||||
assertEquals(0, postIndexException.get());
|
||||
|
@ -119,7 +111,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|||
assertEquals(2, postDelete.get());
|
||||
assertEquals(0, postDeleteException.get());
|
||||
|
||||
compositeListener.postDelete(delete, new RuntimeException());
|
||||
compositeListener.postOperation(delete, new RuntimeException());
|
||||
assertEquals(0, preIndex.get());
|
||||
assertEquals(0, postIndex.get());
|
||||
assertEquals(0, postIndexException.get());
|
||||
|
@ -127,7 +119,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|||
assertEquals(2, postDelete.get());
|
||||
assertEquals(2, postDeleteException.get());
|
||||
|
||||
compositeListener.preDelete(delete);
|
||||
compositeListener.preOperation(delete);
|
||||
assertEquals(0, preIndex.get());
|
||||
assertEquals(0, postIndex.get());
|
||||
assertEquals(0, postIndexException.get());
|
||||
|
@ -135,7 +127,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|||
assertEquals(2, postDelete.get());
|
||||
assertEquals(2, postDeleteException.get());
|
||||
|
||||
compositeListener.postIndex(index, false);
|
||||
compositeListener.postOperation(index);
|
||||
assertEquals(0, preIndex.get());
|
||||
assertEquals(2, postIndex.get());
|
||||
assertEquals(0, postIndexException.get());
|
||||
|
@ -143,7 +135,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|||
assertEquals(2, postDelete.get());
|
||||
assertEquals(2, postDeleteException.get());
|
||||
|
||||
compositeListener.postIndex(index, new RuntimeException());
|
||||
compositeListener.postOperation(index, new RuntimeException());
|
||||
assertEquals(0, preIndex.get());
|
||||
assertEquals(2, postIndex.get());
|
||||
assertEquals(2, postIndexException.get());
|
||||
|
@ -151,7 +143,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|||
assertEquals(2, postDelete.get());
|
||||
assertEquals(2, postDeleteException.get());
|
||||
|
||||
compositeListener.preIndex(index);
|
||||
compositeListener.preOperation(index);
|
||||
assertEquals(2, preIndex.get());
|
||||
assertEquals(2, postIndex.get());
|
||||
assertEquals(2, postIndexException.get());
|
||||
|
|
|
@ -197,25 +197,14 @@ public class CancelTests extends ReindexTestCase {
|
|||
}
|
||||
|
||||
public static class BlockingOperationListener implements IndexingOperationListener {
|
||||
|
||||
@Override
|
||||
public Engine.Index preIndex(Engine.Index index) {
|
||||
return preCheck(index, index.type());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||
return preCheck(delete, delete.type());
|
||||
}
|
||||
|
||||
private <T extends Engine.Operation> T preCheck(T operation, String type) {
|
||||
if ((TYPE.equals(type) == false) || (operation.origin() != Origin.PRIMARY)) {
|
||||
return operation;
|
||||
public void preOperation(Engine.Operation operation) {
|
||||
if ((TYPE.equals(operation.type()) == false) || (operation.origin() != Origin.PRIMARY)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) {
|
||||
return operation;
|
||||
return;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
|
|
@ -451,7 +451,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)),
|
||||
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
|
||||
}
|
||||
shard.index(index);
|
||||
shard.execute(index);
|
||||
return index;
|
||||
}
|
||||
|
||||
|
@ -462,7 +462,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
|||
} else {
|
||||
delete = shard.prepareDeleteOnPrimary(type, id, 1, VersionType.EXTERNAL);
|
||||
}
|
||||
shard.delete(delete);
|
||||
shard.execute(delete);
|
||||
return delete;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue