incorporate feedback

This commit is contained in:
Areek Zillur 2016-12-21 00:26:58 -05:00
parent eb5cc1e241
commit de44584f84
8 changed files with 218 additions and 149 deletions

View File

@ -0,0 +1,107 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.translog.Translog;
public class SingleWriteOperationUtility {
public static <T extends DocWriteResponse> ActionListener<BulkResponse> wrapBulkResponse(ActionListener<T> listener) {
return ActionListener.wrap(bulkItemResponses -> {
assert bulkItemResponses.getItems().length == 1 : "expected only one item in bulk request";
BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0];
if (bulkItemResponse.isFailed() == false) {
listener.onResponse(bulkItemResponse.getResponse());
} else {
listener.onFailure(bulkItemResponse.getFailure().getCause());
}
}, listener::onFailure);
}
public static BulkRequest toSingleItemBulkRequest(ReplicatedWriteRequest request) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(((DocWriteRequest) request));
bulkRequest.setRefreshPolicy(request.getRefreshPolicy());
bulkRequest.timeout(request.timeout());
bulkRequest.waitForActiveShards(request.waitForActiveShards());
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
return bulkRequest;
}
public static final class ResultHolder<Response extends ReplicationResponse & WriteResponse> {
public final Response response;
public final Translog.Location location;
public final Exception failure;
public ResultHolder(Response response, Translog.Location location, Exception failure) {
this.response = response;
this.location = location;
this.failure = failure;
}
}
public static <Response extends ReplicationResponse & WriteResponse> ResultHolder<Response>
executeSingleItemBulkRequestOnPrimary(ReplicatedWriteRequest request,
ThrowableFunction<BulkShardRequest, Tuple<BulkShardResponse,
Translog.Location>> executeShardBulkAction) throws Exception {
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy();
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request));
BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests);
Tuple<BulkShardResponse, Translog.Location> responseLocationTuple = executeShardBulkAction.apply(bulkShardRequest);
BulkShardResponse bulkShardResponse = responseLocationTuple.v1();
assert bulkShardResponse.getResponses().length == 1: "expected only one bulk shard response";
BulkItemResponse itemResponse = bulkShardResponse.getResponses()[0];
final Response response;
final Exception failure;
if (itemResponse.isFailed()) {
failure = itemResponse.getFailure().getCause();
response = null;
} else {
response = (Response) itemResponse.getResponse();
failure = null;
}
return new ResultHolder<>(response, responseLocationTuple.v2(), failure);
}
public static ResultHolder executeSingleItemBulkRequestOnReplica(ReplicatedWriteRequest replicaRequest,
ThrowableFunction<BulkShardRequest,
Translog.Location> executeShardBulkAction) throws Exception {
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
WriteRequest.RefreshPolicy refreshPolicy = replicaRequest.getRefreshPolicy();
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) replicaRequest));
BulkShardRequest bulkShardRequest = new BulkShardRequest(replicaRequest.shardId(), refreshPolicy, itemRequests);
Translog.Location location = executeShardBulkAction.apply(bulkShardRequest);
return new ResultHolder<>(null, location, null);
}
public interface ThrowableFunction<T, R> {
R apply(T t) throws Exception;
}
}

View File

@ -31,12 +31,9 @@ import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.TransportUpdateAction;

View File

@ -28,11 +28,7 @@ import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.action.update.UpdateHelper;
@ -109,10 +105,11 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
@Override
protected WritePrimaryResult<BulkShardRequest, BulkShardResponse> shardOperationOnPrimary(
public WritePrimaryResult<BulkShardRequest, BulkShardResponse> shardOperationOnPrimary(
BulkShardRequest request, IndexShard primary) throws Exception {
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();
logger.info("TTRACE: in bulk shardOperationPrimary for [{}]", request);
long[] preVersions = new long[request.items().length];
VersionType[] preVersionTypes = new VersionType[request.items().length];
Translog.Location location = null;
@ -367,7 +364,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
}
@Override
protected WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
logger.info("TTRACE: in bulk shardOperationReplica for [{}]", request);
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
@ -425,42 +423,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
return next;
}
public <Request extends ReplicatedWriteRequest<Request>, Response extends ReplicationResponse & WriteResponse>
WritePrimaryResult<Request, Response> executeSingleItemBulkRequestOnPrimary(
Request request, IndexShard primary) throws Exception {
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy();
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request));
BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests);
WritePrimaryResult<BulkShardRequest, BulkShardResponse> result = shardOperationOnPrimary(bulkShardRequest, primary);
BulkShardResponse bulkShardResponse = result.finalResponseIfSuccessful;
assert bulkShardResponse.getResponses().length == 1: "expected only one bulk shard response";
BulkItemResponse itemResponse = bulkShardResponse.getResponses()[0];
final Response response;
final Exception failure;
if (itemResponse.isFailed()) {
failure = itemResponse.getFailure().getCause();
response = null;
} else {
response = (Response) itemResponse.getResponse();
failure = null;
}
return new WritePrimaryResult<>(request, response, result.location, failure, primary, logger);
}
public <ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>>
WriteReplicaResult<ReplicaRequest> executeSingleItemBulkRequestOnReplica(
ReplicaRequest request, IndexShard replica) throws Exception {
BulkItemRequest[] itemRequests = new BulkItemRequest[1];
WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy();
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request));
BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests);
WriteReplicaResult<BulkShardRequest> result = shardOperationOnReplica(bulkShardRequest, replica);
return new WriteReplicaResult<>(request, result.location, null, replica, logger);
}
/**
* Execute the given {@link IndexRequest} on a replica shard, throwing a
* {@link RetryOnReplicaException} if the operation needs to be re-tried.

View File

@ -20,17 +20,16 @@
package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.bulk.BulkShardResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
@ -39,9 +38,18 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnPrimary;
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnReplica;
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.toSingleItemBulkRequest;
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.wrapBulkResponse;
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.ResultHolder;
/**
* Performs the delete operation.
*
* Deprecated use TransportBulkAction with a single item instead
*/
@Deprecated
public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, DeleteRequest, DeleteResponse> {
private final TransportBulkAction bulkAction;
@ -60,30 +68,7 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
@Override
protected void doExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(request);
bulkRequest.setRefreshPolicy(request.getRefreshPolicy());
bulkRequest.timeout(request.timeout());
bulkRequest.waitForActiveShards(request.waitForActiveShards());
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
bulkAction.execute(task, bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
assert bulkItemResponses.getItems().length == 1: "expected only one item in bulk request";
BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0];
if (bulkItemResponse.isFailed() == false) {
DeleteResponse response = bulkItemResponse.getResponse();
listener.onResponse(response);
} else {
listener.onFailure(bulkItemResponse.getFailure().getCause());
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener));
}
@Override
@ -94,12 +79,23 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
@Override
protected WritePrimaryResult<DeleteRequest, DeleteResponse> shardOperationOnPrimary(
DeleteRequest request, IndexShard primary) throws Exception {
return shardBulkAction.executeSingleItemBulkRequestOnPrimary(request, primary);
ResultHolder<DeleteResponse> resultHolder = executeSingleItemBulkRequestOnPrimary(request,
bulkShardRequest -> {
WritePrimaryResult<BulkShardRequest, BulkShardResponse> result =
shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary);
return new Tuple<>(result.finalResponseIfSuccessful, result.location);
}
);
return new WritePrimaryResult<>(request, resultHolder.response, resultHolder.location, resultHolder.failure, primary, logger);
}
@Override
protected WriteReplicaResult<DeleteRequest> shardOperationOnReplica(
DeleteRequest request, IndexShard replica) throws Exception {
return shardBulkAction.executeSingleItemBulkRequestOnReplica(request, replica);
ResultHolder resultHolder = executeSingleItemBulkRequestOnReplica(request, bulkShardRequest -> {
WriteReplicaResult<BulkShardRequest> result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica);
return result.location;
});
return new WriteReplicaResult<>(request, resultHolder.location, null, replica, logger);
}
}

View File

@ -20,17 +20,17 @@
package org.elasticsearch.action.index;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.bulk.BulkShardResponse;
import org.elasticsearch.action.bulk.SingleWriteOperationUtility.ResultHolder;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
@ -39,6 +39,11 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnReplica;
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.executeSingleItemBulkRequestOnPrimary;
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.toSingleItemBulkRequest;
import static org.elasticsearch.action.bulk.SingleWriteOperationUtility.wrapBulkResponse;
/**
* Performs the index operation.
*
@ -48,7 +53,10 @@ import org.elasticsearch.transport.TransportService;
* Defaults to <tt>true</tt>.
* <li><b>allowIdGeneration</b>: If the id is set not, should it be generated. Defaults to <tt>true</tt>.
* </ul>
*
* Deprecated use TransportBulkAction with a single item instead
*/
@Deprecated
public class TransportIndexAction extends TransportWriteAction<IndexRequest, IndexRequest, IndexResponse> {
private final TransportBulkAction bulkAction;
@ -68,30 +76,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
@Override
protected void doExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(request);
bulkRequest.setRefreshPolicy(request.getRefreshPolicy());
bulkRequest.timeout(request.timeout());
bulkRequest.waitForActiveShards(request.waitForActiveShards());
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
bulkAction.execute(task, bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
assert bulkItemResponses.getItems().length == 1: "expected only one item in bulk request";
BulkItemResponse bulkItemResponse = bulkItemResponses.getItems()[0];
if (bulkItemResponse.isFailed() == false) {
IndexResponse response = bulkItemResponse.getResponse();
listener.onResponse(response);
} else {
listener.onFailure(bulkItemResponse.getFailure().getCause());
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener));
}
@Override
@ -101,13 +86,26 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
@Override
protected WritePrimaryResult<IndexRequest, IndexResponse> shardOperationOnPrimary(
IndexRequest request, IndexShard primary) throws Exception {
return shardBulkAction.executeSingleItemBulkRequestOnPrimary(request, primary);
IndexRequest request, final IndexShard primary) throws Exception {
ResultHolder<IndexResponse> resultHolder = executeSingleItemBulkRequestOnPrimary(request,
bulkShardRequest -> {
WritePrimaryResult<BulkShardRequest, BulkShardResponse> result =
shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary);
return new Tuple<>(result.finalResponseIfSuccessful, result.location);
}
);
return new WritePrimaryResult<>(request, resultHolder.response, resultHolder.location, resultHolder.failure, primary, logger);
}
@Override
protected WriteReplicaResult<IndexRequest> shardOperationOnReplica(
IndexRequest request, IndexShard replica) throws Exception {
return shardBulkAction.executeSingleItemBulkRequestOnReplica(request, replica);
ResultHolder resultHolder = executeSingleItemBulkRequestOnReplica(request,
bulkShardRequest -> {
WriteReplicaResult<BulkShardRequest> result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica);
return result.location;
}
);
return new WriteReplicaResult<>(request, resultHolder.location, null, replica, logger);
}
}

View File

@ -21,6 +21,8 @@ package org.elasticsearch.action.support.replication;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteResponse;

View File

@ -121,62 +121,67 @@ public class NoMasterNodeIT extends ESIntegTestCase {
ClusterBlockException.class, RestStatus.SERVICE_UNAVAILABLE
);
checkWriteAction(timeout,
checkUpdateAction(false, timeout,
client().prepareUpdate("test", "type1", "1")
.setScript(new Script(
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "test script", Collections.emptyMap())).setTimeout(timeout));
checkWriteAction(timeout,
checkUpdateAction(autoCreateIndex, timeout,
client().prepareUpdate("no_index", "type1", "1")
.setScript(new Script(
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, "test script", Collections.emptyMap())).setTimeout(timeout));
checkWriteAction(timeout,
checkWriteAction(false, timeout,
client().prepareIndex("test", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()).setTimeout(timeout));
checkWriteAction(timeout,
checkWriteAction(autoCreateIndex, timeout,
client().prepareIndex("no_index", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()).setTimeout(timeout));
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.add(client().prepareIndex("test", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()));
bulkRequestBuilder.add(client().prepareIndex("test", "type1", "2").setSource(XContentFactory.jsonBuilder().startObject().endObject()));
checkBulkAction(false, bulkRequestBuilder);
// the request should fail very quickly - use a large timeout and make sure it didn't pass...
timeout = new TimeValue(5000);
bulkRequestBuilder.setTimeout(timeout);
checkWriteAction(false, timeout, bulkRequestBuilder);
bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.add(client().prepareIndex("no_index", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()));
bulkRequestBuilder.add(client().prepareIndex("no_index", "type1", "2").setSource(XContentFactory.jsonBuilder().startObject().endObject()));
checkBulkAction(autoCreateIndex, bulkRequestBuilder);
internalCluster().startNode(settings);
client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
}
void checkWriteAction(TimeValue timeout, ActionRequestBuilder<?, ?, ?> builder) {
// we clean the metadata when loosing a master, therefore all operations on indices will auto create it, if allowed
long now = System.currentTimeMillis();
try {
builder.get();
fail("expected ClusterBlockException or MasterNotDiscoveredException");
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
// verify we waited before giving up...
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(System.currentTimeMillis() - now, greaterThan(timeout.millis() - 50));
}
}
void checkBulkAction(boolean indexShouldBeAutoCreated, BulkRequestBuilder builder) {
// bulk operation do not throw MasterNotDiscoveredException exceptions. The only test that auto create kicked in and failed is
// via the timeout, as bulk operation do not wait on blocks.
TimeValue timeout;
if (indexShouldBeAutoCreated) {
if (autoCreateIndex) {
// we expect the bulk to fail because it will try to go to the master. Use small timeout and detect it has passed
timeout = new TimeValue(200);
} else {
// the request should fail very quickly - use a large timeout and make sure it didn't pass...
timeout = new TimeValue(5000);
}
builder.setTimeout(timeout);
bulkRequestBuilder.setTimeout(timeout);
checkWriteAction(autoCreateIndex, timeout, bulkRequestBuilder);
internalCluster().startNode(settings);
client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
}
void checkUpdateAction(boolean autoCreateIndex, TimeValue timeout, ActionRequestBuilder<?, ?, ?> builder) {
// we clean the metadata when loosing a master, therefore all operations on indices will auto create it, if allowed
long now = System.currentTimeMillis();
try {
builder.get();
fail("expected ClusterBlockException or MasterNotDiscoveredException");
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
if (e instanceof MasterNotDiscoveredException) {
assertTrue(autoCreateIndex);
} else {
assertFalse(autoCreateIndex);
}
// verify we waited before giving up...
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(System.currentTimeMillis() - now, greaterThan(timeout.millis() - 50));
}
}
void checkWriteAction(boolean indexShouldBeAutoCreated, TimeValue timeout, ActionRequestBuilder<?, ?, ?> builder) {
long now = System.currentTimeMillis();
try {
builder.get();

View File

@ -20,11 +20,12 @@
package org.elasticsearch.index.mapper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.update.UpdateHelper;
@ -61,7 +62,7 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
private static ThreadPool THREAD_POOL;
private ClusterService clusterService;
private TransportService transportService;
private TransportIndexAction transportIndexAction;
private TransportBulkAction transportBulkAction;
@BeforeClass
public static void createThreadPool() {
@ -89,11 +90,8 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
UpdateHelper updateHelper = new UpdateHelper(settings, null);
TransportShardBulkAction shardBulkAction = new TransportShardBulkAction(settings, transportService, clusterService,
indicesService, THREAD_POOL, shardStateAction, null, updateHelper, actionFilters, indexNameExpressionResolver);
TransportBulkAction bulkAction = new TransportBulkAction(settings, THREAD_POOL, transportService, clusterService,
transportBulkAction = new TransportBulkAction(settings, THREAD_POOL, transportService, clusterService,
null, shardBulkAction, null, actionFilters, indexNameExpressionResolver, autoCreateIndex, System::currentTimeMillis);
transportIndexAction = new TransportIndexAction(settings, transportService, clusterService,
indicesService, THREAD_POOL, shardStateAction, actionFilters, indexNameExpressionResolver,
bulkAction, shardBulkAction);
}
@After
@ -115,19 +113,23 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
IndexRequest request = new IndexRequest("index", "type", "1");
request.source("foo", 3);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(request);
final AtomicBoolean onFailureCalled = new AtomicBoolean();
transportIndexAction.execute(request, new ActionListener<IndexResponse>() {
transportBulkAction.execute(bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
fail("Indexing request should have failed");
public void onResponse(BulkResponse bulkResponse) {
BulkItemResponse itemResponse = bulkResponse.getItems()[0];
assertTrue(itemResponse.isFailed());
assertThat(itemResponse.getFailure().getCause(), instanceOf(IndexNotFoundException.class));
assertEquals(itemResponse.getFailure().getCause().getMessage(), "no such index");
onFailureCalled.set(true);
}
@Override
public void onFailure(Exception e) {
onFailureCalled.set(true);
assertThat(e, instanceOf(IndexNotFoundException.class));
assertEquals(e.getMessage(), "no such index");
fail("unexpected failure in bulk action, expected failed bulk item");
}
});