diff --git a/core/src/main/java/org/elasticsearch/action/DocumentRequest.java b/core/src/main/java/org/elasticsearch/action/DocumentRequest.java index f4c88e159c7..ef2aa815a6b 100644 --- a/core/src/main/java/org/elasticsearch/action/DocumentRequest.java +++ b/core/src/main/java/org/elasticsearch/action/DocumentRequest.java @@ -20,10 +20,11 @@ package org.elasticsearch.action; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.update.UpdateReplicaRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.index.VersionType; import java.io.IOException; @@ -33,84 +34,72 @@ import java.util.Locale; * Generic interface to group ActionRequest, which perform writes to a single document * Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest} */ -public interface DocumentRequest extends IndicesRequest { - - /** - * Get the index that this request operates on - * @return the index - */ - String index(); +public abstract class DocumentRequest> extends ReplicatedWriteRequest { /** * Get the type that this request operates on * @return the type */ - String type(); + public abstract String type(); /** * Get the id of the document for this request * @return the id */ - String id(); - - /** - * Get the options for this request - * @return the indices options - */ - IndicesOptions indicesOptions(); + public abstract String id(); /** * Set the routing for this request * @return the Request */ - T routing(String routing); + public abstract T routing(String routing); /** * Get the routing for this request * @return the Routing */ - String routing(); + public abstract String routing(); /** * Get the parent for this request * @return the Parent */ - String parent(); + public abstract String parent(); /** * Get the document version for this request * @return the document version */ - long version(); + public abstract long version(); /** * Sets the version, which will perform the operation only if a matching * version exists and no changes happened on the doc since then. */ - T version(long version); + public abstract T version(long version); /** * Get the document version type for this request * @return the document version type */ - VersionType versionType(); + public abstract VersionType versionType(); /** * Sets the versioning type. Defaults to {@link VersionType#INTERNAL}. */ - T versionType(VersionType versionType); + public abstract T versionType(VersionType versionType); /** * Get the requested document operation type of the request * @return the operation type {@link OpType} */ - OpType opType(); + public abstract OpType opType(); /** * Requested operation type to perform on the document */ - enum OpType { + public enum OpType { /** * Index the source. If there an existing document with the id, it will * be replaced. @@ -164,40 +153,42 @@ public interface DocumentRequest extends IndicesRequest { } /** read a document write (index/delete/update) request */ - static DocumentRequest readDocumentRequest(StreamInput in) throws IOException { + public static DocumentRequest readDocumentRequest(StreamInput in) throws IOException { byte type = in.readByte(); - final DocumentRequest documentRequest; if (type == 0) { IndexRequest indexRequest = new IndexRequest(); indexRequest.readFrom(in); - documentRequest = indexRequest; + return indexRequest; } else if (type == 1) { DeleteRequest deleteRequest = new DeleteRequest(); deleteRequest.readFrom(in); - documentRequest = deleteRequest; + return deleteRequest; } else if (type == 2) { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.readFrom(in); - documentRequest = updateRequest; + return updateRequest; + } else if (type == 3) { + UpdateReplicaRequest updateReplicaRequest = new UpdateReplicaRequest(); + updateReplicaRequest.readFrom(in); + return updateReplicaRequest; } else { throw new IllegalStateException("invalid request type [" + type+ " ]"); } - return documentRequest; } /** write a document write (index/delete/update) request*/ - static void writeDocumentRequest(StreamOutput out, DocumentRequest request) throws IOException { + public static void writeDocumentRequest(StreamOutput out, DocumentRequest request) throws IOException { if (request instanceof IndexRequest) { out.writeByte((byte) 0); - ((IndexRequest) request).writeTo(out); } else if (request instanceof DeleteRequest) { out.writeByte((byte) 1); - ((DeleteRequest) request).writeTo(out); } else if (request instanceof UpdateRequest) { out.writeByte((byte) 2); - ((UpdateRequest) request).writeTo(out); + } else if (request instanceof UpdateReplicaRequest) { + out.writeByte((byte) 3); } else { throw new IllegalStateException("invalid request [" + request.getClass().getSimpleName() + " ]"); } + request.writeTo(out); } } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 079d4efe9bf..df9fd13b034 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -20,9 +20,6 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.action.DocumentRequest; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index dc72407cf42..7729c737439 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -528,11 +528,11 @@ public class BulkRequest extends ActionRequest implements Composite } for (DocumentRequest request : requests) { // We first check if refresh has been set - if (((WriteRequest) request).getRefreshPolicy() != RefreshPolicy.NONE) { + if (request.getRefreshPolicy() != RefreshPolicy.NONE) { validationException = addValidationError( "RefreshPolicy is not supported on an item request. Set it on the BulkRequest instead.", validationException); } - ActionRequestValidationException ex = ((WriteRequest) request).validate(); + ActionRequestValidationException ex = request.validate(); if (ex != null) { if (validationException == null) { validationException = new ActionRequestValidationException(); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index f7861d1e093..37c1b7c2290 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -27,14 +27,12 @@ import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; -import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.TransportDeleteAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.action.update.TransportUpdateAction; -import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -211,7 +209,7 @@ public class TransportBulkAction extends HandledTransportAction documentRequest = bulkRequest.requests.get(i); + DocumentRequest documentRequest = bulkRequest.requests.get(i); //the request can only be null because we set it to null in the previous step, so it gets ignored if (documentRequest == null) { continue; @@ -234,10 +232,8 @@ public class TransportBulkAction extends HandledTransportAction { +public class TransportShardBulkAction extends TransportWriteAction { public static final String ACTION_NAME = BulkAction.NAME + "[s]"; - private final UpdateHelper updateHelper; private final boolean allowIdGeneration; private final MappingUpdatedAction mappingUpdatedAction; + private final UpdateHelper updateHelper; + private final AutoCreateIndex autoCreateIndex; + private final TransportCreateIndexAction createIndexAction; @Inject public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { + MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, ScriptService scriptService, + AutoCreateIndex autoCreateIndex, TransportCreateIndexAction createIndexAction) { super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, BulkShardRequest::new, ThreadPool.Names.BULK); - this.updateHelper = updateHelper; + indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.BULK); this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true); this.mappingUpdatedAction = mappingUpdatedAction; + this.updateHelper = new UpdateHelper(scriptService, logger); + this.autoCreateIndex = autoCreateIndex; + this.createIndexAction = createIndexAction; } @Override @@ -105,7 +122,39 @@ public class TransportShardBulkAction extends TransportWriteAction onPrimaryShard(BulkShardRequest request, IndexShard indexShard) throws Exception { + protected void doExecute(Task task, BulkShardRequest request, ActionListener listener) { + ClusterState state = clusterService.state(); + if (autoCreateIndex.shouldAutoCreate(request.index(), state)) { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(); + createIndexRequest.index(request.index()); + createIndexRequest.cause("auto(bulk api)"); + createIndexRequest.masterNodeTimeout(request.timeout()); + createIndexAction.execute(task, createIndexRequest, new ActionListener() { + @Override + public void onResponse(CreateIndexResponse result) { + innerExecute(task, request, listener); + } + + @Override + public void onFailure(Exception e) { + if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) { + // we have the index, do it + innerExecute(task, request, listener); + } else { + listener.onFailure(e); + } + } + }); + } else { + innerExecute(task, request, listener); + } + } + + private void innerExecute(Task task, final BulkShardRequest request, final ActionListener listener) { + super.doExecute(task, request, listener); + } + @Override + protected WriteResult onPrimaryShard(BulkShardRequest request, IndexShard indexShard) throws Exception { ShardId shardId = request.shardId(); final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexMetaData metaData = indexService.getIndexSettings().getIndexMetaData(); @@ -123,7 +172,7 @@ public class TransportShardBulkAction extends TransportWriteAction(response, location); + return new WriteResult<>(request, response, location); } /** Executes bulk item requests and handles request execution exceptions */ @@ -131,22 +180,39 @@ public class TransportShardBulkAction extends TransportWriteAction itemRequest = request.items()[requestIndex].request(); + preVersions[requestIndex] = itemRequest.version(); + preVersionTypes[requestIndex] = itemRequest.versionType(); + DocumentRequest.OpType opType = itemRequest.opType(); try { - WriteResult writeResult = innerExecuteBulkItemRequest(metaData, indexShard, - request, requestIndex); + final WriteResult writeResult; + switch (itemRequest.opType()) { + case CREATE: + case INDEX: + writeResult = TransportIndexAction.executeIndexRequestOnPrimary(((IndexRequest) itemRequest), indexShard, + mappingUpdatedAction); + break; + case UPDATE: + writeResult = TransportUpdateAction.executeUpdateRequestOnPrimary(((UpdateRequest) itemRequest), indexShard, + metaData, updateHelper, mappingUpdatedAction, allowIdGeneration); + break; + case DELETE: + writeResult = TransportDeleteAction.executeDeleteRequestOnPrimary(((DeleteRequest) itemRequest), indexShard); + break; + default: + throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found"); + } if (writeResult.getLocation() != null) { location = locationToSync(location, writeResult.getLocation()); } else { assert writeResult.getResponse().getResult() == DocWriteResponse.Result.NOOP : "only noop operation can have null next operation"; } - // update the bulk item request because update request execution can mutate the bulk item request - BulkItemRequest item = request.items()[requestIndex]; + // update the bulk item request with replica request (update request are changed to index or delete requests for replication) + request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), + (DocumentRequest) writeResult.getReplicaRequest()); // add the response - setResponse(item, new BulkItemResponse(item.id(), opType, writeResult.getResponse())); + setResponse(request.items()[requestIndex], new BulkItemResponse(request.items()[requestIndex].id(), opType, writeResult.getResponse())); } catch (Exception e) { // rethrow the failure if we are going to retry on primary and let parent failure to handle it if (retryPrimaryException(e)) { @@ -182,33 +248,6 @@ public class TransportShardBulkAction extends TransportWriteAction innerExecuteBulkItemRequest(IndexMetaData metaData, IndexShard indexShard, - BulkShardRequest request, int requestIndex) throws Exception { - DocumentRequest itemRequest = request.items()[requestIndex].request(); - switch (itemRequest.opType()) { - case CREATE: - case INDEX: - return TransportIndexAction.executeIndexRequestOnPrimary(((IndexRequest) itemRequest), indexShard, mappingUpdatedAction); - case UPDATE: - int maxAttempts = ((UpdateRequest) itemRequest).retryOnConflict(); - for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) { - try { - return shardUpdateOperation(metaData, indexShard, request, requestIndex, ((UpdateRequest) itemRequest)); - } catch (Exception e) { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - if (attemptCount == maxAttempts // bubble up exception when we run out of attempts - || (cause instanceof VersionConflictEngineException) == false) { // or when exception is not a version conflict - throw e; - } - } - } - throw new IllegalStateException("version conflict exception should bubble up on last attempt"); - case DELETE: - return TransportDeleteAction.executeDeleteRequestOnPrimary(((DeleteRequest) itemRequest), indexShard); - default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found"); - } - } - private void setResponse(BulkItemRequest request, BulkItemResponse response) { request.setPrimaryResponse(response); if (response.isFailed()) { @@ -219,51 +258,6 @@ public class TransportShardBulkAction extends TransportWriteAction shardUpdateOperation(IndexMetaData metaData, IndexShard indexShard, - BulkShardRequest request, - int requestIndex, UpdateRequest updateRequest) - throws Exception { - // Todo: capture read version conflicts, missing documents and malformed script errors in the write result due to get request - UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard); - switch (translate.getResponseResult()) { - case CREATED: - case UPDATED: - IndexRequest indexRequest = translate.action(); - MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type()); - indexRequest.process(mappingMd, allowIdGeneration, request.index()); - WriteResult writeResult = TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction); - BytesReference indexSourceAsBytes = indexRequest.source(); - IndexResponse indexResponse = writeResult.getResponse(); - UpdateResponse writeUpdateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getResult()); - if (updateRequest.fields() != null && updateRequest.fields().length > 0) { - Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); - writeUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); - } - // Replace the update request to the translated index request to execute on the replica. - request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest); - return new WriteResult<>(writeUpdateResponse, writeResult.getLocation()); - case DELETED: - DeleteRequest deleteRequest = translate.action(); - WriteResult deleteResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard); - DeleteResponse response = deleteResult.getResponse(); - UpdateResponse deleteUpdateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult()); - deleteUpdateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), null)); - // Replace the update request to the translated delete request to execute on the replica. - request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest); - return new WriteResult<>(deleteUpdateResponse, deleteResult.getLocation()); - case NOOP: - BulkItemRequest item = request.items()[requestIndex]; - indexShard.noopUpdate(updateRequest.type()); - item.setIgnoreOnReplica(); // no need to go to the replica - return new WriteResult<>(translate.action(), null); - default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult()); - } - } - @Override protected Location onReplicaShard(BulkShardRequest request, IndexShard indexShard) { Translog.Location location = null; @@ -272,7 +266,8 @@ public class TransportShardBulkAction extends TransportWriteAction documentRequest = item.request(); + DocumentRequest documentRequest = (item.request() instanceof UpdateReplicaRequest) + ? ((UpdateReplicaRequest) item.request()).getRequest() : item.request(); final Engine.Operation operation; try { switch (documentRequest.opType()) { diff --git a/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index e3babcfc380..f2e5e13494d 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/core/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -43,7 +43,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; * @see org.elasticsearch.client.Client#delete(DeleteRequest) * @see org.elasticsearch.client.Requests#deleteRequest(String) */ -public class DeleteRequest extends ReplicatedWriteRequest implements DocumentRequest { +public class DeleteRequest extends DocumentRequest { private String type; private String id; diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 6f3d27ea369..926700e327e 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.delete; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -49,7 +50,7 @@ import org.elasticsearch.transport.TransportService; /** * Performs the delete operation. */ -public class TransportDeleteAction extends TransportWriteAction { +public class TransportDeleteAction extends TransportWriteAction { private final AutoCreateIndex autoCreateIndex; private final TransportCreateIndexAction createIndexAction; @@ -61,7 +62,7 @@ public class TransportDeleteAction extends TransportWriteAction listener) { ClusterState state = clusterService.state(); if (autoCreateIndex.shouldAutoCreate(request.index(), state)) { - createIndexAction.execute(task, new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener() { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(); + createIndexRequest.index(request.index()); + createIndexRequest.cause("auto(delete api)"); + createIndexRequest.masterNodeTimeout(request.timeout()); + createIndexAction.execute(task, createIndexRequest, new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { innerExecute(task, request, listener); @@ -100,15 +105,6 @@ public class TransportDeleteAction extends TransportWriteAction listener) { super.doExecute(task, request, listener); } @@ -119,7 +115,7 @@ public class TransportDeleteAction extends TransportWriteAction onPrimaryShard(DeleteRequest request, IndexShard indexShard) { + protected WriteResult onPrimaryShard(DeleteRequest request, IndexShard indexShard) { return executeDeleteRequestOnPrimary(request, indexShard); } @@ -128,7 +124,7 @@ public class TransportDeleteAction extends TransportWriteAction executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) { + public static WriteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard indexShard) { Engine.Delete delete = indexShard.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType()); indexShard.delete(delete); // update the request with the version so it will go to the replicas @@ -137,7 +133,7 @@ public class TransportDeleteAction extends TransportWriteAction(response, delete.getTranslogLocation()); + return new WriteResult<>(request, response, delete.getTranslogLocation()); } public static Engine.Delete executeDeleteRequestOnReplica(DeleteRequest request, IndexShard indexShard) { diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index cce0f6c8eef..48eaab2b48c 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -67,7 +67,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; * @see org.elasticsearch.client.Requests#indexRequest(String) * @see org.elasticsearch.client.Client#index(IndexRequest) */ -public class IndexRequest extends ReplicatedWriteRequest implements DocumentRequest { +public class IndexRequest extends DocumentRequest { private String type; private String id; diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index cc3fbb7906d..37cc2d7e3bc 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -60,7 +60,7 @@ import org.elasticsearch.transport.TransportService; *
  • allowIdGeneration: If the id is set not, should it be generated. Defaults to true. * */ -public class TransportIndexAction extends TransportWriteAction { +public class TransportIndexAction extends TransportWriteAction { private final AutoCreateIndex autoCreateIndex; private final boolean allowIdGeneration; @@ -76,7 +76,7 @@ public class TransportIndexAction extends TransportWriteAction onPrimaryShard(IndexRequest request, IndexShard indexShard) throws Exception { + protected WriteResult onPrimaryShard(IndexRequest request, IndexShard indexShard) throws Exception { return executeIndexRequestOnPrimary(request, indexShard, mappingUpdatedAction); } @@ -174,7 +174,7 @@ public class TransportIndexAction extends TransportWriteAction executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, + public static WriteResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Exception { Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard); Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); @@ -198,7 +198,7 @@ public class TransportIndexAction extends TransportWriteAction(response, operation.getTranslogLocation()); + return new WriteResult<>(request, response, operation.getTranslogLocation()); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index d541ef6a35c..8aa0ed66a77 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.engine.VersionConflictEngineException; @@ -112,22 +113,24 @@ public class ReplicationOperation< pendingActions.incrementAndGet(); primaryResult = primary.perform(request); final ReplicaRequest replicaRequest = primaryResult.replicaRequest(); - assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term"; - if (logger.isTraceEnabled()) { - logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request); + if (replicaRequest != null) { + assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term"; + if (logger.isTraceEnabled()) { + logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request); + } + + // we have to get a new state after successfully indexing into the primary in order to honour recovery semantics. + // we have to make sure that every operation indexed into the primary after recovery start will also be replicated + // to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then. + ClusterState clusterState = clusterStateSupplier.get(); + final List shards = getShards(primaryId, clusterState); + Set inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState); + + markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards); + + performOnReplicas(replicaRequest, shards); } - // we have to get a new state after successfully indexing into the primary in order to honour recovery semantics. - // we have to make sure that every operation indexed into the primary after recovery start will also be replicated - // to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then. - ClusterState clusterState = clusterStateSupplier.get(); - final List shards = getShards(primaryId, clusterState); - Set inSyncAllocationIds = getInSyncAllocationIds(primaryId, clusterState); - - markUnavailableShardsAsStale(replicaRequest, inSyncAllocationIds, shards); - - performOnReplicas(replicaRequest, shards); - successfulShards.incrementAndGet(); decPendingAndFinishIfNeeded(); } @@ -419,7 +422,10 @@ public class ReplicationOperation< public interface PrimaryResult> { - R replicaRequest(); + /** + * @return null if no operation needs to be sent to a replica + */ + @Nullable R replicaRequest(); void setShardInfo(ReplicationResponse.ShardInfo shardInfo); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 9587b4e6b2c..95e196672d4 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -23,6 +23,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.DocumentRequest; +import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; @@ -163,6 +165,16 @@ public abstract class TransportReplicationAction< } } + /** helper to verify and resolve request routing */ + public static void resolveAndValidateRouting(final MetaData metaData, final String concreteIndex, + DocumentRequest request) { + request.routing(metaData.resolveIndexRouting(request.parent(), request.routing(), request.index())); + // check if routing is required, if so, throw error if routing wasn't specified + if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) { + throw new RoutingMissingException(concreteIndex, request.type(), request.id()); + } + } + /** * Primary operation on node with primary copy. * @@ -900,7 +912,9 @@ public abstract class TransportReplicationAction< @Override public PrimaryResult perform(Request request) throws Exception { PrimaryResult result = shardOperationOnPrimary(request); - result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm()); + if (result.replicaRequest() != null) { + result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm()); + } return result; } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index bf2b3235b11..ee8ee4862f9 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -49,38 +49,40 @@ import java.util.function.Supplier; */ public abstract class TransportWriteAction< Request extends ReplicatedWriteRequest, + ReplicaRequest extends ReplicatedWriteRequest, Response extends ReplicationResponse & WriteResponse - > extends TransportReplicationAction { + > extends TransportReplicationAction { protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, + Supplier replicaRequest, String executor) { super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, request, request, executor); + indexNameExpressionResolver, request, replicaRequest, executor); } /** * Called on the primary with a reference to the {@linkplain IndexShard} to modify. */ - protected abstract WriteResult onPrimaryShard(Request request, IndexShard indexShard) throws Exception; + protected abstract WriteResult onPrimaryShard(Request request, IndexShard indexShard) throws Exception; /** * Called once per replica with a reference to the {@linkplain IndexShard} to modify. * * @return the translog location of the {@linkplain IndexShard} after the write was completed or null if no write occurred */ - protected abstract Translog.Location onReplicaShard(Request request, IndexShard indexShard); + protected abstract Translog.Location onReplicaShard(ReplicaRequest request, IndexShard indexShard); @Override protected final WritePrimaryResult shardOperationOnPrimary(Request request) throws Exception { IndexShard indexShard = indexShard(request); - WriteResult result = onPrimaryShard(request, indexShard); - return new WritePrimaryResult(request, result.getResponse(), result.getLocation(), indexShard); + WriteResult result = onPrimaryShard(request, indexShard); + return new WritePrimaryResult(request, result, indexShard); } @Override - protected final WriteReplicaResult shardOperationOnReplica(Request request) { + protected final WriteReplicaResult shardOperationOnReplica(ReplicaRequest request) { IndexShard indexShard = indexShard(request); Translog.Location location = onReplicaShard(request, indexShard); return new WriteReplicaResult(indexShard, request, location); @@ -89,7 +91,7 @@ public abstract class TransportWriteAction< /** * Fetch the IndexShard for the request. Protected so it can be mocked in tests. */ - protected IndexShard indexShard(Request request) { + protected IndexShard indexShard(ReplicatedWriteRequest request) { final ShardId shardId = request.shardId(); IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); return indexService.getShard(shardId.id()); @@ -98,11 +100,13 @@ public abstract class TransportWriteAction< /** * Simple result from a write action. Write actions have static method to return these so they can integrate with bulk. */ - public static class WriteResult { + public static class WriteResult, Response extends ReplicationResponse> { + private final ReplicaRequest replicaRequest; private final Response response; private final Translog.Location location; - public WriteResult(Response response, @Nullable Location location) { + public WriteResult(ReplicaRequest replicaRequest, Response response, @Nullable Location location) { + this.replicaRequest = replicaRequest; this.response = response; this.location = location; } @@ -114,6 +118,10 @@ public abstract class TransportWriteAction< public Translog.Location getLocation() { return location; } + + public ReplicaRequest getReplicaRequest() { + return replicaRequest; + } } /** @@ -123,15 +131,15 @@ public abstract class TransportWriteAction< boolean finishedAsyncActions; ActionListener listener = null; - public WritePrimaryResult(Request request, Response finalResponse, - @Nullable Translog.Location location, + public WritePrimaryResult(Request request, + WriteResult result, IndexShard indexShard) { - super(request, finalResponse); + super(result.getReplicaRequest(), result.getResponse()); /* * We call this before replication because this might wait for a refresh and that can take a while. This way we wait for the * refresh in parallel on the primary and on the replica. */ - new AsyncAfterWriteAction(indexShard, request, location, this, logger).run(); + new AsyncAfterWriteAction(indexShard, request, result.getLocation(), this, logger).run(); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java b/core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java deleted file mode 100644 index cb9a6ab9f69..00000000000 --- a/core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.single.instance; - -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.ValidateActions; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.shard.ShardId; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -/** - * - */ -public abstract class InstanceShardOperationRequest> extends ActionRequest - implements IndicesRequest { - - public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); - - protected TimeValue timeout = DEFAULT_TIMEOUT; - - protected String index; - // null means its not set, allows to explicitly direct a request to a specific shard - protected ShardId shardId = null; - - private String concreteIndex; - - protected InstanceShardOperationRequest() { - } - - public InstanceShardOperationRequest(String index) { - this.index = index; - } - - @Override - public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = null; - if (index == null) { - validationException = ValidateActions.addValidationError("index is missing", validationException); - } - return validationException; - } - - public String index() { - return index; - } - - @Override - public String[] indices() { - return new String[]{index}; - } - - @Override - public IndicesOptions indicesOptions() { - return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); - } - - @SuppressWarnings("unchecked") - public final Request index(String index) { - this.index = index; - return (Request) this; - } - - public TimeValue timeout() { - return timeout; - } - - /** - * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. - */ - @SuppressWarnings("unchecked") - public final Request timeout(TimeValue timeout) { - this.timeout = timeout; - return (Request) this; - } - - /** - * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. - */ - public final Request timeout(String timeout) { - return timeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout")); - } - - public String concreteIndex() { - return concreteIndex; - } - - void concreteIndex(String concreteIndex) { - this.concreteIndex = concreteIndex; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - index = in.readString(); - if (in.readBoolean()) { - shardId = ShardId.readShardId(in); - } else { - shardId = null; - } - timeout = new TimeValue(in); - concreteIndex = in.readOptionalString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(index); - out.writeOptionalStreamable(shardId); - timeout.writeTo(out); - out.writeOptionalString(concreteIndex); - } - -} - diff --git a/core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequestBuilder.java deleted file mode 100644 index 13266b9151d..00000000000 --- a/core/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequestBuilder.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.single.instance; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.common.unit.TimeValue; - -/** - */ -public abstract class InstanceShardOperationRequestBuilder, Response extends ActionResponse, RequestBuilder extends InstanceShardOperationRequestBuilder> - extends ActionRequestBuilder { - - protected InstanceShardOperationRequestBuilder(ElasticsearchClient client, Action action, Request request) { - super(client, action, request); - } - - @SuppressWarnings("unchecked") - public final RequestBuilder setIndex(String index) { - request.index(index); - return (RequestBuilder) this; - } - - /** - * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. - */ - @SuppressWarnings("unchecked") - public final RequestBuilder setTimeout(TimeValue timeout) { - request.timeout(timeout); - return (RequestBuilder) this; - } - - /** - * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. - */ - @SuppressWarnings("unchecked") - public final RequestBuilder setTimeout(String timeout) { - request.timeout(timeout); - return (RequestBuilder) this; - } -} diff --git a/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java deleted file mode 100644 index 81da5ec9a86..00000000000 --- a/core/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.single.instance; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.UnavailableShardsException; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateObserver; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.node.NodeClosedException; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportService; - -import java.util.function.Supplier; - -/** - * - */ -public abstract class TransportInstanceSingleOperationAction, Response extends ActionResponse> - extends HandledTransportAction { - protected final ClusterService clusterService; - protected final TransportService transportService; - - final String executor; - final String shardActionName; - - protected TransportInstanceSingleOperationAction(Settings settings, String actionName, ThreadPool threadPool, - ClusterService clusterService, TransportService transportService, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { - super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); - this.clusterService = clusterService; - this.transportService = transportService; - this.executor = executor(); - this.shardActionName = actionName + "[s]"; - transportService.registerRequestHandler(shardActionName, request, executor, new ShardTransportHandler()); - } - - @Override - protected void doExecute(Request request, ActionListener listener) { - new AsyncSingleAction(request, listener).start(); - } - - protected abstract String executor(); - - protected abstract void shardOperation(Request request, ActionListener listener); - - protected abstract Response newResponse(); - - protected ClusterBlockException checkGlobalBlock(ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); - } - - protected ClusterBlockException checkRequestBlock(ClusterState state, Request request) { - return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.concreteIndex()); - } - - /** - * Resolves the request. Throws an exception if the request cannot be resolved. - */ - protected abstract void resolveRequest(ClusterState state, Request request); - - protected boolean retryOnFailure(Exception e) { - return false; - } - - protected TransportRequestOptions transportOptions() { - return TransportRequestOptions.EMPTY; - } - - /** - * Should return an iterator with a single shard! - */ - protected abstract ShardIterator shards(ClusterState clusterState, Request request); - - class AsyncSingleAction { - - private final ActionListener listener; - private final Request request; - private volatile ClusterStateObserver observer; - private ShardIterator shardIt; - private DiscoveryNodes nodes; - - AsyncSingleAction(Request request, ActionListener listener) { - this.request = request; - this.listener = listener; - } - - public void start() { - this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); - doStart(); - } - - protected void doStart() { - nodes = observer.observedState().nodes(); - try { - ClusterBlockException blockException = checkGlobalBlock(observer.observedState()); - if (blockException != null) { - if (blockException.retryable()) { - retry(blockException); - return; - } else { - throw blockException; - } - } - request.concreteIndex(indexNameExpressionResolver.concreteSingleIndex(observer.observedState(), request).getName()); - resolveRequest(observer.observedState(), request); - blockException = checkRequestBlock(observer.observedState(), request); - if (blockException != null) { - if (blockException.retryable()) { - retry(blockException); - return; - } else { - throw blockException; - } - } - shardIt = shards(observer.observedState(), request); - } catch (Exception e) { - listener.onFailure(e); - return; - } - - // no shardIt, might be in the case between index gateway recovery and shardIt initialization - if (shardIt.size() == 0) { - retry(null); - return; - } - - // this transport only make sense with an iterator that returns a single shard routing (like primary) - assert shardIt.size() == 1; - - ShardRouting shard = shardIt.nextOrNull(); - assert shard != null; - - if (!shard.active()) { - retry(null); - return; - } - - request.shardId = shardIt.shardId(); - DiscoveryNode node = nodes.get(shard.currentNodeId()); - transportService.sendRequest(node, shardActionName, request, transportOptions(), new TransportResponseHandler() { - - @Override - public Response newInstance() { - return newResponse(); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public void handleResponse(Response response) { - listener.onResponse(response); - } - - @Override - public void handleException(TransportException exp) { - final Throwable cause = exp.unwrapCause(); - // if we got disconnected from the node, or the node / shard is not in the right state (being closed) - if (cause instanceof ConnectTransportException || cause instanceof NodeClosedException || - retryOnFailure(exp)) { - retry((Exception) cause); - } else { - listener.onFailure(exp); - } - } - }); - } - - void retry(@Nullable final Exception failure) { - if (observer.isTimedOut()) { - // we running as a last attempt after a timeout has happened. don't retry - Exception listenFailure = failure; - if (listenFailure == null) { - if (shardIt == null) { - listenFailure = new UnavailableShardsException(request.concreteIndex(), -1, "Timeout waiting for [{}], request: {}", request.timeout(), actionName); - } else { - listenFailure = new UnavailableShardsException(shardIt.shardId(), "[{}] shardIt, [{}] active : Timeout waiting for [{}], request: {}", shardIt.size(), shardIt.sizeActive(), request.timeout(), actionName); - } - } - listener.onFailure(listenFailure); - return; - } - - observer.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - doStart(); - } - - @Override - public void onClusterServiceClose() { - listener.onFailure(new NodeClosedException(nodes.getLocalNode())); - } - - @Override - public void onTimeout(TimeValue timeout) { - // just to be on the safe side, see if we can start it now? - doStart(); - } - }, request.timeout()); - } - } - - private class ShardTransportHandler implements TransportRequestHandler { - - @Override - public void messageReceived(final Request request, final TransportChannel channel) throws Exception { - shardOperation(request, new ActionListener() { - @Override - public void onResponse(Response response) { - try { - channel.sendResponse(response); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.warn("failed to send response for get", inner); - } - } - }); - - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index e5322f51d50..1f3a97a25a2 100644 --- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -21,7 +21,7 @@ package org.elasticsearch.action.update; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -34,19 +34,17 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; -import org.elasticsearch.action.support.TransportActions; -import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.action.index.MappingUpdatedAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.PlainShardIterator; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; @@ -54,59 +52,52 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.Collections; import java.util.Map; import static org.elasticsearch.ExceptionsHelper.unwrapCause; /** */ -public class TransportUpdateAction extends TransportInstanceSingleOperationAction { +public class TransportUpdateAction extends TransportWriteAction { - private final TransportDeleteAction deleteAction; - private final TransportIndexAction indexAction; private final AutoCreateIndex autoCreateIndex; private final TransportCreateIndexAction createIndexAction; private final UpdateHelper updateHelper; private final IndicesService indicesService; + private final MappingUpdatedAction mappingUpdatedAction; + private final boolean allowIdGeneration; @Inject public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, - TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportCreateIndexAction createIndexAction, - UpdateHelper updateHelper, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - IndicesService indicesService, AutoCreateIndex autoCreateIndex) { - super(settings, UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpdateRequest::new); - this.indexAction = indexAction; - this.deleteAction = deleteAction; + TransportCreateIndexAction createIndexAction, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, IndicesService indicesService, + AutoCreateIndex autoCreateIndex, ShardStateAction shardStateAction, + MappingUpdatedAction mappingUpdatedAction, ScriptService scriptService) { + super(settings, UpdateAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, + actionFilters, indexNameExpressionResolver, UpdateRequest::new, UpdateReplicaRequest::new, ThreadPool.Names.INDEX); this.createIndexAction = createIndexAction; - this.updateHelper = updateHelper; + this.updateHelper = new UpdateHelper(scriptService, logger); this.indicesService = indicesService; this.autoCreateIndex = autoCreateIndex; + this.mappingUpdatedAction = mappingUpdatedAction; + this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true); } @Override - protected String executor() { - return ThreadPool.Names.INDEX; - } - - @Override - protected UpdateResponse newResponse() { - return new UpdateResponse(); - } - - @Override - protected boolean retryOnFailure(Exception e) { - return TransportActions.isShardNotAvailableException(e); - } - - @Override - protected void resolveRequest(ClusterState state, UpdateRequest request) { - resolveAndValidateRouting(state.metaData(), request.concreteIndex(), request); + protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, UpdateRequest request) { + super.resolveRequest(metaData, indexMetaData, request); + resolveAndValidateRouting(metaData, indexMetaData.getIndex().getName(), request); + ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(), + indexMetaData.getIndex().getName(), request.id(), request.routing()); + request.setShardId(shardId); } public static void resolveAndValidateRouting(MetaData metaData, String concreteIndex, UpdateRequest request) { @@ -118,13 +109,17 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio } @Override - protected void doExecute(final UpdateRequest request, final ActionListener listener) { + protected void doExecute(Task task, UpdateRequest request, ActionListener listener) { // if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) { - createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener() { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(); + createIndexRequest.index(request.index()); + createIndexRequest.cause("auto(update api)"); + createIndexRequest.masterNodeTimeout(request.timeout()); + createIndexAction.execute(createIndexRequest, new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { - innerExecute(request, listener); + innerExecute(task, request, listener); } @Override @@ -132,7 +127,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio if (unwrapCause(e) instanceof IndexAlreadyExistsException) { // we have the index, do it try { - innerExecute(request, listener); + innerExecute(task, request, listener); } catch (Exception inner) { inner.addSuppressed(e); listener.onFailure(inner); @@ -143,153 +138,123 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio } }); } else { - innerExecute(request, listener); + innerExecute(task, request, listener); } } - private void innerExecute(final UpdateRequest request, final ActionListener listener) { - super.doExecute(request, listener); - } - @Override - protected ShardIterator shards(ClusterState clusterState, UpdateRequest request) { - if (request.getShardId() != null) { - return clusterState.routingTable().index(request.concreteIndex()).shard(request.getShardId().getId()).primaryShardIt(); - } - ShardIterator shardIterator = clusterService.operationRouting() - .indexShards(clusterState, request.concreteIndex(), request.id(), request.routing()); - ShardRouting shard; - while ((shard = shardIterator.nextOrNull()) != null) { - if (shard.primary()) { - return new PlainShardIterator(shardIterator.shardId(), Collections.singletonList(shard)); + protected UpdateResponse newResponseInstance() { + return new UpdateResponse(); + } + + private void innerExecute(Task task, final UpdateRequest request, final ActionListener listener) { + super.doExecute(task, request, listener); + } + + @Override + protected WriteResult onPrimaryShard(UpdateRequest request, IndexShard indexShard) throws Exception { + ShardId shardId = request.shardId(); + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexMetaData indexMetaData = indexService.getMetaData(); + return executeUpdateRequestOnPrimary(request, indexShard, indexMetaData, updateHelper, mappingUpdatedAction, allowIdGeneration); + } + + public static WriteResult executeUpdateRequestOnPrimary(UpdateRequest request, + IndexShard indexShard, + IndexMetaData indexMetaData, + UpdateHelper updateHelper, + MappingUpdatedAction mappingUpdatedAction, + boolean allowIdGeneration) + throws Exception { + int maxAttempts = request.retryOnConflict(); + for (int attemptCount = 0; attemptCount <= maxAttempts; attemptCount++) { + try { + return shardUpdateOperation(indexMetaData, indexShard, request, updateHelper, mappingUpdatedAction, allowIdGeneration); + } catch (Exception e) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + if (attemptCount == maxAttempts // bubble up exception when we run out of attempts + || (cause instanceof VersionConflictEngineException) == false) { // or when exception is not a version conflict + throw e; + } } } - return new PlainShardIterator(shardIterator.shardId(), Collections.emptyList()); + throw new IllegalStateException("version conflict exception should bubble up on last attempt"); + } - @Override - protected void shardOperation(final UpdateRequest request, final ActionListener listener) { - shardOperation(request, listener, 0); - } - - protected void shardOperation(final UpdateRequest request, final ActionListener listener, final int retryCount) { - final ShardId shardId = request.getShardId(); - final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - final IndexShard indexShard = indexService.getShard(shardId.getId()); + private static WriteResult shardUpdateOperation(IndexMetaData indexMetaData, + IndexShard indexShard, + UpdateRequest request, + UpdateHelper updateHelper, + MappingUpdatedAction mappingUpdatedAction, + boolean allowIdGeneration) + throws Exception { final UpdateHelper.Result result = updateHelper.prepare(request, indexShard); switch (result.getResponseResult()) { case CREATED: - IndexRequest upsertRequest = result.action(); - // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request - final BytesReference upsertSourceBytes = upsertRequest.source(); - indexAction.execute(upsertRequest, new ActionListener() { - @Override - public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult()); - if ((request.fetchSource() != null && request.fetchSource().fetchSource()) || - (request.fields() != null && request.fields().length > 0)) { - Tuple> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true); - update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes)); - } else { - update.setGetResult(null); - } - update.setForcedRefresh(response.forcedRefresh()); - listener.onResponse(update); - } - - @Override - public void onFailure(Exception e) { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - if (cause instanceof VersionConflictEngineException) { - if (retryCount < request.retryOnConflict()) { - logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]", - retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id()); - threadPool.executor(executor()).execute(new ActionRunnable(listener) { - @Override - protected void doRun() { - shardOperation(request, listener, retryCount + 1); - } - }); - return; - } - } - listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause)); - } - }); - break; case UPDATED: IndexRequest indexRequest = result.action(); + MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type()); + indexRequest.process(mappingMd, allowIdGeneration, indexMetaData.getIndex().getName()); + WriteResult indexResponseWriteResult = TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction); + IndexResponse response = indexResponseWriteResult.getResponse(); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult()); // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request final BytesReference indexSourceBytes = indexRequest.source(); - indexAction.execute(indexRequest, new ActionListener() { - @Override - public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult()); - update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); - update.setForcedRefresh(response.forcedRefresh()); - listener.onResponse(update); + if (result.getResponseResult() == DocWriteResponse.Result.CREATED) { + if ((request.fetchSource() != null && request.fetchSource().fetchSource()) || + (request.fields() != null && request.fields().length > 0)) { + Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceBytes, true); + update.setGetResult(updateHelper.extractGetResult(request, indexMetaData.getIndex().getName(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceBytes)); + } else { + update.setGetResult(null); } - - @Override - public void onFailure(Exception e) { - final Throwable cause = unwrapCause(e); - if (cause instanceof VersionConflictEngineException) { - if (retryCount < request.retryOnConflict()) { - threadPool.executor(executor()).execute(new ActionRunnable(listener) { - @Override - protected void doRun() { - shardOperation(request, listener, retryCount + 1); - } - }); - return; - } - } - listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause)); - } - }); - break; + } else if (result.getResponseResult() == DocWriteResponse.Result.UPDATED) { + update.setGetResult(updateHelper.extractGetResult(request, indexMetaData.getIndex().getName(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); + } + update.setForcedRefresh(response.forcedRefresh()); + UpdateReplicaRequest updateReplicaRequest = new UpdateReplicaRequest(indexRequest); + updateReplicaRequest.setParentTask(request.getParentTask()); + updateReplicaRequest.setShardId(request.shardId()); + updateReplicaRequest.setRefreshPolicy(request.getRefreshPolicy()); + return new WriteResult<>(updateReplicaRequest, update, indexResponseWriteResult.getLocation()); case DELETED: DeleteRequest deleteRequest = result.action(); - deleteAction.execute(deleteRequest, new ActionListener() { - @Override - public void onResponse(DeleteResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getResult()); - update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); - update.setForcedRefresh(response.forcedRefresh()); - listener.onResponse(update); - } - - @Override - public void onFailure(Exception e) { - final Throwable cause = unwrapCause(e); - if (cause instanceof VersionConflictEngineException) { - if (retryCount < request.retryOnConflict()) { - threadPool.executor(executor()).execute(new ActionRunnable(listener) { - @Override - protected void doRun() { - shardOperation(request, listener, retryCount + 1); - } - }); - return; - } - } - listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause)); - } - }); - break; + WriteResult deleteResponseWriteResult = TransportDeleteAction.executeDeleteRequestOnPrimary(deleteRequest, indexShard); + DeleteResponse deleteResponse = deleteResponseWriteResult.getResponse(); + UpdateResponse deleteUpdate = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getVersion(), deleteResponse.getResult()); + deleteUpdate.setGetResult(updateHelper.extractGetResult(request, indexMetaData.getIndex().getName(), deleteResponse.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); + deleteUpdate.setForcedRefresh(deleteResponse.forcedRefresh()); + UpdateReplicaRequest deleteReplicaRequest = new UpdateReplicaRequest(deleteRequest); + deleteReplicaRequest.setParentTask(request.getParentTask()); + deleteReplicaRequest.setShardId(request.shardId()); + deleteReplicaRequest.setRefreshPolicy(request.getRefreshPolicy()); + return new WriteResult<>(deleteReplicaRequest, deleteUpdate, deleteResponseWriteResult.getLocation()); case NOOP: - UpdateResponse update = result.action(); - IndexService indexServiceOrNull = indicesService.indexService(shardId.getIndex()); - if (indexServiceOrNull != null) { - IndexShard shard = indexService.getShardOrNull(shardId.getId()); - if (shard != null) { - shard.noopUpdate(request.type()); - } - } - listener.onResponse(update); - break; + UpdateResponse noopUpdate = result.action(); + indexShard.noopUpdate(request.type()); + return new WriteResult<>(null, noopUpdate, null); default: throw new IllegalStateException("Illegal result " + result.getResponseResult()); } } + + @Override + protected Translog.Location onReplicaShard(UpdateReplicaRequest request, IndexShard indexShard) { + assert request.getRequest() != null; + final Translog.Location location; + switch (request.getRequest().opType()) { + case INDEX: + case CREATE: + location = TransportIndexAction.executeIndexRequestOnReplica(((IndexRequest) request.getRequest()), indexShard).getTranslogLocation(); + break; + case DELETE: + location = TransportDeleteAction.executeDeleteRequestOnReplica(((DeleteRequest) request.getRequest()), indexShard).getTranslogLocation(); + break; + default: + throw new IllegalStateException("unexpected opType [" + request.getRequest().opType().getLowercase() + "]"); + + } + return location; + } } diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 49206470532..c242f885f06 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.update; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -27,11 +28,8 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; @@ -63,14 +61,14 @@ import java.util.Map; /** * Helper for translating an update request to an index, delete request or update response. */ -public class UpdateHelper extends AbstractComponent { +public class UpdateHelper { private final ScriptService scriptService; + private final Logger logger; - @Inject - public UpdateHelper(Settings settings, ScriptService scriptService) { - super(settings); + public UpdateHelper(ScriptService scriptService, Logger logger) { this.scriptService = scriptService; + this.logger = logger; } /** @@ -259,7 +257,7 @@ public class UpdateHelper extends AbstractComponent { return ctx; } - private TimeValue getTTLFromScriptContext(Map ctx) { + private static TimeValue getTTLFromScriptContext(Map ctx) { Object fetchedTTL = ctx.get("_ttl"); if (fetchedTTL != null) { if (fetchedTTL instanceof Number) { diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateReplicaRequest.java b/core/src/main/java/org/elasticsearch/action/update/UpdateReplicaRequest.java new file mode 100644 index 00000000000..5f258a675c2 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateReplicaRequest.java @@ -0,0 +1,113 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.update; + +import org.elasticsearch.action.DocumentRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.VersionType; + +import java.io.IOException; + +/** Replica request for update operation holds translated (index/delete) requests */ +public class UpdateReplicaRequest extends DocumentRequest { + private DocumentRequest request; + + public UpdateReplicaRequest() { + } + + public UpdateReplicaRequest(DocumentRequest request) { + assert !(request instanceof UpdateReplicaRequest) : "underlying request must not be a update replica request"; + this.request = request; + this.index = request.index(); + setRefreshPolicy(request.getRefreshPolicy()); + setShardId(request.shardId()); + setParentTask(request.getParentTask()); + } + + public DocumentRequest getRequest() { + return request; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + request = DocumentRequest.readDocumentRequest(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + DocumentRequest.writeDocumentRequest(out, request); + } + + @Override + public String type() { + return request.type(); + } + + @Override + public String id() { + return request.id(); + } + + @Override + public UpdateReplicaRequest routing(String routing) { + throw new UnsupportedOperationException("setting routing is not supported"); + } + + @Override + public String routing() { + return request.routing(); + } + + @Override + public String parent() { + return request.parent(); + } + + @Override + public long version() { + return request.version(); + } + + @Override + public UpdateReplicaRequest version(long version) { + throw new UnsupportedOperationException("setting version is not supported"); + } + + @Override + public VersionType versionType() { + return request.versionType(); + } + + @Override + public UpdateReplicaRequest versionType(VersionType versionType) { + throw new UnsupportedOperationException("setting version type is not supported"); + } + + @Override + public OpType opType() { + return request.opType(); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index deca938fa6a..80d3676e051 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -23,9 +23,6 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.support.replication.ReplicationRequest; -import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.bytes.BytesArray; @@ -56,10 +53,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; /** */ -public class UpdateRequest extends InstanceShardOperationRequest - implements DocumentRequest, WriteRequest { - private static final DeprecationLogger DEPRECATION_LOGGER = - new DeprecationLogger(Loggers.getLogger(UpdateRequest.class)); +public class UpdateRequest extends DocumentRequest { private String type; private String id; @@ -97,7 +91,7 @@ public class UpdateRequest extends InstanceShardOperationRequest } public UpdateRequest(String index, String type, String id) { - super(index); + this.index = index; this.type = type; this.id = id; } @@ -495,39 +489,6 @@ public class UpdateRequest extends InstanceShardOperationRequest return OpType.UPDATE; } - @Override - public UpdateRequest setRefreshPolicy(RefreshPolicy refreshPolicy) { - this.refreshPolicy = refreshPolicy; - return this; - } - - @Override - public RefreshPolicy getRefreshPolicy() { - return refreshPolicy; - } - - public ActiveShardCount waitForActiveShards() { - return this.waitForActiveShards; - } - - /** - * Sets the number of shard copies that must be active before proceeding with the write. - * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. - */ - public UpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) { - this.waitForActiveShards = waitForActiveShards; - return this; - } - - /** - * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical - * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} - * to get the ActiveShardCount. - */ - public UpdateRequest waitForActiveShards(final int waitForActiveShards) { - return waitForActiveShards(ActiveShardCount.from(waitForActiveShards)); - } - /** * Sets the doc to use for updates when a script is not specified. */ diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java index bbbc9bafd8f..e9b111f4df9 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java @@ -23,7 +23,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequestBuilder; import org.elasticsearch.action.support.replication.ReplicationRequest; -import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequestBuilder; +import org.elasticsearch.action.support.replication.ReplicationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.DeprecationLogger; @@ -37,7 +37,7 @@ import org.elasticsearch.script.Script; import java.util.Map; -public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder +public class UpdateRequestBuilder extends ReplicationRequestBuilder implements WriteRequestBuilder { private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(Loggers.getLogger(RestUpdateAction.class)); diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java index eb1843dc7d9..83f347d6b98 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -22,7 +22,6 @@ package org.elasticsearch.indices; import org.elasticsearch.action.admin.indices.rollover.Condition; import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition; import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition; -import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.common.geo.ShapesAvailability; import org.elasticsearch.common.inject.AbstractModule; @@ -182,7 +181,6 @@ public class IndicesModule extends AbstractModule { bind(SyncedFlushService.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); bind(IndicesTTLService.class).asEagerSingleton(); - bind(UpdateHelper.class).asEagerSingleton(); bind(MetaDataIndexUpgradeService.class).asEagerSingleton(); bind(NodeServicesProvider.class).asEagerSingleton(); } diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index d1d01610f18..5c692668f26 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -222,8 +222,7 @@ public class IndicesRequestIT extends ESIntegTestCase { } public void testUpdate() { - //update action goes to the primary, index op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[p]", UpdateAction.NAME + "[r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); @@ -237,8 +236,7 @@ public class IndicesRequestIT extends ESIntegTestCase { } public void testUpdateUpsert() { - //update action goes to the primary, index op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", IndexAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[p]", UpdateAction.NAME + "[r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); @@ -251,8 +249,7 @@ public class IndicesRequestIT extends ESIntegTestCase { } public void testUpdateDelete() { - //update action goes to the primary, delete op gets executed locally, then replicated - String[] updateShardActions = new String[]{UpdateAction.NAME + "[s]", DeleteAction.NAME + "[r]"}; + String[] updateShardActions = new String[]{UpdateAction.NAME + "[p]", UpdateAction.NAME + "[r]"}; interceptTransportActions(updateShardActions); String indexOrAlias = randomIndexOrAlias(); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index a554ca53d99..d2070fb21db 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteResponse; @@ -128,21 +129,21 @@ public class TransportWriteActionTests extends ESTestCase { resultChecker.accept(listener.response, forcedRefresh); } - private class TestAction extends TransportWriteAction { + private class TestAction extends TransportWriteAction { protected TestAction() { super(Settings.EMPTY, "test", new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR), null, null, null, null, new ActionFilters(new HashSet<>()), - new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, ThreadPool.Names.SAME); + new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME); } @Override - protected IndexShard indexShard(TestRequest request) { + protected IndexShard indexShard(ReplicatedWriteRequest request) { return indexShard; } @Override - protected WriteResult onPrimaryShard(TestRequest request, IndexShard indexShard) throws Exception { - return new WriteResult<>(new TestResponse(), location); + protected WriteResult onPrimaryShard(TestRequest request, IndexShard indexShard) throws Exception { + return new WriteResult<>(request, new TestResponse(), location); } @Override diff --git a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java deleted file mode 100644 index 1d736060568..00000000000 --- a/core/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java +++ /dev/null @@ -1,327 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.single.instance; - -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.support.ActionFilter; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlock; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.transport.CapturingTransport; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportService; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Supplier; - -import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; -import static org.elasticsearch.test.ClusterServiceUtils.setState; -import static org.hamcrest.core.IsEqual.equalTo; - -public class TransportInstanceSingleOperationActionTests extends ESTestCase { - - private static ThreadPool THREAD_POOL; - - private ClusterService clusterService; - private CapturingTransport transport; - private TransportService transportService; - - private TestTransportInstanceSingleOperationAction action; - - public static class Request extends InstanceShardOperationRequest { - public Request() { - } - } - - public static class Response extends ActionResponse { - public Response() { - } - } - - class TestTransportInstanceSingleOperationAction extends TransportInstanceSingleOperationAction { - private final Map shards = new HashMap<>(); - - public TestTransportInstanceSingleOperationAction(Settings settings, String actionName, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { - super(settings, actionName, THREAD_POOL, TransportInstanceSingleOperationActionTests.this.clusterService, transportService, actionFilters, indexNameExpressionResolver, request); - } - - public Map getResults() { - return shards; - } - - @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected void shardOperation(Request request, ActionListener listener) { - throw new UnsupportedOperationException("Not implemented in test class"); - } - - @Override - protected Response newResponse() { - return new Response(); - } - - @Override - protected void resolveRequest(ClusterState state, Request request) { - } - - @Override - protected ShardIterator shards(ClusterState clusterState, Request request) { - return clusterState.routingTable().index(request.concreteIndex()).shard(request.shardId.getId()).primaryShardIt(); - } - } - - class MyResolver extends IndexNameExpressionResolver { - public MyResolver() { - super(Settings.EMPTY); - } - - @Override - public String[] concreteIndexNames(ClusterState state, IndicesRequest request) { - return request.indices(); - } - } - - @BeforeClass - public static void startThreadPool() { - THREAD_POOL = new TestThreadPool(TransportInstanceSingleOperationActionTests.class.getSimpleName()); - } - - @Before - public void setUp() throws Exception { - super.setUp(); - transport = new CapturingTransport(); - clusterService = createClusterService(THREAD_POOL); - transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, TransportService.NOOP_TRANSPORT_INTERCEPTOR); - transportService.start(); - transportService.acceptIncomingRequests(); - action = new TestTransportInstanceSingleOperationAction( - Settings.EMPTY, - "indices:admin/test", - transportService, - new ActionFilters(new HashSet()), - new MyResolver(), - Request::new - ); - } - - @After - public void tearDown() throws Exception { - super.tearDown(); - clusterService.close(); - transportService.close(); - } - - @AfterClass - public static void destroyThreadPool() { - ThreadPool.terminate(THREAD_POOL, 30, TimeUnit.SECONDS); - // since static must set to null to be eligible for collection - THREAD_POOL = null; - } - - public void testGlobalBlock() { - Request request = new Request(); - PlainActionFuture listener = new PlainActionFuture<>(); - ClusterBlocks.Builder block = ClusterBlocks.builder() - .addGlobalBlock(new ClusterBlock(1, "", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); - setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block)); - try { - action.new AsyncSingleAction(request, listener).start(); - listener.get(); - fail("expected ClusterBlockException"); - } catch (Exception e) { - if (ExceptionsHelper.unwrap(e, ClusterBlockException.class) == null) { - logger.info("expected ClusterBlockException but got ", e); - fail("expected ClusterBlockException"); - } - } - } - - public void testBasicRequestWorks() throws InterruptedException, ExecutionException, TimeoutException { - Request request = new Request().index("test"); - request.shardId = new ShardId("test", "_na_", 0); - PlainActionFuture listener = new PlainActionFuture<>(); - setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED)); - action.new AsyncSingleAction(request, listener).start(); - assertThat(transport.capturedRequests().length, equalTo(1)); - transport.handleResponse(transport.capturedRequests()[0].requestId, new Response()); - listener.get(); - } - - public void testFailureWithoutRetry() throws Exception { - Request request = new Request().index("test"); - request.shardId = new ShardId("test", "_na_", 0); - PlainActionFuture listener = new PlainActionFuture<>(); - setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED)); - - action.new AsyncSingleAction(request, listener).start(); - assertThat(transport.capturedRequests().length, equalTo(1)); - long requestId = transport.capturedRequests()[0].requestId; - transport.clear(); - // this should not trigger retry or anything and the listener should report exception immediately - transport.handleRemoteError(requestId, new TransportException("a generic transport exception", new Exception("generic test exception"))); - - try { - // result should return immediately - assertTrue(listener.isDone()); - listener.get(); - fail("this should fail with a transport exception"); - } catch (ExecutionException t) { - if (ExceptionsHelper.unwrap(t, TransportException.class) == null) { - logger.info("expected TransportException but got ", t); - fail("expected and TransportException"); - } - } - } - - public void testSuccessAfterRetryWithClusterStateUpdate() throws Exception { - Request request = new Request().index("test"); - request.shardId = new ShardId("test", "_na_", 0); - PlainActionFuture listener = new PlainActionFuture<>(); - boolean local = randomBoolean(); - setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.INITIALIZING)); - action.new AsyncSingleAction(request, listener).start(); - // this should fail because primary not initialized - assertThat(transport.capturedRequests().length, equalTo(0)); - setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED)); - // this time it should work - assertThat(transport.capturedRequests().length, equalTo(1)); - transport.handleResponse(transport.capturedRequests()[0].requestId, new Response()); - listener.get(); - } - - public void testSuccessAfterRetryWithExceptionFromTransport() throws Exception { - Request request = new Request().index("test"); - request.shardId = new ShardId("test", "_na_", 0); - PlainActionFuture listener = new PlainActionFuture<>(); - boolean local = randomBoolean(); - setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED)); - action.new AsyncSingleAction(request, listener).start(); - assertThat(transport.capturedRequests().length, equalTo(1)); - long requestId = transport.capturedRequests()[0].requestId; - transport.clear(); - DiscoveryNode node = clusterService.state().getNodes().getLocalNode(); - transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception")); - // trigger cluster state observer - setState(clusterService, ClusterStateCreationUtils.state("test", local, ShardRoutingState.STARTED)); - assertThat(transport.capturedRequests().length, equalTo(1)); - transport.handleResponse(transport.capturedRequests()[0].requestId, new Response()); - listener.get(); - } - - public void testRetryOfAnAlreadyTimedOutRequest() throws Exception { - Request request = new Request().index("test").timeout(new TimeValue(0, TimeUnit.MILLISECONDS)); - request.shardId = new ShardId("test", "_na_", 0); - PlainActionFuture listener = new PlainActionFuture<>(); - setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED)); - action.new AsyncSingleAction(request, listener).start(); - assertThat(transport.capturedRequests().length, equalTo(1)); - long requestId = transport.capturedRequests()[0].requestId; - transport.clear(); - DiscoveryNode node = clusterService.state().getNodes().getLocalNode(); - transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception")); - - // wait until the timeout was triggered and we actually tried to send for the second time - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(transport.capturedRequests().length, equalTo(1)); - } - }); - - // let it fail the second time too - requestId = transport.capturedRequests()[0].requestId; - transport.handleLocalError(requestId, new ConnectTransportException(node, "test exception")); - try { - // result should return immediately - assertTrue(listener.isDone()); - listener.get(); - fail("this should fail with a transport exception"); - } catch (ExecutionException t) { - if (ExceptionsHelper.unwrap(t, ConnectTransportException.class) == null) { - logger.info("expected ConnectTransportException but got ", t); - fail("expected and ConnectTransportException"); - } - } - } - - public void testUnresolvableRequestDoesNotHang() throws InterruptedException, ExecutionException, TimeoutException { - action = new TestTransportInstanceSingleOperationAction( - Settings.EMPTY, - "indices:admin/test_unresolvable", - transportService, - new ActionFilters(new HashSet<>()), - new MyResolver(), - Request::new - ) { - @Override - protected void resolveRequest(ClusterState state, Request request) { - throw new IllegalStateException("request cannot be resolved"); - } - }; - Request request = new Request().index("test"); - request.shardId = new ShardId("test", "_na_", 0); - PlainActionFuture listener = new PlainActionFuture<>(); - setState(clusterService, ClusterStateCreationUtils.state("test", randomBoolean(), ShardRoutingState.STARTED)); - action.new AsyncSingleAction(request, listener).start(); - assertThat(transport.capturedRequests().length, equalTo(0)); - try { - listener.get(); - } catch (Exception e) { - if (ExceptionsHelper.unwrap(e, IllegalStateException.class) == null) { - logger.info("expected IllegalStateException but got ", e); - fail("expected and IllegalStateException"); - } - } - } -} diff --git a/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index cb27a527f63..d4291464fdb 100644 --- a/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -172,9 +172,8 @@ public class UpdateRequestTests extends ESTestCase { // Related to issue 3256 public void testUpdateRequestWithTTL() throws Exception { TimeValue providedTTLValue = TimeValue.parseTimeValue(randomTimeValue(), null, "ttl"); - Settings settings = settings(Version.CURRENT).build(); - UpdateHelper updateHelper = new UpdateHelper(settings, null); + UpdateHelper updateHelper = new UpdateHelper(null, logger); // We just upsert one document with ttl IndexRequest indexRequest = new IndexRequest("test", "type1", "1") diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 6e200c4756a..67f0b4bf4a6 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -366,8 +366,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase @Override protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception { - TransportWriteAction.WriteResult result = TransportIndexAction.executeIndexRequestOnPrimary(request, primary, - null); + TransportWriteAction.WriteResult result = + TransportIndexAction.executeIndexRequestOnPrimary(request, primary, null); request.primaryTerm(primary.getPrimaryTerm()); TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.getLocation(), logger); return new PrimaryResult(request, result.getResponse()); diff --git a/docs/reference/docs/update.asciidoc b/docs/reference/docs/update.asciidoc index ff4c4c657d7..7972bc39f37 100644 --- a/docs/reference/docs/update.asciidoc +++ b/docs/reference/docs/update.asciidoc @@ -162,8 +162,8 @@ the request was ignored. -------------------------------------------------- { "_shards": { - "total": 0, - "successful": 0, + "total": 1, + "successful": 1, "failed": 0 }, "_index": "test",