From eee0d18f94108b5ece7b18a450bbcc5c729d9311 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 6 Oct 2016 04:26:32 -0400 Subject: [PATCH] Make update a replication action Currently, update action delegates to index and delete actions for replication using a dedicated transport action. This change makes update a replication operation, removing the dedicated transport action. This simplifies bulk execution and removes duplicate logic for update retries and translation. This consolidates the interface for single document write requests. Now on the primary, the update request is translated to an index or delete request before execution and the translated request is sent to copies for replication. --- .../elasticsearch/action/DocumentRequest.java | 61 ++-- .../action/bulk/BulkItemRequest.java | 3 - .../action/bulk/BulkRequest.java | 4 +- .../action/bulk/TransportBulkAction.java | 10 +- .../action/bulk/TransportShardBulkAction.java | 173 +++++---- .../action/delete/DeleteRequest.java | 2 +- .../action/delete/TransportDeleteAction.java | 26 +- .../action/index/IndexRequest.java | 2 +- .../action/index/TransportIndexAction.java | 12 +- .../replication/ReplicationOperation.java | 36 +- .../TransportReplicationAction.java | 16 +- .../replication/TransportWriteAction.java | 36 +- .../InstanceShardOperationRequest.java | 138 -------- .../InstanceShardOperationRequestBuilder.java | 60 ---- ...ransportInstanceSingleOperationAction.java | 270 --------------- .../action/update/TransportUpdateAction.java | 297 +++++++--------- .../action/update/UpdateHelper.java | 14 +- .../action/update/UpdateReplicaRequest.java | 113 ++++++ .../action/update/UpdateRequest.java | 43 +-- .../action/update/UpdateRequestBuilder.java | 4 +- .../elasticsearch/indices/IndicesModule.java | 2 - .../action/IndicesRequestIT.java | 9 +- .../TransportWriteActionTests.java | 11 +- ...ortInstanceSingleOperationActionTests.java | 327 ------------------ .../action/update/UpdateRequestTests.java | 3 +- .../ESIndexLevelReplicationTestCase.java | 4 +- docs/reference/docs/update.asciidoc | 4 +- 27 files changed, 460 insertions(+), 1220 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java delete mode 100644 core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequestBuilder.java delete mode 100644 core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java create mode 100644 core/src/main/java/org/elasticsearch/action/update/UpdateReplicaRequest.java delete mode 100644 core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java diff --git a/core/src/main/java/org/elasticsearch/action/DocumentRequest.java b/core/src/main/java/org/elasticsearch/action/DocumentRequest.java index f4c88e159c7..ef2aa815a6b 100644 --- a/core/src/main/java/org/elasticsearch/action/DocumentRequest.java +++ b/core/src/main/java/org/elasticsearch/action/DocumentRequest.java @@ -20,10 +20,11 @@ package org.elasticsearch.action; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.update.UpdateReplicaRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.index.VersionType; import java.io.IOException; @@ -33,84 +34,72 @@ import java.util.Locale; * Generic interface to group ActionRequest, which perform writes to a single document * Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest} */ -public interface DocumentRequest extends IndicesRequest { - - /** - * Get the index that this request operates on - * @return the index - */ - String index(); +public abstract class DocumentRequest> extends ReplicatedWriteRequest { /** * Get the type that this request operates on * @return the type */ - String type(); + public abstract String type(); /** * Get the id of the document for this request * @return the id */ - String id(); - - /** - * Get the options for this request - * @return the indices options - */ - IndicesOptions indicesOptions(); + public abstract String id(); /** * Set the routing for this request * @return the Request */ - T routing(String routing); + public abstract T routing(String routing); /** * Get the routing for this request * @return the Routing */ - String routing(); + public abstract String routing(); /** * Get the parent for this request * @return the Parent */ - String parent(); + public abstract String parent(); /** * Get the document version for this request * @return the document version */ - long version(); + public abstract long version(); /** * Sets the version, which will perform the operation only if a matching * version exists and no changes happened on the doc since then. */ - T version(long version); + public abstract T version(long version); /** * Get the document version type for this request * @return the document version type */ - VersionType versionType(); + public abstract VersionType versionType(); /** * Sets the versioning type. Defaults to {@link VersionType#INTERNAL}. */ - T versionType(VersionType versionType); + public abstract T versionType(VersionType versionType); /** * Get the requested document operation type of the request * @return the operation type {@link OpType} */ - OpType opType(); + public abstract OpType opType(); /** * Requested operation type to perform on the document */ - enum OpType { + public enum OpType { /** * Index the source. If there an existing document with the id, it will * be replaced. @@ -164,40 +153,42 @@ public interface DocumentRequest extends IndicesRequest { } /** read a document write (index/delete/update) request */ - static DocumentRequest readDocumentRequest(StreamInput in) throws IOException { + public static DocumentRequest readDocumentRequest(StreamInput in) throws IOException { byte type = in.readByte(); - final DocumentRequest documentRequest; if (type == 0) { IndexRequest indexRequest = new IndexRequest(); indexRequest.readFrom(in); - documentRequest = indexRequest; + return indexRequest; } else if (type == 1) { DeleteRequest deleteRequest = new DeleteRequest(); deleteRequest.readFrom(in); - documentRequest = deleteRequest; + return deleteRequest; } else if (type == 2) { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.readFrom(in); - documentRequest = updateRequest; + return updateRequest; + } else if (type == 3) { + UpdateReplicaRequest updateReplicaRequest = new UpdateReplicaRequest(); + updateReplicaRequest.readFrom(in); + return updateReplicaRequest; } else { throw new IllegalStateException("invalid request type [" + type+ " ]"); } - return documentRequest; } /** write a document write (index/delete/update) request*/ - static void writeDocumentRequest(StreamOutput out, DocumentRequest request) throws IOException { + public static void writeDocumentRequest(StreamOutput out, DocumentRequest request) throws IOException { if (request instanceof IndexRequest) { out.writeByte((byte) 0); - ((IndexRequest) request).writeTo(out); } else if (request instanceof DeleteRequest) { out.writeByte((byte) 1); - ((DeleteRequest) request).writeTo(out); } else if (request instanceof UpdateRequest) { out.writeByte((byte) 2); - ((UpdateRequest) request).writeTo(out); + } else if (request instanceof UpdateReplicaRequest) { + out.writeByte((byte) 3); } else { throw new IllegalStateException("invalid request [" + request.getClass().getSimpleName() + " ]"); } + request.writeTo(out); } } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 079d4efe9bf..df9fd13b034 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -20,9 +20,6 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.action.DocumentRequest; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index dc72407cf42..7729c737439 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -528,11 +528,11 @@ public class BulkRequest extends ActionRequest implements Composite } for (DocumentRequest request : requests) { // We first check if refresh has been set - if (((WriteRequest) request).getRefreshPolicy() != RefreshPolicy.NONE) { + if (request.getRefreshPolicy() != RefreshPolicy.NONE) { validationException = addValidationError( "RefreshPolicy is not supported on an item request. Set it on the BulkRequest instead.", validationException); } - ActionRequestValidationException ex = ((WriteRequest) request).validate(); + ActionRequestValidationException ex = request.validate(); if (ex != null) { if (validationException == null) { validationException = new ActionRequestValidationException(); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index f7861d1e093..37c1b7c2290 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -27,14 +27,12 @@ import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; -import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.TransportDeleteAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.action.update.TransportUpdateAction; -import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -211,7 +209,7 @@ public class TransportBulkAction extends HandledTransportAction documentRequest = bulkRequest.requests.get(i); + DocumentRequest documentRequest = bulkRequest.requests.get(i); //the request can only be null because we set it to null in the previous step, so it gets ignored if (documentRequest == null) { continue; @@ -234,10 +232,8 @@ public class TransportBulkAction extends HandledTransportAction { +public class TransportShardBulkAction extends TransportWriteAction { public static final String ACTION_NAME = BulkAction.NAME + "[s]"; - private final UpdateHelper updateHelper; private final boolean allowIdGeneration; private final MappingUpdatedAction mappingUpdatedAction; + private final UpdateHelper updateHelper; + private final AutoCreateIndex autoCreateIndex; + private final TransportCreateIndexAction createIndexAction; @Inject public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { + MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, ScriptService scriptService, + AutoCreateIndex autoCreateIndex, TransportCreateIndexAction createIndexAction) { super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, BulkShardRequest::new, ThreadPool.Names.BULK); - this.updateHelper = updateHelper; + indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.BULK); this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true); this.mappingUpdatedAction = mappingUpdatedAction; + this.updateHelper = new UpdateHelper(scriptService, logger); + this.autoCreateIndex = autoCreateIndex; + this.createIndexAction = createIndexAction; } @Override @@ -105,7 +122,39 @@ public class TransportShardBulkAction extends TransportWriteAction onPrimaryShard(BulkShardRequest request, IndexShard indexShard) throws Exception { + protected void doExecute(Task task, BulkShardRequest request, ActionListener listener) { + ClusterState state = clusterService.state(); + if (autoCreateIndex.shouldAutoCreate(request.index(), state)) { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(); + createIndexRequest.index(request.index()); + createIndexRequest.cause("auto(bulk api)"); + createIndexRequest.masterNodeTimeout(request.timeout()); + createIndexAction.execute(task, createIndexRequest, new ActionListener() { + @Override + public void onResponse(CreateIndexResponse result) { + innerExecute(task, request, listener); + } + + @Override + public void onFailure(Exception e) { + if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) { + // we have the index, do it + innerExecute(task, request, listener); + } else { + listener.onFailure(e); + } + } + }); + } else { + innerExecute(task, request, listener); + } + } + + private void innerExecute(Task task, final BulkShardRequest request, final ActionListener listener) { + super.doExecute(task, request, listener); + } + @Override + protected WriteResult onPrimaryShard(BulkShardRequest request, IndexShard indexShard) throws Exception { ShardId shardId = request.shardId(); final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexMetaData metaData = indexService.getIndexSettings().getIndexMetaData(); @@ -123,7 +172,7 @@ public class TransportShardBulkAction extends TransportWriteAction(response, location); + return new WriteResult<>(request, response, location); } /** Executes bulk item requests and handles request execution exceptions */ @@ -131,22 +180,39 @@ public class TransportShardBulkAction extends TransportWriteAction itemRequest = request.items()[requestIndex].request(); + preVersions[requestIndex] = itemRequest.version(); + preVersionTypes[requestIndex] = itemRequest.versionType(); + DocumentRequest.OpType opType = itemRequest.opType(); try { - WriteResult writeResult = innerExecuteBulkItemRequest(metaData, indexShard, - request, requestIndex); + final WriteResult writeResult; + switch (itemRequest.opType()) { + case CREATE: + case INDEX: + writeResult = TransportIndexAction.executeIndexRequestOnPrimary(((IndexRequest) itemRequest), indexShard, + mappingUpdatedAction); + break; + case UPDATE: + writeResult = TransportUpdateAction.executeUpdateRequestOnPrimary(((UpdateRequest) itemRequest), indexShard, + metaData, updateHelper, mappingUpdatedAction, allowIdGeneration); + break; + case DELETE: + writeResult = TransportDeleteAction.executeDeleteRequestOnPrimary(((DeleteRequest) itemRequest), indexShard); + break; + default: + throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found"); + } if (writeResult.getLocation() != null) { location = locationToSync(location, writeResult.getLocation()); } else { assert writeResult.getResponse().getResult() == DocWriteResponse.Result.NOOP : "only noop operation can have null next operation"; } - // update the bulk item request because update request execution can mutate the bulk item request - BulkItemRequest item = request.items()[requestIndex]; + // update the bulk item request with replica request (update request are changed to index or delete requests for replication) + request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), + (DocumentRequest) writeResult.getReplicaRequest()); // add the response - setResponse(item, new BulkItemResponse(item.id(), opType, writeResult.getResponse())); + setResponse(request.items()[requestIndex], new BulkItemResponse(request.items()[requestIndex].id(), opType, writeResult.getResponse())); } catch (Exception e) { // rethrow the failure if we are going to retry on primary and let parent failure to handle it if (retryPrimaryException(e)) { @@ -182,33 +248,6 @@ public class TransportShardBulkAction extends TransportWriteAction innerExecuteBulkItemRequest(IndexMetaData metaData, IndexShard indexShard, - BulkShardRequest request, int requestIndex) throws Exception { - DocumentRequest itemRequest = request.items()[requestIndex].request(); - switch (itemRequest.opType()) { - case CREATE: - case INDEX: - return TransportIndexAction.executeIndexRequestOnPrimary(((IndexRequest) itemRequest), indexShard, mappingUpdatedAction); - case UPDATE: - int maxAttempts = ((UpdateRequest) itemRequest).retryOnConflict(); - for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) { - try { - return shardUpdateOperation(metaData, indexShard, request, requestIndex, ((UpdateRequest) itemRequest)); - } catch (Exception e) { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - if (attemptCount == maxAttempts // bubble up exception when we run out of attempts - || (cause instanceof VersionConflictEngineException) == false) { // or when exception is not a version conflict - throw e; - } - } - } - throw new IllegalStateException("version conflict exception should bubble up on last attempt"); - case DELETE: - return TransportDeleteAction.executeDeleteRequestOnPrimary(((DeleteRequest) itemRequest), indexShard); - default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found"); - } - } - private void setResponse(BulkItemRequest request, BulkItemResponse response) { request.setPrimaryResponse(response); if (response.isFailed()) { @@ -219,51 +258,6 @@ public class TransportShardBulkAction extends TransportWriteAction shardUpdateOperation(IndexMetaData metaData, IndexShard indexShard, - BulkShardRequest request, - int requestIndex, UpdateRequest updateRequest) - throws Exception { - // Todo: capture read version conflicts, missing documents and malformed script errors in the write result due to get request - UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard); - switch (translate.getResponseResult()) { - case CREATED: - case UPDATED: - IndexRequest indexRequest = translate.action(); - MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type()); - indexRequest.process(mappingMd, allowIdGeneration, request.index()); - WriteResult writeResult = TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction); - BytesReference indexSourceAsBytes = indexRequest.source(); - IndexResponse indexResponse = writeResult.getResponse(); - UpdateResponse writeUpdateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult()); - if (updateRequest.fields() != null && updateRequest.fields().length > 0) { - Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); - writeUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); - } - // Replace the update request to the translated index request to execute on the replica. - request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest); - return new WriteResult<>(writeUpdateResponse, writeResult.getLocation()); - case DELETED: - DeleteRequest deleteRequest = translate.action(); - WriteResult deleteResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard); - DeleteResponse response = deleteResult.getResponse(); - UpdateResponse deleteUpdateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult()); - deleteUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), null)); - // Replace the update request to the translated delete request to execute on the replica. - request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest); - return new WriteResult<>(deleteUpdateResponse, deleteResult.getLocation()); - case NOOP: - BulkItemRequest item = request.items()[requestIndex]; - indexShard.noopUpdate(updateRequest.type()); - item.setIgnoreOnReplica(); // no need to go to the replica - return new WriteResult<>(translate.action(), null); - default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult()); - } - } - @Override protected Location onReplicaShard(BulkShardRequest request, IndexShard indexShard) { Translog.Location location = null; @@ -272,7 +266,8 @@ public class TransportShardBulkAction extends TransportWriteAction documentRequest = item.request(); + DocumentRequest documentRequest = (item.request() instanceof UpdateReplicaRequest) + ? ((UpdateReplicaRequest) item.request()).getRequest() : item.request(); final Engine.Operation operation; try { switch (documentRequest.opType()) { diff --git a/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index e3babcfc380..f2e5e13494d 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -43,7 +43,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; * @see org.elasticsearch.client.Client#delete(DeleteRequest) * @see org.elasticsearch.client.Requests#deleteRequest(String) */ -public class DeleteRequest extends ReplicatedWriteRequest implements DocumentRequest { +public class DeleteRequest extends DocumentRequest { private String type; private String id; diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 6f3d27ea369..926700e327e 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.delete; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -49,7 +50,7 @@ import org.elasticsearch.transport.TransportService; /** * Performs the delete operation. */ -public class TransportDeleteAction extends TransportWriteAction { +public class TransportDeleteAction extends TransportWriteAction { private final AutoCreateIndex autoCreateIndex; private final TransportCreateIndexAction createIndexAction; @@ -61,7 +62,7 @@ public class TransportDeleteAction extends TransportWriteAction listener) { ClusterState state = clusterService.state(); if (autoCreateIndex.shouldAutoCreate(request.index(), state)) { - createIndexAction.execute(task, new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener() { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(); + createIndexRequest.index(request.index()); + createIndexRequest.cause("auto(delete api)"); + createIndexRequest.masterNodeTimeout(request.timeout()); + createIndexAction.execute(task, createIndexRequest, new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { innerExecute(task, request, listener); @@ -100,15 +105,6 @@ public class TransportDeleteAction extends TransportWriteAction listener) { super.doExecute(task, request, listener); } @@ -119,7 +115,7 @@ public class TransportDeleteAction extends TransportWriteAction onPrimaryShard(DeleteRequest request, IndexShard indexShard) { + protected WriteResult onPrimaryShard(DeleteRequest request, IndexShard indexShard) { return executeDeleteRequestOnPrimary(request, indexShard); } @@ -128,7 +124,7 @@ public class TransportDeleteAction extends TransportWriteAction executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) { + public static WriteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) { Engine.Delete delete = indexShard.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType()); indexShard.delete(delete); // update the request with the version so it will go to the replicas @@ -137,7 +133,7 @@ public class TransportDeleteAction extends TransportWriteAction(response, delete.getTranslogLocation()); + return new WriteResult<>(request, response, delete.getTranslogLocation()); } public static Engine.Delete executeDeleteRequestOnReplica(DeleteRequest request, IndexShard indexShard) { diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index cce0f6c8eef..48eaab2b48c 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -67,7 +67,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; * @see org.elasticsearch.client.Requests#indexRequest(String) * @see org.elasticsearch.client.Client#index(IndexRequest) */ -public class IndexRequest extends ReplicatedWriteRequest implements DocumentRequest { +public class IndexRequest extends DocumentRequest { private String type; private String id; diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index cc3fbb7906d..37cc2d7e3bc 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -60,7 +60,7 @@ import org.elasticsearch.transport.TransportService; *
  • allowIdGeneration: If the id is set not, should it be generated. Defaults to true. * */ -public class TransportIndexAction extends TransportWriteAction { +public class TransportIndexAction extends TransportWriteAction { private final AutoCreateIndex autoCreateIndex; private final boolean allowIdGeneration; @@ -76,7 +76,7 @@ public class TransportIndexAction extends TransportWriteAction onPrimaryShard(IndexRequest request, IndexShard indexShard) throws Exception { + protected WriteResult onPrimaryShard(IndexRequest request, IndexShard indexShard) throws Exception { return executeIndexRequestOnPrimary(request, indexShard, mappingUpdatedAction); } @@ -174,7 +174,7 @@ public class TransportIndexAction extends TransportWriteAction executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, + public static WriteResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Exception { Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard); Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); @@ -198,7 +198,7 @@ public class TransportIndexAction extends TransportWriteAction(response, operation.getTranslogLocation()); + return new WriteResult<>(request, response, operation.getTranslogLocation()); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index d541ef6a35c..8aa0ed66a77 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.engine.VersionConflictEngineException; @@ -112,22 +113,24 @@ public class ReplicationOperation< pendingActions.incrementAndGet(); primaryResult = primary.perform(request); final ReplicaRequest replicaRequest = primaryResult.replicaRequest(); - assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term"; - if (logger.isTraceEnabled()) { - logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request); + if (replicaRequest != null) { + assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term"; + if (logger.isTraceEnabled()) { + logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request); + } + + // we have to get a new state after successfully indexing into the primary in order to honour recovery semantics. + // we have to make sure that every operation indexed into the primary after recovery start will also be replicated + // to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then. + ClusterState clusterState = clusterStateSupplier.get(); + final List shards = getShards(primaryId, clusterState); + Set inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState); + + markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards); + + performOnReplicas(replicaRequest, shards); } - // we have to get a new state after successfully indexing into the primary in order to honour recovery semantics. - // we have to make sure that every operation indexed into the primary after recovery start will also be replicated - // to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then. - ClusterState clusterState = clusterStateSupplier.get(); - final List shards = getShards(primaryId, clusterState); - Set inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState); - - markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards); - - performOnReplicas(replicaRequest, shards); - successfulShards.incrementAndGet(); decPendingAndFinishIfNeeded(); } @@ -419,7 +422,10 @@ public class ReplicationOperation< public interface PrimaryResult> { - R replicaRequest(); + /** + * @return null if no operation needs to be sent to a replica + */ + @Nullable R replicaRequest(); void setShardInfo(ReplicationResponse.ShardInfo shardInfo); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 9587b4e6b2c..95e196672d4 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -23,6 +23,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.DocumentRequest; +import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; @@ -163,6 +165,16 @@ public abstract class TransportReplicationAction< } } + /** helper to verify and resolve request routing */ + public static void resolveAndValidateRouting(final MetaData metaData, final String concreteIndex, + DocumentRequest request) { + request.routing(metaData.resolveIndexRouting(request.parent(), request.routing(), request.index())); + // check if routing is required, if so, throw error if routing wasn't specified + if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) { + throw new RoutingMissingException(concreteIndex, request.type(), request.id()); + } + } + /** * Primary operation on node with primary copy. * @@ -900,7 +912,9 @@ public abstract class TransportReplicationAction< @Override public PrimaryResult perform(Request request) throws Exception { PrimaryResult result = shardOperationOnPrimary(request); - result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm()); + if (result.replicaRequest() != null) { + result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm()); + } return result; } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index bf2b3235b11..ee8ee4862f9 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -49,38 +49,40 @@ import java.util.function.Supplier; */ public abstract class TransportWriteAction< Request extends ReplicatedWriteRequest, + ReplicaRequest extends ReplicatedWriteRequest, Response extends ReplicationResponse & WriteResponse - > extends TransportReplicationAction { + > extends TransportReplicationAction { protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, + Supplier replicaRequest, String executor) { super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, request, request, executor); + indexNameExpressionResolver, request, replicaRequest, executor); } /** * Called on the primary with a reference to the {@linkplain IndexShard} to modify. */ - protected abstract WriteResult onPrimaryShard(Request request, IndexShard indexShard) throws Exception; + protected abstract WriteResult onPrimaryShard(Request request, IndexShard indexShard) throws Exception; /** * Called once per replica with a reference to the {@linkplain IndexShard} to modify. * * @return the translog location of the {@linkplain IndexShard} after the write was completed or null if no write occurred */ - protected abstract Translog.Location onReplicaShard(Request request, IndexShard indexShard); + protected abstract Translog.Location onReplicaShard(ReplicaRequest request, IndexShard indexShard); @Override protected final WritePrimaryResult shardOperationOnPrimary(Request request) throws Exception { IndexShard indexShard = indexShard(request); - WriteResult result = onPrimaryShard(request, indexShard); - return new WritePrimaryResult(request, result.getResponse(), result.getLocation(), indexShard); + WriteResult result = onPrimaryShard(request, indexShard); + return new WritePrimaryResult(request, result, indexShard); } @Override - protected final WriteReplicaResult shardOperationOnReplica(Request request) { + protected final WriteReplicaResult shardOperationOnReplica(ReplicaRequest request) { IndexShard indexShard = indexShard(request); Translog.Location location = onReplicaShard(request, indexShard); return new WriteReplicaResult(indexShard, request, location); @@ -89,7 +91,7 @@ public abstract class TransportWriteAction< /** * Fetch the IndexShard for the request. Protected so it can be mocked in tests. */ - protected IndexShard indexShard(Request request) { + protected IndexShard indexShard(ReplicatedWriteRequest request) { final ShardId shardId = request.shardId(); IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); return indexService.getShard(shardId.id()); @@ -98,11 +100,13 @@ public abstract class TransportWriteAction< /** * Simple result from a write action. Write actions have static method to return these so they can integrate with bulk. */ - public static class WriteResult { + public static class WriteResult, Response extends ReplicationResponse> { + private final ReplicaRequest replicaRequest; private final Response response; private final Translog.Location location; - public WriteResult(Response response, @Nullable Location location) { + public WriteResult(ReplicaRequest replicaRequest, Response response, @Nullable Location location) { + this.replicaRequest = replicaRequest; this.response = response; this.location = location; } @@ -114,6 +118,10 @@ public abstract class TransportWriteAction< public Translog.Location getLocation() { return location; } + + public ReplicaRequest getReplicaRequest() { + return replicaRequest; + } } /** @@ -123,15 +131,15 @@ public abstract class TransportWriteAction< boolean finishedAsyncActions; ActionListener listener = null; - public WritePrimaryResult(Request request, Response finalResponse, - @Nullable Translog.Location location, + public WritePrimaryResult(Request request, + WriteResult result, IndexShard indexShard) { - super(request, finalResponse); + super(result.getReplicaRequest(), result.getResponse()); /* * We call this before replication because this might wait for a refresh and that can take a while. This way we wait for the * refresh in parallel on the primary and on the replica. */ - new AsyncAfterWriteAction(indexShard, request, location, this, logger).run(); + new AsyncAfterWriteAction(indexShard, request, result.getLocation(), this, logger).run(); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java b/core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java deleted file mode 100644 index cb9a6ab9f69..00000000000 --- a/core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.single.instance; - -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.ValidateActions; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -/** - * - */ -public abstract class InstanceShardOperationRequest> extends ActionRequest - implements IndicesRequest { - - public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); - - protected TimeValue timeout = DEFAULT_TIMEOUT; - - protected String index; - // null means its not set, allows to explicitly direct a request to a specific shard - protected ShardId shardId = null; - - private String concreteIndex; - - protected InstanceShardOperationRequest() { - } - - public InstanceShardOperationRequest(String index) { - this.index = index; - } - - @Override - public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = null; - if (index == null) { - validationException = ValidateActions.addValidationError("index is missing", validationException); - } - return validationException; - } - - public String index() { - return index; - } - - @Override - public String[] indices() { - return new String[]{index}; - } - - @Override - public IndicesOptions indicesOptions() { - return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); - } - - @SuppressWarnings("unchecked") - public final Request index(String index) { - this.index = index; - return (Request) this; - } - - public TimeValue timeout() { - return timeout; - } - - /** - * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. - */ - @SuppressWarnings("unchecked") - public final Request timeout(TimeValue timeout) { - this.timeout = timeout; - return (Request) this; - } - - /** - * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. - */ - public final Request timeout(String timeout) { - return timeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout")); - } - - public String concreteIndex() { - return concreteIndex; - } - - void concreteIndex(String concreteIndex) { - this.concreteIndex = concreteIndex; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - index = in.readString(); - if (in.readBoolean()) { - shardId = ShardId.readShardId(in); - } else { - shardId = null; - } - timeout = new TimeValue(in); - concreteIndex = in.readOptionalString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(index); - out.writeOptionalStreamable(shardId); - timeout.writeTo(out); - out.writeOptionalString(concreteIndex); - } - -} - diff --git a/core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequestBuilder.java deleted file mode 100644 index 13266b9151d..00000000000 --- a/core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequestBuilder.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.single.instance; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.common.unit.TimeValue; - -/** - */ -public abstract class InstanceShardOperationRequestBuilder, Response extends ActionResponse, RequestBuilder extends InstanceShardOperationRequestBuilder> - extends ActionRequestBuilder { - - protected InstanceShardOperationRequestBuilder(ElasticsearchClient client, Action action, Request request) { - super(client, action, request); - } - - @SuppressWarnings("unchecked") - public final RequestBuilder setIndex(String index) { - request.index(index); - return (RequestBuilder) this; - } - - /** - * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. - */ - @SuppressWarnings("unchecked") - public final RequestBuilder setTimeout(TimeValue timeout) { - request.timeout(timeout); - return (RequestBuilder) this; - } - - /** - * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. - */ - @SuppressWarnings("unchecked") - public final RequestBuilder setTimeout(String timeout) { - request.timeout(timeout); - return (RequestBuilder) this; - } -} diff --git a/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java deleted file mode 100644 index 81da5ec9a86..00000000000 --- a/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.single.instance; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.UnavailableShardsException; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateObserver; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.node.NodeClosedException; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportService; - -import java.util.function.Supplier; - -/** - * - */ -public abstract class TransportInstanceSingleOperationAction, Response extends ActionResponse> - extends HandledTransportAction { - protected final ClusterService clusterService; - protected final TransportService transportService; - - final String executor; - final String shardActionName; - - protected TransportInstanceSingleOperationAction(Settings settings, String actionName, ThreadPool threadPool, - ClusterService clusterService, TransportService transportService, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { - super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); - this.clusterService = clusterService; - this.transportService = transportService; - this.executor = executor(); - this.shardActionName = actionName + "[s]"; - transportService.registerRequestHandler(shardActionName, request, executor, new ShardTransportHandler()); - } - - @Override - protected void doExecute(Request request, ActionListener listener) { - new AsyncSingleAction(request, listener).start(); - } - - protected abstract String executor(); - - protected abstract void shardOperation(Request request, ActionListener listener); - - protected abstract Response newResponse(); - - protected ClusterBlockException checkGlobalBlock(ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); - } - - protected ClusterBlockException checkRequestBlock(ClusterState state, Request request) { - return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.concreteIndex()); - } - - /** - * Resolves the request. Throws an exception if the request cannot be resolved. - */ - protected abstract void resolveRequest(ClusterState state, Request request); - - protected boolean retryOnFailure(Exception e) { - return false; - } - - protected TransportRequestOptions transportOptions() { - return TransportRequestOptions.EMPTY; - } - - /** - * Should return an iterator with a single shard! - */ - protected abstract ShardIterator shards(ClusterState clusterState, Request request); - - class AsyncSingleAction { - - private final ActionListener listener; - private final Request request; - private volatile ClusterStateObserver observer; - private ShardIterator shardIt; - private DiscoveryNodes nodes; - - AsyncSingleAction(Request request, ActionListener listener) { - this.request = request; - this.listener = listener; - } - - public void start() { - this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); - doStart(); - } - - protected void doStart() { - nodes = observer.observedState().nodes(); - try { - ClusterBlockException blockException = checkGlobalBlock(observer.observedState()); - if (blockException != null) { - if (blockException.retryable()) { - retry(blockException); - return; - } else { - throw blockException; - } - } - request.concreteIndex(indexNameExpressionResolver.concreteSingleIndex(observer.observedState(), request).getName()); - resolveRequest(observer.observedState(), request); - blockException = checkRequestBlock(observer.observedState(), request); - if (blockException != null) { - if (blockException.retryable()) { - retry(blockException); - return; - } else { - throw blockException; - } - } - shardIt = shards(observer.observedState(), request); - } catch (Exception e) { - listener.onFailure(e); - return; - } - - // no shardIt, might be in the case between index gateway recovery and shardIt initialization - if (shardIt.size() == 0) { - retry(null); - return; - } - - // this transport only make sense with an iterator that returns a single shard routing (like primary) - assert shardIt.size() == 1; - - ShardRouting shard = shardIt.nextOrNull(); - assert shard != null; - - if (!shard.active()) { - retry(null); - return; - } - - request.shardId = shardIt.shardId(); - DiscoveryNode node = nodes.get(shard.currentNodeId()); - transportService.sendRequest(node, shardActionName, request, transportOptions(), new TransportResponseHandler() { - - @Override - public Response newInstance() { - return newResponse(); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public void handleResponse(Response response) { - listener.onResponse(response); - } - - @Override - public void handleException(TransportException exp) { - final Throwable cause = exp.unwrapCause(); - // if we got disconnected from the node, or the node / shard is not in the right state (being closed) - if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException || - retryOnFailure(exp)) { - retry((Exception) cause); - } else { - listener.onFailure(exp); - } - } - }); - } - - void retry(@Nullable final Exception failure) { - if (observer.isTimedOut()) { - // we running as a last attempt after a timeout has happened. don't retry - Exception listenFailure = failure; - if (listenFailure == null) { - if (shardIt == null) { - listenFailure = new UnavailableShardsException(request.concreteIndex(), -1, "Timeout waiting for [{}], request: {}", request.timeout(), actionName); - } else { - listenFailure = new UnavailableShardsException(shardIt.shardId(), "[{}] shardIt, [{}] active : Timeout waiting for [{}], request: {}", shardIt.size(), shardIt.sizeActive(), request.timeout(), actionName); - } - } - listener.onFailure(listenFailure); - return; - } - - observer.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - doStart(); - } - - @Override - public void onClusterServiceClose() { - listener.onFailure(new NodeClosedException(nodes.getLocalNode())); - } - - @Override - public void onTimeout(TimeValue timeout) { - // just to be on the safe side, see if we can start it now? - doStart(); - } - }, request.timeout()); - } - } - - private class ShardTransportHandler implements TransportRequestHandler { - - @Override - public void messageReceived(final Request request, final TransportChannel channel) throws Exception { - shardOperation(request, new ActionListener() { - @Override - public void onResponse(Response response) { - try { - channel.sendResponse(response); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.warn("failed to send response for get", inner); - } - } - }); - - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index e5322f51d50..1f3a97a25a2 100644 --- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -21,7 +21,7 @@ package org.elasticsearch.action.update; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -34,19 +34,17 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; -import org.elasticsearch.action.support.TransportActions; -import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.action.index.MappingUpdatedAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.PlainShardIterator; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; @@ -54,59 +52,52 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.Collections; import java.util.Map; import static org.elasticsearch.ExceptionsHelper.unwrapCause; /** */ -public class TransportUpdateAction extends TransportInstanceSingleOperationAction { +public class TransportUpdateAction extends TransportWriteAction { - private final TransportDeleteAction deleteAction; - private final TransportIndexAction indexAction; private final AutoCreateIndex autoCreateIndex; private final TransportCreateIndexAction createIndexAction; private final UpdateHelper updateHelper; private final IndicesService indicesService; + private final MappingUpdatedAction mappingUpdatedAction; + private final boolean allowIdGeneration; @Inject public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, - TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportCreateIndexAction createIndexAction, - UpdateHelper updateHelper, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - IndicesService indicesService, AutoCreateIndex autoCreateIndex) { - super(settings, UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpdateRequest::new); - this.indexAction = indexAction; - this.deleteAction = deleteAction; + TransportCreateIndexAction createIndexAction, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, IndicesService indicesService, + AutoCreateIndex autoCreateIndex, ShardStateAction shardStateAction, + MappingUpdatedAction mappingUpdatedAction, ScriptService scriptService) { + super(settings, UpdateAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, + actionFilters, indexNameExpressionResolver, UpdateRequest::new, UpdateReplicaRequest::new, ThreadPool.Names.INDEX); this.createIndexAction = createIndexAction; - this.updateHelper = updateHelper; + this.updateHelper = new UpdateHelper(scriptService, logger); this.indicesService = indicesService; this.autoCreateIndex = autoCreateIndex; + this.mappingUpdatedAction = mappingUpdatedAction; + this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true); } @Override - protected String executor() { - return ThreadPool.Names.INDEX; - } - - @Override - protected UpdateResponse newResponse() { - return new UpdateResponse(); - } - - @Override - protected boolean retryOnFailure(Exception e) { - return TransportActions.isShardNotAvailableException(e); - } - - @Override - protected void resolveRequest(ClusterState state, UpdateRequest request) { - resolveAndValidateRouting(state.metaData(), request.concreteIndex(), request); + protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, UpdateRequest request) { + super.resolveRequest(metaData, indexMetaData, request); + resolveAndValidateRouting(metaData, indexMetaData.getIndex().getName(), request); + ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(), + indexMetaData.getIndex().getName(), request.id(), request.routing()); + request.setShardId(shardId); } public static void resolveAndValidateRouting(MetaData metaData, String concreteIndex, UpdateRequest request) { @@ -118,13 +109,17 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio } @Override - protected void doExecute(final UpdateRequest request, final ActionListener listener) { + protected void doExecute(Task task, UpdateRequest request, ActionListener listener) { // if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) { - createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener() { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(); + createIndexRequest.index(request.index()); + createIndexRequest.cause("auto(update api)"); + createIndexRequest.masterNodeTimeout(request.timeout()); + createIndexAction.execute(createIndexRequest, new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { - innerExecute(request, listener); + innerExecute(task, request, listener); } @Override @@ -132,7 +127,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio if (unwrapCause(e) instanceof IndexAlreadyExistsException) { // we have the index, do it try { - innerExecute(request, listener); + innerExecute(task, request, listener); } catch (Exception inner) { inner.addSuppressed(e); listener.onFailure(inner); @@ -143,153 +138,123 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio } }); } else { - innerExecute(request, listener); + innerExecute(task, request, listener); } } - private void innerExecute(final UpdateRequest request, final ActionListener listener) { - super.doExecute(request, listener); - } - @Override - protected ShardIterator shards(ClusterState clusterState, UpdateRequest request) { - if (request.getShardId() != null) { - return clusterState.routingTable().index(request.concreteIndex()).shard(request.getShardId().getId()).primaryShardIt(); - } - ShardIterator shardIterator = clusterService.operationRouting() - .indexShards(clusterState, request.concreteIndex(), request.id(), request.routing()); - ShardRouting shard; - while ((shard = shardIterator.nextOrNull()) != null) { - if (shard.primary()) { - return new PlainShardIterator(shardIterator.shardId(), Collections.singletonList(shard)); + protected UpdateResponse newResponseInstance() { + return new UpdateResponse(); + } + + private void innerExecute(Task task, final UpdateRequest request, final ActionListener listener) { + super.doExecute(task, request, listener); + } + + @Override + protected WriteResult onPrimaryShard(UpdateRequest request, IndexShard indexShard) throws Exception { + ShardId shardId = request.shardId(); + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexMetaData indexMetaData = indexService.getMetaData(); + return executeUpdateRequestOnPrimary(request, indexShard, indexMetaData, updateHelper, mappingUpdatedAction, allowIdGeneration); + } + + public static WriteResult executeUpdateRequestOnPrimary(UpdateRequest request, + IndexShard indexShard, + IndexMetaData indexMetaData, + UpdateHelper updateHelper, + MappingUpdatedAction mappingUpdatedAction, + boolean allowIdGeneration) + throws Exception { + int maxAttempts = request.retryOnConflict(); + for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) { + try { + return shardUpdateOperation(indexMetaData, indexShard, request, updateHelper, mappingUpdatedAction, allowIdGeneration); + } catch (Exception e) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + if (attemptCount == maxAttempts // bubble up exception when we run out of attempts + || (cause instanceof VersionConflictEngineException) == false) { // or when exception is not a version conflict + throw e; + } } } - return new PlainShardIterator(shardIterator.shardId(), Collections.emptyList()); + throw new IllegalStateException("version conflict exception should bubble up on last attempt"); + } - @Override - protected void shardOperation(final UpdateRequest request, final ActionListener listener) { - shardOperation(request, listener, 0); - } - - protected void shardOperation(final UpdateRequest request, final ActionListener listener, final int retryCount) { - final ShardId shardId = request.getShardId(); - final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - final IndexShard indexShard = indexService.getShard(shardId.getId()); + private static WriteResult shardUpdateOperation(IndexMetaData indexMetaData, + IndexShard indexShard, + UpdateRequest request, + UpdateHelper updateHelper, + MappingUpdatedAction mappingUpdatedAction, + boolean allowIdGeneration) + throws Exception { final UpdateHelper.Result result = updateHelper.prepare(request, indexShard); switch (result.getResponseResult()) { case CREATED: - IndexRequest upsertRequest = result.action(); - // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request - final BytesReference upsertSourceBytes = upsertRequest.source(); - indexAction.execute(upsertRequest, new ActionListener() { - @Override - public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult()); - if ((request.fetchSource() != null && request.fetchSource().fetchSource()) || - (request.fields() != null && request.fields().length > 0)) { - Tuple> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true); - update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes)); - } else { - update.setGetResult(null); - } - update.setForcedRefresh(response.forcedRefresh()); - listener.onResponse(update); - } - - @Override - public void onFailure(Exception e) { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - if (cause instanceof VersionConflictEngineException) { - if (retryCount < request.retryOnConflict()) { - logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]", - retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id()); - threadPool.executor(executor()).execute(new ActionRunnable(listener) { - @Override - protected void doRun() { - shardOperation(request, listener, retryCount + 1); - } - }); - return; - } - } - listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause)); - } - }); - break; case UPDATED: IndexRequest indexRequest = result.action(); + MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type()); + indexRequest.process(mappingMd, allowIdGeneration, indexMetaData.getIndex().getName()); + WriteResult indexResponseWriteResult = TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction); + IndexResponse response = indexResponseWriteResult.getResponse(); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult()); // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request final BytesReference indexSourceBytes = indexRequest.source(); - indexAction.execute(indexRequest, new ActionListener() { - @Override - public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult()); - update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); - update.setForcedRefresh(response.forcedRefresh()); - listener.onResponse(update); + if (result.getResponseResult() == DocWriteResponse.Result.CREATED) { + if ((request.fetchSource() != null && request.fetchSource().fetchSource()) || + (request.fields() != null && request.fields().length > 0)) { + Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceBytes, true); + update.setGetResult(updateHelper.extractGetResult(request, indexMetaData.getIndex().getName(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceBytes)); + } else { + update.setGetResult(null); } - - @Override - public void onFailure(Exception e) { - final Throwable cause = unwrapCause(e); - if (cause instanceof VersionConflictEngineException) { - if (retryCount < request.retryOnConflict()) { - threadPool.executor(executor()).execute(new ActionRunnable(listener) { - @Override - protected void doRun() { - shardOperation(request, listener, retryCount + 1); - } - }); - return; - } - } - listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause)); - } - }); - break; + } else if (result.getResponseResult() == DocWriteResponse.Result.UPDATED) { + update.setGetResult(updateHelper.extractGetResult(request, indexMetaData.getIndex().getName(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); + } + update.setForcedRefresh(response.forcedRefresh()); + UpdateReplicaRequest updateReplicaRequest = new UpdateReplicaRequest(indexRequest); + updateReplicaRequest.setParentTask(request.getParentTask()); + updateReplicaRequest.setShardId(request.shardId()); + updateReplicaRequest.setRefreshPolicy(request.getRefreshPolicy()); + return new WriteResult<>(updateReplicaRequest, update, indexResponseWriteResult.getLocation()); case DELETED: DeleteRequest deleteRequest = result.action(); - deleteAction.execute(deleteRequest, new ActionListener() { - @Override - public void onResponse(DeleteResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult()); - update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); - update.setForcedRefresh(response.forcedRefresh()); - listener.onResponse(update); - } - - @Override - public void onFailure(Exception e) { - final Throwable cause = unwrapCause(e); - if (cause instanceof VersionConflictEngineException) { - if (retryCount < request.retryOnConflict()) { - threadPool.executor(executor()).execute(new ActionRunnable(listener) { - @Override - protected void doRun() { - shardOperation(request, listener, retryCount + 1); - } - }); - return; - } - } - listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause)); - } - }); - break; + WriteResult deleteResponseWriteResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard); + DeleteResponse deleteResponse = deleteResponseWriteResult.getResponse(); + UpdateResponse deleteUpdate = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getVersion(), deleteResponse.getResult()); + deleteUpdate.setGetResult(updateHelper.extractGetResult(request, indexMetaData.getIndex().getName(), deleteResponse.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); + deleteUpdate.setForcedRefresh(deleteResponse.forcedRefresh()); + UpdateReplicaRequest deleteReplicaRequest = new UpdateReplicaRequest(deleteRequest); + deleteReplicaRequest.setParentTask(request.getParentTask()); + deleteReplicaRequest.setShardId(request.shardId()); + deleteReplicaRequest.setRefreshPolicy(request.getRefreshPolicy()); + return new WriteResult<>(deleteReplicaRequest, deleteUpdate, deleteResponseWriteResult.getLocation()); case NOOP: - UpdateResponse update = result.action(); - IndexService indexServiceOrNull = indicesService.indexService(shardId.getIndex()); - if (indexServiceOrNull != null) { - IndexShard shard = indexService.getShardOrNull(shardId.getId()); - if (shard != null) { - shard.noopUpdate(request.type()); - } - } - listener.onResponse(update); - break; + UpdateResponse noopUpdate = result.action(); + indexShard.noopUpdate(request.type()); + return new WriteResult<>(null, noopUpdate, null); default: throw new IllegalStateException("Illegal result " + result.getResponseResult()); } } + + @Override + protected Translog.Location onReplicaShard(UpdateReplicaRequest request, IndexShard indexShard) { + assert request.getRequest() != null; + final Translog.Location location; + switch (request.getRequest().opType()) { + case INDEX: + case CREATE: + location = TransportIndexAction.executeIndexRequestOnReplica(((IndexRequest) request.getRequest()), indexShard).getTranslogLocation(); + break; + case DELETE: + location = TransportDeleteAction.executeDeleteRequestOnReplica(((DeleteRequest) request.getRequest()), indexShard).getTranslogLocation(); + break; + default: + throw new IllegalStateException("unexpected opType [" + request.getRequest().opType().getLowercase() + "]"); + + } + return location; + } } diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 49206470532..c242f885f06 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.update; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -27,11 +28,8 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; @@ -63,14 +61,14 @@ import java.util.Map; /** * Helper for translating an update request to an index, delete request or update response. */ -public class UpdateHelper extends AbstractComponent { +public class UpdateHelper { private final ScriptService scriptService; + private final Logger logger; - @Inject - public UpdateHelper(Settings settings, ScriptService scriptService) { - super(settings); + public UpdateHelper(ScriptService scriptService, Logger logger) { this.scriptService = scriptService; + this.logger = logger; } /** @@ -259,7 +257,7 @@ public class UpdateHelper extends AbstractComponent { return ctx; } - private TimeValue getTTLFromScriptContext(Map ctx) { + private static TimeValue getTTLFromScriptContext(Map ctx) { Object fetchedTTL = ctx.get("_ttl"); if (fetchedTTL != null) { if (fetchedTTL instanceof Number) { diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateReplicaRequest.java b/core/src/main/java/org/elasticsearch/action/update/UpdateReplicaRequest.java new file mode 100644 index 00000000000..5f258a675c2 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateReplicaRequest.java @@ -0,0 +1,113 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.update; + +import org.elasticsearch.action.DocumentRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.VersionType; + +import java.io.IOException; + +/** Replica request for update operation holds translated (index/delete) requests */ +public class UpdateReplicaRequest extends DocumentRequest { + private DocumentRequest request; + + public UpdateReplicaRequest() { + } + + public UpdateReplicaRequest(DocumentRequest request) { + assert !(request instanceof UpdateReplicaRequest) : "underlying request must not be a update replica request"; + this.request = request; + this.index = request.index(); + setRefreshPolicy(request.getRefreshPolicy()); + setShardId(request.shardId()); + setParentTask(request.getParentTask()); + } + + public DocumentRequest getRequest() { + return request; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + request = DocumentRequest.readDocumentRequest(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + DocumentRequest.writeDocumentRequest(out, request); + } + + @Override + public String type() { + return request.type(); + } + + @Override + public String id() { + return request.id(); + } + + @Override + public UpdateReplicaRequest routing(String routing) { + throw new UnsupportedOperationException("setting routing is not supported"); + } + + @Override + public String routing() { + return request.routing(); + } + + @Override + public String parent() { + return request.parent(); + } + + @Override + public long version() { + return request.version(); + } + + @Override + public UpdateReplicaRequest version(long version) { + throw new UnsupportedOperationException("setting version is not supported"); + } + + @Override + public VersionType versionType() { + return request.versionType(); + } + + @Override + public UpdateReplicaRequest versionType(VersionType versionType) { + throw new UnsupportedOperationException("setting version type is not supported"); + } + + @Override + public OpType opType() { + return request.opType(); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index deca938fa6a..80d3676e051 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -23,9 +23,6 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.support.replication.ReplicationRequest; -import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.bytes.BytesArray; @@ -56,10 +53,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; /** */ -public class UpdateRequest extends InstanceShardOperationRequest - implements DocumentRequest, WriteRequest { - private static final DeprecationLogger DEPRECATION_LOGGER = - new DeprecationLogger(Loggers.getLogger(UpdateRequest.class)); +public class UpdateRequest extends DocumentRequest { private String type; private String id; @@ -97,7 +91,7 @@ public class UpdateRequest extends InstanceShardOperationRequest } public UpdateRequest(String index, String type, String id) { - super(index); + this.index = index; this.type = type; this.id = id; } @@ -495,39 +489,6 @@ public class UpdateRequest extends InstanceShardOperationRequest return OpType.UPDATE; } - @Override - public UpdateRequest setRefreshPolicy(RefreshPolicy refreshPolicy) { - this.refreshPolicy = refreshPolicy; - return this; - } - - @Override - public RefreshPolicy getRefreshPolicy() { - return refreshPolicy; - } - - public ActiveShardCount waitForActiveShards() { - return this.waitForActiveShards; - } - - /** - * Sets the number of shard copies that must be active before proceeding with the write. - * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. - */ - public UpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) { - this.waitForActiveShards = waitForActiveShards; - return this; - } - - /** - * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical - * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} - * to get the ActiveShardCount. - */ - public UpdateRequest waitForActiveShards(final int waitForActiveShards) { - return waitForActiveShards(ActiveShardCount.from(waitForActiveShards)); - } - /** * Sets the doc to use for updates when a script is not specified. */ diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java index bbbc9bafd8f..e9b111f4df9 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java @@ -23,7 +23,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequestBuilder; import org.elasticsearch.action.support.replication.ReplicationRequest; -import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequestBuilder; +import org.elasticsearch.action.support.replication.ReplicationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.DeprecationLogger; @@ -37,7 +37,7 @@ import org.elasticsearch.script.Script; import java.util.Map; -public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder +public class UpdateRequestBuilder extends ReplicationRequestBuilder implements WriteRequestBuilder { private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(Loggers.getLogger(RestUpdateAction.class)); diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java index eb1843dc7d9..83f347d6b98 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -22,7 +22,6 @@ package org.elasticsearch.indices; import org.elasticsearch.action.admin.indices.rollover.Condition; import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition; import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition; -import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.common.geo.ShapesAvailability; import org.elasticsearch.common.inject.AbstractModule; @@ -182,7 +181,6 @@ public class IndicesModule extends AbstractModule { bind(SyncedFlushService.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); bind(IndicesTTLService.class).asEagerSingleton(); - bind(UpdateHelper.class).asEagerSingleton(); bind(MetaDataIndexUpgradeService.class).asEagerSingleton(); bind(NodeServicesProvider.class).asEagerSingleton(); } diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index d1d01610f18..5c692668f26 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -222,8 +222,7 @@ public class IndicesRequestIT extends ESIntegTestCase { } public void testUpdate() { - //update action goes to the primary, index op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[p]", UpdateAction.NAME + "[r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); @@ -237,8 +236,7 @@ public class IndicesRequestIT extends ESIntegTestCase { } public void testUpdateUpsert() { - //update action goes to the primary, index op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[p]", UpdateAction.NAME + "[r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); @@ -251,8 +249,7 @@ public class IndicesRequestIT extends ESIntegTestCase { } public void testUpdateDelete() { - //update action goes to the primary, delete op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", DeleteAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[p]", UpdateAction.NAME + "[r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index a554ca53d99..d2070fb21db 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteResponse; @@ -128,21 +129,21 @@ public class TransportWriteActionTests extends ESTestCase { resultChecker.accept(listener.response, forcedRefresh); } - private class TestAction extends TransportWriteAction { + private class TestAction extends TransportWriteAction { protected TestAction() { super(Settings.EMPTY, "test", new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR), null, null, null, null, new ActionFilters(new HashSet<>()), - new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, ThreadPool.Names.SAME); + new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME); } @Override - protected IndexShard indexShard(TestRequest request) { + protected IndexShard indexShard(ReplicatedWriteRequest request) { return indexShard; } @Override - protected WriteResult onPrimaryShard(TestRequest request, IndexShard indexShard) throws Exception { - return new WriteResult<>(new TestResponse(), location); + protected WriteResult onPrimaryShard(TestRequest request, IndexShard indexShard) throws Exception { + return new WriteResult<>(request, new TestResponse(), location); } @Override diff --git a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java deleted file mode 100644 index 1d736060568..00000000000 --- a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java +++ /dev/null @@ -1,327 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.single.instance; - -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.support.ActionFilter; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlock; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.transport.CapturingTransport; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportService; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Supplier; - -import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; -import static org.elasticsearch.test.ClusterServiceUtils.setState; -import static org.hamcrest.core.IsEqual.equalTo; - -public class TransportInstanceSingleOperationActionTests extends ESTestCase { - - private static ThreadPool THREAD_POOL; - - private ClusterService clusterService; - private CapturingTransport transport; - private TransportService transportService; - - private TestTransportInstanceSingleOperationAction action; - - public static class Request extends InstanceShardOperationRequest { - public Request() { - } - } - - public static class Response extends ActionResponse { - public Response() { - } - } - - class TestTransportInstanceSingleOperationAction extends TransportInstanceSingleOperationAction { - private final Map shards = new HashMap<>(); - - public TestTransportInstanceSingleOperationAction(Settings settings, String actionName, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { - super(settings, actionName, THREAD_POOL, TransportInstanceSingleOperationActionTests.this.clusterService, transportService, actionFilters, indexNameExpressionResolver, request); - } - - public Map getResults() { - return shards; - } - - @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected void shardOperation(Request request, ActionListener listener) { - throw new UnsupportedOperationException("Not implemented in test class"); - } - - @Override - protected Response newResponse() { - return new Response(); - } - - @Override - protected void resolveRequest(ClusterState state, Request request) { - } - - @Override - protected ShardIterator shards(ClusterState clusterState, Request request) { - return clusterState.routingTable().index(request.concreteIndex()).shard(request.shardId.getId()).primaryShardIt(); - } - } - - class MyResolver extends IndexNameExpressionResolver { - public MyResolver() { - super(Settings.EMPTY); - } - - @Override - public String[] concreteIndexNames(ClusterState state, IndicesRequest request) { - return request.indices(); - } - } - - @BeforeClass - public static void startThreadPool() { - THREAD_POOL = new TestThreadPool(TransportInstanceSingleOperationActionTests.class.getSimpleName()); - } - - @Before - public void setUp() throws Exception { - super.setUp(); - transport = new CapturingTransport(); - clusterService = createClusterService(THREAD_POOL); - transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, TransportService.NOOP_TRANSPORT_INTERCEPTOR); - transportService.start(); - transportService.acceptIncomingRequests(); - action = new TestTransportInstanceSingleOperationAction( - Settings.EMPTY, - "indices:admin/test", - transportService, - new ActionFilters(new HashSet()), - new MyResolver(), - Request::new - ); - } - - @After - public void tearDown() throws Exception { - super.tearDown(); - clusterService.close(); - transportService.close(); - } - - @AfterClass - public static void destroyThreadPool() { - ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS); - // since static must set to null to be eligible for collection - THREAD_POOL = null; - } - - public void testGlobalBlock() { - Request request = new Request(); - PlainActionFuture listener = new PlainActionFuture<>(); - ClusterBlocks.Builder block = ClusterBlocks.builder() - .addGlobalBlock(new ClusterBlock(1, "", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); - setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block)); - try { - action.new AsyncSingleAction(request, listener).start(); - listener.get(); - fail("expected ClusterBlockException"); - } catch (Exception e) { - if (ExceptionsHelper.unwrap(e, ClusterBlockException.class) == null) { - logger.info("expected ClusterBlockException but got ", e); - fail("expected ClusterBlockException"); - } - } - } - - public void testBasicRequestWorks() throws InterruptedException, ExecutionException, TimeoutException { - Request request = new Request().index("test"); - request.shardId = new ShardId("test", "_na_", 0); - PlainActionFuture listener = new PlainActionFuture<>(); - setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED)); - action.new AsyncSingleAction(request, listener).start(); - assertThat(transport.capturedRequests().length, equalTo(1)); - transport.handleResponse(transport.capturedRequests()[0].requestId, new Response()); - listener.get(); - } - - public void testFailureWithoutRetry() throws Exception { - Request request = new Request().index("test"); - request.shardId = new ShardId("test", "_na_", 0); - PlainActionFuture listener = new PlainActionFuture<>(); - setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED)); - - action.new AsyncSingleAction(request, listener).start(); - assertThat(transport.capturedRequests().length, equalTo(1)); - long requestId = transport.capturedRequests()[0].requestId; - transport.clear(); - // this should not trigger retry or anything and the listener should report exception immediately - transport.handleRemoteError(requestId, new TransportException("a generic transport exception", new Exception("generic test exception"))); - - try { - // result should return immediately - assertTrue(listener.isDone()); - listener.get(); - fail("this should fail with a transport exception"); - } catch (ExecutionException t) { - if (ExceptionsHelper.unwrap(t, TransportException.class) == null) { - logger.info("expected TransportException but got ", t); - fail("expected and TransportException"); - } - } - } - - public void testSuccessAfterRetryWithClusterStateUpdate() throws Exception { - Request request = new Request().index("test"); - request.shardId = new ShardId("test", "_na_", 0); - PlainActionFuture listener = new PlainActionFuture<>(); - boolean local = randomBoolean(); - setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.INITIALIZING)); - action.new AsyncSingleAction(request, listener).start(); - // this should fail because primary not initialized - assertThat(transport.capturedRequests().length, equalTo(0)); - setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED)); - // this time it should work - assertThat(transport.capturedRequests().length, equalTo(1)); - transport.handleResponse(transport.capturedRequests()[0].requestId, new Response()); - listener.get(); - } - - public void testSuccessAfterRetryWithExceptionFromTransport() throws Exception { - Request request = new Request().index("test"); - request.shardId = new ShardId("test", "_na_", 0); - PlainActionFuture listener = new PlainActionFuture<>(); - boolean local = randomBoolean(); - setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED)); - action.new AsyncSingleAction(request, listener).start(); - assertThat(transport.capturedRequests().length, equalTo(1)); - long requestId = transport.capturedRequests()[0].requestId; - transport.clear(); - DiscoveryNode node = clusterService.state().getNodes().getLocalNode(); - transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception")); - // trigger cluster state observer - setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED)); - assertThat(transport.capturedRequests().length, equalTo(1)); - transport.handleResponse(transport.capturedRequests()[0].requestId, new Response()); - listener.get(); - } - - public void testRetryOfAnAlreadyTimedOutRequest() throws Exception { - Request request = new Request().index("test").timeout(new TimeValue(0, TimeUnit.MILLISECONDS)); - request.shardId = new ShardId("test", "_na_", 0); - PlainActionFuture listener = new PlainActionFuture<>(); - setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED)); - action.new AsyncSingleAction(request, listener).start(); - assertThat(transport.capturedRequests().length, equalTo(1)); - long requestId = transport.capturedRequests()[0].requestId; - transport.clear(); - DiscoveryNode node = clusterService.state().getNodes().getLocalNode(); - transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception")); - - // wait until the timeout was triggered and we actually tried to send for the second time - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(transport.capturedRequests().length, equalTo(1)); - } - }); - - // let it fail the second time too - requestId = transport.capturedRequests()[0].requestId; - transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception")); - try { - // result should return immediately - assertTrue(listener.isDone()); - listener.get(); - fail("this should fail with a transport exception"); - } catch (ExecutionException t) { - if (ExceptionsHelper.unwrap(t, ConnectTransportException.class) == null) { - logger.info("expected ConnectTransportException but got ", t); - fail("expected and ConnectTransportException"); - } - } - } - - public void testUnresolvableRequestDoesNotHang() throws InterruptedException, ExecutionException, TimeoutException { - action = new TestTransportInstanceSingleOperationAction( - Settings.EMPTY, - "indices:admin/test_unresolvable", - transportService, - new ActionFilters(new HashSet<>()), - new MyResolver(), - Request::new - ) { - @Override - protected void resolveRequest(ClusterState state, Request request) { - throw new IllegalStateException("request cannot be resolved"); - } - }; - Request request = new Request().index("test"); - request.shardId = new ShardId("test", "_na_", 0); - PlainActionFuture listener = new PlainActionFuture<>(); - setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED)); - action.new AsyncSingleAction(request, listener).start(); - assertThat(transport.capturedRequests().length, equalTo(0)); - try { - listener.get(); - } catch (Exception e) { - if (ExceptionsHelper.unwrap(e, IllegalStateException.class) == null) { - logger.info("expected IllegalStateException but got ", e); - fail("expected and IllegalStateException"); - } - } - } -} diff --git a/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index cb27a527f63..d4291464fdb 100644 --- a/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -172,9 +172,8 @@ public class UpdateRequestTests extends ESTestCase { // Related to issue 3256 public void testUpdateRequestWithTTL() throws Exception { TimeValue providedTTLValue = TimeValue.parseTimeValue(randomTimeValue(), null, "ttl"); - Settings settings = settings(Version.CURRENT).build(); - UpdateHelper updateHelper = new UpdateHelper(settings, null); + UpdateHelper updateHelper = new UpdateHelper(null, logger); // We just upsert one document with ttl IndexRequest indexRequest = new IndexRequest("test", "type1", "1") diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 6e200c4756a..67f0b4bf4a6 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -366,8 +366,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase @Override protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception { - TransportWriteAction.WriteResult result = TransportIndexAction.executeIndexRequestOnPrimary(request, primary, - null); + TransportWriteAction.WriteResult result = + TransportIndexAction.executeIndexRequestOnPrimary(request, primary, null); request.primaryTerm(primary.getPrimaryTerm()); TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.getLocation(), logger); return new PrimaryResult(request, result.getResponse()); diff --git a/docs/reference/docs/update.asciidoc b/docs/reference/docs/update.asciidoc index ff4c4c657d7..7972bc39f37 100644 --- a/docs/reference/docs/update.asciidoc +++ b/docs/reference/docs/update.asciidoc @@ -162,8 +162,8 @@ the request was ignored. -------------------------------------------------- { "_shards": { - "total": 0, - "successful": 0, + "total": 1, + "successful": 1, "failed": 0 }, "_index": "test",