diff --git a/core/src/main/java/org/elasticsearch/action/bulk/SingleWriteOperationUtility.java b/core/src/main/java/org/elasticsearch/action/bulk/SingleWriteOperationUtility.java new file mode 100644 index 00000000000..8473cb38f9e --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/bulk/SingleWriteOperationUtility.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.WriteResponse; +import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.index.translog.Translog; + +public class SingleWriteOperationUtility { + + public static ActionListener wrapBulkResponse(ActionListener listener) { + return ActionListener.wrap(bulkItemResponses -> { + assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request"; + BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0]; + if (bulkItemResponse.isFailed() == false) { + listener.onResponse(bulkItemResponse.getResponse()); + } else { + listener.onFailure(bulkItemResponse.getFailure().getCause()); + } + }, listener::onFailure); + } + + public static BulkRequest toSingleItemBulkRequest(ReplicatedWriteRequest request) { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(((DocWriteRequest) request)); + bulkRequest.setRefreshPolicy(request.getRefreshPolicy()); + bulkRequest.timeout(request.timeout()); + bulkRequest.waitForActiveShards(request.waitForActiveShards()); + request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); + return bulkRequest; + } + + public static final class ResultHolder { + public final Response response; + public final Translog.Location location; + public final Exception failure; + + public ResultHolder(Response response, Translog.Location location, Exception failure) { + this.response = response; + this.location = location; + this.failure = failure; + } + } + + public static ResultHolder + executeSingleItemBulkRequestOnPrimary(ReplicatedWriteRequest request, + ThrowableFunction> executeShardBulkAction) throws Exception { + BulkItemRequest[] itemRequests = new BulkItemRequest[1]; + WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy(); + request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); + itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request)); + BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests); + Tuple responseLocationTuple = executeShardBulkAction.apply(bulkShardRequest); + BulkShardResponse bulkShardResponse = responseLocationTuple.v1(); + assert bulkShardResponse.getResponses().length == 1: "expected only one bulk shard response"; + BulkItemResponse itemResponse = bulkShardResponse.getResponses()[0]; + final Response response; + final Exception failure; + if (itemResponse.isFailed()) { + failure = itemResponse.getFailure().getCause(); + response = null; + } else { + response = (Response) itemResponse.getResponse(); + failure = null; + } + return new ResultHolder<>(response, responseLocationTuple.v2(), failure); + } + + public static ResultHolder executeSingleItemBulkRequestOnReplica(ReplicatedWriteRequest replicaRequest, + ThrowableFunction executeShardBulkAction) throws Exception { + BulkItemRequest[] itemRequests = new BulkItemRequest[1]; + WriteRequest.RefreshPolicy refreshPolicy = replicaRequest.getRefreshPolicy(); + itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) replicaRequest)); + BulkShardRequest bulkShardRequest = new BulkShardRequest(replicaRequest.shardId(), refreshPolicy, itemRequests); + Translog.Location location = executeShardBulkAction.apply(bulkShardRequest); + return new ResultHolder<>(null, location, null); + } + + public interface ThrowableFunction { + R apply(T t) throws Exception; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 3e2efb3a731..f52f4535635 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -31,12 +31,9 @@ import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.delete.TransportDeleteAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.IngestActionForwarder; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.update.TransportUpdateAction; diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index d52d0bc11c9..0d394cdcd8f 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -28,11 +28,7 @@ import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.support.WriteResponse; -import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationOperation; -import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.update.UpdateHelper; @@ -109,10 +105,11 @@ public class TransportShardBulkAction extends TransportWriteAction shardOperationOnPrimary( + public WritePrimaryResult shardOperationOnPrimary( BulkShardRequest request, IndexShard primary) throws Exception { final IndexMetaData metaData = primary.indexSettings().getIndexMetaData(); + logger.info("TTRACE: in bulk shardOperationPrimary for [{}]", request); long[] preVersions = new long[request.items().length]; VersionType[] preVersionTypes = new VersionType[request.items().length]; Translog.Location location = null; @@ -367,7 +364,8 @@ public class TransportShardBulkAction extends TransportWriteAction shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + public WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + logger.info("TTRACE: in bulk shardOperationReplica for [{}]", request); Translog.Location location = null; for (int i = 0; i < request.items().length; i++) { BulkItemRequest item = request.items()[i]; @@ -425,42 +423,6 @@ public class TransportShardBulkAction extends TransportWriteAction, Response extends ReplicationResponse & WriteResponse> - WritePrimaryResult executeSingleItemBulkRequestOnPrimary( - Request request, IndexShard primary) throws Exception { - BulkItemRequest[] itemRequests = new BulkItemRequest[1]; - WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy(); - request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); - itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request)); - BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests); - WritePrimaryResult result = shardOperationOnPrimary(bulkShardRequest, primary); - BulkShardResponse bulkShardResponse = result.finalResponseIfSuccessful; - assert bulkShardResponse.getResponses().length == 1: "expected only one bulk shard response"; - BulkItemResponse itemResponse = bulkShardResponse.getResponses()[0]; - final Response response; - final Exception failure; - if (itemResponse.isFailed()) { - failure = itemResponse.getFailure().getCause(); - response = null; - } else { - response = (Response) itemResponse.getResponse(); - failure = null; - } - return new WritePrimaryResult<>(request, response, result.location, failure, primary, logger); - } - - public > - WriteReplicaResult executeSingleItemBulkRequestOnReplica( - ReplicaRequest request, IndexShard replica) throws Exception { - BulkItemRequest[] itemRequests = new BulkItemRequest[1]; - WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy(); - request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); - itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request)); - BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests); - WriteReplicaResult result = shardOperationOnReplica(bulkShardRequest, replica); - return new WriteReplicaResult<>(request, result.location, null, replica, logger); - } - /** * Execute the given {@link IndexRequest} on a replica shard, throwing a * {@link RetryOnReplicaException} if the operation needs to be re-tried. diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index d8effe868c1..5a8cbbc5bcc 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -20,17 +20,16 @@ package org.elasticsearch.action.delete; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.bulk.BulkShardResponse; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.IndexShard; @@ -39,9 +38,18 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnPrimary; +import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnReplica; +import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.toSingleItemBulkRequest; +import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.wrapBulkResponse; +import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.ResultHolder; + /** * Performs the delete operation. + * + * Deprecated use TransportBulkAction with a single item instead */ +@Deprecated public class TransportDeleteAction extends TransportWriteAction { private final TransportBulkAction bulkAction; @@ -60,30 +68,7 @@ public class TransportDeleteAction extends TransportWriteAction listener) { - BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(request); - bulkRequest.setRefreshPolicy(request.getRefreshPolicy()); - bulkRequest.timeout(request.timeout()); - bulkRequest.waitForActiveShards(request.waitForActiveShards()); - request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); - bulkAction.execute(task, bulkRequest, new ActionListener() { - @Override - public void onResponse(BulkResponse bulkItemResponses) { - assert bulkItemResponses.getItems().length == 1: "expected only one item in bulk request"; - BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0]; - if (bulkItemResponse.isFailed() == false) { - DeleteResponse response = bulkItemResponse.getResponse(); - listener.onResponse(response); - } else { - listener.onFailure(bulkItemResponse.getFailure().getCause()); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener)); } @Override @@ -94,12 +79,23 @@ public class TransportDeleteAction extends TransportWriteAction shardOperationOnPrimary( DeleteRequest request, IndexShard primary) throws Exception { - return shardBulkAction.executeSingleItemBulkRequestOnPrimary(request, primary); + ResultHolder resultHolder = executeSingleItemBulkRequestOnPrimary(request, + bulkShardRequest -> { + WritePrimaryResult result = + shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary); + return new Tuple<>(result.finalResponseIfSuccessful, result.location); + } + ); + return new WritePrimaryResult<>(request, resultHolder.response, resultHolder.location, resultHolder.failure, primary, logger); } @Override protected WriteReplicaResult shardOperationOnReplica( DeleteRequest request, IndexShard replica) throws Exception { - return shardBulkAction.executeSingleItemBulkRequestOnReplica(request, replica); + ResultHolder resultHolder = executeSingleItemBulkRequestOnReplica(request, bulkShardRequest -> { + WriteReplicaResult result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica); + return result.location; + }); + return new WriteReplicaResult<>(request, resultHolder.location, null, replica, logger); } } diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index a0c21a0dbcf..3b3c29b5a08 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -20,17 +20,17 @@ package org.elasticsearch.action.index; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.bulk.BulkShardResponse; +import org.elasticsearch.action.bulk.SingleWriteOperationUtility.ResultHolder; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.IndexShard; @@ -39,6 +39,11 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnReplica; +import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnPrimary; +import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.toSingleItemBulkRequest; +import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.wrapBulkResponse; + /** * Performs the index operation. * @@ -48,7 +53,10 @@ import org.elasticsearch.transport.TransportService; * Defaults to true. *
  • allowIdGeneration: If the id is set not, should it be generated. Defaults to true. * + * + * Deprecated use TransportBulkAction with a single item instead */ +@Deprecated public class TransportIndexAction extends TransportWriteAction { private final TransportBulkAction bulkAction; @@ -68,30 +76,7 @@ public class TransportIndexAction extends TransportWriteAction listener) { - BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(request); - bulkRequest.setRefreshPolicy(request.getRefreshPolicy()); - bulkRequest.timeout(request.timeout()); - bulkRequest.waitForActiveShards(request.waitForActiveShards()); - request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); - bulkAction.execute(task, bulkRequest, new ActionListener() { - @Override - public void onResponse(BulkResponse bulkItemResponses) { - assert bulkItemResponses.getItems().length == 1: "expected only one item in bulk request"; - BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0]; - if (bulkItemResponse.isFailed() == false) { - IndexResponse response = bulkItemResponse.getResponse(); - listener.onResponse(response); - } else { - listener.onFailure(bulkItemResponse.getFailure().getCause()); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener)); } @Override @@ -101,13 +86,26 @@ public class TransportIndexAction extends TransportWriteAction shardOperationOnPrimary( - IndexRequest request, IndexShard primary) throws Exception { - return shardBulkAction.executeSingleItemBulkRequestOnPrimary(request, primary); + IndexRequest request, final IndexShard primary) throws Exception { + ResultHolder resultHolder = executeSingleItemBulkRequestOnPrimary(request, + bulkShardRequest -> { + WritePrimaryResult result = + shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary); + return new Tuple<>(result.finalResponseIfSuccessful, result.location); + } + ); + return new WritePrimaryResult<>(request, resultHolder.response, resultHolder.location, resultHolder.failure, primary, logger); } @Override protected WriteReplicaResult shardOperationOnReplica( IndexRequest request, IndexShard replica) throws Exception { - return shardBulkAction.executeSingleItemBulkRequestOnReplica(request, replica); + ResultHolder resultHolder = executeSingleItemBulkRequestOnReplica(request, + bulkShardRequest -> { + WriteReplicaResult result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica); + return result.location; + } + ); + return new WriteReplicaResult<>(request, resultHolder.location, null, replica, logger); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index afc16df054d..8569b28257f 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -21,6 +21,8 @@ package org.elasticsearch.action.support.replication; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteResponse; diff --git a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java index 211e8b1f600..74ccbc7f8bd 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java @@ -121,62 +121,67 @@ public class NoMasterNodeIT extends ESIntegTestCase { ClusterBlockException.class, RestStatus.SERVICE_UNAVAILABLE ); - checkWriteAction(timeout, + checkUpdateAction(false, timeout, client().prepareUpdate("test", "type1", "1") .setScript(new Script( ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "test script", Collections.emptyMap())).setTimeout(timeout)); - checkWriteAction(timeout, + checkUpdateAction(autoCreateIndex, timeout, client().prepareUpdate("no_index", "type1", "1") .setScript(new Script( ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "test script", Collections.emptyMap())).setTimeout(timeout)); - checkWriteAction(timeout, + checkWriteAction(false, timeout, client().prepareIndex("test", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()).setTimeout(timeout)); - checkWriteAction(timeout, + checkWriteAction(autoCreateIndex, timeout, client().prepareIndex("no_index", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()).setTimeout(timeout)); BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); bulkRequestBuilder.add(client().prepareIndex("test", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject())); bulkRequestBuilder.add(client().prepareIndex("test", "type1", "2").setSource(XContentFactory.jsonBuilder().startObject().endObject())); - checkBulkAction(false, bulkRequestBuilder); + // the request should fail very quickly - use a large timeout and make sure it didn't pass... + timeout = new TimeValue(5000); + bulkRequestBuilder.setTimeout(timeout); + checkWriteAction(false, timeout, bulkRequestBuilder); bulkRequestBuilder = client().prepareBulk(); bulkRequestBuilder.add(client().prepareIndex("no_index", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject())); bulkRequestBuilder.add(client().prepareIndex("no_index", "type1", "2").setSource(XContentFactory.jsonBuilder().startObject().endObject())); - checkBulkAction(autoCreateIndex, bulkRequestBuilder); - - internalCluster().startNode(settings); - client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); - } - - void checkWriteAction(TimeValue timeout, ActionRequestBuilder builder) { - // we clean the metadata when loosing a master, therefore all operations on indices will auto create it, if allowed - long now = System.currentTimeMillis(); - try { - builder.get(); - fail("expected ClusterBlockException or MasterNotDiscoveredException"); - } catch (ClusterBlockException | MasterNotDiscoveredException e) { - // verify we waited before giving up... - assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(System.currentTimeMillis() - now, greaterThan(timeout.millis() - 50)); - } - } - - void checkBulkAction(boolean indexShouldBeAutoCreated, BulkRequestBuilder builder) { - // bulk operation do not throw MasterNotDiscoveredException exceptions. The only test that auto create kicked in and failed is - // via the timeout, as bulk operation do not wait on blocks. - TimeValue timeout; - if (indexShouldBeAutoCreated) { + if (autoCreateIndex) { // we expect the bulk to fail because it will try to go to the master. Use small timeout and detect it has passed timeout = new TimeValue(200); } else { // the request should fail very quickly - use a large timeout and make sure it didn't pass... timeout = new TimeValue(5000); } - builder.setTimeout(timeout); + bulkRequestBuilder.setTimeout(timeout); + checkWriteAction(autoCreateIndex, timeout, bulkRequestBuilder); + + internalCluster().startNode(settings); + client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet(); + } + + void checkUpdateAction(boolean autoCreateIndex, TimeValue timeout, ActionRequestBuilder builder) { + // we clean the metadata when loosing a master, therefore all operations on indices will auto create it, if allowed + long now = System.currentTimeMillis(); + try { + builder.get(); + fail("expected ClusterBlockException or MasterNotDiscoveredException"); + } catch (ClusterBlockException | MasterNotDiscoveredException e) { + if (e instanceof MasterNotDiscoveredException) { + assertTrue(autoCreateIndex); + } else { + assertFalse(autoCreateIndex); + } + // verify we waited before giving up... + assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); + assertThat(System.currentTimeMillis() - now, greaterThan(timeout.millis() - 50)); + } + } + + void checkWriteAction(boolean indexShouldBeAutoCreated, TimeValue timeout, ActionRequestBuilder builder) { long now = System.currentTimeMillis(); try { builder.get(); diff --git a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java index 431595ec21f..f3924f7663b 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java @@ -20,11 +20,12 @@ package org.elasticsearch.index.mapper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.update.UpdateHelper; @@ -61,7 +62,7 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase { private static ThreadPool THREAD_POOL; private ClusterService clusterService; private TransportService transportService; - private TransportIndexAction transportIndexAction; + private TransportBulkAction transportBulkAction; @BeforeClass public static void createThreadPool() { @@ -89,11 +90,8 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase { UpdateHelper updateHelper = new UpdateHelper(settings, null); TransportShardBulkAction shardBulkAction = new TransportShardBulkAction(settings, transportService, clusterService, indicesService, THREAD_POOL, shardStateAction, null, updateHelper, actionFilters, indexNameExpressionResolver); - TransportBulkAction bulkAction = new TransportBulkAction(settings, THREAD_POOL, transportService, clusterService, + transportBulkAction = new TransportBulkAction(settings, THREAD_POOL, transportService, clusterService, null, shardBulkAction, null, actionFilters, indexNameExpressionResolver, autoCreateIndex, System::currentTimeMillis); - transportIndexAction = new TransportIndexAction(settings, transportService, clusterService, - indicesService, THREAD_POOL, shardStateAction, actionFilters, indexNameExpressionResolver, - bulkAction, shardBulkAction); } @After @@ -115,19 +113,23 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase { IndexRequest request = new IndexRequest("index", "type", "1"); request.source("foo", 3); + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(request); final AtomicBoolean onFailureCalled = new AtomicBoolean(); - transportIndexAction.execute(request, new ActionListener() { + transportBulkAction.execute(bulkRequest, new ActionListener() { @Override - public void onResponse(IndexResponse indexResponse) { - fail("Indexing request should have failed"); + public void onResponse(BulkResponse bulkResponse) { + BulkItemResponse itemResponse = bulkResponse.getItems()[0]; + assertTrue(itemResponse.isFailed()); + assertThat(itemResponse.getFailure().getCause(), instanceOf(IndexNotFoundException.class)); + assertEquals(itemResponse.getFailure().getCause().getMessage(), "no such index"); + onFailureCalled.set(true); } @Override public void onFailure(Exception e) { - onFailureCalled.set(true); - assertThat(e, instanceOf(IndexNotFoundException.class)); - assertEquals(e.getMessage(), "no such index"); + fail("unexpected failure in bulk action, expected failed bulk item"); } });