diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResultHolder.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResultHolder.java deleted file mode 100644 index 3e7ee41b914..00000000000 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResultHolder.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.bulk; - -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.VersionConflictEngineException; - -/** - * A struct-like holder for a bulk items reponse, result, and the resulting - * replica operation to be executed. - */ -class BulkItemResultHolder { - public final @Nullable DocWriteResponse response; - public final @Nullable Engine.Result operationResult; - public final BulkItemRequest replicaRequest; - - BulkItemResultHolder(@Nullable DocWriteResponse response, - @Nullable Engine.Result operationResult, - BulkItemRequest replicaRequest) { - this.response = response; - this.operationResult = operationResult; - this.replicaRequest = replicaRequest; - } - - public boolean isVersionConflict() { - return operationResult == null ? false : - operationResult.getFailure() instanceof VersionConflictEngineException; - } -} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java new file mode 100644 index 00000000000..85ce28d2d52 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java @@ -0,0 +1,345 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.translog.Translog; + +import java.util.Arrays; + +/** + * This is a utility class that holds the per request state needed to perform bulk operations on the primary. + * More specifically, it maintains an index to the current executing bulk item, which allows execution + * to stop and wait for external events such as mapping updates. + */ +class BulkPrimaryExecutionContext { + + enum ItemProcessingState { + /** Item execution is ready to start, no operations have been performed yet */ + INITIAL, + /** + * The incoming request has been translated to a request that can be executed on the shard. + * This is used to convert update requests to a fully specified index or delete requests. + */ + TRANSLATED, + /** + * the request can not execute with the current mapping and should wait for a new mapping + * to arrive from the master. A mapping request for the needed changes has already been + * submitted + */ + WAIT_FOR_MAPPING_UPDATE, + /** + * The request should be executed again, but there is no need to wait for an external event. + * This is needed to support retry on conflicts during updates. + */ + IMMEDIATE_RETRY, + /** The request has been executed on the primary shard (successfully or not) */ + EXECUTED, + /** + * No further handling of current request is needed. The result has been converted to a user response + * and execution can continue to the next item (if available). + */ + COMPLETED + } + + private final BulkShardRequest request; + private final IndexShard primary; + private Translog.Location locationToSync = null; + private int currentIndex = -1; + + private ItemProcessingState currentItemState; + private DocWriteRequest requestToExecute; + private BulkItemResponse executionResult; + private int retryCounter; + + + BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) { + this.request = request; + this.primary = primary; + advance(); + } + + + private int findNextNonAborted(int startIndex) { + final int length = request.items().length; + while (startIndex < length && isAborted(request.items()[startIndex].getPrimaryResponse())) { + startIndex++; + } + return startIndex; + } + + private static boolean isAborted(BulkItemResponse response) { + return response != null && response.isFailed() && response.getFailure().isAborted(); + } + + /** move to the next item to execute */ + private void advance() { + assert currentItemState == ItemProcessingState.COMPLETED || currentIndex == -1 : + "moving to next but current item wasn't completed (state: " + currentItemState + ")"; + currentItemState = ItemProcessingState.INITIAL; + currentIndex = findNextNonAborted(currentIndex + 1); + retryCounter = 0; + requestToExecute = null; + executionResult = null; + assert assertInvariants(ItemProcessingState.INITIAL); + } + + /** gets the current, untranslated item request */ + public DocWriteRequest getCurrent() { + return getCurrentItem().request(); + } + + public BulkShardRequest getBulkShardRequest() { + return request; + } + + /** returns the result of the request that has been executed on the shard */ + public BulkItemResponse getExecutionResult() { + assert assertInvariants(ItemProcessingState.EXECUTED); + return executionResult; + } + + /** returns the number of times the current operation has been retried */ + public int getRetryCounter() { + return retryCounter; + } + + /** returns true if the current request has been executed on the primary */ + public boolean isOperationExecuted() { + return currentItemState == ItemProcessingState.EXECUTED; + } + + /** returns true if the request needs to wait for a mapping update to arrive from the master */ + public boolean requiresWaitingForMappingUpdate() { + return currentItemState == ItemProcessingState.WAIT_FOR_MAPPING_UPDATE; + } + + /** returns true if the current request should be retried without waiting for an external event */ + public boolean requiresImmediateRetry() { + return currentItemState == ItemProcessingState.IMMEDIATE_RETRY; + } + + /** + * returns true if the current request has been completed and it's result translated to a user + * facing response + */ + public boolean isCompleted() { + return currentItemState == ItemProcessingState.COMPLETED; + } + + /** + * returns true if the current request is in INITIAL state + */ + public boolean isInitial() { + return currentItemState == ItemProcessingState.INITIAL; + } + + /** + * returns true if {@link #advance()} has moved the current item beyond the + * end of the {@link BulkShardRequest#items()} array. + */ + public boolean hasMoreOperationsToExecute() { + return currentIndex < request.items().length; + } + + + /** returns the name of the index the current request used */ + public String getConcreteIndex() { + return getCurrentItem().index(); + } + + /** returns any primary response that was set by a previous primary */ + public BulkItemResponse getPreviousPrimaryResponse() { + return getCurrentItem().getPrimaryResponse(); + } + + /** returns a translog location that is needed to be synced in order to persist all operations executed so far */ + public Translog.Location getLocationToSync() { + assert hasMoreOperationsToExecute() == false; + // we always get to the end of the list by using advance, which in turn sets the state to INITIAL + assert assertInvariants(ItemProcessingState.INITIAL); + return locationToSync; + } + + private BulkItemRequest getCurrentItem() { + return request.items()[currentIndex]; + } + + /** returns the primary shard */ + public IndexShard getPrimary() { + return primary; + } + + /** + * sets the request that should actually be executed on the primary. This can be different then the request + * received from the user (specifically, an update request is translated to an indexing or delete request). + */ + public void setRequestToExecute(DocWriteRequest writeRequest) { + assert assertInvariants(ItemProcessingState.INITIAL); + requestToExecute = writeRequest; + currentItemState = ItemProcessingState.TRANSLATED; + assert assertInvariants(ItemProcessingState.TRANSLATED); + } + + /** returns the request that should be executed on the shard. */ + public > T getRequestToExecute() { + assert assertInvariants(ItemProcessingState.TRANSLATED); + return (T) requestToExecute; + } + + /** indicates that the current operation can not be completed and needs to wait for a new mapping from the master */ + public void markAsRequiringMappingUpdate() { + assert assertInvariants(ItemProcessingState.TRANSLATED); + currentItemState = ItemProcessingState.WAIT_FOR_MAPPING_UPDATE; + requestToExecute = null; + assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE); + } + + /** resets the current item state, prepare for a new execution */ + public void resetForExecutionForRetry() { + assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE, ItemProcessingState.EXECUTED); + currentItemState = ItemProcessingState.INITIAL; + requestToExecute = null; + executionResult = null; + assertInvariants(ItemProcessingState.INITIAL); + } + + /** completes the operation without doing anything on the primary */ + public void markOperationAsNoOp(DocWriteResponse response) { + assertInvariants(ItemProcessingState.INITIAL); + executionResult = new BulkItemResponse(getCurrentItem().id(), getCurrentItem().request().opType(), response); + currentItemState = ItemProcessingState.EXECUTED; + assertInvariants(ItemProcessingState.EXECUTED); + } + + /** indicates that the operation needs to be failed as the required mapping didn't arrive in time */ + public void failOnMappingUpdate(Exception cause) { + assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE); + currentItemState = ItemProcessingState.EXECUTED; + final DocWriteRequest docWriteRequest = getCurrentItem().request(); + executionResult = new BulkItemResponse(getCurrentItem().id(), docWriteRequest.opType(), + // Make sure to use getCurrentItem().index() here, if you use docWriteRequest.index() it will use the + // concrete index instead of an alias if used! + new BulkItemResponse.Failure(getCurrentItem().index(), docWriteRequest.type(), docWriteRequest.id(), cause)); + markAsCompleted(executionResult); + } + + /** the current operation has been executed on the primary with the specified result */ + public void markOperationAsExecuted(Engine.Result result) { + assertInvariants(ItemProcessingState.TRANSLATED); + final BulkItemRequest current = getCurrentItem(); + DocWriteRequest docWriteRequest = getRequestToExecute(); + switch (result.getResultType()) { + case SUCCESS: + final DocWriteResponse response; + if (result.getOperationType() == Engine.Operation.TYPE.INDEX) { + Engine.IndexResult indexResult = (Engine.IndexResult) result; + response = new IndexResponse(primary.shardId(), requestToExecute.type(), requestToExecute.id(), + result.getSeqNo(), result.getTerm(), indexResult.getVersion(), indexResult.isCreated()); + } else if (result.getOperationType() == Engine.Operation.TYPE.DELETE) { + Engine.DeleteResult deleteResult = (Engine.DeleteResult) result; + response = new DeleteResponse(primary.shardId(), requestToExecute.type(), requestToExecute.id(), + deleteResult.getSeqNo(), result.getTerm(), deleteResult.getVersion(), deleteResult.isFound()); + + } else { + throw new AssertionError("unknown result type :" + result.getResultType()); + } + executionResult = new BulkItemResponse(current.id(), current.request().opType(), response); + // set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though. + executionResult.getResponse().setShardInfo(new ReplicationResponse.ShardInfo()); + locationToSync = TransportWriteAction.locationToSync(locationToSync, result.getTranslogLocation()); + break; + case FAILURE: + executionResult = new BulkItemResponse(current.id(), docWriteRequest.opType(), + // Make sure to use request.index() here, if you + // use docWriteRequest.index() it will use the + // concrete index instead of an alias if used! + new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), + result.getFailure(), result.getSeqNo())); + break; + default: + throw new AssertionError("unknown result type for " + getCurrentItem() + ": " + result.getResultType()); + } + currentItemState = ItemProcessingState.EXECUTED; + } + + /** finishes the execution of the current request, with the response that should be returned to the user */ + public void markAsCompleted(BulkItemResponse translatedResponse) { + assertInvariants(ItemProcessingState.EXECUTED); + assert executionResult == null || translatedResponse.getItemId() == executionResult.getItemId(); + assert translatedResponse.getItemId() == getCurrentItem().id(); + + if (translatedResponse.isFailed() == false && requestToExecute != getCurrent()) { + request.items()[currentIndex] = new BulkItemRequest(request.items()[currentIndex].id(), requestToExecute); + } + getCurrentItem().setPrimaryResponse(translatedResponse); + currentItemState = ItemProcessingState.COMPLETED; + advance(); + } + + /** builds the bulk shard response to return to the user */ + public BulkShardResponse buildShardResponse() { + assert hasMoreOperationsToExecute() == false; + return new BulkShardResponse(request.shardId(), + Arrays.stream(request.items()).map(BulkItemRequest::getPrimaryResponse).toArray(BulkItemResponse[]::new)); + } + + private boolean assertInvariants(ItemProcessingState... expectedCurrentState) { + assert Arrays.asList(expectedCurrentState).contains(currentItemState): + "expected current state [" + currentItemState + "] to be one of " + Arrays.toString(expectedCurrentState); + assert currentIndex >= 0 : currentIndex; + assert retryCounter >= 0 : retryCounter; + switch (currentItemState) { + case INITIAL: + assert requestToExecute == null : requestToExecute; + assert executionResult == null : executionResult; + break; + case TRANSLATED: + assert requestToExecute != null; + assert executionResult == null : executionResult; + break; + case WAIT_FOR_MAPPING_UPDATE: + assert requestToExecute == null; + assert executionResult == null : executionResult; + break; + case IMMEDIATE_RETRY: + assert requestToExecute != null; + assert executionResult == null : executionResult; + break; + case EXECUTED: + // requestToExecute can be null if the update ended up as NOOP + assert executionResult != null; + break; + case COMPLETED: + assert requestToExecute != null; + assert executionResult != null; + assert getCurrentItem().getPrimaryResponse() != null; + break; + } + return true; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index ed99c739afb..9c134ba4012 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -29,30 +29,34 @@ import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.ReplicationOperation; -import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -60,12 +64,14 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -108,174 +114,167 @@ public class TransportShardBulkAction extends TransportWriteAction shardOperationOnPrimary( - BulkShardRequest request, IndexShard primary) throws Exception { - return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, new ConcreteMappingUpdatePerformer()); + protected WritePrimaryResult shardOperationOnPrimary(BulkShardRequest request, IndexShard primary) + throws Exception { + ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); + CheckedRunnable waitForMappingUpdate = () -> { + PlainActionFuture waitingFuture = new PlainActionFuture<>(); + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + waitingFuture.onResponse(null); + } + + @Override + public void onClusterServiceClose() { + waitingFuture.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + waitingFuture.onFailure( + new MapperException("timed out while waiting for a dynamic mapping update")); + } + }); + waitingFuture.get(); + }; + return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, + new ConcreteMappingUpdatePerformer(), waitForMappingUpdate); } public static WritePrimaryResult performOnPrimary( - BulkShardRequest request, - IndexShard primary, - UpdateHelper updateHelper, - LongSupplier nowInMillisSupplier, - MappingUpdatePerformer mappingUpdater) throws Exception { - final IndexMetaData metaData = primary.indexSettings().getIndexMetaData(); - Translog.Location location = null; - for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) { - if (isAborted(request.items()[requestIndex].getPrimaryResponse()) == false) { - location = executeBulkItemRequest(metaData, primary, request, location, requestIndex, - updateHelper, nowInMillisSupplier, mappingUpdater); - } - } - BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; - BulkItemRequest[] items = request.items(); - for (int i = 0; i < items.length; i++) { - responses[i] = items[i].getPrimaryResponse(); - } - BulkShardResponse response = new BulkShardResponse(request.shardId(), responses); - return new WritePrimaryResult<>(request, response, location, null, primary, logger); + BulkShardRequest request, + IndexShard primary, + UpdateHelper updateHelper, + LongSupplier nowInMillisSupplier, + MappingUpdatePerformer mappingUpdater, + CheckedRunnable waitForMappingUpdate) throws Exception { + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary); + return performOnPrimary(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); } - private static BulkItemResultHolder executeIndexRequest(final IndexRequest indexRequest, - final BulkItemRequest bulkItemRequest, - final IndexShard primary, - final MappingUpdatePerformer mappingUpdater) throws Exception { - Engine.IndexResult indexResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater); - switch (indexResult.getResultType()) { - case SUCCESS: - IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), - indexResult.getSeqNo(), indexResult.getTerm(), indexResult.getVersion(), indexResult.isCreated()); - return new BulkItemResultHolder(response, indexResult, bulkItemRequest); - case FAILURE: - return new BulkItemResultHolder(null, indexResult, bulkItemRequest); - default: - throw new AssertionError("unknown result type for " + indexRequest + ": " + indexResult.getResultType()); - } - } - - private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest deleteRequest, - final BulkItemRequest bulkItemRequest, - final IndexShard primary, - final MappingUpdatePerformer mappingUpdater) throws Exception { - Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater); - switch (deleteResult.getResultType()) { - case SUCCESS: - DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(), - deleteResult.getSeqNo(), deleteResult.getTerm(), deleteResult.getVersion(), deleteResult.isFound()); - return new BulkItemResultHolder(response, deleteResult, bulkItemRequest); - case FAILURE: - return new BulkItemResultHolder(null, deleteResult, bulkItemRequest); - case MAPPING_UPDATE_REQUIRED: - throw new AssertionError("delete operation leaked a mapping update " + deleteRequest); - default: - throw new AssertionError("unknown result type for " + deleteRequest + ": " + deleteResult.getResultType()); - } - } - - static Translog.Location calculateTranslogLocation(final Translog.Location originalLocation, - final BulkItemResultHolder bulkItemResult) { - final Engine.Result operationResult = bulkItemResult.operationResult; - if (operationResult != null && operationResult.getResultType() == Engine.Result.Type.SUCCESS) { - return locationToSync(originalLocation, operationResult.getTranslogLocation()); - } else { - return originalLocation; - } - } - - // Visible for unit testing - /** - * Creates a BulkItemResponse for the primary operation and returns it. If no bulk response is - * needed (because one already exists and the operation failed), then return null. - */ - static BulkItemResponse createPrimaryResponse(BulkItemResultHolder bulkItemResult, - final DocWriteRequest.OpType opType, - BulkShardRequest request) { - final Engine.Result operationResult = bulkItemResult.operationResult; - final DocWriteResponse response = bulkItemResult.response; - final BulkItemRequest replicaRequest = bulkItemResult.replicaRequest; - - if (operationResult == null) { // in case of noop update operation - assert response.getResult() == DocWriteResponse.Result.NOOP : "only noop updates can have a null operation"; - return new BulkItemResponse(replicaRequest.id(), opType, response); - - } else if (operationResult.getResultType() == Engine.Result.Type.SUCCESS) { - BulkItemResponse primaryResponse = new BulkItemResponse(replicaRequest.id(), opType, response); - // set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though. - primaryResponse.getResponse().setShardInfo(new ShardInfo()); - return primaryResponse; - - } else if (operationResult.getResultType() == Engine.Result.Type.FAILURE) { - DocWriteRequest docWriteRequest = replicaRequest.request(); - Exception failure = operationResult.getFailure(); - if (isConflictException(failure)) { - logger.trace(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", - request.shardId(), docWriteRequest.opType().getLowercase(), request), failure); - } else { - logger.debug(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", - request.shardId(), docWriteRequest.opType().getLowercase(), request), failure); - } - - // if it's a conflict failure, and we already executed the request on a primary (and we execute it - // again, due to primary relocation and only processing up to N bulk items when the shard gets closed) - // then just use the response we got from the failed execution - if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) { - return new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(), - // Make sure to use request.index() here, if you - // use docWriteRequest.index() it will use the - // concrete index instead of an alias if used! - new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), - failure, operationResult.getSeqNo())); - } else { - assert replicaRequest.getPrimaryResponse() != null : "replica request must have a primary response"; - return null; - } - } else { - throw new AssertionError("unknown result type for " + request + ": " + operationResult.getResultType()); + private static WritePrimaryResult performOnPrimary( + BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, + MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) throws Exception { + + while (context.hasMoreOperationsToExecute()) { + executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); + assert context.isInitial(); // either completed and moved to next or reset } + return new WritePrimaryResult<>(context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), + null, context.getPrimary(), logger); } /** Executes bulk item requests and handles request execution exceptions */ - static Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexShard primary, - BulkShardRequest request, Translog.Location location, - int requestIndex, UpdateHelper updateHelper, - LongSupplier nowInMillisSupplier, - final MappingUpdatePerformer mappingUpdater) throws Exception { - final DocWriteRequest itemRequest = request.items()[requestIndex].request(); - final DocWriteRequest.OpType opType = itemRequest.opType(); - final BulkItemResultHolder responseHolder; - switch (itemRequest.opType()) { - case CREATE: - case INDEX: - responseHolder = executeIndexRequest((IndexRequest) itemRequest, - request.items()[requestIndex], primary, mappingUpdater); - break; - case UPDATE: - responseHolder = executeUpdateRequest((UpdateRequest) itemRequest, primary, metaData, request, - requestIndex, updateHelper, nowInMillisSupplier, mappingUpdater); - break; - case DELETE: - responseHolder = executeDeleteRequest((DeleteRequest) itemRequest, request.items()[requestIndex], primary, mappingUpdater); - break; - default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found"); + static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, + MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) + throws Exception { + final DocWriteRequest.OpType opType = context.getCurrent().opType(); + + final UpdateHelper.Result updateResult; + if (opType == DocWriteRequest.OpType.UPDATE) { + final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); + try { + updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier); + } catch (Exception failure) { + // we may fail translating a update to index or delete operation + // we use index result to communicate failure while translating update request + final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO); + context.setRequestToExecute(updateRequest); + context.markOperationAsExecuted(result); + context.markAsCompleted(context.getExecutionResult()); + return; + } + // execute translated update request + switch (updateResult.getResponseResult()) { + case CREATED: + case UPDATED: + IndexRequest indexRequest = updateResult.action(); + IndexMetaData metaData = context.getPrimary().indexSettings().getIndexMetaData(); + MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type()); + indexRequest.process(metaData.getCreationVersion(), mappingMd, updateRequest.concreteIndex()); + context.setRequestToExecute(indexRequest); + break; + case DELETED: + context.setRequestToExecute(updateResult.action()); + break; + case NOOP: + context.markOperationAsNoOp(updateResult.action()); + context.markAsCompleted(context.getExecutionResult()); + return; + default: + throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult()); + } + } else { + context.setRequestToExecute(context.getCurrent()); + updateResult = null; } - final BulkItemRequest replicaRequest = responseHolder.replicaRequest; + assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state - // update the bulk item request because update request execution can mutate the bulk item request - request.items()[requestIndex] = replicaRequest; - - // Retrieve the primary response, and update the replica request with the primary's response - BulkItemResponse primaryResponse = createPrimaryResponse(responseHolder, opType, request); - if (primaryResponse != null) { - replicaRequest.setPrimaryResponse(primaryResponse); + if (context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE) { + executeDeleteRequestOnPrimary(context, mappingUpdater); + } else { + executeIndexRequestOnPrimary(context, mappingUpdater); } - // Update the translog with the new location, if needed - return calculateTranslogLocation(location, responseHolder); + if (context.requiresWaitingForMappingUpdate()) { + try { + waitForMappingUpdate.run(); + context.resetForExecutionForRetry(); + } catch (Exception e) { + context.failOnMappingUpdate(e); + } + return; + } + + assert context.isOperationExecuted(); + + if (opType == DocWriteRequest.OpType.UPDATE && + context.getExecutionResult().isFailed() && + isConflictException(context.getExecutionResult().getFailure().getCause())) { + final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); + if (context.getRetryCounter() < updateRequest.retryOnConflict()) { + context.resetForExecutionForRetry(); + return; + } + } + + finalizePrimaryOperationOnCompletion(context, opType, updateResult); } - private static boolean isAborted(BulkItemResponse response) { - return response != null && response.isFailed() && response.getFailure().isAborted(); + private static void finalizePrimaryOperationOnCompletion(BulkPrimaryExecutionContext context, DocWriteRequest.OpType opType, + UpdateHelper.Result updateResult) { + final BulkItemResponse executionResult = context.getExecutionResult(); + if (opType == DocWriteRequest.OpType.UPDATE) { + final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); + context.markAsCompleted( + processUpdateResponse(updateRequest, context.getConcreteIndex(), executionResult, updateResult)); + } else if (executionResult.isFailed()) { + final Exception failure = executionResult.getFailure().getCause(); + final DocWriteRequest docWriteRequest = context.getCurrent(); + if (TransportShardBulkAction.isConflictException(failure)) { + logger.trace(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", + context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure); + } else { + logger.debug(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", + context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure); + } + + final BulkItemResponse primaryResponse; + // if it's a conflict failure, and we already executed the request on a primary (and we execute it + // again, due to primary relocation and only processing up to N bulk items when the shard gets closed) + // then just use the response we got from the failed execution + if (TransportShardBulkAction.isConflictException(failure) && context.getPreviousPrimaryResponse() != null) { + primaryResponse = context.getPreviousPrimaryResponse(); + } else { + primaryResponse = executionResult; + } + context.markAsCompleted(primaryResponse); + } else { + context.markAsCompleted(executionResult); + } + assert context.isInitial(); } private static boolean isConflictException(final Exception e) { @@ -285,150 +284,50 @@ public class TransportShardBulkAction extends TransportWriteAction> sourceAndContent = + if (updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) { + final BytesReference indexSourceAsBytes = updateIndexRequest.source(); + final Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true, updateIndexRequest.getContentType()); - updateResponse.setGetResult(UpdateHelper.extractGetResult(updateRequest, concreteIndex, - indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); - } - // set translated request as replica request - replicaRequest = new BulkItemRequest(bulkReqId, updateIndexRequest); - - } else if (opType == Engine.Operation.TYPE.DELETE) { - assert result instanceof Engine.DeleteResult : result.getClass(); - final DeleteRequest updateDeleteRequest = translate.action(); - - final DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(), updateDeleteRequest.type(), updateDeleteRequest.id(), - result.getSeqNo(), result.getTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound()); - - updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(), + updateResponse.setGetResult(UpdateHelper.extractGetResult(updateRequest, concreteIndex, + indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); + } + } else if (translatedResult == DocWriteResponse.Result.DELETED) { + final DeleteResponse deleteResponse = operationResponse.getResponse(); + updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(), deleteResponse.getVersion(), deleteResponse.getResult()); - final GetResult getResult = UpdateHelper.extractGetResult(updateRequest, concreteIndex, deleteResponse.getVersion(), + final GetResult getResult = UpdateHelper.extractGetResult(updateRequest, concreteIndex, deleteResponse.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), null); - updateResponse.setGetResult(getResult); - // set translated request as replica request - replicaRequest = new BulkItemRequest(bulkReqId, updateDeleteRequest); - - } else { - throw new IllegalArgumentException("unknown operation type: " + opType); - } - - return new BulkItemResultHolder(updateResponse, result, replicaRequest); - } - - /** - * Executes update request once, delegating to a index or delete operation after translation. - * NOOP updates are indicated by returning a null operation in {@link BulkItemResultHolder} - */ - static BulkItemResultHolder executeUpdateRequestOnce(UpdateRequest updateRequest, IndexShard primary, - IndexMetaData metaData, String concreteIndex, - UpdateHelper updateHelper, LongSupplier nowInMillis, - BulkItemRequest primaryItemRequest, int bulkReqId, - final MappingUpdatePerformer mappingUpdater) throws Exception { - final UpdateHelper.Result translate; - // translate update request - try { - translate = updateHelper.prepare(updateRequest, primary, nowInMillis); - } catch (Exception failure) { - // we may fail translating a update to index or delete operation - // we use index result to communicate failure while translating update request - final Engine.Result result = primary.getFailedIndexResult(failure, updateRequest.version()); - return new BulkItemResultHolder(null, result, primaryItemRequest); - } - - final Engine.Result result; - // execute translated update request - switch (translate.getResponseResult()) { - case CREATED: - case UPDATED: - IndexRequest indexRequest = translate.action(); - MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type()); - indexRequest.process(metaData.getCreationVersion(), mappingMd, concreteIndex); - result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater); - break; - case DELETED: - DeleteRequest deleteRequest = translate.action(); - result = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater); - break; - case NOOP: - primary.noopUpdate(updateRequest.type()); - result = null; - break; - default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult()); - } - - if (result == null) { - // this is a noop operation - final UpdateResponse updateResponse = translate.action(); - return new BulkItemResultHolder(updateResponse, result, primaryItemRequest); - } else if (result.getResultType() == Engine.Result.Type.FAILURE) { - // There was a result, and the result was a failure - return new BulkItemResultHolder(null, result, primaryItemRequest); - } else if (result.getResultType() == Engine.Result.Type.SUCCESS) { - // It was successful, we need to construct the response and return it - return processUpdateResponse(updateRequest, concreteIndex, result, translate, primary, bulkReqId); - } else { - throw new AssertionError("unknown result type for " + updateRequest + ": " + result.getResultType()); - } - } - - /** - * Executes update request, delegating to a index or delete operation after translation, - * handles retries on version conflict and constructs update response - * NOOP updates are indicated by returning a null operation - * in {@link BulkItemResultHolder} - */ - private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateRequest, IndexShard primary, - IndexMetaData metaData, BulkShardRequest request, - int requestIndex, UpdateHelper updateHelper, - LongSupplier nowInMillis, - final MappingUpdatePerformer mappingUpdater) throws Exception { - BulkItemRequest primaryItemRequest = request.items()[requestIndex]; - assert primaryItemRequest.request() == updateRequest - : "expected bulk item request to contain the original update request, got: " + - primaryItemRequest.request() + " and " + updateRequest; - - BulkItemResultHolder holder = null; - // There must be at least one attempt - int maxAttempts = Math.max(1, updateRequest.retryOnConflict()); - for (int attemptCount = 0; attemptCount < maxAttempts; attemptCount++) { - - holder = executeUpdateRequestOnce(updateRequest, primary, metaData, request.index(), updateHelper, - nowInMillis, primaryItemRequest, request.items()[requestIndex].id(), mappingUpdater); - - // It was either a successful request, or it was a non-conflict failure - if (holder.isVersionConflict() == false) { - return holder; + updateResponse.setGetResult(getResult); + } else { + throw new IllegalArgumentException("unknown operation type: " + translatedResult); } + response = new BulkItemResponse(operationResponse.getItemId(), DocWriteRequest.OpType.UPDATE, updateResponse); } - // We ran out of tries and haven't returned a valid bulk item response, so return the last one generated - return holder; + return response; } + /** Modes for executing item request on replica depending on corresponding primary execution result */ public enum ReplicaItemExecutionMode { @@ -451,6 +350,7 @@ public class TransportShardBulkAction extends TransportWriteAction primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse, request.getAutoGeneratedTimestamp(), request.isRetry()), e -> primary.getFailedIndexResult(e, request.version()), - mappingUpdater); + context::markOperationAsExecuted, + mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type())); } - private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary, - MappingUpdatePerformer mappingUpdater) throws Exception { - return executeOnPrimaryWhileHandlingMappingUpdates(primary.shardId(), request.type(), + private static void executeDeleteRequestOnPrimary(BulkPrimaryExecutionContext context, + MappingUpdatePerformer mappingUpdater) throws Exception { + final DeleteRequest request = context.getRequestToExecute(); + final IndexShard primary = context.getPrimary(); + executeOnPrimaryWhileHandlingMappingUpdates(context, () -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType()), e -> primary.getFailedDeleteResult(e, request.version()), - mappingUpdater); + context::markOperationAsExecuted, + mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type())); } - private static T executeOnPrimaryWhileHandlingMappingUpdates(ShardId shardId, String type, - CheckedSupplier toExecute, - Function onError, - MappingUpdatePerformer mappingUpdater) + private static void executeOnPrimaryWhileHandlingMappingUpdates( + BulkPrimaryExecutionContext context, CheckedSupplier toExecute, + Function exceptionToResult, Consumer onComplete, Consumer mappingUpdater) throws IOException { T result = toExecute.get(); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { // try to update the mappings and try again. try { - mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), shardId, type); + mappingUpdater.accept(result.getRequiredMappingUpdate()); } catch (Exception e) { // failure to update the mapping should translate to a failure of specific requests. Other requests // still need to be executed and replicated. - return onError.apply(e); + onComplete.accept(exceptionToResult.apply(e)); + return; } + // TODO - we can fall back to a wait for cluster state update but I'm keeping the logic the same for now result = toExecute.get(); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { // double mapping update. We assume that the successful mapping update wasn't yet processed on the node // and retry the entire request again. - throw new ReplicationOperation.RetryOnPrimaryException(shardId, - "Dynamic mappings are not available on the node that holds the primary yet"); + context.markAsRequiringMappingUpdate(); + } else { + onComplete.accept(result); } + } else { + onComplete.accept(result); } - assert result.getFailure() instanceof ReplicationOperation.RetryOnPrimaryException == false : - "IndexShard shouldn't use RetryOnPrimaryException. got " + result.getFailure(); - return result; - } class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index ca91a32a17a..ae029ce3f93 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -83,7 +83,7 @@ public abstract class TransportWriteAction< return location; } - protected static Location locationToSync(Location current, Location next) { + public static Location locationToSync(Location current, Location next) { /* here we are moving forward in the translog with each operation. Under the hood this might * cross translog files which is ok since from the user perspective the translog is like a * tape where only the highest location needs to be fsynced in order to sync all previous diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContextTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContextTests.java new file mode 100644 index 00000000000..de7444fac09 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContextTests.java @@ -0,0 +1,158 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.TransportShardBulkActionTests.FakeDeleteResult; +import org.elasticsearch.action.bulk.TransportShardBulkActionTests.FakeIndexResult; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class BulkPrimaryExecutionContextTests extends ESTestCase { + + public void testAbortedSkipped() { + BulkShardRequest shardRequest = generateRandomRequest(); + + ArrayList> nonAbortedRequests = new ArrayList<>(); + for (BulkItemRequest request : shardRequest.items()) { + if (randomBoolean()) { + request.abort("index", new ElasticsearchException("bla")); + } else { + nonAbortedRequests.add(request.request()); + } + } + + ArrayList> visitedRequests = new ArrayList<>(); + for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, null); + context.hasMoreOperationsToExecute(); + ) { + visitedRequests.add(context.getCurrent()); + context.setRequestToExecute(context.getCurrent()); + // using failures prevents caring about types + context.markOperationAsExecuted(new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1)); + context.markAsCompleted(context.getExecutionResult()); + } + + assertThat(visitedRequests, equalTo(nonAbortedRequests)); + } + + private BulkShardRequest generateRandomRequest() { + BulkItemRequest[] items = new BulkItemRequest[randomInt(20)]; + for (int i = 0; i < items.length; i++) { + final DocWriteRequest request; + switch (randomFrom(DocWriteRequest.OpType.values())) { + case INDEX: + request = new IndexRequest("index", "_doc", "id_" + i); + break; + case CREATE: + request = new IndexRequest("index", "_doc", "id_" + i).create(true); + break; + case UPDATE: + request = new UpdateRequest("index", "_doc", "id_" + i); + break; + case DELETE: + request = new DeleteRequest("index", "_doc", "id_" + i); + break; + default: + throw new AssertionError("unknown type"); + } + items[i] = new BulkItemRequest(i, request); + } + return new BulkShardRequest(new ShardId("index", "_na_", 0), + randomFrom(WriteRequest.RefreshPolicy.values()), items); + } + + public void testTranslogLocation() { + + BulkShardRequest shardRequest = generateRandomRequest(); + + Translog.Location expectedLocation = null; + final IndexShard primary = mock(IndexShard.class); + when(primary.shardId()).thenReturn(shardRequest.shardId()); + + long translogGen = 0; + long translogOffset = 0; + + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, primary); + while (context.hasMoreOperationsToExecute()) { + final Engine.Result result; + final DocWriteRequest current = context.getCurrent(); + final boolean failure = rarely(); + if (frequently()) { + translogGen += randomIntBetween(1, 4); + translogOffset = 0; + } else { + translogOffset += randomIntBetween(200, 400); + } + + Translog.Location location = new Translog.Location(translogGen, translogOffset, randomInt(200)); + switch (current.opType()) { + case INDEX: + case CREATE: + context.setRequestToExecute(current); + if (failure) { + result = new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1); + } else { + result = new FakeIndexResult(1, 1, randomLongBetween(0, 200), randomBoolean(), location); + } + break; + case UPDATE: + context.setRequestToExecute(new IndexRequest(current.index(), current.type(), current.id())); + if (failure) { + result = new Engine.IndexResult(new ElasticsearchException("bla"), 1, 1, 1); + } else { + result = new FakeIndexResult(1, 1, randomLongBetween(0, 200), randomBoolean(), location); + } + break; + case DELETE: + context.setRequestToExecute(current); + if (failure) { + result = new Engine.DeleteResult(new ElasticsearchException("bla"), 1, 1); + } else { + result = new FakeDeleteResult(1, 1, randomLongBetween(0, 200), randomBoolean(), location); + } + break; + default: + throw new AssertionError("unknown type:" + current.opType()); + } + if (failure == false) { + expectedLocation = location; + } + context.markOperationAsExecuted(result); + context.markAsCompleted(context.getExecutionResult()); + } + + assertThat(context.getLocationToSync(), equalTo(expectedLocation)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java index 0cdc9db916c..8a7c46ebcf6 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java @@ -405,16 +405,20 @@ public class BulkWithUpdatesIT extends ESIntegTestCase { assertThat("expected no failures but got: " + response.buildFailureMessage(), response.hasFailures(), equalTo(false)); assertThat(response.getItems().length, equalTo(numDocs)); for (int i = 0; i < numDocs; i++) { - assertThat(response.getItems()[i].getItemId(), equalTo(i)); - assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i))); - assertThat(response.getItems()[i].getIndex(), equalTo("test")); - assertThat(response.getItems()[i].getType(), equalTo("type1")); - assertThat(response.getItems()[i].getOpType(), equalTo(OpType.UPDATE)); + final BulkItemResponse itemResponse = response.getItems()[i]; + assertThat(itemResponse.getFailure(), nullValue()); + assertThat(itemResponse.isFailed(), equalTo(false)); + assertThat(itemResponse.getItemId(), equalTo(i)); + assertThat(itemResponse.getId(), equalTo(Integer.toString(i))); + assertThat(itemResponse.getIndex(), equalTo("test")); + assertThat(itemResponse.getType(), equalTo("type1")); + assertThat(itemResponse.getOpType(), equalTo(OpType.UPDATE)); for (int j = 0; j < 5; j++) { GetResponse getResponse = client().prepareGet("test", "type1", Integer.toString(i)).get(); assertThat(getResponse.isExists(), equalTo(false)); } } + assertThat(response.hasFailures(), equalTo(false)); } public void testBulkIndexingWhileInitializing() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index bbe25ea02d6..e60ee1395a8 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; -import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.TransportWriteAction.WritePrimaryResult; import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.action.update.UpdateRequest; @@ -39,12 +38,13 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.MetadataFieldMapper; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; @@ -52,10 +52,9 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.rest.RestStatus; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.LongSupplier; import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode; import static org.hamcrest.CoreMatchers.equalTo; @@ -63,103 +62,103 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TransportShardBulkActionTests extends IndexShardTestCase { private final ShardId shardId = new ShardId("index", "_na_", 0); private final Settings idxSettings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0) - .put("index.version.created", Version.CURRENT.id) - .build(); + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT.id) + .build(); private IndexMetaData indexMetaData() throws IOException { return IndexMetaData.builder("index") - .putMapping("_doc", - "{\"properties\":{\"foo\":{\"type\":\"text\",\"fields\":" + - "{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}}}}") - .settings(idxSettings) - .primaryTerm(0, 1).build(); + .putMapping("_doc", + "{\"properties\":{\"foo\":{\"type\":\"text\",\"fields\":" + + "{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}}}}") + .settings(idxSettings) + .primaryTerm(0, 1).build(); } public void testShouldExecuteReplicaItem() throws Exception { // Successful index request should be replicated DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 17, 1, randomBoolean()); BulkItemRequest request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response)); assertThat(replicaItemExecutionMode(request, 0), - equalTo(ReplicaItemExecutionMode.NORMAL)); + equalTo(ReplicaItemExecutionMode.NORMAL)); // Failed index requests without sequence no should not be replicated writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse( - new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, - new BulkItemResponse.Failure("index", "type", "id", - new IllegalArgumentException("i died")))); + new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("index", "type", "id", + new IllegalArgumentException("i died")))); assertThat(replicaItemExecutionMode(request, 0), - equalTo(ReplicaItemExecutionMode.NOOP)); + equalTo(ReplicaItemExecutionMode.NOOP)); // Failed index requests with sequence no should be replicated request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse( - new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, - new BulkItemResponse.Failure("index", "type", "id", - new IllegalArgumentException( - "i died after sequence no was generated"), - 1))); + new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("index", "type", "id", + new IllegalArgumentException( + "i died after sequence no was generated"), + 1))); assertThat(replicaItemExecutionMode(request, 0), - equalTo(ReplicaItemExecutionMode.FAILURE)); + equalTo(ReplicaItemExecutionMode.FAILURE)); // NOOP requests should not be replicated DocWriteRequest updateRequest = new UpdateRequest("index", "type", "id"); response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP); request = new BulkItemRequest(0, updateRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE, - response)); + response)); assertThat(replicaItemExecutionMode(request, 0), - equalTo(ReplicaItemExecutionMode.NOOP)); + equalTo(ReplicaItemExecutionMode.NOOP)); } - public void testExecuteBulkIndexRequest() throws Exception { - IndexMetaData metaData = indexMetaData(); IndexShard shard = newStartedShard(true); BulkItemRequest[] items = new BulkItemRequest[1]; boolean create = randomBoolean(); DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE) - .create(create); + .create(create); BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); items[0] = primaryRequest; BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - Translog.Location location = new Translog.Location(0, 0, 0); UpdateHelper updateHelper = null; - - Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, - shard, bulkShardRequest, location, 0, updateHelper, - threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer()); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); // Translog should change, since there were no problems - assertThat(newLocation, not(location)); + assertNotNull(context.getLocationToSync()); BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), - equalTo(create ? DocWriteRequest.OpType.CREATE : DocWriteRequest.OpType.INDEX)); + equalTo(create ? DocWriteRequest.OpType.CREATE : DocWriteRequest.OpType.INDEX)); assertFalse(primaryResponse.isFailed()); // Assert that the document actually made it there @@ -170,13 +169,12 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { items[0] = primaryRequest; bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - Translog.Location secondLocation = - TransportShardBulkAction.executeBulkItemRequest( metaData, - shard, bulkShardRequest, newLocation, 0, updateHelper, - threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail"))); + BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(secondContext, updateHelper, + threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); - // Translog should not change, since the document was not indexed due to a version conflict - assertThat(secondLocation, equalTo(newLocation)); + assertNull(secondContext.getLocationToSync()); BulkItemRequest replicaRequest = bulkShardRequest.items()[0]; @@ -194,7 +192,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { assertThat(failure.getId(), equalTo("id")); assertThat(failure.getCause().getClass(), equalTo(VersionConflictEngineException.class)); assertThat(failure.getCause().getMessage(), - containsString("version conflict, document already exists (current version [1])")); + containsString("version conflict, document already exists (current version [1])")); assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT)); assertThat(replicaRequest, equalTo(primaryRequest)); @@ -224,7 +222,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { UpdateHelper updateHelper = null; WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( - bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer()); + bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), + () -> {}); // since at least 1 item passed, the tran log location should exist, assertThat(result.location, notNullValue()); @@ -254,52 +253,85 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { closeShards(shard); } - public void testExecuteBulkIndexRequestWithRejection() throws Exception { - IndexMetaData metaData = indexMetaData(); - IndexShard shard = newStartedShard(true); + public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { BulkItemRequest[] items = new BulkItemRequest[1]; DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); items[0] = new BulkItemRequest(0, writeRequest); BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - Translog.Location location = new Translog.Location(0, 0, 0); - UpdateHelper updateHelper = null; + Engine.IndexResult mappingUpdate = + new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap())); + Translog.Location resultLocation = new Translog.Location(42, 42, 42); + Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation); - // Pretend the mappings haven't made it to the node yet, and throw a rejection - expectThrows(ReplicationOperation.RetryOnPrimaryException.class, - () -> TransportShardBulkAction.executeBulkItemRequest(metaData, shard, bulkShardRequest, - location, 0, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer())); + IndexShard shard = mock(IndexShard.class); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(mappingUpdate); + + // Pretend the mappings haven't made it to the node yet + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + AtomicInteger updateCalled = new AtomicInteger(); + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + (update, shardId, type) -> { + // There should indeed be a mapping update + assertNotNull(update); + updateCalled.incrementAndGet(); + }, () -> {}); + assertTrue(context.isInitial()); + assertTrue(context.hasMoreOperationsToExecute()); + + assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1)); + + // Verify that the shard "executed" the operation twice + verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean()); + + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(success); + + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + (update, shardId, type) -> fail("should not have had to update the mappings"), () -> {}); + + + // Verify that the shard "executed" the operation only once (2 for previous invocations plus + // 1 for this execution) + verify(shard, times(3)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean()); + + + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + + assertThat(primaryResponse.getItemId(), equalTo(0)); + assertThat(primaryResponse.getId(), equalTo("id")); + assertThat(primaryResponse.getOpType(), equalTo(writeRequest.opType())); + assertFalse(primaryResponse.isFailed()); closeShards(shard); } public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Exception { - IndexMetaData metaData = indexMetaData(); IndexShard shard = newStartedShard(true); BulkItemRequest[] items = new BulkItemRequest[1]; DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); items[0] = new BulkItemRequest(0, writeRequest); - BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - Translog.Location location = new Translog.Location(0, 0, 0); UpdateHelper updateHelper = null; - // Return an exception when trying to update the mapping + // Return an exception when trying to update the mapping, or when waiting for it to come RuntimeException err = new RuntimeException("some kind of exception"); - Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, - shard, bulkShardRequest, location, 0, updateHelper, - threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(err)); + boolean errorOnWait = randomBoolean(); - // Translog shouldn't change, as there were conflicting mappings - assertThat(newLocation, equalTo(location)); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(), + errorOnWait ? () -> { throw err; } : () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); + + // Translog shouldn't be synced, as there were conflicting mappings + assertThat(context.getLocationToSync(), nullValue()); BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); @@ -320,24 +352,24 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { } public void testExecuteBulkDeleteRequest() throws Exception { - IndexMetaData metaData = indexMetaData(); IndexShard shard = newStartedShard(true); BulkItemRequest[] items = new BulkItemRequest[1]; DocWriteRequest writeRequest = new DeleteRequest("index", "_doc", "id"); items[0] = new BulkItemRequest(0, writeRequest); BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); Translog.Location location = new Translog.Location(0, 0, 0); UpdateHelper updateHelper = null; - Translog.Location newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, - shard, bulkShardRequest, location, 0, updateHelper, - threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer()); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); // Translog changes, even though the document didn't exist - assertThat(newLocation, not(location)); + assertThat(context.getLocationToSync(), not(location)); BulkItemRequest replicaRequest = bulkShardRequest.items()[0]; DocWriteRequest replicaDeleteRequest = replicaRequest.request(); @@ -369,14 +401,15 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { items[0] = new BulkItemRequest(0, writeRequest); bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - location = newLocation; + location = context.getLocationToSync(); - newLocation = TransportShardBulkAction.executeBulkItemRequest(metaData, shard, - bulkShardRequest, location, 0, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer()); + context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); // Translog changes, because the document was deleted - assertThat(newLocation, not(location)); + assertThat(context.getLocationToSync(), not(location)); replicaRequest = bulkShardRequest.items()[0]; replicaDeleteRequest = replicaRequest.request(); @@ -405,63 +438,79 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { closeShards(shard); } - public void testNoopUpdateReplicaRequest() throws Exception { - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "field", "value"); - BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest); + public void testNoopUpdateRequest() throws Exception { + DocWriteRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); DocWriteResponse noopUpdateResponse = new UpdateResponse(shardId, "_doc", "id", 0, - DocWriteResponse.Result.NOOP); - BulkItemResultHolder noopResults = new BulkItemResultHolder(noopUpdateResponse, null, - replicaRequest); + DocWriteResponse.Result.NOOP); - Translog.Location location = new Translog.Location(0, 0, 0); - BulkItemRequest[] items = new BulkItemRequest[0]; + IndexShard shard = mock(IndexShard.class); + + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result(noopUpdateResponse, DocWriteResponse.Result.NOOP, + Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); + + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - BulkItemResponse primaryResponse = TransportShardBulkAction.createPrimaryResponse( - noopResults, DocWriteRequest.OpType.UPDATE, bulkShardRequest); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); - Translog.Location newLocation = - TransportShardBulkAction.calculateTranslogLocation(location, noopResults); + assertFalse(context.hasMoreOperationsToExecute()); // Basically nothing changes in the request since it's a noop - assertThat(newLocation, equalTo(location)); + assertThat(context.getLocationToSync(), nullValue()); + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); assertThat(primaryResponse.getResponse(), equalTo(noopUpdateResponse)); assertThat(primaryResponse.getResponse().getResult(), - equalTo(DocWriteResponse.Result.NOOP)); + equalTo(DocWriteResponse.Result.NOOP)); + assertThat(primaryResponse.getResponse().getSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); } - public void testUpdateReplicaRequestWithFailure() throws Exception { - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE); - BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest); + public void testUpdateRequestWithFailure() throws Exception { + IndexSettings indexSettings = new IndexSettings(indexMetaData(), Settings.EMPTY); + DocWriteRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); + + IndexRequest updateResponse = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); Exception err = new ElasticsearchException("I'm dead <(x.x)>"); Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0); - BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult, - replicaRequest); + IndexShard shard = mock(IndexShard.class); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(indexResult); + when(shard.indexSettings()).thenReturn(indexSettings); - Translog.Location location = new Translog.Location(0, 0, 0); - BulkItemRequest[] items = new BulkItemRequest[0]; + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result(updateResponse, randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, + Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); + + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - BulkItemResponse primaryResponse = - TransportShardBulkAction.createPrimaryResponse( - failedResults, DocWriteRequest.OpType.UPDATE, bulkShardRequest); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - Translog.Location newLocation = - TransportShardBulkAction.calculateTranslogLocation(location, failedResults); + + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); // Since this was not a conflict failure, the primary response // should be filled out with the failure information - assertThat(newLocation, equalTo(location)); + assertNull(context.getLocationToSync()); + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); - assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); assertTrue(primaryResponse.isFailed()); assertThat(primaryResponse.getFailureMessage(), containsString("I'm dead <(x.x)>")); BulkItemResponse.Failure failure = primaryResponse.getFailure(); @@ -472,33 +521,42 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { assertThat(failure.getStatus(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); } - public void testUpdateReplicaRequestWithConflictFailure() throws Exception { - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE); - BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest); + + public void testUpdateRequestWithConflictFailure() throws Exception { + IndexSettings indexSettings = new IndexSettings(indexMetaData(), Settings.EMPTY); + DocWriteRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); + + IndexRequest updateResponse = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); Exception err = new VersionConflictEngineException(shardId, "_doc", "id", - "I'm conflicted <(;_;)>"); + "I'm conflicted <(;_;)>"); Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0); - BulkItemResultHolder failedResults = new BulkItemResultHolder(null, indexResult, - replicaRequest); + IndexShard shard = mock(IndexShard.class); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(indexResult); + when(shard.indexSettings()).thenReturn(indexSettings); - Translog.Location location = new Translog.Location(0, 0, 0); - BulkItemRequest[] items = new BulkItemRequest[0]; + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result(updateResponse, randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, + Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); + + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - BulkItemResponse primaryResponse = - TransportShardBulkAction.createPrimaryResponse( - failedResults, DocWriteRequest.OpType.UPDATE, bulkShardRequest); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - Translog.Location newLocation = - TransportShardBulkAction.calculateTranslogLocation(location, failedResults); - // Since this was not a conflict failure, the primary response - // should be filled out with the failure information - assertThat(newLocation, equalTo(location)); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); + + assertNull(context.getLocationToSync()); + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); - assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); assertTrue(primaryResponse.isFailed()); assertThat(primaryResponse.getFailureMessage(), containsString("I'm conflicted <(;_;)>")); BulkItemResponse.Failure failure = primaryResponse.getFailure(); @@ -509,61 +567,162 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT)); } - public void testUpdateReplicaRequestWithSuccess() throws Exception { - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE); - BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest); + public void testUpdateRequestWithSuccess() throws Exception { + IndexSettings indexSettings = new IndexSettings(indexMetaData(), Settings.EMPTY); + DocWriteRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); + + IndexRequest updateResponse = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); boolean created = randomBoolean(); Translog.Location resultLocation = new Translog.Location(42, 42, 42); - Engine.IndexResult indexResult = new FakeResult(1, 1, 1, created, resultLocation); - DocWriteResponse indexResponse = new IndexResponse(shardId, "_doc", "id", 1, 17, 1, created); - BulkItemResultHolder goodResults = - new BulkItemResultHolder(indexResponse, indexResult, replicaRequest); + Engine.IndexResult indexResult = new FakeIndexResult(1, 1, 13, created, resultLocation); + IndexShard shard = mock(IndexShard.class); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(indexResult); + when(shard.indexSettings()).thenReturn(indexSettings); - Translog.Location originalLocation = new Translog.Location(21, 21, 21); - BulkItemRequest[] items = new BulkItemRequest[0]; + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result(updateResponse, created ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, + Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); + + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; BulkShardRequest bulkShardRequest = - new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - BulkItemResponse primaryResponse = - TransportShardBulkAction.createPrimaryResponse( - goodResults, DocWriteRequest.OpType.INDEX, bulkShardRequest); + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - Translog.Location newLocation = - TransportShardBulkAction.calculateTranslogLocation(originalLocation, goodResults); + + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); // Check that the translog is successfully advanced - assertThat(newLocation, equalTo(resultLocation)); + assertThat(context.getLocationToSync(), equalTo(resultLocation)); + assertThat(bulkShardRequest.items()[0].request(), equalTo(updateResponse)); // Since this was not a conflict failure, the primary response // should be filled out with the failure information + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); assertThat(primaryResponse.getItemId(), equalTo(0)); assertThat(primaryResponse.getId(), equalTo("id")); - assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); DocWriteResponse response = primaryResponse.getResponse(); assertThat(response.status(), equalTo(created ? RestStatus.CREATED : RestStatus.OK)); + assertThat(response.getSeqNo(), equalTo(13L)); } - public void testCalculateTranslogLocation() throws Exception { - final Translog.Location original = new Translog.Location(0, 0, 0); + public void testUpdateWithDelete() throws Exception { + IndexSettings indexSettings = new IndexSettings(indexMetaData(), Settings.EMPTY); + DocWriteRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE); - BulkItemRequest replicaRequest = new BulkItemRequest(0, writeRequest); - BulkItemResultHolder results = new BulkItemResultHolder(null, null, replicaRequest); + DeleteRequest updateResponse = new DeleteRequest("index", "_doc", "id"); - assertThat(TransportShardBulkAction.calculateTranslogLocation(original, results), - equalTo(original)); + boolean found = randomBoolean(); + Translog.Location resultLocation = new Translog.Location(42, 42, 42); + final long resultSeqNo = 13; + Engine.DeleteResult deleteResult = new FakeDeleteResult(1, 1, resultSeqNo, found, resultLocation); + IndexShard shard = mock(IndexShard.class); + when(shard.applyDeleteOperationOnPrimary(anyLong(), any(), any(), any())).thenReturn(deleteResult); + when(shard.indexSettings()).thenReturn(indexSettings); - boolean created = randomBoolean(); - DocWriteResponse indexResponse = new IndexResponse(shardId, "_doc", "id", 1, 17, 1, created); - Translog.Location newLocation = new Translog.Location(1, 1, 1); - final long version = randomNonNegativeLong(); - final long seqNo = randomNonNegativeLong(); - Engine.IndexResult indexResult = new IndexResultWithLocation(version, 0L, seqNo, created, newLocation); - results = new BulkItemResultHolder(indexResponse, indexResult, replicaRequest); - assertThat(TransportShardBulkAction.calculateTranslogLocation(original, results), - equalTo(newLocation)); + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result(updateResponse, DocWriteResponse.Result.DELETED, + Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; + BulkShardRequest bulkShardRequest = + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + + + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); + + // Check that the translog is successfully advanced + assertThat(context.getLocationToSync(), equalTo(resultLocation)); + assertThat(bulkShardRequest.items()[0].request(), equalTo(updateResponse)); + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + assertThat(primaryResponse.getItemId(), equalTo(0)); + assertThat(primaryResponse.getId(), equalTo("id")); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); + DocWriteResponse response = primaryResponse.getResponse(); + assertThat(response.status(), equalTo(RestStatus.OK)); + assertThat(response.getSeqNo(), equalTo(resultSeqNo)); + } + + + public void testFailureDuringUpdateProcessing() throws Exception { + DocWriteRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); + + IndexShard shard = mock(IndexShard.class); + + UpdateHelper updateHelper = mock(UpdateHelper.class); + final ElasticsearchException err = new ElasticsearchException("oops"); + when(updateHelper.prepare(any(), eq(shard), any())).thenThrow(err); + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; + BulkShardRequest bulkShardRequest = + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + + + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + assertFalse(context.hasMoreOperationsToExecute()); + + assertNull(context.getLocationToSync()); + BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse(); + assertThat(primaryResponse.getItemId(), equalTo(0)); + assertThat(primaryResponse.getId(), equalTo("id")); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); + assertTrue(primaryResponse.isFailed()); + assertThat(primaryResponse.getFailureMessage(), containsString("oops")); + BulkItemResponse.Failure failure = primaryResponse.getFailure(); + assertThat(failure.getIndex(), equalTo("index")); + assertThat(failure.getType(), equalTo("_doc")); + assertThat(failure.getId(), equalTo("id")); + assertThat(failure.getCause(), equalTo(err)); + assertThat(failure.getStatus(), equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + } + + public void testTranslogPositionToSync() throws Exception { + IndexShard shard = newStartedShard(true); + + BulkItemRequest[] items = new BulkItemRequest[randomIntBetween(2, 5)]; + for (int i = 0; i < items.length; i++) { + DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id_" + i) + .source(Requests.INDEX_CONTENT_TYPE) + .opType(DocWriteRequest.OpType.INDEX); + items[i] = new BulkItemRequest(i, writeRequest); + } + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); + while (context.hasMoreOperationsToExecute()) { + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), () -> {}); + } + + assertTrue(shard.isSyncNeeded()); + + // if we sync the location, nothing else is unsynced + CountDownLatch latch = new CountDownLatch(1); + shard.sync(context.getLocationToSync(), e -> { + if (e != null) { + throw new AssertionError(e); + } + latch.countDown(); + }); + + latch.await(); + assertFalse(shard.isSyncNeeded()); + + closeShards(shard); } public void testNoOpReplicationOnPrimaryDocumentFailure() throws Exception { @@ -572,66 +731,87 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { final String failureMessage = "simulated primary failure"; final IOException exception = new IOException(failureMessage); itemRequest.setPrimaryResponse(new BulkItemResponse(0, - randomFrom( - DocWriteRequest.OpType.CREATE, - DocWriteRequest.OpType.DELETE, - DocWriteRequest.OpType.INDEX - ), - new BulkItemResponse.Failure("index", "_doc", "1", - exception, 1L) + randomFrom( + DocWriteRequest.OpType.CREATE, + DocWriteRequest.OpType.DELETE, + DocWriteRequest.OpType.INDEX + ), + new BulkItemResponse.Failure("index", "_doc", "1", + exception, 1L) )); BulkItemRequest[] itemRequests = new BulkItemRequest[1]; itemRequests[0] = itemRequest; BulkShardRequest bulkShardRequest = new BulkShardRequest( - shard.shardId(), RefreshPolicy.NONE, itemRequests); + shard.shardId(), RefreshPolicy.NONE, itemRequests); TransportShardBulkAction.performOnReplica(bulkShardRequest, shard); verify(shard, times(1)).markSeqNoAsNoop(1, exception.toString()); closeShards(shard); } - public void testMappingUpdateParsesCorrectNumberOfTimes() throws Exception { - IndexMetaData metaData = indexMetaData(); - logger.info("--> metadata.getIndex(): {}", metaData.getIndex()); - final IndexShard shard = spy(newStartedShard(true)); + public void testRetries() throws Exception { + IndexSettings indexSettings = new IndexSettings(indexMetaData(), Settings.EMPTY); + UpdateRequest writeRequest = new UpdateRequest("index", "_doc", "id") + .doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + // the beating will continue until success has come. + writeRequest.retryOnConflict(Integer.MAX_VALUE); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); - IndexRequest request = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); + IndexRequest updateResponse = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); - final AtomicInteger updateCalled = new AtomicInteger(0); - expectThrows(ReplicationOperation.RetryOnPrimaryException.class, - () -> TransportShardBulkAction.executeIndexRequestOnPrimary(request, shard, - (update, shardId, type) -> { - // There should indeed be a mapping update - assertNotNull(update); - updateCalled.incrementAndGet(); - })); + Exception err = new VersionConflictEngineException(shardId, "_doc", "id", + "I'm conflicted <(;_;)>"); + Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0, 0); + Engine.IndexResult mappingUpdate = + new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap())); + Translog.Location resultLocation = new Translog.Location(42, 42, 42); + Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation); - assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1)); + IndexShard shard = mock(IndexShard.class); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenAnswer(ir -> { + if (randomBoolean()) { + return conflictedResult; + } + if (randomBoolean()) { + return mappingUpdate; + } else { + return success; + } + }); + when(shard.indexSettings()).thenReturn(indexSettings); + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result(updateResponse, randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, + Collections.singletonMap("field", "value"), Requests.INDEX_CONTENT_TYPE)); - // Verify that the shard "executed" the operation twice - verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean()); + BulkItemRequest[] items = new BulkItemRequest[]{primaryRequest}; + BulkShardRequest bulkShardRequest = + new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - // Update the mapping, so the next mapping updater doesn't do anything - final MapperService mapperService = shard.mapperService(); - logger.info("--> mapperService.index(): {}", mapperService.index()); - mapperService.updateMapping(metaData); + WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( + bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), + () -> {}); - TransportShardBulkAction.executeIndexRequestOnPrimary(request, shard, - (update, shardId, type) -> fail("should not have had to update the mappings")); - - // Verify that the shard "executed" the operation only once (2 for previous invocations plus - // 1 for this execution) - verify(shard, times(3)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean()); - - closeShards(shard); + assertThat(result.location, equalTo(resultLocation)); + BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse(); + assertThat(primaryResponse.getItemId(), equalTo(0)); + assertThat(primaryResponse.getId(), equalTo("id")); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); + DocWriteResponse response = primaryResponse.getResponse(); + assertThat(response.status(), equalTo(RestStatus.CREATED)); + assertThat(response.getSeqNo(), equalTo(13L)); } - public class IndexResultWithLocation extends Engine.IndexResult { + /** + * Fake IndexResult that has a settable translog location + */ + static class FakeIndexResult extends Engine.IndexResult { + private final Translog.Location location; - public IndexResultWithLocation(long version, long term, long seqNo, boolean created, Translog.Location newLocation) { + + protected FakeIndexResult(long version, long term, long seqNo, boolean created, Translog.Location location) { super(version, term, seqNo, created); - this.location = newLocation; + this.location = location; } @Override @@ -640,198 +820,15 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { } } - public void testProcessUpdateResponse() throws Exception { - IndexShard shard = newStartedShard(false); - - UpdateRequest updateRequest = new UpdateRequest("index", "_doc", "id"); - BulkItemRequest request = new BulkItemRequest(0, updateRequest); - Exception err = new VersionConflictEngineException(shardId, "_doc", "id", - "I'm conflicted <(;_;)>"); - Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0); - Engine.DeleteResult deleteResult = new Engine.DeleteResult(1, 1, 1, true); - DocWriteResponse.Result docWriteResult = DocWriteResponse.Result.CREATED; - DocWriteResponse.Result deleteWriteResult = DocWriteResponse.Result.DELETED; - IndexRequest indexRequest = new IndexRequest("index", "_doc", "id"); - DeleteRequest deleteRequest = new DeleteRequest("index", "_doc", "id"); - UpdateHelper.Result translate = new UpdateHelper.Result(indexRequest, docWriteResult, - new HashMap(), XContentType.JSON); - UpdateHelper.Result translateDelete = new UpdateHelper.Result(deleteRequest, deleteWriteResult, - new HashMap(), XContentType.JSON); - - BulkItemRequest[] itemRequests = new BulkItemRequest[1]; - itemRequests[0] = request; - - BulkItemResultHolder holder = TransportShardBulkAction.processUpdateResponse(updateRequest, - "index", indexResult, translate, shard, 7); - - assertTrue(holder.isVersionConflict()); - assertThat(holder.response, instanceOf(UpdateResponse.class)); - UpdateResponse updateResp = (UpdateResponse) holder.response; - assertThat(updateResp.getGetResult(), equalTo(null)); - assertThat(holder.operationResult, equalTo(indexResult)); - BulkItemRequest replicaBulkRequest = holder.replicaRequest; - assertThat(replicaBulkRequest.id(), equalTo(7)); - DocWriteRequest replicaRequest = replicaBulkRequest.request(); - assertThat(replicaRequest, instanceOf(IndexRequest.class)); - assertThat(replicaRequest, equalTo(indexRequest)); - - BulkItemResultHolder deleteHolder = TransportShardBulkAction.processUpdateResponse(updateRequest, - "index", deleteResult, translateDelete, shard, 8); - - assertFalse(deleteHolder.isVersionConflict()); - assertThat(deleteHolder.response, instanceOf(UpdateResponse.class)); - UpdateResponse delUpdateResp = (UpdateResponse) deleteHolder.response; - assertThat(delUpdateResp.getGetResult(), equalTo(null)); - assertThat(deleteHolder.operationResult, equalTo(deleteResult)); - BulkItemRequest delReplicaBulkRequest = deleteHolder.replicaRequest; - assertThat(delReplicaBulkRequest.id(), equalTo(8)); - DocWriteRequest delReplicaRequest = delReplicaBulkRequest.request(); - assertThat(delReplicaRequest, instanceOf(DeleteRequest.class)); - assertThat(delReplicaRequest, equalTo(deleteRequest)); - - closeShards(shard); - } - - public void testExecuteUpdateRequestOnce() throws Exception { - IndexMetaData metaData = indexMetaData(); - IndexShard shard = newStartedShard(true); - - Map source = new HashMap<>(); - BulkItemRequest[] items = new BulkItemRequest[1]; - boolean create = randomBoolean(); - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id").source(Requests.INDEX_CONTENT_TYPE) - .create(create); - BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); - items[0] = primaryRequest; - - IndexRequest indexRequest = new IndexRequest("index", "_doc", "id"); - indexRequest.source(source); - - DocWriteResponse.Result docWriteResult = DocWriteResponse.Result.CREATED; - UpdateHelper.Result translate = new UpdateHelper.Result(indexRequest, docWriteResult, - new HashMap(), XContentType.JSON); - UpdateHelper updateHelper = new MockUpdateHelper(translate); - UpdateRequest updateRequest = new UpdateRequest("index", "_doc", "id"); - updateRequest.upsert(source); - - BulkItemResultHolder holder = TransportShardBulkAction.executeUpdateRequestOnce(updateRequest, shard, metaData, - "index", updateHelper, threadPool::absoluteTimeInMillis, primaryRequest, 0, - new ThrowingMappingUpdatePerformer(new RuntimeException())); - - assertFalse(holder.isVersionConflict()); - assertNotNull(holder.response); - assertNotNull(holder.operationResult); - assertNotNull(holder.replicaRequest); - - assertThat(holder.response, instanceOf(UpdateResponse.class)); - UpdateResponse updateResp = (UpdateResponse) holder.response; - assertThat(updateResp.getGetResult(), equalTo(null)); - BulkItemRequest replicaBulkRequest = holder.replicaRequest; - assertThat(replicaBulkRequest.id(), equalTo(0)); - DocWriteRequest replicaRequest = replicaBulkRequest.request(); - assertThat(replicaRequest, instanceOf(IndexRequest.class)); - assertThat(replicaRequest, equalTo(indexRequest)); - - // Assert that the document actually made it there - assertDocCount(shard, 1); - closeShards(shard); - } - - public void testExecuteUpdateRequestOnceWithFailure() throws Exception { - IndexMetaData metaData = indexMetaData(); - IndexShard shard = newStartedShard(true); - - Map source = new HashMap<>(); - source.put("foo", "bar"); - BulkItemRequest[] items = new BulkItemRequest[1]; - boolean create = randomBoolean(); - DocWriteRequest writeRequest = new IndexRequest("index", "_doc", "id") - .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar") - .create(create); - BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); - items[0] = primaryRequest; - - IndexRequest indexRequest = new IndexRequest("index", "_doc", "id"); - indexRequest.source(source); - - Exception prepareFailure = new IllegalArgumentException("I failed to do something!"); - UpdateHelper updateHelper = new FailingUpdateHelper(prepareFailure); - UpdateRequest updateRequest = new UpdateRequest("index", "_doc", "id"); - updateRequest.upsert(source); - - BulkItemResultHolder holder = TransportShardBulkAction.executeUpdateRequestOnce(updateRequest, shard, metaData, - "index", updateHelper, threadPool::absoluteTimeInMillis, primaryRequest, 0, new NoopMappingUpdatePerformer()); - - assertFalse(holder.isVersionConflict()); - assertNull(holder.response); - assertNotNull(holder.operationResult); - assertNotNull(holder.replicaRequest); - - Engine.IndexResult opResult = (Engine.IndexResult) holder.operationResult; - assertThat(opResult.getResultType(), equalTo(Engine.Result.Type.FAILURE)); - assertFalse(opResult.isCreated()); - Exception e = opResult.getFailure(); - assertThat(e.getMessage(), containsString("I failed to do something!")); - - BulkItemRequest replicaBulkRequest = holder.replicaRequest; - assertThat(replicaBulkRequest.id(), equalTo(0)); - assertThat(replicaBulkRequest.request(), instanceOf(IndexRequest.class)); - IndexRequest replicaRequest = (IndexRequest) replicaBulkRequest.request(); - assertThat(replicaRequest.index(), equalTo("index")); - assertThat(replicaRequest.type(), equalTo("_doc")); - assertThat(replicaRequest.id(), equalTo("id")); - assertThat(replicaRequest.sourceAsMap(), equalTo(source)); - - // Assert that the document did not make it there, since it should have failed - assertDocCount(shard, 0); - closeShards(shard); - } - /** - * Fake UpdateHelper that always returns whatever result you give it + * Fake DeleteResult that has a settable translog location */ - private static class MockUpdateHelper extends UpdateHelper { - private final UpdateHelper.Result result; - - MockUpdateHelper(UpdateHelper.Result result) { - super(Settings.EMPTY, null); - this.result = result; - } - - @Override - public UpdateHelper.Result prepare(UpdateRequest u, IndexShard s, LongSupplier n) { - logger.info("--> preparing update for {} - {}", s, u); - return result; - } - } - - /** - * An update helper that always fails to prepare the update - */ - private static class FailingUpdateHelper extends UpdateHelper { - private final Exception e; - - FailingUpdateHelper(Exception failure) { - super(Settings.EMPTY, null); - this.e = failure; - } - - @Override - public UpdateHelper.Result prepare(UpdateRequest u, IndexShard s, LongSupplier n) { - logger.info("--> preparing failing update for {} - {}", s, u); - throw new ElasticsearchException(e); - } - } - - /** - * Fake IndexResult that has a settable translog location - */ - private static class FakeResult extends Engine.IndexResult { + static class FakeDeleteResult extends Engine.DeleteResult { private final Translog.Location location; - protected FakeResult(long version, long term, long seqNo, boolean created, Translog.Location location) { - super(version, term, seqNo, created); + protected FakeDeleteResult(long version, long term, long seqNo, boolean found, Translog.Location location) { + super(version, term, seqNo, found); this.location = location; } @@ -851,6 +848,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { /** Always throw the given exception */ private class ThrowingMappingUpdatePerformer implements MappingUpdatePerformer { private final RuntimeException e; + ThrowingMappingUpdatePerformer(RuntimeException e) { this.e = e; } diff --git a/server/src/test/java/org/elasticsearch/document/ShardInfoIT.java b/server/src/test/java/org/elasticsearch/document/ShardInfoIT.java index 5a5f279985f..682b1deb146 100644 --- a/server/src/test/java/org/elasticsearch/document/ShardInfoIT.java +++ b/server/src/test/java/org/elasticsearch/document/ShardInfoIT.java @@ -91,6 +91,7 @@ public class ShardInfoIT extends ESIntegTestCase { BulkResponse bulkResponse = bulkRequestBuilder.get(); for (BulkItemResponse item : bulkResponse) { + assertThat(item.getFailure(), nullValue()); assertThat(item.isFailed(), equalTo(false)); assertShardInfo(item.getResponse()); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index a40f950b02e..77bc644909a 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -629,7 +629,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase final TransportWriteAction.WritePrimaryResult result; try (Releasable ignored = permitAcquiredFuture.actionGet()) { MappingUpdatePerformer noopMappingUpdater = (update, shardId, type) -> { }; - result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater); + result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater, + null); } TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger); return result;