From ba8ad397a1b418bb8dc8a47968f8fe58939fcfb9 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 2 Feb 2017 14:21:53 -0500 Subject: [PATCH] Use bulk action interally for update action (#22915) Currently, update action internally uses deprecated index and delete transport actions. As of #21964, these tranport actions were deprecated in favour of using single item bulk request. In this commit, update action uses single item bulk action. --- .../TransportSingleItemBulkWriteAction.java | 3 +- .../action/update/TransportUpdateAction.java | 155 +++++++----------- 2 files changed, 60 insertions(+), 98 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java index fd71f504ea9..ed17971a77c 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java @@ -107,7 +107,8 @@ public abstract class TransportSingleItemBulkWriteAction< } - private ActionListener wrapBulkResponse(ActionListener listener) { + public static + ActionListener wrapBulkResponse(ActionListener listener) { return ActionListener.wrap(bulkItemResponses -> { assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request"; BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0]; diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 4e2a7b466bc..0235dd95a4b 100644 --- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -19,19 +19,17 @@ package org.elasticsearch.action.update; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; +import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; -import org.elasticsearch.action.delete.TransportDeleteAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.TransportActions; @@ -63,11 +61,12 @@ import java.util.Collections; import java.util.Map; import static org.elasticsearch.ExceptionsHelper.unwrapCause; +import static org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction.toSingleItemBulkRequest; +import static org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction.wrapBulkResponse; public class TransportUpdateAction extends TransportInstanceSingleOperationAction { - private final TransportDeleteAction deleteAction; - private final TransportIndexAction indexAction; + private final TransportBulkAction bulkAction; private final AutoCreateIndex autoCreateIndex; private final TransportCreateIndexAction createIndexAction; private final UpdateHelper updateHelper; @@ -75,12 +74,10 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio @Inject public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, - TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportCreateIndexAction createIndexAction, - UpdateHelper updateHelper, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - IndicesService indicesService, AutoCreateIndex autoCreateIndex) { + TransportBulkAction bulkAction, TransportCreateIndexAction createIndexAction, UpdateHelper updateHelper, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, IndicesService indicesService, AutoCreateIndex autoCreateIndex) { super(settings, UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpdateRequest::new); - this.indexAction = indexAction; - this.deleteAction = deleteAction; + this.bulkAction = bulkAction; this.createIndexAction = createIndexAction; this.updateHelper = updateHelper; this.indicesService = indicesService; @@ -162,7 +159,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio return new PlainShardIterator(shardIterator.shardId(), Collections.singletonList(shard)); } } - return new PlainShardIterator(shardIterator.shardId(), Collections.emptyList()); + return new PlainShardIterator(shardIterator.shardId(), Collections.emptyList()); } @Override @@ -180,101 +177,46 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio IndexRequest upsertRequest = result.action(); // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request final BytesReference upsertSourceBytes = upsertRequest.source(); - indexAction.execute(upsertRequest, new ActionListener() { - @Override - public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult()); - if ((request.fetchSource() != null && request.fetchSource().fetchSource()) || - (request.fields() != null && request.fields().length > 0)) { - Tuple> sourceAndContent = - XContentHelper.convertToMap(upsertSourceBytes, true, upsertRequest.getContentType()); - update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes)); - } else { - update.setGetResult(null); - } - update.setForcedRefresh(response.forcedRefresh()); - listener.onResponse(update); - } - - @Override - public void onFailure(Exception e) { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - if (cause instanceof VersionConflictEngineException) { - if (retryCount < request.retryOnConflict()) { - logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]", - retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id()); - threadPool.executor(executor()).execute(new ActionRunnable(listener) { - @Override - protected void doRun() { - shardOperation(request, listener, retryCount + 1); - } - }); - return; + bulkAction.execute(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse( + ActionListener.wrap(response -> { + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult()); + if ((request.fetchSource() != null && request.fetchSource().fetchSource()) || + (request.fields() != null && request.fields().length > 0)) { + Tuple> sourceAndContent = + XContentHelper.convertToMap(upsertSourceBytes, true, upsertRequest.getContentType()); + update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes)); + } else { + update.setGetResult(null); } - } - listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause)); - } - }); + update.setForcedRefresh(response.forcedRefresh()); + listener.onResponse(update); + }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))) + ); + break; case UPDATED: IndexRequest indexRequest = result.action(); // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request final BytesReference indexSourceBytes = indexRequest.source(); - indexAction.execute(indexRequest, new ActionListener() { - @Override - public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult()); - update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); - update.setForcedRefresh(response.forcedRefresh()); - listener.onResponse(update); - } - - @Override - public void onFailure(Exception e) { - final Throwable cause = unwrapCause(e); - if (cause instanceof VersionConflictEngineException) { - if (retryCount < request.retryOnConflict()) { - threadPool.executor(executor()).execute(new ActionRunnable(listener) { - @Override - protected void doRun() { - shardOperation(request, listener, retryCount + 1); - } - }); - return; - } - } - listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause)); - } - }); + bulkAction.execute(toSingleItemBulkRequest(indexRequest), wrapBulkResponse( + ActionListener.wrap(response -> { + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult()); + update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); + update.setForcedRefresh(response.forcedRefresh()); + listener.onResponse(update); + }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))) + ); break; case DELETED: DeleteRequest deleteRequest = result.action(); - deleteAction.execute(deleteRequest, new ActionListener() { - @Override - public void onResponse(DeleteResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult()); - update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); - update.setForcedRefresh(response.forcedRefresh()); - listener.onResponse(update); - } - - @Override - public void onFailure(Exception e) { - final Throwable cause = unwrapCause(e); - if (cause instanceof VersionConflictEngineException) { - if (retryCount < request.retryOnConflict()) { - threadPool.executor(executor()).execute(new ActionRunnable(listener) { - @Override - protected void doRun() { - shardOperation(request, listener, retryCount + 1); - } - }); - return; - } - } - listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause)); - } - }); + bulkAction.execute(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse( + ActionListener.wrap(response -> { + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult()); + update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); + update.setForcedRefresh(response.forcedRefresh()); + listener.onResponse(update); + }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount))) + ); break; case NOOP: UpdateResponse update = result.action(); @@ -291,4 +233,23 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio throw new IllegalStateException("Illegal result " + result.getResponseResult()); } } + + private void handleUpdateFailureWithRetry(final ActionListener listener, final UpdateRequest request, + final Exception failure, int retryCount) { + final Throwable cause = unwrapCause(failure); + if (cause instanceof VersionConflictEngineException) { + if (retryCount < request.retryOnConflict()) { + logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]", + retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id()); + threadPool.executor(executor()).execute(new ActionRunnable(listener) { + @Override + protected void doRun() { + shardOperation(request, listener, retryCount + 1); + } + }); + return; + } + } + listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause)); + } }