Bulk api: fail deletes when routing is required but not specified
As part of #10136 we removed the transport action for broadcast deletes in case routing is required but not specified. Bulk api worked differently though and kept on doing the broadcast delete internally in that case. This commit makes sure that delete items are marked as failed in such cases. Also the check has been moved up in the code together with the existing check for the update api, and we now make sure that the exception is the same as the one thrown for single document apis (delete/update). Note that the failure for the update api contained the wrong optype (the type of the document rather than "update"), that's been fixed too and tested. Closes #16645
This commit is contained in:
parent
2d2197ed2e
commit
0fe73b51f9
|
@ -30,10 +30,12 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||||
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
|
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
|
||||||
import org.elasticsearch.action.delete.DeleteRequest;
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
|
import org.elasticsearch.action.delete.TransportDeleteAction;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.support.ActionFilters;
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||||
import org.elasticsearch.action.support.HandledTransportAction;
|
import org.elasticsearch.action.support.HandledTransportAction;
|
||||||
|
import org.elasticsearch.action.update.TransportUpdateAction;
|
||||||
import org.elasticsearch.action.update.UpdateRequest;
|
import org.elasticsearch.action.update.UpdateRequest;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
@ -42,12 +44,9 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
|
||||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||||
import org.elasticsearch.index.Index;
|
|
||||||
import org.elasticsearch.index.IndexNotFoundException;
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
||||||
|
@ -197,10 +196,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||||
*/
|
*/
|
||||||
public void executeBulk(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
|
public void executeBulk(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
|
||||||
final long startTime = System.currentTimeMillis();
|
final long startTime = System.currentTimeMillis();
|
||||||
executeBulk(bulkRequest, startTime, listener, new AtomicArray<BulkItemResponse>(bulkRequest.requests.size()));
|
executeBulk(bulkRequest, startTime, listener, new AtomicArray<>(bulkRequest.requests.size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private final long buildTookInMillis(long startTime) {
|
private long buildTookInMillis(long startTime) {
|
||||||
// protect ourselves against time going backwards
|
// protect ourselves against time going backwards
|
||||||
return Math.max(1, System.currentTimeMillis() - startTime);
|
return Math.max(1, System.currentTimeMillis() - startTime);
|
||||||
}
|
}
|
||||||
|
@ -214,14 +213,15 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||||
MetaData metaData = clusterState.metaData();
|
MetaData metaData = clusterState.metaData();
|
||||||
for (int i = 0; i < bulkRequest.requests.size(); i++) {
|
for (int i = 0; i < bulkRequest.requests.size(); i++) {
|
||||||
ActionRequest request = bulkRequest.requests.get(i);
|
ActionRequest request = bulkRequest.requests.get(i);
|
||||||
if (request instanceof DocumentRequest) {
|
//the request can only be null because we set it to null in the previous step, so it gets ignored
|
||||||
DocumentRequest req = (DocumentRequest) request;
|
if (request == null) {
|
||||||
|
|
||||||
if (addFailureIfIndexIsUnavailable(req, bulkRequest, responses, i, concreteIndices, metaData)) {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
DocumentRequest documentRequest = (DocumentRequest) request;
|
||||||
String concreteIndex = concreteIndices.resolveIfAbsent(req);
|
if (addFailureIfIndexIsUnavailable(documentRequest, bulkRequest, responses, i, concreteIndices, metaData)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
String concreteIndex = concreteIndices.resolveIfAbsent(documentRequest);
|
||||||
if (request instanceof IndexRequest) {
|
if (request instanceof IndexRequest) {
|
||||||
IndexRequest indexRequest = (IndexRequest) request;
|
IndexRequest indexRequest = (IndexRequest) request;
|
||||||
MappingMetaData mappingMd = null;
|
MappingMetaData mappingMd = null;
|
||||||
|
@ -237,10 +237,29 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||||
// make sure the request gets never processed again
|
// make sure the request gets never processed again
|
||||||
bulkRequest.requests.set(i, null);
|
bulkRequest.requests.set(i, null);
|
||||||
}
|
}
|
||||||
} else {
|
} else if (request instanceof DeleteRequest) {
|
||||||
concreteIndices.resolveIfAbsent(req);
|
try {
|
||||||
req.routing(clusterState.metaData().resolveIndexRouting(req.parent(), req.routing(), req.index()));
|
TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex, (DeleteRequest)request);
|
||||||
|
} catch(RoutingMissingException e) {
|
||||||
|
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, documentRequest.type(), documentRequest.id(), e);
|
||||||
|
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "delete", failure);
|
||||||
|
responses.set(i, bulkItemResponse);
|
||||||
|
// make sure the request gets never processed again
|
||||||
|
bulkRequest.requests.set(i, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
} else if (request instanceof UpdateRequest) {
|
||||||
|
try {
|
||||||
|
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex, (UpdateRequest)request);
|
||||||
|
} catch(RoutingMissingException e) {
|
||||||
|
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, documentRequest.type(), documentRequest.id(), e);
|
||||||
|
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "update", failure);
|
||||||
|
responses.set(i, bulkItemResponse);
|
||||||
|
// make sure the request gets never processed again
|
||||||
|
bulkRequest.requests.set(i, null);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new AssertionError("request type not supported: [" + request.getClass().getName() + "]");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,19 +281,6 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||||
} else if (request instanceof DeleteRequest) {
|
} else if (request instanceof DeleteRequest) {
|
||||||
DeleteRequest deleteRequest = (DeleteRequest) request;
|
DeleteRequest deleteRequest = (DeleteRequest) request;
|
||||||
String concreteIndex = concreteIndices.getConcreteIndex(deleteRequest.index());
|
String concreteIndex = concreteIndices.getConcreteIndex(deleteRequest.index());
|
||||||
MappingMetaData mappingMd = clusterState.metaData().index(concreteIndex).mappingOrDefault(deleteRequest.type());
|
|
||||||
if (mappingMd != null && mappingMd.routing().required() && deleteRequest.routing() == null) {
|
|
||||||
// if routing is required, and no routing on the delete request, we need to broadcast it....
|
|
||||||
GroupShardsIterator groupShards = clusterService.operationRouting().broadcastDeleteShards(clusterState, concreteIndex);
|
|
||||||
for (ShardIterator shardIt : groupShards) {
|
|
||||||
List<BulkItemRequest> list = requestsByShard.get(shardIt.shardId());
|
|
||||||
if (list == null) {
|
|
||||||
list = new ArrayList<>();
|
|
||||||
requestsByShard.put(shardIt.shardId(), list);
|
|
||||||
}
|
|
||||||
list.add(new BulkItemRequest(i, deleteRequest));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
|
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
|
||||||
List<BulkItemRequest> list = requestsByShard.get(shardId);
|
List<BulkItemRequest> list = requestsByShard.get(shardId);
|
||||||
if (list == null) {
|
if (list == null) {
|
||||||
|
@ -282,17 +288,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||||
requestsByShard.put(shardId, list);
|
requestsByShard.put(shardId, list);
|
||||||
}
|
}
|
||||||
list.add(new BulkItemRequest(i, request));
|
list.add(new BulkItemRequest(i, request));
|
||||||
}
|
|
||||||
} else if (request instanceof UpdateRequest) {
|
} else if (request instanceof UpdateRequest) {
|
||||||
UpdateRequest updateRequest = (UpdateRequest) request;
|
UpdateRequest updateRequest = (UpdateRequest) request;
|
||||||
String concreteIndex = concreteIndices.getConcreteIndex(updateRequest.index());
|
String concreteIndex = concreteIndices.getConcreteIndex(updateRequest.index());
|
||||||
MappingMetaData mappingMd = clusterState.metaData().index(concreteIndex).mappingOrDefault(updateRequest.type());
|
|
||||||
if (mappingMd != null && mappingMd.routing().required() && updateRequest.routing() == null) {
|
|
||||||
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(),
|
|
||||||
updateRequest.id(), new IllegalArgumentException("routing is required for this item"));
|
|
||||||
responses.set(i, new BulkItemResponse(i, updateRequest.type(), failure));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, updateRequest.type(), updateRequest.id(), updateRequest.routing()).shardId();
|
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, updateRequest.type(), updateRequest.id(), updateRequest.routing()).shardId();
|
||||||
List<BulkItemRequest> list = requestsByShard.get(shardId);
|
List<BulkItemRequest> list = requestsByShard.get(shardId);
|
||||||
if (list == null) {
|
if (list == null) {
|
||||||
|
|
|
@ -96,9 +96,15 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void resolveRequest(final MetaData metaData, String concreteIndex, DeleteRequest request) {
|
protected void resolveRequest(final MetaData metaData, String concreteIndex, DeleteRequest request) {
|
||||||
|
resolveAndValidateRouting(metaData, concreteIndex, request);
|
||||||
|
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(), concreteIndex, request.id(), request.routing());
|
||||||
|
request.setShardId(shardId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void resolveAndValidateRouting(final MetaData metaData, String concreteIndex, DeleteRequest request) {
|
||||||
request.routing(metaData.resolveIndexRouting(request.parent(), request.routing(), request.index()));
|
request.routing(metaData.resolveIndexRouting(request.parent(), request.routing(), request.index()));
|
||||||
if (metaData.hasIndex(concreteIndex)) {
|
if (metaData.hasIndex(concreteIndex)) {
|
||||||
// check if routing is required, if so, do a broadcast delete
|
// check if routing is required, if so, throw error if routing wasn't specified
|
||||||
MappingMetaData mappingMd = metaData.index(concreteIndex).mappingOrDefault(request.type());
|
MappingMetaData mappingMd = metaData.index(concreteIndex).mappingOrDefault(request.type());
|
||||||
if (mappingMd != null && mappingMd.routing().required()) {
|
if (mappingMd != null && mappingMd.routing().required()) {
|
||||||
if (request.routing() == null) {
|
if (request.routing() == null) {
|
||||||
|
@ -111,8 +117,6 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(), concreteIndex, request.id(), request.routing());
|
|
||||||
request.setShardId(shardId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void innerExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
|
private void innerExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.action.support.single.instance.TransportInstanceSingleO
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.routing.PlainShardIterator;
|
import org.elasticsearch.cluster.routing.PlainShardIterator;
|
||||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
|
@ -100,14 +101,18 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean resolveRequest(ClusterState state, UpdateRequest request, ActionListener<UpdateResponse> listener) {
|
protected boolean resolveRequest(ClusterState state, UpdateRequest request, ActionListener<UpdateResponse> listener) {
|
||||||
request.routing((state.metaData().resolveIndexRouting(request.parent(), request.routing(), request.index())));
|
resolveAndValidateRouting(state.metaData(), request.concreteIndex(), request);
|
||||||
// Fail fast on the node that received the request, rather than failing when translating on the index or delete request.
|
|
||||||
if (request.routing() == null && state.getMetaData().routingRequired(request.concreteIndex(), request.type())) {
|
|
||||||
throw new RoutingMissingException(request.concreteIndex(), request.type(), request.id());
|
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void resolveAndValidateRouting(MetaData metaData, String concreteIndex, UpdateRequest request) {
|
||||||
|
request.routing((metaData.resolveIndexRouting(request.parent(), request.routing(), request.index())));
|
||||||
|
// Fail fast on the node that received the request, rather than failing when translating on the index or delete request.
|
||||||
|
if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) {
|
||||||
|
throw new RoutingMissingException(concreteIndex, request.type(), request.id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doExecute(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
|
protected void doExecute(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
|
||||||
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
|
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
|
||||||
|
|
|
@ -29,7 +29,6 @@ import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.math.MathUtils;
|
import org.elasticsearch.common.math.MathUtils;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.Index;
|
|
||||||
import org.elasticsearch.index.IndexNotFoundException;
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.shard.ShardNotFoundException;
|
import org.elasticsearch.index.shard.ShardNotFoundException;
|
||||||
|
@ -67,10 +66,6 @@ public class OperationRouting extends AbstractComponent {
|
||||||
return preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
|
return preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference);
|
||||||
}
|
}
|
||||||
|
|
||||||
public GroupShardsIterator broadcastDeleteShards(ClusterState clusterState, String index) {
|
|
||||||
return indexRoutingTable(clusterState, index).groupByShardsIt();
|
|
||||||
}
|
|
||||||
|
|
||||||
public int searchShardsCount(ClusterState clusterState, String[] concreteIndices, @Nullable Map<String, Set<String>> routing) {
|
public int searchShardsCount(ClusterState clusterState, String[] concreteIndices, @Nullable Map<String, Set<String>> routing) {
|
||||||
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
|
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
|
||||||
return shards.size();
|
return shards.size();
|
||||||
|
|
|
@ -20,24 +20,26 @@
|
||||||
package org.elasticsearch.routing;
|
package org.elasticsearch.routing;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.Version;
|
|
||||||
import org.elasticsearch.action.RoutingMissingException;
|
import org.elasticsearch.action.RoutingMissingException;
|
||||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||||
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
import org.elasticsearch.action.explain.ExplainResponse;
|
import org.elasticsearch.action.explain.ExplainResponse;
|
||||||
|
import org.elasticsearch.action.get.GetResponse;
|
||||||
import org.elasticsearch.action.get.MultiGetRequest;
|
import org.elasticsearch.action.get.MultiGetRequest;
|
||||||
import org.elasticsearch.action.get.MultiGetResponse;
|
import org.elasticsearch.action.get.MultiGetResponse;
|
||||||
import org.elasticsearch.action.termvectors.MultiTermVectorsResponse;
|
import org.elasticsearch.action.termvectors.MultiTermVectorsResponse;
|
||||||
import org.elasticsearch.action.termvectors.TermVectorsRequest;
|
import org.elasticsearch.action.termvectors.TermVectorsRequest;
|
||||||
import org.elasticsearch.action.termvectors.TermVectorsResponse;
|
import org.elasticsearch.action.termvectors.TermVectorsResponse;
|
||||||
|
import org.elasticsearch.action.update.UpdateRequest;
|
||||||
import org.elasticsearch.action.update.UpdateResponse;
|
import org.elasticsearch.action.update.UpdateResponse;
|
||||||
import org.elasticsearch.client.Requests;
|
import org.elasticsearch.client.Requests;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.index.mapper.MapperParsingException;
|
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
@ -156,8 +158,7 @@ public class SimpleRoutingIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/16645")
|
public void testRequiredRoutingCrudApis() throws Exception {
|
||||||
public void testRequiredRoutingMapping() throws Exception {
|
|
||||||
client().admin().indices().prepareCreate("test").addAlias(new Alias("alias"))
|
client().admin().indices().prepareCreate("test").addAlias(new Alias("alias"))
|
||||||
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_routing").field("required", true).endObject().endObject().endObject())
|
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_routing").field("required", true).endObject().endObject().endObject())
|
||||||
.execute().actionGet();
|
.execute().actionGet();
|
||||||
|
@ -203,9 +204,31 @@ public class SimpleRoutingIT extends ESIntegTestCase {
|
||||||
client().prepareIndex(indexOrAlias(), "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet();
|
client().prepareIndex(indexOrAlias(), "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet();
|
||||||
logger.info("--> verifying get with no routing, should not find anything");
|
logger.info("--> verifying get with no routing, should not find anything");
|
||||||
|
|
||||||
logger.info("--> bulk deleting with no routing, should broadcast the delete since _routing is required");
|
try {
|
||||||
client().prepareBulk().add(Requests.deleteRequest(indexOrAlias()).type("type1").id("1")).execute().actionGet();
|
client().prepareUpdate(indexOrAlias(), "type1", "1").setDoc("field", "value2").execute().actionGet();
|
||||||
|
fail("update with missing routing when routing is required should fail");
|
||||||
|
} catch(ElasticsearchException e) {
|
||||||
|
assertThat(e.unwrapCause(), instanceOf(RoutingMissingException.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
client().prepareUpdate(indexOrAlias(), "type1", "1").setRouting("0").setDoc("field", "value2").execute().actionGet();
|
||||||
client().admin().indices().prepareRefresh().execute().actionGet();
|
client().admin().indices().prepareRefresh().execute().actionGet();
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
try {
|
||||||
|
client().prepareGet(indexOrAlias(), "type1", "1").execute().actionGet().isExists();
|
||||||
|
fail();
|
||||||
|
} catch (RoutingMissingException e) {
|
||||||
|
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||||
|
assertThat(e.getMessage(), equalTo("routing is required for [test]/[type1]/[1]"));
|
||||||
|
}
|
||||||
|
GetResponse getResponse = client().prepareGet(indexOrAlias(), "type1", "1").setRouting("0").execute().actionGet();
|
||||||
|
assertThat(getResponse.isExists(), equalTo(true));
|
||||||
|
assertThat(getResponse.getSourceAsMap().get("field"), equalTo("value2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
client().prepareDelete(indexOrAlias(), "type1", "1").setRouting("0").setRefresh(true).execute().actionGet();
|
||||||
|
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
try {
|
try {
|
||||||
client().prepareGet(indexOrAlias(), "type1", "1").execute().actionGet().isExists();
|
client().prepareGet(indexOrAlias(), "type1", "1").execute().actionGet().isExists();
|
||||||
|
@ -227,28 +250,72 @@ public class SimpleRoutingIT extends ESIntegTestCase {
|
||||||
.execute().actionGet();
|
.execute().actionGet();
|
||||||
ensureGreen();
|
ensureGreen();
|
||||||
|
|
||||||
logger.info("--> indexing with id [1], and routing [0]");
|
{
|
||||||
client().prepareBulk().add(
|
BulkResponse bulkResponse = client().prepareBulk().add(Requests.indexRequest(indexOrAlias()).type("type1").id("1")
|
||||||
client().prepareIndex(indexOrAlias(), "type1", "1").setRouting("0").setSource("field", "value1")).execute().actionGet();
|
.source("field", "value")).execute().actionGet();
|
||||||
client().admin().indices().prepareRefresh().execute().actionGet();
|
assertThat(bulkResponse.getItems().length, equalTo(1));
|
||||||
|
assertThat(bulkResponse.hasFailures(), equalTo(true));
|
||||||
|
|
||||||
logger.info("--> verifying get with no routing, should fail");
|
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
||||||
for (int i = 0; i < 5; i++) {
|
assertThat(bulkItemResponse.isFailed(), equalTo(true));
|
||||||
try {
|
assertThat(bulkItemResponse.getOpType(), equalTo("index"));
|
||||||
client().prepareGet(indexOrAlias(), "type1", "1").execute().actionGet().isExists();
|
assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST));
|
||||||
fail();
|
assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class));
|
||||||
} catch (RoutingMissingException e) {
|
assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]"));
|
||||||
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
|
||||||
assertThat(e.getMessage(), equalTo("routing is required for [test]/[type1]/[1]"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.info("--> verifying get with routing, should find");
|
|
||||||
for (int i = 0; i < 5; i++) {
|
|
||||||
assertThat(client().prepareGet(indexOrAlias(), "type1", "1").setRouting("0").execute().actionGet().isExists(), equalTo(true));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRequiredRoutingMapping_variousAPIs() throws Exception {
|
{
|
||||||
|
BulkResponse bulkResponse = client().prepareBulk().add(Requests.indexRequest(indexOrAlias()).type("type1").id("1").routing("0")
|
||||||
|
.source("field", "value")).execute().actionGet();
|
||||||
|
assertThat(bulkResponse.hasFailures(), equalTo(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
BulkResponse bulkResponse = client().prepareBulk().add(new UpdateRequest(indexOrAlias(), "type1", "1").doc("field", "value2"))
|
||||||
|
.execute().actionGet();
|
||||||
|
assertThat(bulkResponse.getItems().length, equalTo(1));
|
||||||
|
assertThat(bulkResponse.hasFailures(), equalTo(true));
|
||||||
|
|
||||||
|
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
||||||
|
assertThat(bulkItemResponse.isFailed(), equalTo(true));
|
||||||
|
assertThat(bulkItemResponse.getOpType(), equalTo("update"));
|
||||||
|
assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST));
|
||||||
|
assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class));
|
||||||
|
assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
BulkResponse bulkResponse = client().prepareBulk().add(new UpdateRequest(indexOrAlias(), "type1", "1").doc("field", "value2")
|
||||||
|
.routing("0")).execute().actionGet();
|
||||||
|
assertThat(bulkResponse.hasFailures(), equalTo(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
BulkResponse bulkResponse = client().prepareBulk().add(Requests.deleteRequest(indexOrAlias()).type("type1").id("1"))
|
||||||
|
.execute().actionGet();
|
||||||
|
assertThat(bulkResponse.getItems().length, equalTo(1));
|
||||||
|
assertThat(bulkResponse.hasFailures(), equalTo(true));
|
||||||
|
|
||||||
|
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
||||||
|
assertThat(bulkItemResponse.isFailed(), equalTo(true));
|
||||||
|
assertThat(bulkItemResponse.getOpType(), equalTo("delete"));
|
||||||
|
assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST));
|
||||||
|
assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class));
|
||||||
|
assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
BulkResponse bulkResponse = client().prepareBulk().add(Requests.deleteRequest(indexOrAlias()).type("type1").id("1")
|
||||||
|
.routing("0")).execute().actionGet();
|
||||||
|
assertThat(bulkResponse.getItems().length, equalTo(1));
|
||||||
|
assertThat(bulkResponse.hasFailures(), equalTo(false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRequiredRoutingMappingVariousAPIs() throws Exception {
|
||||||
client().admin().indices().prepareCreate("test").addAlias(new Alias("alias"))
|
client().admin().indices().prepareCreate("test").addAlias(new Alias("alias"))
|
||||||
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_routing").field("required", true).endObject().endObject().endObject())
|
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_routing").field("required", true).endObject().endObject().endObject())
|
||||||
.execute().actionGet();
|
.execute().actionGet();
|
||||||
|
|
Loading…
Reference in New Issue