Revert "Generify index shard method to execute engine write operation"

This reverts commit 1bdeada8aa.
This commit is contained in:
Areek Zillur 2016-10-25 09:11:16 -04:00
parent 7c11a2b732
commit 1587a77ffd
22 changed files with 842 additions and 730 deletions

View File

@ -57,8 +57,7 @@ import org.elasticsearch.transport.TransportService;
import java.util.Map;
import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnPrimary;
import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnReplica;
import static org.elasticsearch.action.delete.TransportDeleteAction.*;
import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary;
import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnReplica;
import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException;
@ -125,44 +124,44 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
BulkShardRequest request,
long[] preVersions, VersionType[] preVersionTypes,
Translog.Location location, int requestIndex) throws Exception {
preVersions[requestIndex] = request.items()[requestIndex].request().version();
preVersionTypes[requestIndex] = request.items()[requestIndex].request().versionType();
DocWriteRequest.OpType opType = request.items()[requestIndex].request().opType();
final DocWriteRequest itemRequest = request.items()[requestIndex].request();
preVersions[requestIndex] = itemRequest.version();
preVersionTypes[requestIndex] = itemRequest.versionType();
DocWriteRequest.OpType opType = itemRequest.opType();
try {
// execute item request
DocWriteRequest itemRequest = request.items()[requestIndex].request();
final Engine.Operation operation;
final Engine.Result operationResult;
final DocWriteResponse response;
switch (itemRequest.opType()) {
case CREATE:
case INDEX:
final IndexRequest indexRequest = (IndexRequest) itemRequest;
operation = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
response = operation.hasFailure() ? null
operationResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
response = operationResult.hasFailure() ? null
: new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
operation.version(), ((Engine.Index) operation).isCreated());
operationResult.getVersion(), ((Engine.IndexResult) operationResult).isCreated());
break;
case UPDATE:
UpdateResultHolder updateResultHolder = executeUpdateRequest(((UpdateRequest) itemRequest),
primary, metaData, request, requestIndex);
operation = updateResultHolder.operation;
operationResult = updateResultHolder.operationResult;
response = updateResultHolder.response;
break;
case DELETE:
final DeleteRequest deleteRequest = (DeleteRequest) itemRequest;
operation = executeDeleteRequestOnPrimary(deleteRequest, primary);
response = operation.hasFailure() ? null :
operationResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
response = operationResult.hasFailure() ? null :
new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(),
operation.version(), ((Engine.Delete) operation).found());
operationResult.getVersion(), ((Engine.DeleteResult) operationResult).isFound());
break;
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
}
// update the bulk item request because update request execution can mutate the bulk item request
BulkItemRequest item = request.items()[requestIndex];
if (operation == null // in case of a noop update operation
|| operation.hasFailure() == false) {
if (operation != null) {
location = locationToSync(location, operation.getTranslogLocation());
if (operationResult == null // in case of a noop update operation
|| operationResult.hasFailure() == false) {
if (operationResult != null) {
location = locationToSync(location, operationResult.getLocation());
} else {
assert response.getResult() == DocWriteResponse.Result.NOOP
: "only noop update can have null operation";
@ -171,7 +170,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
item.setPrimaryResponse(new BulkItemResponse(item.id(), opType, response));
} else {
DocWriteRequest docWriteRequest = item.request();
Exception failure = operation.getFailure();
Exception failure = operationResult.getFailure();
if (isConflictException(failure)) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
request.shardId(), docWriteRequest.opType().getLowercase(), request), failure);
@ -214,11 +213,11 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
private static class UpdateResultHolder {
final Engine.Operation operation;
final Engine.Result operationResult;
final DocWriteResponse response;
private UpdateResultHolder(Engine.Operation operation, DocWriteResponse response) {
this.operation = operation;
private UpdateResultHolder(Engine.Result operationResult, DocWriteResponse response) {
this.operationResult = operationResult;
this.response = response;
}
}
@ -233,7 +232,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, IndexShard primary,
IndexMetaData metaData, BulkShardRequest request,
int requestIndex) throws Exception {
Engine.Operation updateOperation = null;
Engine.Result updateOperationResult = null;
UpdateResponse updateResponse = null;
int maxAttempts = updateRequest.retryOnConflict();
for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) {
@ -243,8 +242,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
translate = updateHelper.prepare(updateRequest, primary, threadPool::estimatedTimeInMillis);
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
updateOperation = new Engine.Failure(updateRequest.type(), updateRequest.id(), updateRequest.version(),
updateRequest.versionType(), Engine.Operation.Origin.PRIMARY, System.nanoTime(), failure);
updateOperationResult = new Engine.IndexResult(failure, updateRequest.version(), 0);
break; // out of retry loop
}
// execute translated update request
@ -254,29 +252,29 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
IndexRequest indexRequest = translate.action();
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(mappingMd, allowIdGeneration, request.index());
updateOperation = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
updateOperationResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
break;
case DELETED:
updateOperation = executeDeleteRequestOnPrimary(translate.action(), primary);
updateOperationResult = executeDeleteRequestOnPrimary(translate.action(), primary);
break;
case NOOP:
primary.noopUpdate(updateRequest.type());
break;
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
}
if (updateOperation == null) {
if (updateOperationResult == null) {
// this is a noop operation
updateResponse = translate.action();
} else {
if (updateOperation.hasFailure() == false) {
if (updateOperationResult.hasFailure() == false) {
// enrich update response and
// set translated update (index/delete) request for replica execution in bulk items
switch (updateOperation.operationType()) {
switch (updateOperationResult.getOperationType()) {
case INDEX:
IndexRequest updateIndexRequest = translate.action();
final IndexResponse indexResponse = new IndexResponse(primary.shardId(),
updateIndexRequest.type(), updateIndexRequest.id(),
updateOperation.version(), ((Engine.Index) updateOperation).isCreated());
updateOperationResult.getVersion(), ((Engine.IndexResult) updateOperationResult).isCreated());
BytesReference indexSourceAsBytes = updateIndexRequest.source();
updateResponse = new UpdateResponse(indexResponse.getShardInfo(),
indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(),
@ -295,7 +293,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
DeleteRequest updateDeleteRequest = translate.action();
DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(),
updateDeleteRequest.type(), updateDeleteRequest.id(),
updateOperation.version(), ((Engine.Delete) updateOperation).found());
updateOperationResult.getVersion(), ((Engine.DeleteResult) updateOperationResult).isFound());
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(),
deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(),
deleteResponse.getVersion(), deleteResponse.getResult());
@ -305,19 +303,17 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
// replace the update request to the translated delete request to execute on the replica.
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
break;
case FAILURE:
break;
}
} else {
// version conflict exception, retry
if (updateOperation.getFailure() instanceof VersionConflictEngineException) {
if (updateOperationResult.getFailure() instanceof VersionConflictEngineException) {
continue;
}
}
}
break; // out of retry loop
}
return new UpdateResultHolder(updateOperation, updateResponse);
return new UpdateResultHolder(updateOperationResult, updateResponse);
}
@Override
@ -327,28 +323,28 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
BulkItemRequest item = request.items()[i];
if (item.isIgnoreOnReplica() == false) {
DocWriteRequest docWriteRequest = item.request();
final Engine.Operation operation;
final Engine.Result operationResult;
try {
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
operation = executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), replica);
operationResult = executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), replica);
break;
case DELETE:
operation = executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), replica);
operationResult = executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), replica);
break;
default:
throw new IllegalStateException("Unexpected request operation type on replica: "
+ docWriteRequest.opType().getLowercase());
}
if (operation.hasFailure()) {
if (operationResult.hasFailure()) {
// check if any transient write operation failures should be bubbled up
Exception failure = operation.getFailure();
Exception failure = operationResult.getFailure();
if (!ignoreReplicaException(failure)) {
throw failure;
}
} else {
location = locationToSync(location, operation.getTranslogLocation());
location = locationToSync(location, operationResult.getLocation());
}
} catch (Exception e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure

View File

@ -123,33 +123,32 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
@Override
protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception {
final Engine.Delete operation = executeDeleteRequestOnPrimary(request, primary);
final DeleteResponse response = operation.hasFailure() ? null :
new DeleteResponse(primary.shardId(), request.type(), request.id(), operation.version(), operation.found());
return new WritePrimaryResult(request, response, operation.getTranslogLocation(), operation.getFailure(), primary);
final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary);
final DeleteResponse response = result.hasFailure() ? null :
new DeleteResponse(primary.shardId(), request.type(), request.id(), result.getVersion(), result.isFound());
return new WritePrimaryResult(request, response, result.getLocation(), result.getFailure(), primary);
}
@Override
protected WriteReplicaResult shardOperationOnReplica(DeleteRequest request, IndexShard replica) throws Exception {
final Engine.Operation operation = executeDeleteRequestOnReplica(request, replica);
return new WriteReplicaResult(request, operation.getTranslogLocation(), operation.getFailure(), replica);
final Engine.DeleteResult result = executeDeleteRequestOnReplica(request, replica);
return new WriteReplicaResult(request, result.getLocation(), result.getFailure(), replica);
}
public static Engine.Delete executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
primary.execute(delete);
if (delete.hasFailure() == false) {
Engine.DeleteResult result = primary.delete(delete);
if (result.hasFailure() == false) {
// update the request with the version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
request.version(delete.version());
request.version(result.getVersion());
assert request.versionType().validateVersionForWrites(request.version());
}
return delete;
return result;
}
public static Engine.Delete executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
replica.execute(delete);
return delete;
return replica.delete(delete);
}
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
@ -140,84 +141,89 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
@Override
protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
final Engine.Operation operation = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
final IndexResponse response = operation.hasFailure() ? null :
new IndexResponse(primary.shardId(), request.type(), request.id(), operation.version(),
((Engine.Index) operation).isCreated());
return new WritePrimaryResult(request, response, operation.getTranslogLocation(), operation.getFailure(), primary);
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
final IndexResponse response = indexResult.hasFailure() ? null :
new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getVersion(),
indexResult.isCreated());
return new WritePrimaryResult(request, response, indexResult.getLocation(), indexResult.getFailure(), primary);
}
@Override
protected WriteReplicaResult shardOperationOnReplica(IndexRequest request, IndexShard replica) throws Exception {
final Engine.Operation operation = executeIndexRequestOnReplica(request, replica);
return new WriteReplicaResult(request, operation.getTranslogLocation(), operation.getFailure(), replica);
final Engine.IndexResult indexResult = executeIndexRequestOnReplica(request, replica);
return new WriteReplicaResult(request, indexResult.getLocation(), indexResult.getFailure(), replica);
}
/**
* Execute the given {@link IndexRequest} on a replica shard, throwing a
* {@link RetryOnReplicaException} if the operation needs to be re-tried.
*/
public static Engine.Operation executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) {
public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest request, IndexShard replica) {
final ShardId shardId = replica.shardId();
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
final Engine.Operation operation;
operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
if (operation.hasFailure() == false) {
Mapping update = ((Engine.Index) operation).parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
replica.execute(operation);
final Engine.Index operation;
try {
operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(), 0, 0);
}
return operation;
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
return replica.index(operation);
}
/** Utility method to prepare an index operation on primary shards */
static Engine.Operation prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
return primary.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
public static Engine.Operation executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatedAction mappingUpdatedAction) throws Exception {
Engine.Operation operation = prepareIndexOperationOnPrimary(request, primary);
if (operation.hasFailure() == false) {
Mapping update = ((Engine.Index) operation).parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = primary.shardId();
if (update != null) {
// can throw timeout exception when updating mappings or ISE for attempting to update default mappings
// which are bubbled up
try {
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
} catch (IllegalArgumentException e) {
// throws IAE on conflicts merging dynamic mappings
return new Engine.Failure(request.type(), request.id(), request.version(),
request.versionType(), Engine.Operation.Origin.PRIMARY, operation.startTime(), e);
}
Engine.Index operation;
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(), 0, 0);
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = primary.shardId();
if (update != null) {
// can throw timeout exception when updating mappings or ISE for attempting to update default mappings
// which are bubbled up
try {
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
} catch (IllegalArgumentException e) {
// throws IAE on conflicts merging dynamic mappings
return new Engine.IndexResult(e, request.version(),
operation.startTime() - System.nanoTime(), 0);
}
try {
operation = prepareIndexOperationOnPrimary(request, primary);
if (operation.hasFailure() == false) {
update = ((Engine.Index) operation).parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
"Dynamic mappings are not available on the node that holds the primary yet");
}
}
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(),
operation.startTime() - System.nanoTime(), 0);
}
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
"Dynamic mappings are not available on the node that holds the primary yet");
}
}
if (operation.hasFailure() == false) {
primary.execute(operation);
}
if (operation.hasFailure() == false) {
Engine.IndexResult result = primary.index(operation);
if (result.hasFailure() == false) {
// update the version on request so it will happen on the replicas
final long version = operation.version();
final long version = result.getVersion();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
}
return operation;
return result;
}
}

View File

@ -133,20 +133,23 @@ public final class IndexingSlowLog implements IndexingOperationListener {
this.reformat = reformat;
}
@Override
public void postOperation(Engine.Operation operation) {
if (operation.operationType() == Engine.Operation.TYPE.INDEX) {
final long tookInNanos = operation.endTime() - operation.startTime();
ParsedDocument doc = ((Engine.Index) operation).parsedDoc();
if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) {
indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
} else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) {
indexLogger.info("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
} else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) {
indexLogger.debug("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
} else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) {
indexLogger.trace("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
}
public void postIndex(Engine.Index index, boolean created) {
final long took = index.endTime() - index.startTime();
postIndexing(index.parsedDoc(), took);
}
private void postIndexing(ParsedDocument doc, long tookInNanos) {
if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) {
indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
} else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) {
indexLogger.info("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
} else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) {
indexLogger.debug("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
} else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) {
indexLogger.trace("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog));
}
}

View File

@ -278,9 +278,105 @@ public abstract class Engine implements Closeable {
}
}
public abstract void index(Index operation);
public abstract IndexResult index(Index operation);
public abstract void delete(Delete delete);
public abstract DeleteResult delete(Delete delete);
public abstract static class Result {
private final Operation.TYPE operationType;
private final Translog.Location location;
private final long version;
private final Exception failure;
private final long took;
private final int estimatedSizeInBytes;
private Result(Operation.TYPE operationType, Translog.Location location, Exception failure,
long version, long took, int estimatedSizeInBytes) {
this.operationType = operationType;
this.location = location;
this.failure = failure;
this.version = version;
this.took = took;
this.estimatedSizeInBytes = estimatedSizeInBytes;
}
protected Result(Operation.TYPE operationType, Translog.Location location,
long version, long took, int estimatedSizeInBytes) {
this(operationType, location, null, version, took, estimatedSizeInBytes);
}
protected Result(Operation.TYPE operationType, Exception failure,
long version, long took, int estimatedSizeInBytes) {
this(operationType, null, failure, version, took, estimatedSizeInBytes);
}
public boolean hasFailure() {
return failure != null;
}
public long getVersion() {
return version;
}
public Translog.Location getLocation() {
return location;
}
public Exception getFailure() {
return failure;
}
public long getTook() {
return took;
}
public Operation.TYPE getOperationType() {
return operationType;
}
public int getSizeInBytes() {
if (location != null) {
return location.size;
}
return estimatedSizeInBytes;
}
}
public static class IndexResult extends Result {
private final boolean created;
public IndexResult(Translog.Location location, long version, boolean created, long took, int estimatedSizeInBytes) {
super(Operation.TYPE.INDEX, location, version, took, estimatedSizeInBytes);
this.created = created;
}
public IndexResult(Exception failure, long version, long took, int estimatedSizeInBytes) {
super(Operation.TYPE.INDEX, failure, version, took, estimatedSizeInBytes);
this.created = false;
}
public boolean isCreated() {
return created;
}
}
public static class DeleteResult extends Result {
private final boolean found;
public DeleteResult(Translog.Location location, long version, boolean found, long took, int estimatedSizeInBytes) {
super(Operation.TYPE.DELETE, location, version, took, estimatedSizeInBytes);
this.found = found;
}
DeleteResult(Exception failure, long version, long took, int estimatedSizeInBytes) {
super(Operation.TYPE.DELETE, failure, version, took, estimatedSizeInBytes);
this.found = false;
}
public boolean isFound() {
return found;
}
}
/**
* Attempts to do a special commit where the given syncID is put into the commit data. The attempt
@ -771,7 +867,7 @@ public abstract class Engine implements Closeable {
/** type of operation (index, delete), subclasses use static types */
public enum TYPE {
INDEX, DELETE, FAILURE;
INDEX, DELETE;
private final String lowercase;
@ -785,13 +881,10 @@ public abstract class Engine implements Closeable {
}
private final Term uid;
private long version;
private final long version;
private final VersionType versionType;
private final Origin origin;
private Translog.Location location;
private Exception failure;
private final long startTime;
private long endTime;
public Operation(Term uid, long version, VersionType versionType, Origin origin, long startTime) {
this.uid = uid;
@ -824,39 +917,7 @@ public abstract class Engine implements Closeable {
return this.version;
}
public void updateVersion(long version) {
this.version = version;
}
public void setTranslogLocation(Translog.Location location) {
this.location = location;
}
public Translog.Location getTranslogLocation() {
return this.location;
}
public Exception getFailure() {
return failure;
}
public void setFailure(Exception failure) {
this.failure = failure;
}
public boolean hasFailure() {
return failure != null;
}
public int sizeInBytes() {
if (location != null) {
return location.size;
} else {
return estimatedSizeInBytes();
}
}
protected abstract int estimatedSizeInBytes();
public abstract int estimatedSizeInBytes();
public VersionType versionType() {
return this.versionType;
@ -869,24 +930,11 @@ public abstract class Engine implements Closeable {
return this.startTime;
}
public void endTime(long endTime) {
this.endTime = endTime;
}
/**
* Returns operation end time in nanoseconds.
*/
public long endTime() {
return this.endTime;
}
public abstract String type();
abstract String id();
public abstract TYPE operationType();
public abstract String toString();
abstract TYPE operationType();
}
public static class Index extends Operation {
@ -894,7 +942,6 @@ public abstract class Engine implements Closeable {
private final ParsedDocument doc;
private final long autoGeneratedIdTimestamp;
private final boolean isRetry;
private boolean created;
public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime,
long autoGeneratedIdTimestamp, boolean isRetry) {
@ -927,7 +974,7 @@ public abstract class Engine implements Closeable {
}
@Override
public TYPE operationType() {
TYPE operationType() {
return TYPE.INDEX;
}
@ -943,12 +990,6 @@ public abstract class Engine implements Closeable {
return this.doc.ttl();
}
@Override
public void updateVersion(long version) {
super.updateVersion(version);
this.doc.version().setLongValue(version);
}
public String parent() {
return this.doc.parent();
}
@ -961,16 +1002,8 @@ public abstract class Engine implements Closeable {
return this.doc.source();
}
public boolean isCreated() {
return created;
}
public void setCreated(boolean created) {
this.created = created;
}
@Override
protected int estimatedSizeInBytes() {
public int estimatedSizeInBytes() {
return (id().length() + type().length()) * 2 + source().length() + 12;
}
@ -991,31 +1024,25 @@ public abstract class Engine implements Closeable {
return isRetry;
}
@Override
public String toString() {
return "index [{" + type() + "}][{" + id()+ "}] [{" + docs() + "}]";
}
}
public static class Delete extends Operation {
private final String type;
private final String id;
private boolean found;
public Delete(String type, String id, Term uid, long version, VersionType versionType, Origin origin, long startTime, boolean found) {
public Delete(String type, String id, Term uid, long version, VersionType versionType, Origin origin, long startTime) {
super(uid, version, versionType, origin, startTime);
this.type = type;
this.id = id;
this.found = found;
}
public Delete(String type, String id, Term uid) {
this(type, id, uid, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), false);
this(type, id, uid, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
}
public Delete(Delete template, VersionType versionType) {
this(template.type(), template.id(), template.uid(), template.version(), versionType, template.origin(), template.startTime(), template.found());
this(template.type(), template.id(), template.uid(), template.version(), versionType, template.origin(), template.startTime());
}
@Override
@ -1029,74 +1056,16 @@ public abstract class Engine implements Closeable {
}
@Override
public TYPE operationType() {
TYPE operationType() {
return TYPE.DELETE;
}
@Override
public String toString() {
return "delete [{"+ uid().text() +"}]";
}
public void updateVersion(long version, boolean found) {
updateVersion(version);
this.found = found;
}
public boolean found() {
return this.found;
}
@Override
protected int estimatedSizeInBytes() {
public int estimatedSizeInBytes() {
return (uid().field().length() + uid().text().length()) * 2 + 20;
}
}
public static class Failure extends Operation {
private final String type;
private final String id;
public Failure(String type, String id, long version, VersionType versionType, Origin origin,
long startTime, Exception failure) {
super(null, version, versionType, origin, startTime);
this.type = type;
this.id = id;
setFailure(failure);
}
@Override
public Term uid() {
throw new UnsupportedOperationException("failure operation doesn't have uid");
}
@Override
protected int estimatedSizeInBytes() {
return 0;
}
@Override
public String type() {
return type;
}
@Override
protected String id() {
return id;
}
@Override
public TYPE operationType() {
return TYPE.FAILURE;
}
@Override
public String toString() {
return "failure [{" + type() + "}][{" + id()+ "}]";
}
}
public static class Get {
private final boolean realtime;
private final Term uid;

View File

@ -57,7 +57,7 @@ import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine.Operation;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
@ -394,44 +394,47 @@ public class InternalEngine extends Engine {
VersionValue apply(long updatedVersion, long time);
}
private <T extends Engine.Operation> void maybeAddToTranslog(
private <T extends Engine.Operation> Translog.Location maybeAddToTranslog(
final T op,
final long updatedVersion,
final Function<T, Translog.Operation> toTranslogOp,
final VersionValueSupplier toVersionValue) throws IOException {
Translog.Location location = null;
if (op.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location translogLocation = translog.add(toTranslogOp.apply(op));
op.setTranslogLocation(translogLocation);
location = translog.add(toTranslogOp.apply(op));
}
versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
return location;
}
@Override
public void index(Index index) {
public IndexResult index(Index index) {
IndexResult result;
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (index.origin().isRecovery()) {
// Don't throttle recovery operations
innerIndex(index);
result = innerIndex(index);
} else {
try (Releasable r = throttle.acquireThrottle()) {
innerIndex(index);
result = innerIndex(index);
}
}
} catch (Exception e) {
handleOperationFailure(index, e);
Exception transientOperationFailure = handleOperationFailure(index, e);
result = new IndexResult(transientOperationFailure, index.version(), index.startTime() - System.nanoTime());
}
return result;
}
/**
* Handle failures executing write operations, distinguish persistent engine (environment) failures
* from document (request) specific failures.
* Write failures that fail the engine as a side-effect, are thrown wrapped in {@link OperationFailedEngineException}
* and document specific failures are captured through {@link Operation#setFailure(Exception)} to be handled
* and document specific failures are returned to be set on the {@link Engine.Result} to be handled
* at the transport level.
*/
private void handleOperationFailure(final Operation operation, final Exception failure) {
private Exception handleOperationFailure(final Operation operation, final Exception failure) {
boolean isEnvironmentFailure;
try {
// When indexing a document into Lucene, Lucene distinguishes between environment related errors
@ -451,7 +454,7 @@ public class InternalEngine extends Engine {
throw new OperationFailedEngineException(shardId, operation.operationType().getLowercase(),
operation.type(), operation.id(), failure);
} else {
operation.setFailure(failure);
return failure;
}
}
@ -479,7 +482,9 @@ public class InternalEngine extends Engine {
return false;
}
private void innerIndex(Index index) throws IOException {
private IndexResult innerIndex(Index index) throws IOException {
final Translog.Location location;
final long updatedVersion;
try (Releasable ignored = acquireLock(index.uid())) {
lastWriteNanos = index.startTime();
/* if we have an autoGeneratedID that comes into the engine we can potentially optimize
@ -544,54 +549,52 @@ public class InternalEngine extends Engine {
}
final long expectedVersion = index.version();
if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) {
index.setCreated(false);
return;
}
final long updatedVersion = updateVersion(index, currentVersion, expectedVersion);
index.setCreated(deleted);
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create
index(index, indexWriter);
// skip index operation because of version conflict on recovery
return new IndexResult(null, expectedVersion, false, index.startTime() - System.nanoTime());
} else {
update(index, indexWriter);
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create
index(index.docs(), indexWriter);
} else {
update(index.uid(), index.docs(), indexWriter);
}
location = maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE);
return new IndexResult(location, updatedVersion, deleted, index.startTime() - System.nanoTime());
}
maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE);
}
}
private long updateVersion(Operation op, long currentVersion, long expectedVersion) {
final long updatedVersion = op.versionType().updateVersion(currentVersion, expectedVersion);
op.updateVersion(updatedVersion);
return updatedVersion;
}
private static void index(final Index index, final IndexWriter indexWriter) throws IOException {
if (index.docs().size() > 1) {
indexWriter.addDocuments(index.docs());
private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
if (docs.size() > 1) {
indexWriter.addDocuments(docs);
} else {
indexWriter.addDocument(index.docs().get(0));
indexWriter.addDocument(docs.get(0));
}
}
private static void update(final Index index, final IndexWriter indexWriter) throws IOException {
if (index.docs().size() > 1) {
indexWriter.updateDocuments(index.uid(), index.docs());
private static void update(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
if (docs.size() > 1) {
indexWriter.updateDocuments(uid, docs);
} else {
indexWriter.updateDocument(index.uid(), index.docs().get(0));
indexWriter.updateDocument(uid, docs.get(0));
}
}
@Override
public void delete(Delete delete) {
public DeleteResult delete(Delete delete) {
DeleteResult result;
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
innerDelete(delete);
result = innerDelete(delete);
} catch (Exception e) {
handleOperationFailure(delete, e);
Exception transientOperationFailure = handleOperationFailure(delete, e);
result = new DeleteResult(transientOperationFailure, delete.version(), delete.startTime() - System.nanoTime());
}
maybePruneDeletedTombstones();
return result;
}
private void maybePruneDeletedTombstones() {
@ -602,7 +605,10 @@ public class InternalEngine extends Engine {
}
}
private void innerDelete(Delete delete) throws IOException {
private DeleteResult innerDelete(Delete delete) throws IOException {
final Translog.Location location;
final long updatedVersion;
final boolean found;
try (Releasable ignored = acquireLock(delete.uid())) {
lastWriteNanos = delete.startTime();
final long currentVersion;
@ -618,19 +624,19 @@ public class InternalEngine extends Engine {
}
final long expectedVersion = delete.version();
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) return;
final long updatedVersion = updateVersion(delete, currentVersion, expectedVersion);
final boolean found = deleteIfFound(delete, currentVersion, deleted, versionValue);
delete.updateVersion(updatedVersion, found);
maybeAddToTranslog(delete, updatedVersion, Translog.Delete::new, DeleteVersionValue::new);
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) {
// skip executing delete because of version conflict on recovery
return new DeleteResult(null, expectedVersion, true, delete.startTime() - System.nanoTime());
} else {
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
location = maybeAddToTranslog(delete, updatedVersion, Translog.Delete::new, DeleteVersionValue::new);
return new DeleteResult(location, updatedVersion, found, delete.startTime() - System.nanoTime());
}
}
}
private boolean deleteIfFound(Delete delete, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException {
private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException {
final boolean found;
if (currentVersion == Versions.NOT_FOUND) {
// doc does not exist and no prior deletes
@ -640,7 +646,7 @@ public class InternalEngine extends Engine {
found = false;
} else {
// we deleted a currently existing document
indexWriter.deleteDocuments(delete.uid());
indexWriter.deleteDocuments(uid);
found = true;
}
return found;

View File

@ -106,12 +106,12 @@ public class ShadowEngine extends Engine {
@Override
public void index(Index index) {
public IndexResult index(Index index) {
throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine");
}
@Override
public void delete(Delete delete) {
public DeleteResult delete(Delete delete) {
throw new UnsupportedOperationException(shardId + " delete operation not allowed on shadow engine");
}

View File

@ -93,7 +93,6 @@ import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
@ -500,30 +499,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return previousState;
}
public Engine.Operation prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp,
public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp,
boolean isRetry) {
try {
verifyPrimary();
return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY,
autoGeneratedIdTimestamp, isRetry);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.Failure(source.type(), source.id(), version, versionType, Engine.Operation.Origin.PRIMARY,
System.nanoTime(), e);
} catch (Exception e) {
verifyNotClosed(e);
throw e;
}
}
public Engine.Operation prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp,
public Engine.Index prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp,
boolean isRetry) {
try {
verifyReplicationTarget();
return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.REPLICA, autoGeneratedIdTimestamp,
isRetry);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.Failure(source.type(), source.id(), version, versionType, Engine.Operation.Origin.PRIMARY,
System.nanoTime(), e);
} catch (Exception e) {
verifyNotClosed(e);
throw e;
@ -543,37 +536,31 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return new Engine.Index(uid, doc, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
}
public void execute(Engine.Operation operation) {
ensureWriteAllowed(operation);
public Engine.IndexResult index(Engine.Index index) {
ensureWriteAllowed(index);
Engine engine = getEngine();
execute(engine, operation);
return index(engine, index);
}
private void execute(Engine engine, Engine.Operation operation) {
private Engine.IndexResult index(Engine engine, Engine.Index index) {
active.set(true);
indexingOperationListeners.preOperation(operation);
final Engine.IndexResult result;
index = indexingOperationListeners.preIndex(index);
try {
if (logger.isTraceEnabled()) {
logger.trace(operation.toString());
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
}
switch (operation.operationType()) {
case INDEX:
engine.index(((Engine.Index) operation));
break;
case DELETE:
engine.delete(((Engine.Delete) operation));
break;
}
operation.endTime(System.nanoTime());
result = engine.index(index);
} catch (Exception e) {
indexingOperationListeners.postOperation(operation, e);
indexingOperationListeners.postIndex(index, e);
throw e;
}
if (operation.hasFailure()) {
indexingOperationListeners.postOperation(operation, operation.getFailure());
if (result.hasFailure()) {
indexingOperationListeners.postIndex(index, result.getFailure());
} else {
indexingOperationListeners.postOperation(operation);
indexingOperationListeners.postIndex(index, result);
}
return result;
}
public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) {
@ -595,7 +582,34 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
static Engine.Delete prepareDelete(String type, String id, Term uid, long version, VersionType versionType, Engine.Operation.Origin origin) {
long startTime = System.nanoTime();
return new Engine.Delete(type, id, uid, version, versionType, origin, startTime, false);
return new Engine.Delete(type, id, uid, version, versionType, origin, startTime);
}
public Engine.DeleteResult delete(Engine.Delete delete) {
ensureWriteAllowed(delete);
Engine engine = getEngine();
return delete(engine, delete);
}
private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) {
active.set(true);
final Engine.DeleteResult result;
delete = indexingOperationListeners.preDelete(delete);
try {
if (logger.isTraceEnabled()) {
logger.trace("delete [{}]", delete.uid().text());
}
result = engine.delete(delete);
} catch (Exception e) {
indexingOperationListeners.postDelete(delete, e);
throw e;
}
if (result.hasFailure()) {
indexingOperationListeners.postDelete(delete, result.getFailure());
} else {
indexingOperationListeners.postDelete(delete, result);
}
return result;
}
public Engine.GetResult get(Engine.Get get) {
@ -1829,12 +1843,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
@Override
protected void index(Engine engine, Engine.Index engineIndex) {
IndexShard.this.execute(engine, engineIndex);
IndexShard.this.index(engine, engineIndex);
}
@Override
protected void delete(Engine engine, Engine.Delete engineDelete) {
IndexShard.this.execute(engine, engineDelete);
IndexShard.this.delete(engine, engineDelete);
}
}
}

View File

@ -25,19 +25,49 @@ import org.elasticsearch.index.engine.Engine;
import java.util.List;
/** An engine operation listener for index and delete execution. */
/**
* An indexing listener for indexing, delete, events.
*/
public interface IndexingOperationListener {
/** Called before executing index or delete operation */
default void preOperation(Engine.Operation operation) {}
/**
* Called before the indexing occurs.
*/
default Engine.Index preIndex(Engine.Index operation) {
return operation;
}
/** Called after executing index or delete operation */
default void postOperation(Engine.Operation operation) {}
/**
* Called after the indexing operation occurred.
*/
default void postIndex(Engine.Index index, boolean created) {}
/** Called after index or delete operation failed with exception */
default void postOperation(Engine.Operation operation, Exception ex) {}
/**
* Called after the indexing operation occurred with exception.
*/
default void postIndex(Engine.Index index, Exception ex) {}
/** A Composite listener that multiplexes calls to each of the listeners methods. */
/**
* Called before the delete occurs.
*/
default Engine.Delete preDelete(Engine.Delete delete) {
return delete;
}
/**
* Called after the delete operation occurred.
*/
default void postDelete(Engine.Delete delete) {}
/**
* Called after the delete operation occurred with exception.
*/
default void postDelete(Engine.Delete delete, Exception ex) {}
/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*/
final class CompositeListener implements IndexingOperationListener{
private final List<IndexingOperationListener> listeners;
private final Logger logger;
@ -48,40 +78,79 @@ public interface IndexingOperationListener {
}
@Override
public void preOperation(Engine.Operation operation) {
public Engine.Index preIndex(Engine.Index operation) {
assert operation != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.preOperation(operation);
listener.preIndex(operation);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preOperation listener [{}] failed", listener), e);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preIndex listener [{}] failed", listener), e);
}
}
return operation;
}
@Override
public void postIndex(Engine.Index index, boolean created) {
assert index != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postIndex(index, created);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), e);
}
}
}
@Override
public void postOperation(Engine.Operation operation) {
assert operation != null;
public void postIndex(Engine.Index index, Exception ex) {
assert index != null && ex != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postOperation(operation);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postOperation listener [{}] failed", listener), e);
}
}
}
@Override
public void postOperation(Engine.Operation operation, Exception ex) {
assert operation != null && ex != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postOperation(operation, ex);
listener.postIndex(index, ex);
} catch (Exception inner) {
inner.addSuppressed(ex);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postOperation listener [{}] failed", listener), inner);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), inner);
}
}
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
assert delete != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.preDelete(delete);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("preDelete listener [{}] failed", listener), e);
}
}
return delete;
}
@Override
public void postDelete(Engine.Delete delete) {
assert delete != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postDelete(delete);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), e);
}
}
}
@Override
public void postDelete(Engine.Delete delete, Exception ex) {
assert delete != null && ex != null;
for (IndexingOperationListener listener : listeners) {
try {
listener.postDelete(delete, ex);
} catch (Exception inner) {
inner.addSuppressed(ex);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), inner);
}
}
}
}
}
}

View File

@ -65,60 +65,63 @@ final class InternalIndexingStats implements IndexingOperationListener {
}
@Override
public void preOperation(Engine.Operation operation) {
public Engine.Index preIndex(Engine.Index operation) {
if (!operation.origin().isRecovery()) {
StatsHolder statsHolder = typeStats(operation.type());
switch (operation.operationType()) {
case INDEX:
totalStats.indexCurrent.inc();
statsHolder.indexCurrent.inc();
break;
case DELETE:
totalStats.deleteCurrent.inc();
statsHolder.deleteCurrent.inc();
break;
}
totalStats.indexCurrent.inc();
typeStats(operation.type()).indexCurrent.inc();
}
return operation;
}
@Override
public void postIndex(Engine.Index index, boolean created) {
if (!index.origin().isRecovery()) {
long took = index.endTime() - index.startTime();
totalStats.indexMetric.inc(took);
totalStats.indexCurrent.dec();
StatsHolder typeStats = typeStats(index.type());
typeStats.indexMetric.inc(took);
typeStats.indexCurrent.dec();
}
}
@Override
public void postOperation(Engine.Operation operation) {
if (!operation.origin().isRecovery()) {
long took = operation.endTime() - operation.startTime();
StatsHolder typeStats = typeStats(operation.type());
switch (operation.operationType()) {
case INDEX:
totalStats.indexMetric.inc(took);
totalStats.indexCurrent.dec();
typeStats.indexMetric.inc(took);
typeStats.indexCurrent.dec();
break;
case DELETE:
totalStats.deleteMetric.inc(took);
totalStats.deleteCurrent.dec();
typeStats.deleteMetric.inc(took);
typeStats.deleteCurrent.dec();
break;
}
public void postIndex(Engine.Index index, Exception ex) {
if (!index.origin().isRecovery()) {
totalStats.indexCurrent.dec();
typeStats(index.type()).indexCurrent.dec();
totalStats.indexFailed.inc();
typeStats(index.type()).indexFailed.inc();
}
}
@Override
public void postOperation(Engine.Operation operation, Exception ex) {
if (!operation.origin().isRecovery()) {
StatsHolder statsHolder = typeStats(operation.type());
switch (operation.operationType()) {
case INDEX:
totalStats.indexCurrent.dec();
statsHolder.indexCurrent.dec();
totalStats.indexFailed.inc();
statsHolder.indexFailed.inc();
break;
case DELETE:
totalStats.deleteCurrent.dec();
statsHolder.deleteCurrent.dec();
break;
}
public Engine.Delete preDelete(Engine.Delete delete) {
if (!delete.origin().isRecovery()) {
totalStats.deleteCurrent.inc();
typeStats(delete.type()).deleteCurrent.inc();
}
return delete;
}
@Override
public void postDelete(Engine.Delete delete) {
if (!delete.origin().isRecovery()) {
long took = delete.endTime() - delete.startTime();
totalStats.deleteMetric.inc(took);
totalStats.deleteCurrent.dec();
StatsHolder typeStats = typeStats(delete.type());
typeStats.deleteMetric.inc(took);
typeStats.deleteCurrent.dec();
}
}
@Override
public void postDelete(Engine.Delete delete, Exception ex) {
if (!delete.origin().isRecovery()) {
totalStats.deleteCurrent.dec();
typeStats(delete.type()).deleteCurrent.dec();
}
}
@ -155,5 +158,10 @@ final class InternalIndexingStats implements IndexingOperationListener {
deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(),
noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis));
}
void clear() {
indexMetric.clear();
deleteMetric.clear();
}
}
}

View File

@ -170,7 +170,7 @@ public class TranslogRecoveryPerformer {
logger.trace("[translog] recover [delete] op of [{}][{}]", uid.type(), uid.id());
}
final Engine.Delete engineDelete = new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.version(),
delete.versionType().versionTypeForReplicationAndRecovery(), origin, System.nanoTime(), false);
delete.versionType().versionTypeForReplicationAndRecovery(), origin, System.nanoTime());
delete(engine, engineDelete);
break;
default:

View File

@ -189,6 +189,11 @@ public class IndexingMemoryController extends AbstractComponent implements Index
statusChecker.run();
}
/** called by IndexShard to record that this many bytes were written to translog */
public void bytesWritten(int bytes) {
statusChecker.bytesWritten(bytes);
}
/** Asks this shard to throttle indexing to one thread */
protected void activateThrottling(IndexShard shard) {
shard.activateThrottling();
@ -200,8 +205,17 @@ public class IndexingMemoryController extends AbstractComponent implements Index
}
@Override
public void postOperation(Engine.Operation operation) {
statusChecker.bytesWritten(operation.sizeInBytes());
public void postIndex(Engine.Index index, boolean created) {
recordOperationBytes(index);
}
@Override
public void postDelete(Engine.Delete delete) {
recordOperationBytes(delete);
}
private void recordOperationBytes(Engine.Operation op) {
bytesWritten(op.sizeInBytes());
}
private static final class ShardAndBytesUsed implements Comparable<ShardAndBytesUsed> {

View File

@ -233,10 +233,9 @@ public class IndexModuleTests extends ESTestCase {
AtomicBoolean executed = new AtomicBoolean(false);
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public void preOperation(Engine.Operation operation) {
if (operation.operationType() == Engine.Operation.TYPE.INDEX) {
executed.set(true);
}
public Engine.Index preIndex(Engine.Index operation) {
executed.set(true);
return operation;
}
};
module.addIndexOperationListener(listener);
@ -252,7 +251,7 @@ public class IndexModuleTests extends ESTestCase {
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
for (IndexingOperationListener l : indexService.getIndexOperationListeners()) {
l.preOperation(index);
l.preIndex(index);
}
assertTrue(executed.get());
indexService.close("simon says", false);

View File

@ -334,11 +334,11 @@ public class InternalEngineTests extends ESTestCase {
// create two docs and refresh
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
Engine.Index first = new Engine.Index(newUid("1"), doc);
engine.index(first);
Engine.IndexResult firstResult = engine.index(first);
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, null);
Engine.Index second = new Engine.Index(newUid("2"), doc2);
engine.index(second);
assertThat(second.getTranslogLocation(), greaterThan(first.getTranslogLocation()));
Engine.IndexResult secondResult = engine.index(second);
assertThat(secondResult.getLocation(), greaterThan(firstResult.getLocation()));
engine.refresh("test");
segments = engine.segments(false);
@ -628,7 +628,7 @@ public class InternalEngineTests extends ESTestCase {
operations.add(operation);
initialEngine.index(operation);
} else {
final Engine.Delete operation = new Engine.Delete("test", "1", newUid("test#1"), i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false);
final Engine.Delete operation = new Engine.Delete("test", "1", newUid("test#1"), i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime());
operations.add(operation);
initialEngine.delete(operation);
}
@ -1039,82 +1039,82 @@ public class InternalEngineTests extends ESTestCase {
public void testVersioningNewCreate() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED);
engine.index(create);
assertThat(create.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(create);
assertThat(indexResult.getVersion(), equalTo(1L));
create = new Engine.Index(newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
replicaEngine.index(create);
assertThat(create.version(), equalTo(1L));
create = new Engine.Index(newUid("1"), doc, indexResult.getVersion(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
indexResult = replicaEngine.index(create);
assertThat(indexResult.getVersion(), equalTo(1L));
}
public void testVersioningNewIndex() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
index = new Engine.Index(newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
replicaEngine.index(index);
assertThat(index.version(), equalTo(1L));
index = new Engine.Index(newUid("1"), doc, indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
indexResult = replicaEngine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
}
public void testExternalVersioningNewIndex() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
engine.index(index);
assertThat(index.version(), equalTo(12L));
Engine.IndexResult indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(12L));
index = new Engine.Index(newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
replicaEngine.index(index);
assertThat(index.version(), equalTo(12L));
index = new Engine.Index(newUid("1"), doc, indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
indexResult = replicaEngine.index(index);
assertThat(indexResult.getVersion(), equalTo(12L));
}
public void testVersioningIndexConflict() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(2L));
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(2L));
index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, 0, -1, false);
engine.index(index);
assertTrue(index.hasFailure());
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = engine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
// future versions should not work as well
index = new Engine.Index(newUid("1"), doc, 3L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
engine.index(index);
assertTrue(index.hasFailure());
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = engine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testExternalVersioningIndexConflict() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
engine.index(index);
assertThat(index.version(), equalTo(12L));
Engine.IndexResult indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(12L));
index = new Engine.Index(newUid("1"), doc, 14, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
engine.index(index);
assertThat(index.version(), equalTo(14L));
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(14L));
index = new Engine.Index(newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
engine.index(index);
assertTrue(index.hasFailure());
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = engine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testForceVersioningNotAllowedExceptForOlderIndices() throws Exception {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc, 42, VersionType.FORCE, PRIMARY, 0, -1, false);
engine.index(index);
assertTrue(index.hasFailure());
assertThat(index.getFailure(), instanceOf(IllegalArgumentException.class));
assertThat(index.getFailure().getMessage(), containsString("version type [FORCE] may not be used for indices created after 6.0"));
Engine.IndexResult indexResult = engine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(IllegalArgumentException.class));
assertThat(indexResult.getFailure().getMessage(), containsString("version type [FORCE] may not be used for indices created after 6.0"));
IndexSettings oldIndexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_5_0_0_beta1)
@ -1122,58 +1122,58 @@ public class InternalEngineTests extends ESTestCase {
try (Store store = createStore();
Engine engine = createEngine(oldIndexSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
index = new Engine.Index(newUid("1"), doc, 84, VersionType.FORCE, PRIMARY, 0, -1, false);
engine.index(index);
assertTrue(index.hasFailure());
assertThat(index.getFailure(), instanceOf(IllegalArgumentException.class));
assertThat(index.getFailure().getMessage(), containsString("version type [FORCE] may not be used for non-translog operations"));
Engine.IndexResult result = engine.index(index);
assertTrue(result.hasFailure());
assertThat(result.getFailure(), instanceOf(IllegalArgumentException.class));
assertThat(result.getFailure().getMessage(), containsString("version type [FORCE] may not be used for non-translog operations"));
index = new Engine.Index(newUid("1"), doc, 84, VersionType.FORCE,
Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, 0, -1, false);
engine.index(index);
assertThat(index.version(), equalTo(84L));
result = engine.index(index);
assertThat(result.getVersion(), equalTo(84L));
}
}
public void testVersioningIndexConflictWithFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(2L));
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(2L));
engine.flush();
index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
engine.index(index);
assertTrue(index.hasFailure());
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = engine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
// future versions should not work as well
index = new Engine.Index(newUid("1"), doc, 3L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
engine.index(index);
assertTrue(index.hasFailure());
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = engine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testExternalVersioningIndexConflictWithFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
engine.index(index);
assertThat(index.version(), equalTo(12L));
Engine.IndexResult indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(12L));
index = new Engine.Index(newUid("1"), doc, 14, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
engine.index(index);
assertThat(index.version(), equalTo(14L));
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(14L));
engine.flush();
index = new Engine.Index(newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
engine.index(index);
assertTrue(index.hasFailure());
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = engine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testForceMerge() throws IOException {
@ -1274,202 +1274,202 @@ public class InternalEngineTests extends ESTestCase {
public void testVersioningDeleteConflict() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(2L));
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(2L));
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1L, VersionType.INTERNAL, PRIMARY, 0, false);
engine.delete(delete);
assertTrue(delete.hasFailure());
assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class));
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1L, VersionType.INTERNAL, PRIMARY, 0);
Engine.DeleteResult result = engine.delete(delete);
assertTrue(result.hasFailure());
assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class));
// future versions should not work as well
delete = new Engine.Delete("test", "1", newUid("1"), 3L, VersionType.INTERNAL, PRIMARY, 0, false);
engine.delete(delete);
assertTrue(delete.hasFailure());
assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class));
delete = new Engine.Delete("test", "1", newUid("1"), 3L, VersionType.INTERNAL, PRIMARY, 0);
result = engine.delete(delete);
assertTrue(result.hasFailure());
assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class));
// now actually delete
delete = new Engine.Delete("test", "1", newUid("1"), 2L, VersionType.INTERNAL, PRIMARY, 0, false);
engine.delete(delete);
assertThat(delete.version(), equalTo(3L));
delete = new Engine.Delete("test", "1", newUid("1"), 2L, VersionType.INTERNAL, PRIMARY, 0);
result = engine.delete(delete);
assertThat(result.getVersion(), equalTo(3L));
// now check if we can index to a delete doc with version
index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
engine.index(index);
assertTrue(index.hasFailure());
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = engine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testVersioningDeleteConflictWithFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(index);
assertThat(indexResult.getFailure(), equalTo(1L));
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(2L));
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(2L));
engine.flush();
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1L, VersionType.INTERNAL, PRIMARY, 0, false);
engine.delete(delete);
assertTrue(delete.hasFailure());
assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class));
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1L, VersionType.INTERNAL, PRIMARY, 0);
Engine.DeleteResult deleteResult = engine.delete(delete);
assertTrue(deleteResult.hasFailure());
assertThat(deleteResult.getFailure(), instanceOf(VersionConflictEngineException.class));
// future versions should not work as well
delete = new Engine.Delete("test", "1", newUid("1"), 3L, VersionType.INTERNAL, PRIMARY, 0, false);
engine.delete(delete);
assertTrue(delete.hasFailure());
assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class));
delete = new Engine.Delete("test", "1", newUid("1"), 3L, VersionType.INTERNAL, PRIMARY, 0);
deleteResult = engine.delete(delete);
assertTrue(deleteResult.hasFailure());
assertThat(deleteResult.getFailure(), instanceOf(VersionConflictEngineException.class));
engine.flush();
// now actually delete
delete = new Engine.Delete("test", "1", newUid("1"), 2L, VersionType.INTERNAL, PRIMARY, 0, false);
engine.delete(delete);
assertThat(delete.version(), equalTo(3L));
delete = new Engine.Delete("test", "1", newUid("1"), 2L, VersionType.INTERNAL, PRIMARY, 0);
deleteResult = engine.delete(delete);
assertThat(deleteResult.getVersion(), equalTo(3L));
engine.flush();
// now check if we can index to a delete doc with version
index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
engine.index(index);
assertTrue(index.hasFailure());
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = engine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testVersioningCreateExistsException() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
engine.index(create);
assertThat(create.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(create);
assertThat(indexResult.getVersion(), equalTo(1L));
create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
engine.index(create);
assertTrue(create.hasFailure());
assertThat(create.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = engine.index(create);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testVersioningCreateExistsExceptionWithFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
engine.index(create);
assertThat(create.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(create);
assertThat(indexResult.getVersion(), equalTo(1L));
engine.flush();
create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
engine.index(create);
assertTrue(create.hasFailure());
assertThat(create.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = engine.index(create);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testVersioningReplicaConflict1() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(2L));
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(2L));
// apply the second index to the replica, should work fine
index = new Engine.Index(newUid("1"), doc, index.version(), VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
replicaEngine.index(index);
assertThat(index.version(), equalTo(2L));
index = new Engine.Index(newUid("1"), doc, indexResult.getVersion(), VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
indexResult = replicaEngine.index(index);
assertThat(indexResult.getVersion(), equalTo(2L));
// now, the old one should not work
index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
replicaEngine.index(index);
assertTrue(index.hasFailure());
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = replicaEngine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
// second version on replica should fail as well
index = new Engine.Index(newUid("1"), doc, 2L
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
replicaEngine.index(index);
assertThat(index.version(), equalTo(2L));
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = replicaEngine.index(index);
assertThat(indexResult.getVersion(), equalTo(2L));
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testVersioningReplicaConflict2() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
// apply the first index to the replica, should work fine
index = new Engine.Index(newUid("1"), doc, 1L
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
replicaEngine.index(index);
assertThat(index.version(), equalTo(1L));
indexResult = replicaEngine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
// index it again
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertThat(index.version(), equalTo(2L));
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(2L));
// now delete it
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"));
engine.delete(delete);
assertThat(delete.version(), equalTo(3L));
Engine.DeleteResult deleteResult = engine.delete(delete);
assertThat(deleteResult.getVersion(), equalTo(3L));
// apply the delete on the replica (skipping the second index)
delete = new Engine.Delete("test", "1", newUid("1"), 3L
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, false);
replicaEngine.delete(delete);
assertThat(delete.version(), equalTo(3L));
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
deleteResult = replicaEngine.delete(delete);
assertThat(deleteResult.getVersion(), equalTo(3L));
// second time delete with same version should fail
delete = new Engine.Delete("test", "1", newUid("1"), 3L
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, false);
replicaEngine.delete(delete);
assertTrue(delete.hasFailure());
assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class));
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
deleteResult = replicaEngine.delete(delete);
assertTrue(deleteResult.hasFailure());
assertThat(deleteResult.getFailure(), instanceOf(VersionConflictEngineException.class));
// now do the second index on the replica, it should fail
index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
replicaEngine.index(index);
assertTrue(index.hasFailure());
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = replicaEngine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
public void testBasicCreatedFlag() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertTrue(index.isCreated());
Engine.IndexResult indexResult = engine.index(index);
assertTrue(indexResult.isCreated());
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertFalse(index.isCreated());
indexResult = engine.index(index);
assertFalse(indexResult.isCreated());
engine.delete(new Engine.Delete(null, "1", newUid("1")));
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertTrue(index.isCreated());
indexResult = engine.index(index);
assertTrue(indexResult.isCreated());
}
public void testCreatedFlagAfterFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertTrue(index.isCreated());
Engine.IndexResult indexResult = engine.index(index);
assertTrue(indexResult.isCreated());
engine.delete(new Engine.Delete(null, "1", newUid("1")));
engine.flush();
index = new Engine.Index(newUid("1"), doc);
engine.index(index);
assertTrue(index.isCreated());
indexResult = engine.index(index);
assertTrue(indexResult.isCreated());
}
private static class MockAppender extends AbstractAppender {
@ -1572,7 +1572,7 @@ public class InternalEngineTests extends ESTestCase {
engine.index(new Engine.Index(newUid("1"), doc, 1, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false));
// Delete document we just added:
engine.delete(new Engine.Delete("test", "1", newUid("1"), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false));
engine.delete(new Engine.Delete("test", "1", newUid("1"), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
// Get should not find the document
Engine.GetResult getResult = engine.get(new Engine.Get(true, newUid("1")));
@ -1586,7 +1586,7 @@ public class InternalEngineTests extends ESTestCase {
}
// Delete non-existent document
engine.delete(new Engine.Delete("test", "2", newUid("2"), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false));
engine.delete(new Engine.Delete("test", "2", newUid("2"), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
// Get should not find the document (we never indexed uid=2):
getResult = engine.get(new Engine.Get(true, newUid("2")));
@ -1594,9 +1594,9 @@ public class InternalEngineTests extends ESTestCase {
// Try to index uid=1 with a too-old version, should fail:
Engine.Index index = new Engine.Index(newUid("1"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false);
engine.index(index);
assertTrue(index.hasFailure());
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
Engine.IndexResult indexResult = engine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
// Get should still not find the document
getResult = engine.get(new Engine.Get(true, newUid("1")));
@ -1604,9 +1604,9 @@ public class InternalEngineTests extends ESTestCase {
// Try to index uid=2 with a too-old version, should fail:
Engine.Index index1 = new Engine.Index(newUid("2"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false);
engine.index(index1);
assertTrue(index1.hasFailure());
assertThat(index1.getFailure(), instanceOf(VersionConflictEngineException.class));
indexResult = engine.index(index1);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
// Get should not find the document
getResult = engine.get(new Engine.Get(true, newUid("2")));
@ -1702,8 +1702,8 @@ public class InternalEngineTests extends ESTestCase {
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
assertThat(indexResult.getVersion(), equalTo(1L));
}
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
@ -1752,8 +1752,8 @@ public class InternalEngineTests extends ESTestCase {
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
assertThat(indexResult.getVersion(), equalTo(1L));
}
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
@ -1842,8 +1842,8 @@ public class InternalEngineTests extends ESTestCase {
for (int i = 0; i < numExtraDocs; i++) {
ParsedDocument doc = testParsedDocument("extra" + Integer.toString(i), "extra" + Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
assertThat(indexResult.getVersion(), equalTo(1L));
}
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
@ -1871,8 +1871,8 @@ public class InternalEngineTests extends ESTestCase {
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1L));
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
assertThat(indexResult.getVersion(), equalTo(1L));
}
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
@ -2165,25 +2165,25 @@ public class InternalEngineTests extends ESTestCase {
Engine.Index operation = randomAppendOnly(1, doc, false);
Engine.Index retry = randomAppendOnly(1, doc, true);
if (randomBoolean()) {
engine.index(operation);
Engine.IndexResult indexResult = engine.index(operation);
assertFalse(engine.indexWriterHasDeletions());
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(operation.getTranslogLocation());
engine.index(retry);
assertNotNull(indexResult.getLocation());
Engine.IndexResult retryResult = engine.index(retry);
assertTrue(engine.indexWriterHasDeletions());
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(retry.getTranslogLocation());
assertTrue(retry.getTranslogLocation().compareTo(operation.getTranslogLocation()) > 0);
assertNotNull(retryResult.getLocation());
assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) > 0);
} else {
engine.index(retry);
Engine.IndexResult retryResult = engine.index(retry);
assertTrue(engine.indexWriterHasDeletions());
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(retry.getTranslogLocation());
engine.index(operation);
assertNotNull(retryResult.getLocation());
Engine.IndexResult indexResult = engine.index(operation);
assertTrue(engine.indexWriterHasDeletions());
assertEquals(0, engine.getNumVersionLookups());
assertNotNull(retry.getTranslogLocation());
assertTrue(retry.getTranslogLocation().compareTo(operation.getTranslogLocation()) < 0);
assertNotNull(retryResult.getLocation());
assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) < 0);
}
engine.refresh("test");
@ -2194,17 +2194,17 @@ public class InternalEngineTests extends ESTestCase {
operation = randomAppendOnly(1, doc, false);
retry = randomAppendOnly(1, doc, true);
if (randomBoolean()) {
engine.index(operation);
assertNotNull(operation.getTranslogLocation());
engine.index(retry);
assertNotNull(retry.getTranslogLocation());
assertTrue(retry.getTranslogLocation().compareTo(operation.getTranslogLocation()) > 0);
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(indexResult.getLocation());
Engine.IndexResult retryResult = engine.index(retry);
assertNotNull(retryResult.getLocation());
assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) > 0);
} else {
engine.index(retry);
assertNotNull(retry.getTranslogLocation());
engine.index(operation);
assertNotNull(retry.getTranslogLocation());
assertTrue(retry.getTranslogLocation().compareTo(operation.getTranslogLocation()) < 0);
Engine.IndexResult retryResult = engine.index(retry);
assertNotNull(retryResult.getLocation());
Engine.IndexResult indexResult = engine.index(operation);
assertNotNull(retryResult.getLocation());
assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) < 0);
}
engine.refresh("test");
@ -2265,8 +2265,8 @@ public class InternalEngineTests extends ESTestCase {
isRetry = false;
Engine.Index secondIndexRequest = new Engine.Index(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
engine.index(secondIndexRequest);
assertTrue(secondIndexRequest.isCreated());
Engine.IndexResult indexResult = engine.index(secondIndexRequest);
assertTrue(indexResult.isCreated());
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);

View File

@ -219,7 +219,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
assertEquals("b", fields[1].stringValue());
IndexShard shard = indexService.getShard(0);
shard.execute(new Engine.Index(new Term("_uid", "1"), doc));
shard.index(new Engine.Index(new Term("_uid", "1"), doc));
shard.refresh("test");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
@ -258,7 +258,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
assertEquals("b", fields[1].stringValue());
IndexShard shard = indexService.getShard(0);
shard.execute(new Engine.Index(new Term("_uid", "1"), doc));
shard.index(new Engine.Index(new Term("_uid", "1"), doc));
shard.refresh("test");
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();

View File

@ -366,19 +366,19 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
@Override
protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception {
final Engine.Operation operation = executeIndexRequestOnPrimary(request, primary,
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary,
null);
request.primaryTerm(primary.getPrimaryTerm());
TransportWriteActionTestHelper.performPostWriteActions(primary, request, operation.getTranslogLocation(), logger);
IndexResponse response = new IndexResponse(primary.shardId(), request.type(), request.id(), operation.version(),
((Engine.Index) operation).isCreated());
TransportWriteActionTestHelper.performPostWriteActions(primary, request, indexResult.getLocation(), logger);
IndexResponse response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getVersion(),
indexResult.isCreated());
return new PrimaryResult(request, response);
}
@Override
protected void performOnReplica(IndexRequest request, IndexShard replica) {
final Engine.Operation operation = executeIndexRequestOnReplica(request, replica);
TransportWriteActionTestHelper.performPostWriteActions(replica, request, operation.getTranslogLocation(), logger);
final Engine.IndexResult result = executeIndexRequestOnReplica(request, replica);
TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getLocation(), logger);
}
}
}

View File

@ -321,7 +321,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(),
new BytesArray(new byte[]{1}), null);
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
shard.execute(index);
shard.index(index);
assertTrue(shard.shouldFlush());
assertEquals(2, shard.getEngine().getTranslog().totalOperations());
client().prepareIndex("test", "test", "2").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
@ -406,8 +406,23 @@ public class IndexShardIT extends ESSingleNodeTestCase {
AtomicReference<IndexShard> shardRef = new AtomicReference<>();
List<Exception> failures = new ArrayList<>();
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public void postOperation(Engine.Operation operation) {
public void postIndex(Engine.Index index, boolean created) {
try {
assertNotNull(shardRef.get());
// this is all IMC needs to do - check current memory and refresh
assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0);
shardRef.get().refresh("test");
} catch (Exception e) {
failures.add(e);
throw e;
}
}
@Override
public void postDelete(Engine.Delete delete) {
try {
assertNotNull(shardRef.get());
// this is all IMC needs to do - check current memory and refresh

View File

@ -558,43 +558,40 @@ public class IndexShardTests extends IndexShardTestCase {
shard.close("simon says", true);
shard = reinitShard(shard, new IndexingOperationListener() {
@Override
public void preOperation(Engine.Operation operation) {
switch (operation.operationType()) {
case INDEX:
preIndex.incrementAndGet();
break;
case DELETE:
preDelete.incrementAndGet();
break;
public Engine.Index preIndex(Engine.Index operation) {
preIndex.incrementAndGet();
return operation;
}
@Override
public void postIndex(Engine.Index index, boolean created) {
if (created) {
postIndexCreate.incrementAndGet();
} else {
postIndexUpdate.incrementAndGet();
}
}
@Override
public void postOperation(Engine.Operation operation) {
switch (operation.operationType()) {
case INDEX:
if (((Engine.Index) operation).isCreated()) {
postIndexCreate.incrementAndGet();
} else {
postIndexUpdate.incrementAndGet();
}
break;
case DELETE:
postDelete.incrementAndGet();
break;
}
public void postIndex(Engine.Index index, Exception ex) {
postIndexException.incrementAndGet();
}
@Override
public void postOperation(Engine.Operation operation, Exception ex) {
switch (operation.operationType()) {
case INDEX:
postIndexException.incrementAndGet();
break;
case DELETE:
postDeleteException.incrementAndGet();
break;
}
public Engine.Delete preDelete(Engine.Delete delete) {
preDelete.incrementAndGet();
return delete;
}
@Override
public void postDelete(Engine.Delete delete) {
postDelete.incrementAndGet();
}
@Override
public void postDelete(Engine.Delete delete, Exception ex) {
postDeleteException.incrementAndGet();
}
});
recoveryShardFromStore(shard);
@ -602,7 +599,7 @@ public class IndexShardTests extends IndexShardTestCase {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(),
new BytesArray(new byte[]{1}), null);
Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc);
shard.execute(index);
shard.index(index);
assertEquals(1, preIndex.get());
assertEquals(1, postIndexCreate.get());
assertEquals(0, postIndexUpdate.get());
@ -611,7 +608,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(0, postDelete.get());
assertEquals(0, postDeleteException.get());
shard.execute(index);
shard.index(index);
assertEquals(2, preIndex.get());
assertEquals(1, postIndexCreate.get());
assertEquals(1, postIndexUpdate.get());
@ -621,7 +618,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(0, postDeleteException.get());
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
shard.execute(delete);
shard.delete(delete);
assertEquals(2, preIndex.get());
assertEquals(1, postIndexCreate.get());
@ -635,7 +632,7 @@ public class IndexShardTests extends IndexShardTestCase {
shard.state = IndexShardState.STARTED; // It will generate exception
try {
shard.execute(index);
shard.index(index);
fail();
} catch (IllegalIndexShardStateException e) {
@ -649,7 +646,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(1, postDelete.get());
assertEquals(0, postDeleteException.get());
try {
shard.execute(delete);
shard.delete(delete);
fail();
} catch (IllegalIndexShardStateException e) {
@ -1124,27 +1121,26 @@ public class IndexShardTests extends IndexShardTestCase {
final AtomicInteger postDelete = new AtomicInteger();
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public void preOperation(Engine.Operation operation) {
switch (operation.operationType()) {
case INDEX:
preIndex.incrementAndGet();
break;
case DELETE:
preDelete.incrementAndGet();
break;
}
public Engine.Index preIndex(Engine.Index operation) {
preIndex.incrementAndGet();
return operation;
}
@Override
public void postOperation(Engine.Operation operation) {
switch (operation.operationType()) {
case INDEX:
postIndex.incrementAndGet();
break;
case DELETE:
postDelete.incrementAndGet();
break;
}
public void postIndex(Engine.Index index, boolean created) {
postIndex.incrementAndGet();
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
preDelete.incrementAndGet();
return delete;
}
@Override
public void postDelete(Engine.Delete delete) {
postDelete.incrementAndGet();
}
};
final IndexShard newShard = reinitShard(shard, listener);

View File

@ -40,55 +40,63 @@ public class IndexingOperationListenerTests extends ESTestCase{
AtomicInteger postDeleteException = new AtomicInteger();
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public void preOperation(Engine.Operation operation) {
switch (operation.operationType()) {
case INDEX:
preIndex.incrementAndGet();
break;
case DELETE:
preDelete.incrementAndGet();
break;
}
public Engine.Index preIndex(Engine.Index operation) {
preIndex.incrementAndGet();
return operation;
}
@Override
public void postOperation(Engine.Operation operation) {
switch (operation.operationType()) {
case INDEX:
postIndex.incrementAndGet();
break;
case DELETE:
postDelete.incrementAndGet();
break;
}
public void postIndex(Engine.Index index, boolean created) {
postIndex.incrementAndGet();
}
@Override
public void postOperation(Engine.Operation operation, Exception ex) {
switch (operation.operationType()) {
case INDEX:
postIndexException.incrementAndGet();
break;
case DELETE:
postDeleteException.incrementAndGet();
break;
}
public void postIndex(Engine.Index index, Exception ex) {
postIndexException.incrementAndGet();
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
preDelete.incrementAndGet();
return delete;
}
@Override
public void postDelete(Engine.Delete delete) {
postDelete.incrementAndGet();
}
@Override
public void postDelete(Engine.Delete delete, Exception ex) {
postDeleteException.incrementAndGet();
}
};
IndexingOperationListener throwingListener = new IndexingOperationListener() {
@Override
public void preOperation(Engine.Operation operation) {
public Engine.Index preIndex(Engine.Index operation) {
throw new RuntimeException();
}
@Override
public void postOperation(Engine.Operation operation) {
public void postIndex(Engine.Index index, boolean created) {
throw new RuntimeException(); }
@Override
public void postIndex(Engine.Index index, Exception ex) {
throw new RuntimeException(); }
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
throw new RuntimeException();
}
@Override
public void postOperation(Engine.Operation operation, Exception ex) {
public void postDelete(Engine.Delete delete) {
throw new RuntimeException(); }
@Override
public void postDelete(Engine.Delete delete, Exception ex) {
throw new RuntimeException();
}
};
@ -103,7 +111,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger);
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
compositeListener.postOperation(delete);
compositeListener.postDelete(delete);
assertEquals(0, preIndex.get());
assertEquals(0, postIndex.get());
assertEquals(0, postIndexException.get());
@ -111,7 +119,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
assertEquals(2, postDelete.get());
assertEquals(0, postDeleteException.get());
compositeListener.postOperation(delete, new RuntimeException());
compositeListener.postDelete(delete, new RuntimeException());
assertEquals(0, preIndex.get());
assertEquals(0, postIndex.get());
assertEquals(0, postIndexException.get());
@ -119,7 +127,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
compositeListener.preOperation(delete);
compositeListener.preDelete(delete);
assertEquals(0, preIndex.get());
assertEquals(0, postIndex.get());
assertEquals(0, postIndexException.get());
@ -127,7 +135,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
compositeListener.postOperation(index);
compositeListener.postIndex(index, false);
assertEquals(0, preIndex.get());
assertEquals(2, postIndex.get());
assertEquals(0, postIndexException.get());
@ -135,7 +143,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
compositeListener.postOperation(index, new RuntimeException());
compositeListener.postIndex(index, new RuntimeException());
assertEquals(0, preIndex.get());
assertEquals(2, postIndex.get());
assertEquals(2, postIndexException.get());
@ -143,7 +151,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
assertEquals(2, postDelete.get());
assertEquals(2, postDeleteException.get());
compositeListener.preOperation(index);
compositeListener.preIndex(index);
assertEquals(2, preIndex.get());
assertEquals(2, postIndex.get());
assertEquals(2, postIndexException.get());

View File

@ -137,14 +137,14 @@ public class RefreshListenersTests extends ESTestCase {
public void testTooMany() throws Exception {
assertFalse(listeners.refreshNeeded());
Engine.Index index = index("1");
Engine.IndexResult index = index("1");
// Fill the listener slots
List<DummyRefreshListener> nonForcedListeners = new ArrayList<>(maxListeners);
for (int i = 0; i < maxListeners; i++) {
DummyRefreshListener listener = new DummyRefreshListener();
nonForcedListeners.add(listener);
listeners.addOrNotify(index.getTranslogLocation(), listener);
listeners.addOrNotify(index.getLocation(), listener);
assertTrue(listeners.refreshNeeded());
}
@ -155,7 +155,7 @@ public class RefreshListenersTests extends ESTestCase {
// Add one more listener which should cause a refresh.
DummyRefreshListener forcingListener = new DummyRefreshListener();
listeners.addOrNotify(index.getTranslogLocation(), forcingListener);
listeners.addOrNotify(index.getLocation(), forcingListener);
assertTrue("Forced listener wasn't forced?", forcingListener.forcedRefresh.get());
forcingListener.assertNoError();
@ -168,7 +168,7 @@ public class RefreshListenersTests extends ESTestCase {
}
public void testAfterRefresh() throws Exception {
Engine.Index index = index("1");
Engine.IndexResult index = index("1");
engine.refresh("I said so");
if (randomBoolean()) {
index(randomFrom("1" /* same document */, "2" /* different document */));
@ -178,7 +178,7 @@ public class RefreshListenersTests extends ESTestCase {
}
DummyRefreshListener listener = new DummyRefreshListener();
assertTrue(listeners.addOrNotify(index.getTranslogLocation(), listener));
assertTrue(listeners.addOrNotify(index.getLocation(), listener));
assertFalse(listener.forcedRefresh.get());
listener.assertNoError();
}
@ -198,9 +198,9 @@ public class RefreshListenersTests extends ESTestCase {
refresher.start();
try {
for (int i = 0; i < 1000; i++) {
Engine.Index index = index("1");
Engine.IndexResult index = index("1");
DummyRefreshListener listener = new DummyRefreshListener();
boolean immediate = listeners.addOrNotify(index.getTranslogLocation(), listener);
boolean immediate = listeners.addOrNotify(index.getLocation(), listener);
if (immediate) {
assertNotNull(listener.forcedRefresh.get());
} else {
@ -234,18 +234,18 @@ public class RefreshListenersTests extends ESTestCase {
for (int iteration = 1; iteration <= 50; iteration++) {
try {
String testFieldValue = String.format(Locale.ROOT, "%s%04d", threadId, iteration);
Engine.Index index = index(threadId, testFieldValue);
assertEquals(iteration, index.version());
Engine.IndexResult index = index(threadId, testFieldValue);
assertEquals(iteration, index.getVersion());
DummyRefreshListener listener = new DummyRefreshListener();
listeners.addOrNotify(index.getTranslogLocation(), listener);
listeners.addOrNotify(index.getLocation(), listener);
assertBusy(() -> assertNotNull("listener never called", listener.forcedRefresh.get()));
if (threadCount < maxListeners) {
assertFalse(listener.forcedRefresh.get());
}
listener.assertNoError();
Engine.Get get = new Engine.Get(false, index.uid());
Engine.Get get = new Engine.Get(false, new Term("_uid", "test:"+threadId));
try (Engine.GetResult getResult = engine.get(get)) {
assertTrue("document not found", getResult.exists());
assertEquals(iteration, getResult.version());
@ -267,11 +267,11 @@ public class RefreshListenersTests extends ESTestCase {
refresher.cancel();
}
private Engine.Index index(String id) {
private Engine.IndexResult index(String id) {
return index(id, "test");
}
private Engine.Index index(String id, String testFieldValue) {
private Engine.IndexResult index(String id, String testFieldValue) {
String type = "test";
String uid = type + ":" + id;
Document document = new Document();
@ -283,8 +283,7 @@ public class RefreshListenersTests extends ESTestCase {
BytesReference source = new BytesArray(new byte[] { 1 });
ParsedDocument doc = new ParsedDocument(versionField, id, type, null, -1, -1, Arrays.asList(document), source, null);
Engine.Index index = new Engine.Index(new Term("_uid", uid), doc);
engine.index(index);
return index;
return engine.index(index);
}
private static class DummyRefreshListener implements Consumer<Boolean> {

View File

@ -197,14 +197,25 @@ public class CancelTests extends ReindexTestCase {
}
public static class BlockingOperationListener implements IndexingOperationListener {
@Override
public void preOperation(Engine.Operation operation) {
if ((TYPE.equals(operation.type()) == false) || (operation.origin() != Origin.PRIMARY)) {
return;
public Engine.Index preIndex(Engine.Index index) {
return preCheck(index, index.type());
}
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
return preCheck(delete, delete.type());
}
private <T extends Engine.Operation> T preCheck(T operation, String type) {
if ((TYPE.equals(type) == false) || (operation.origin() != Origin.PRIMARY)) {
return operation;
}
try {
if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) {
return;
return operation;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);

View File

@ -451,7 +451,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)),
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
}
shard.execute(index);
shard.index(index);
return ((Engine.Index) index);
}
@ -462,7 +462,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
} else {
delete = shard.prepareDeleteOnPrimary(type, id, 1, VersionType.EXTERNAL);
}
shard.execute(delete);
shard.delete(delete);
return delete;
}