Incorporate feedback
This commit is contained in:
parent
a3fcfe8196
commit
fa3ee6b996
|
@ -28,7 +28,6 @@ import org.elasticsearch.common.logging.LoggerMessageFormat;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.engine.OperationFailedEngineException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
|
@ -580,8 +579,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
org.elasticsearch.action.TimestampParsingException::new, 78),
|
||||
ROUTING_MISSING_EXCEPTION(org.elasticsearch.action.RoutingMissingException.class,
|
||||
org.elasticsearch.action.RoutingMissingException::new, 79),
|
||||
OPERATION_FAILED_ENGINE_EXCEPTION(OperationFailedEngineException.class,
|
||||
OperationFailedEngineException::new, 80),
|
||||
// 80 used to be for IndexFailedEngineException, removed in 6.0
|
||||
INDEX_SHARD_RESTORE_FAILED_EXCEPTION(org.elasticsearch.index.snapshots.IndexShardRestoreFailedException.class,
|
||||
org.elasticsearch.index.snapshots.IndexShardRestoreFailedException::new, 81),
|
||||
REPOSITORY_EXCEPTION(org.elasticsearch.repositories.RepositoryException.class,
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.mapper.MapperParsingException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
|
@ -133,44 +134,49 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
// execute item request
|
||||
final Engine.Result operationResult;
|
||||
final DocWriteResponse response;
|
||||
BulkItemRequest replicaRequest = request.items()[requestIndex];
|
||||
switch (itemRequest.opType()) {
|
||||
case CREATE:
|
||||
case INDEX:
|
||||
final IndexRequest indexRequest = (IndexRequest) itemRequest;
|
||||
operationResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
|
||||
response = operationResult.hasFailure() ? null
|
||||
Engine.IndexResult indexResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
|
||||
operationResult = indexResult;
|
||||
response = indexResult.hasFailure() ? null
|
||||
: new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
|
||||
operationResult.getVersion(), ((Engine.IndexResult) operationResult).isCreated());
|
||||
indexResult.getVersion(), indexResult.isCreated());
|
||||
break;
|
||||
case UPDATE:
|
||||
UpdateResultHolder updateResultHolder = executeUpdateRequest(((UpdateRequest) itemRequest),
|
||||
primary, metaData, request, requestIndex);
|
||||
operationResult = updateResultHolder.operationResult;
|
||||
response = updateResultHolder.response;
|
||||
replicaRequest = updateResultHolder.replicaRequest;
|
||||
break;
|
||||
case DELETE:
|
||||
final DeleteRequest deleteRequest = (DeleteRequest) itemRequest;
|
||||
operationResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
|
||||
response = operationResult.hasFailure() ? null :
|
||||
Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
|
||||
operationResult = deleteResult;
|
||||
response = deleteResult.hasFailure() ? null :
|
||||
new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(),
|
||||
operationResult.getVersion(), ((Engine.DeleteResult) operationResult).isFound());
|
||||
deleteResult.getVersion(), deleteResult.isFound());
|
||||
break;
|
||||
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
|
||||
}
|
||||
// update the bulk item request because update request execution can mutate the bulk item request
|
||||
BulkItemRequest item = request.items()[requestIndex];
|
||||
if (operationResult == null // in case of a noop update operation
|
||||
|| operationResult.hasFailure() == false) {
|
||||
if (operationResult != null) {
|
||||
location = locationToSync(location, operationResult.getLocation());
|
||||
} else {
|
||||
request.items()[requestIndex] = replicaRequest;
|
||||
if (operationResult == null) { // in case of noop update operation
|
||||
assert response.getResult() == DocWriteResponse.Result.NOOP
|
||||
: "only noop update can have null operation";
|
||||
}
|
||||
// set update response
|
||||
item.setPrimaryResponse(new BulkItemResponse(item.id(), opType, response));
|
||||
replicaRequest.setIgnoreOnReplica();
|
||||
replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), opType, response));
|
||||
} else if (operationResult.hasFailure() == false) {
|
||||
location = locationToSync(location, operationResult.getTranslogLocation());
|
||||
BulkItemResponse primaryResponse = new BulkItemResponse(replicaRequest.id(), opType, response);
|
||||
replicaRequest.setPrimaryResponse(primaryResponse);
|
||||
// set the ShardInfo to 0 so we can safely send it to the replicas. We won't use it in the real response though.
|
||||
primaryResponse.getResponse().setShardInfo(new ShardInfo());
|
||||
} else {
|
||||
DocWriteRequest docWriteRequest = item.request();
|
||||
DocWriteRequest docWriteRequest = replicaRequest.request();
|
||||
Exception failure = operationResult.getFailure();
|
||||
if (isConflictException(failure)) {
|
||||
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
|
||||
|
@ -182,20 +188,14 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
// if its a conflict failure, and we already executed the request on a primary (and we execute it
|
||||
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
|
||||
// then just use the response we got from the successful execution
|
||||
if (item.getPrimaryResponse() == null || isConflictException(failure) == false) {
|
||||
item.setPrimaryResponse(new BulkItemResponse(item.id(), docWriteRequest.opType(),
|
||||
if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) {
|
||||
replicaRequest.setIgnoreOnReplica();
|
||||
replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(),
|
||||
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure)));
|
||||
}
|
||||
}
|
||||
assert item.getPrimaryResponse() != null;
|
||||
assert replicaRequest.getPrimaryResponse() != null;
|
||||
assert preVersionTypes[requestIndex] != null;
|
||||
if (item.getPrimaryResponse().isFailed()
|
||||
|| item.getPrimaryResponse().getResponse().getResult() == DocWriteResponse.Result.NOOP) {
|
||||
item.setIgnoreOnReplica();
|
||||
} else {
|
||||
// set the ShardInfo to 0 so we can safely send it to the replicas. We won't use it in the real response though.
|
||||
item.getPrimaryResponse().getResponse().setShardInfo(new ShardInfo());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
||||
if (retryPrimaryException(e)) {
|
||||
|
@ -205,19 +205,20 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
docWriteRequest.version(preVersions[j]);
|
||||
docWriteRequest.versionType(preVersionTypes[j]);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
// TODO: maybe this assert is too strict, we can still get environment failures while executing write operations
|
||||
assert false : "unexpected exception: " + e.getMessage() + " class:" + e.getClass().getSimpleName();
|
||||
throw e;
|
||||
}
|
||||
return location;
|
||||
}
|
||||
|
||||
private static class UpdateResultHolder {
|
||||
final BulkItemRequest replicaRequest;
|
||||
final Engine.Result operationResult;
|
||||
final DocWriteResponse response;
|
||||
|
||||
private UpdateResultHolder(Engine.Result operationResult, DocWriteResponse response) {
|
||||
private UpdateResultHolder(BulkItemRequest replicaRequest, Engine.Result operationResult,
|
||||
DocWriteResponse response) {
|
||||
this.replicaRequest = replicaRequest;
|
||||
this.operationResult = operationResult;
|
||||
this.response = response;
|
||||
}
|
||||
|
@ -235,6 +236,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
int requestIndex) throws Exception {
|
||||
Engine.Result updateOperationResult = null;
|
||||
UpdateResponse updateResponse = null;
|
||||
BulkItemRequest replicaRequest = request.items()[requestIndex];
|
||||
int maxAttempts = updateRequest.retryOnConflict();
|
||||
for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) {
|
||||
final UpdateHelper.Result translate;
|
||||
|
@ -244,7 +246,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
} catch (Exception failure) {
|
||||
// we may fail translating a update to index or delete operation
|
||||
// we use index result to communicate failure while translating update request
|
||||
updateOperationResult = new Engine.IndexResult(failure, updateRequest.version(), 0);
|
||||
updateOperationResult = new Engine.IndexResult(failure, updateRequest.version());
|
||||
break; // out of retry loop
|
||||
}
|
||||
// execute translated update request
|
||||
|
@ -267,8 +269,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
if (updateOperationResult == null) {
|
||||
// this is a noop operation
|
||||
updateResponse = translate.action();
|
||||
} else {
|
||||
if (updateOperationResult.hasFailure() == false) {
|
||||
break; // out of retry loop
|
||||
} else if (updateOperationResult.hasFailure() == false) {
|
||||
// enrich update response and
|
||||
// set translated update (index/delete) request for replica execution in bulk items
|
||||
switch (updateOperationResult.getOperationType()) {
|
||||
|
@ -288,8 +290,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(),
|
||||
indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
|
||||
}
|
||||
// replace the update request to the translated index request to execute on the replica.
|
||||
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), updateIndexRequest);
|
||||
// set translated request as replica request
|
||||
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateIndexRequest);
|
||||
break;
|
||||
case DELETE:
|
||||
DeleteRequest updateDeleteRequest = translate.action();
|
||||
|
@ -302,20 +304,18 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest,
|
||||
request.index(), deleteResponse.getVersion(), translate.updatedSourceAsMap(),
|
||||
translate.updateSourceContentType(), null));
|
||||
// replace the update request to the translated delete request to execute on the replica.
|
||||
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
|
||||
// set translated request as replica request
|
||||
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// version conflict exception, retry
|
||||
if (updateOperationResult.getFailure() instanceof VersionConflictEngineException) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
// successful operation
|
||||
break; // out of retry loop
|
||||
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {
|
||||
// not a version conflict exception
|
||||
break; // out of retry loop
|
||||
}
|
||||
return new UpdateResultHolder(updateOperationResult, updateResponse);
|
||||
}
|
||||
return new UpdateResultHolder(replicaRequest, updateOperationResult, updateResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -342,11 +342,14 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
if (operationResult.hasFailure()) {
|
||||
// check if any transient write operation failures should be bubbled up
|
||||
Exception failure = operationResult.getFailure();
|
||||
assert failure instanceof VersionConflictEngineException
|
||||
|| failure instanceof MapperParsingException
|
||||
: "expected version conflict or mapper parsing failures";
|
||||
if (!ignoreReplicaException(failure)) {
|
||||
throw failure;
|
||||
}
|
||||
} else {
|
||||
location = locationToSync(location, operationResult.getLocation());
|
||||
location = locationToSync(location, operationResult.getTranslogLocation());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// if its not an ignore replica failure, we need to make sure to bubble up the failure
|
||||
|
|
|
@ -126,13 +126,13 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
|
|||
final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary);
|
||||
final DeleteResponse response = result.hasFailure() ? null :
|
||||
new DeleteResponse(primary.shardId(), request.type(), request.id(), result.getVersion(), result.isFound());
|
||||
return new WritePrimaryResult(request, response, result.getLocation(), result.getFailure(), primary);
|
||||
return new WritePrimaryResult(request, response, result.getTranslogLocation(), result.getFailure(), primary);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WriteReplicaResult shardOperationOnReplica(DeleteRequest request, IndexShard replica) throws Exception {
|
||||
final Engine.DeleteResult result = executeDeleteRequestOnReplica(request, replica);
|
||||
return new WriteReplicaResult(request, result.getLocation(), result.getFailure(), replica);
|
||||
return new WriteReplicaResult(request, result.getTranslogLocation(), result.getFailure(), replica);
|
||||
}
|
||||
|
||||
public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
|
||||
|
|
|
@ -145,13 +145,13 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
final IndexResponse response = indexResult.hasFailure() ? null :
|
||||
new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getVersion(),
|
||||
indexResult.isCreated());
|
||||
return new WritePrimaryResult(request, response, indexResult.getLocation(), indexResult.getFailure(), primary);
|
||||
return new WritePrimaryResult(request, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WriteReplicaResult shardOperationOnReplica(IndexRequest request, IndexShard replica) throws Exception {
|
||||
final Engine.IndexResult indexResult = executeIndexRequestOnReplica(request, replica);
|
||||
return new WriteReplicaResult(request, indexResult.getLocation(), indexResult.getFailure(), replica);
|
||||
return new WriteReplicaResult(request, indexResult.getTranslogLocation(), indexResult.getFailure(), replica);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -167,7 +167,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
try {
|
||||
operation = replica.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
|
||||
} catch (MapperParsingException | IllegalArgumentException e) {
|
||||
return new Engine.IndexResult(e, request.version(), 0);
|
||||
return new Engine.IndexResult(e, request.version());
|
||||
}
|
||||
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
|
||||
if (update != null) {
|
||||
|
@ -189,7 +189,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
try {
|
||||
operation = prepareIndexOperationOnPrimary(request, primary);
|
||||
} catch (MapperParsingException | IllegalArgumentException e) {
|
||||
return new Engine.IndexResult(e, request.version(), 0);
|
||||
return new Engine.IndexResult(e, request.version());
|
||||
}
|
||||
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
|
||||
final ShardId shardId = primary.shardId();
|
||||
|
@ -200,14 +200,12 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
|
|||
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
|
||||
} catch (IllegalArgumentException e) {
|
||||
// throws IAE on conflicts merging dynamic mappings
|
||||
return new Engine.IndexResult(e, request.version(),
|
||||
0);
|
||||
return new Engine.IndexResult(e, request.version());
|
||||
}
|
||||
try {
|
||||
operation = prepareIndexOperationOnPrimary(request, primary);
|
||||
} catch (MapperParsingException | IllegalArgumentException e) {
|
||||
return new Engine.IndexResult(e, request.version(),
|
||||
0);
|
||||
return new Engine.IndexResult(e, request.version());
|
||||
}
|
||||
update = operation.parsedDoc().dynamicMappingsUpdate();
|
||||
if (update != null) {
|
||||
|
|
|
@ -207,13 +207,8 @@ public abstract class TransportReplicationAction<
|
|||
}
|
||||
|
||||
protected boolean retryPrimaryException(final Throwable e) {
|
||||
boolean retry = e.getClass() == ReplicationOperation.RetryOnPrimaryException.class
|
||||
return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class
|
||||
|| TransportActions.isShardNotAvailableException(e);
|
||||
if (retry) {
|
||||
assert e instanceof ElasticsearchException
|
||||
: "expected all retry on primary exception to be ElasticsearchException instances, found: " + e.getClass();
|
||||
}
|
||||
return retry;
|
||||
}
|
||||
|
||||
class OperationTransportHandler implements TransportRequestHandler<Request> {
|
||||
|
@ -378,7 +373,12 @@ public abstract class TransportReplicationAction<
|
|||
final Response finalResponseIfSuccessful;
|
||||
final Exception finalFailure;
|
||||
|
||||
/**
|
||||
* Result of executing a primary operation
|
||||
* expects <code>finalResponseIfSuccessful</code> or <code>finalFailure</code> to be not-null
|
||||
*/
|
||||
public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful, Exception finalFailure) {
|
||||
assert finalFailure != null ^ finalResponseIfSuccessful != null : "either a response or a failure has to be not null";
|
||||
this.replicaRequest = replicaRequest;
|
||||
this.finalResponseIfSuccessful = finalResponseIfSuccessful;
|
||||
this.finalFailure = finalFailure;
|
||||
|
|
|
@ -89,7 +89,6 @@ public abstract class TransportWriteAction<
|
|||
@Nullable Location location, @Nullable Exception operationFailure,
|
||||
IndexShard primary) {
|
||||
super(request, finalResponse, operationFailure);
|
||||
assert operationFailure != null ^ finalResponse != null;
|
||||
if (operationFailure != null) {
|
||||
this.finishedAsyncActions = true;
|
||||
} else {
|
||||
|
@ -127,9 +126,7 @@ public abstract class TransportWriteAction<
|
|||
|
||||
@Override
|
||||
public synchronized void onSuccess(boolean forcedRefresh) {
|
||||
if (finalResponseIfSuccessful != null) {
|
||||
finalResponseIfSuccessful.setForcedRefresh(forcedRefresh);
|
||||
}
|
||||
finishedAsyncActions = true;
|
||||
respondIfPossible(null);
|
||||
}
|
||||
|
|
|
@ -282,43 +282,50 @@ public abstract class Engine implements Closeable {
|
|||
|
||||
public abstract DeleteResult delete(Delete delete);
|
||||
|
||||
/**
|
||||
* Base class for index and delete operation results
|
||||
* Holds result meta data (e.g. translog location, updated version)
|
||||
* for an executed write {@link Operation}
|
||||
**/
|
||||
public abstract static class Result {
|
||||
private final Operation.TYPE operationType;
|
||||
private final long version;
|
||||
private final Exception failure;
|
||||
private final int estimatedSizeInBytes;
|
||||
private Translog.Location location;
|
||||
private Translog.Location translogLocation;
|
||||
private long took;
|
||||
private boolean freeze;
|
||||
|
||||
protected Result(Operation.TYPE operationType, Exception failure,
|
||||
long version, int estimatedSizeInBytes) {
|
||||
protected Result(Operation.TYPE operationType, Exception failure, long version) {
|
||||
this.operationType = operationType;
|
||||
this.failure = failure;
|
||||
this.version = version;
|
||||
this.estimatedSizeInBytes = estimatedSizeInBytes;
|
||||
}
|
||||
|
||||
protected Result(Operation.TYPE operationType, long version, int estimatedSizeInBytes) {
|
||||
this(operationType, null, version, estimatedSizeInBytes);
|
||||
protected Result(Operation.TYPE operationType, long version) {
|
||||
this(operationType, null, version);
|
||||
}
|
||||
|
||||
/** whether the operation had failure */
|
||||
public boolean hasFailure() {
|
||||
return failure != null;
|
||||
}
|
||||
|
||||
/** get the updated document version */
|
||||
public long getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public Translog.Location getLocation() {
|
||||
return location;
|
||||
/** get the translog location after executing the operation */
|
||||
public Translog.Location getTranslogLocation() {
|
||||
return translogLocation;
|
||||
}
|
||||
|
||||
/** get document failure while executing the operation {@code null} in case of no failure */
|
||||
public Exception getFailure() {
|
||||
return failure;
|
||||
}
|
||||
|
||||
/** get total time in nanoseconds */
|
||||
public long getTook() {
|
||||
return took;
|
||||
}
|
||||
|
@ -327,22 +334,24 @@ public abstract class Engine implements Closeable {
|
|||
return operationType;
|
||||
}
|
||||
|
||||
/** get size of the translog operation if translog location has been set */
|
||||
public int getSizeInBytes() {
|
||||
if (location != null) {
|
||||
return location.size;
|
||||
if (translogLocation != null) {
|
||||
return translogLocation.size;
|
||||
} else {
|
||||
throw new IllegalStateException("result has null location, use Operation#estimatedSizeInBytes instead");
|
||||
}
|
||||
return estimatedSizeInBytes;
|
||||
}
|
||||
|
||||
public void setLocation(Translog.Location location) {
|
||||
void setTranslogLocation(Translog.Location translogLocation) {
|
||||
if (freeze == false) {
|
||||
this.location = location;
|
||||
this.translogLocation = translogLocation;
|
||||
} else {
|
||||
throw new IllegalStateException("result is already frozen");
|
||||
}
|
||||
}
|
||||
|
||||
public void setTook(long took) {
|
||||
void setTook(long took) {
|
||||
if (freeze == false) {
|
||||
this.took = took;
|
||||
} else {
|
||||
|
@ -350,7 +359,7 @@ public abstract class Engine implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public void freeze() {
|
||||
void freeze() {
|
||||
this.freeze = true;
|
||||
}
|
||||
}
|
||||
|
@ -358,13 +367,13 @@ public abstract class Engine implements Closeable {
|
|||
public static class IndexResult extends Result {
|
||||
private final boolean created;
|
||||
|
||||
public IndexResult(long version, boolean created, int estimatedSizeInBytes) {
|
||||
super(Operation.TYPE.INDEX, version, estimatedSizeInBytes);
|
||||
public IndexResult(long version, boolean created) {
|
||||
super(Operation.TYPE.INDEX, version);
|
||||
this.created = created;
|
||||
}
|
||||
|
||||
public IndexResult(Exception failure, long version, int estimatedSizeInBytes) {
|
||||
super(Operation.TYPE.INDEX, failure, version, estimatedSizeInBytes);
|
||||
public IndexResult(Exception failure, long version) {
|
||||
super(Operation.TYPE.INDEX, failure, version);
|
||||
this.created = false;
|
||||
}
|
||||
|
||||
|
@ -376,13 +385,13 @@ public abstract class Engine implements Closeable {
|
|||
public static class DeleteResult extends Result {
|
||||
private final boolean found;
|
||||
|
||||
public DeleteResult(long version, boolean found, int estimatedSizeInBytes) {
|
||||
super(Operation.TYPE.DELETE, version, estimatedSizeInBytes);
|
||||
public DeleteResult(long version, boolean found) {
|
||||
super(Operation.TYPE.DELETE, version);
|
||||
this.found = found;
|
||||
}
|
||||
|
||||
public DeleteResult(Exception failure, long version, int estimatedSizeInBytes) {
|
||||
super(Operation.TYPE.DELETE, failure, version, estimatedSizeInBytes);
|
||||
public DeleteResult(Exception failure, long version) {
|
||||
super(Operation.TYPE.DELETE, failure, version);
|
||||
this.found = false;
|
||||
}
|
||||
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.lucene.store.LockObtainFailedException;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
|
@ -401,9 +402,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Exception documentFailure = extractDocumentFailure(index, e);
|
||||
result = new IndexResult(documentFailure, index.version(),
|
||||
index.estimatedSizeInBytes());
|
||||
result = new IndexResult(checkIfDocumentFailureOrThrow(index, e), index.version());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -412,10 +411,12 @@ public class InternalEngine extends Engine {
|
|||
* Inspects exception thrown when executing index or delete operations
|
||||
*
|
||||
* @return failure if the failure is a document specific failure (e.g. analysis chain failure)
|
||||
* @throws OperationFailedEngineException if the failure caused the engine to fail
|
||||
* @throws ElasticsearchException if the failure caused the engine to fail
|
||||
* (e.g. out of disk, lucene tragic event)
|
||||
*
|
||||
* Note: pkg-private for testing
|
||||
*/
|
||||
private Exception extractDocumentFailure(final Operation operation, final Exception failure) {
|
||||
final Exception checkIfDocumentFailureOrThrow(final Operation operation, final Exception failure) {
|
||||
boolean isDocumentFailure;
|
||||
try {
|
||||
// When indexing a document into Lucene, Lucene distinguishes between environment related errors
|
||||
|
@ -434,8 +435,9 @@ public class InternalEngine extends Engine {
|
|||
if (isDocumentFailure) {
|
||||
return failure;
|
||||
} else {
|
||||
throw new OperationFailedEngineException(shardId, operation.operationType().getLowercase(),
|
||||
operation.type(), operation.id(), failure);
|
||||
ElasticsearchException exception = new ElasticsearchException(failure);
|
||||
exception.setShard(shardId);
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -529,9 +531,10 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
final long expectedVersion = index.version();
|
||||
final IndexResult indexResult;
|
||||
if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) {
|
||||
// skip index operation because of version conflict on recovery
|
||||
return new IndexResult(expectedVersion, false, index.estimatedSizeInBytes());
|
||||
indexResult = new IndexResult(expectedVersion, false);
|
||||
} else {
|
||||
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
|
||||
index.parsedDoc().version().setLongValue(updatedVersion);
|
||||
|
@ -541,20 +544,18 @@ public class InternalEngine extends Engine {
|
|||
} else {
|
||||
update(index.uid(), index.docs(), indexWriter);
|
||||
}
|
||||
IndexResult indexResult = new IndexResult(updatedVersion, deleted, index.estimatedSizeInBytes());
|
||||
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||
location = translog.add(new Translog.Index(index, indexResult));
|
||||
} else {
|
||||
location = null;
|
||||
}
|
||||
indexResult = new IndexResult(updatedVersion, deleted);
|
||||
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
|
||||
? translog.add(new Translog.Index(index, indexResult))
|
||||
: null;
|
||||
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
|
||||
indexResult.setLocation(location);
|
||||
indexResult.setTranslogLocation(location);
|
||||
}
|
||||
indexResult.setTook(System.nanoTime() - index.startTime());
|
||||
indexResult.freeze();
|
||||
return indexResult;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
|
||||
if (docs.size() > 1) {
|
||||
|
@ -580,9 +581,7 @@ public class InternalEngine extends Engine {
|
|||
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
|
||||
result = innerDelete(delete);
|
||||
} catch (Exception e) {
|
||||
Exception documentFailure = extractDocumentFailure(delete, e);
|
||||
result = new DeleteResult(documentFailure, delete.version(),
|
||||
delete.estimatedSizeInBytes());
|
||||
result = new DeleteResult(checkIfDocumentFailureOrThrow(delete, e), delete.version());
|
||||
}
|
||||
maybePruneDeletedTombstones();
|
||||
return result;
|
||||
|
@ -615,29 +614,26 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
final long expectedVersion = delete.version();
|
||||
final DeleteResult deleteResult;
|
||||
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) {
|
||||
// skip executing delete because of version conflict on recovery
|
||||
return new DeleteResult(expectedVersion, true,
|
||||
delete.estimatedSizeInBytes());
|
||||
deleteResult = new DeleteResult(expectedVersion, true);
|
||||
} else {
|
||||
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
|
||||
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
|
||||
DeleteResult deleteResult = new DeleteResult(updatedVersion, found,
|
||||
delete.estimatedSizeInBytes());
|
||||
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
||||
location = translog.add(new Translog.Delete(delete, deleteResult));
|
||||
} else {
|
||||
location = null;
|
||||
}
|
||||
deleteResult = new DeleteResult(updatedVersion, found);
|
||||
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
|
||||
? translog.add(new Translog.Delete(delete, deleteResult))
|
||||
: null;
|
||||
versionMap.putUnderLock(delete.uid().bytes(),
|
||||
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
|
||||
deleteResult.setLocation(location);
|
||||
deleteResult.setTranslogLocation(location);
|
||||
}
|
||||
deleteResult.setTook(System.nanoTime() - delete.startTime());
|
||||
deleteResult.freeze();
|
||||
return deleteResult;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException {
|
||||
final boolean found;
|
||||
|
@ -1117,7 +1113,8 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
|
||||
private IndexWriter createWriter(boolean create) throws IOException {
|
||||
// pkg-private for testing
|
||||
IndexWriter createWriter(boolean create) throws IOException {
|
||||
try {
|
||||
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
|
||||
iwc.setCommitOnClose(false); // we by default don't commit on close
|
||||
|
|
|
@ -1,63 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class OperationFailedEngineException extends EngineException {
|
||||
|
||||
private final String type;
|
||||
|
||||
private final String id;
|
||||
|
||||
public OperationFailedEngineException(ShardId shardId, String operationType, String type, String id, Throwable cause) {
|
||||
super(shardId, operationType + " failed for [" + type + "#" + id + "]", cause);
|
||||
Objects.requireNonNull(type, "type must not be null");
|
||||
Objects.requireNonNull(id, "id must not be null");
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public OperationFailedEngineException(StreamInput in) throws IOException{
|
||||
super(in);
|
||||
type = in.readString();
|
||||
id = in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(type);
|
||||
out.writeString(id);
|
||||
}
|
||||
|
||||
public String type() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
public String id() {
|
||||
return this.id;
|
||||
}
|
||||
}
|
|
@ -1126,8 +1126,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
|
||||
private void verifyPrimary() {
|
||||
if (shardRouting.primary() == false) {
|
||||
// TODO throw a more appropriate exception
|
||||
throw new ShardNotFoundException(shardRouting.shardId(), "shard is not a primary anymore");
|
||||
throw new IllegalStateException("shard is not a primary " + shardRouting);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -201,17 +201,23 @@ public class IndexingMemoryController extends AbstractComponent implements Index
|
|||
|
||||
@Override
|
||||
public void postIndex(Engine.Index index, Engine.IndexResult result) {
|
||||
recordOperationBytes(result);
|
||||
recordOperationBytes(index, result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDelete(Engine.Delete delete, Engine.DeleteResult result) {
|
||||
recordOperationBytes(result);
|
||||
recordOperationBytes(delete, result);
|
||||
}
|
||||
|
||||
/** called by IndexShard to record that this many bytes were written to translog */
|
||||
private void recordOperationBytes(Engine.Result result) {
|
||||
statusChecker.bytesWritten(result.getSizeInBytes());
|
||||
private void recordOperationBytes(Engine.Operation operation, Engine.Result result) {
|
||||
final int sizeInBytes;
|
||||
if (result.getTranslogLocation() != null) {
|
||||
sizeInBytes = result.getSizeInBytes();
|
||||
} else {
|
||||
sizeInBytes = operation.estimatedSizeInBytes();
|
||||
}
|
||||
statusChecker.bytesWritten(sizeInBytes);
|
||||
}
|
||||
|
||||
private static final class ShardAndBytesUsed implements Comparable<ShardAndBytesUsed> {
|
||||
|
|
|
@ -51,7 +51,6 @@ import org.elasticsearch.common.xcontent.XContentLocation;
|
|||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.index.AlreadyExpiredException;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.engine.OperationFailedEngineException;
|
||||
import org.elasticsearch.index.engine.RecoveryEngineException;
|
||||
import org.elasticsearch.index.query.QueryShardException;
|
||||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||
|
@ -400,21 +399,6 @@ public class ExceptionSerializationTests extends ESTestCase {
|
|||
assertEquals("TIMESTAMP", ex.timestamp());
|
||||
}
|
||||
|
||||
public void testIndexFailedEngineException() throws IOException {
|
||||
ShardId id = new ShardId("foo", "_na_", 1);
|
||||
OperationFailedEngineException ex = serialize(new OperationFailedEngineException(id, "index", "type", "id", null));
|
||||
assertEquals(ex.getShardId(), new ShardId("foo", "_na_", 1));
|
||||
assertEquals("type", ex.type());
|
||||
assertEquals("id", ex.id());
|
||||
assertNull(ex.getCause());
|
||||
|
||||
ex = serialize(new OperationFailedEngineException(null, "index", "type", "id", new NullPointerException()));
|
||||
assertNull(ex.getShardId());
|
||||
assertEquals("type", ex.type());
|
||||
assertEquals("id", ex.id());
|
||||
assertTrue(ex.getCause() instanceof NullPointerException);
|
||||
}
|
||||
|
||||
public void testAliasesMissingException() throws IOException {
|
||||
AliasesNotFoundException ex = serialize(new AliasesNotFoundException("one", "two", "three"));
|
||||
assertEquals("aliases [one, two, three] missing", ex.getMessage());
|
||||
|
@ -732,7 +716,7 @@ public class ExceptionSerializationTests extends ESTestCase {
|
|||
ids.put(77, org.elasticsearch.common.util.concurrent.UncategorizedExecutionException.class);
|
||||
ids.put(78, org.elasticsearch.action.TimestampParsingException.class);
|
||||
ids.put(79, org.elasticsearch.action.RoutingMissingException.class);
|
||||
ids.put(80, OperationFailedEngineException.class);
|
||||
ids.put(80, null); // was IndexFailedEngineException, removed in 6.0
|
||||
ids.put(81, org.elasticsearch.index.snapshots.IndexShardRestoreFailedException.class);
|
||||
ids.put(82, org.elasticsearch.repositories.RepositoryException.class);
|
||||
ids.put(83, org.elasticsearch.transport.ReceiveTimeoutTransportException.class);
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.elasticsearch.action.support.PlainActionFuture;
|
|||
import org.elasticsearch.action.support.replication.ReplicationOperation;
|
||||
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
||||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.action.support.replication.TransportWriteActionTestHelper;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -369,7 +368,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary,
|
||||
null);
|
||||
request.primaryTerm(primary.getPrimaryTerm());
|
||||
TransportWriteActionTestHelper.performPostWriteActions(primary, request, indexResult.getLocation(), logger);
|
||||
TransportWriteActionTestHelper.performPostWriteActions(primary, request, indexResult.getTranslogLocation(), logger);
|
||||
IndexResponse response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getVersion(),
|
||||
indexResult.isCreated());
|
||||
return new PrimaryResult(request, response);
|
||||
|
@ -378,7 +377,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
@Override
|
||||
protected void performOnReplica(IndexRequest request, IndexShard replica) {
|
||||
final Engine.IndexResult result = executeIndexRequestOnReplica(request, replica);
|
||||
TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getLocation(), logger);
|
||||
TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -359,7 +359,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
try {
|
||||
indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX);
|
||||
fail("shard shouldn't accept primary ops");
|
||||
} catch (ShardNotFoundException ignored) {
|
||||
} catch (IllegalStateException ignored) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -114,7 +114,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|||
IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger);
|
||||
Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1"));
|
||||
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
|
||||
compositeListener.postDelete(delete, new Engine.DeleteResult(1, true, 0));
|
||||
compositeListener.postDelete(delete, new Engine.DeleteResult(1, true));
|
||||
assertEquals(0, preIndex.get());
|
||||
assertEquals(0, postIndex.get());
|
||||
assertEquals(0, postIndexException.get());
|
||||
|
@ -138,7 +138,7 @@ public class IndexingOperationListenerTests extends ESTestCase{
|
|||
assertEquals(2, postDelete.get());
|
||||
assertEquals(2, postDeleteException.get());
|
||||
|
||||
compositeListener.postIndex(index, new Engine.IndexResult(0, false, 0));
|
||||
compositeListener.postIndex(index, new Engine.IndexResult(0, false));
|
||||
assertEquals(0, preIndex.get());
|
||||
assertEquals(2, postIndex.get());
|
||||
assertEquals(0, postIndexException.get());
|
||||
|
|
Loading…
Reference in New Issue