Delete api: remove broadcast delete if routing is missing when required

This commit changes the behaviour of the delete api when processing a delete request that refers to a type that has routing set to required in the mapping, and the routing is missing in the request. Up until now the delete api sent a broadcast delete request to all of the shards that belong to the index, making sure that the document could be found although the routing value wasn't specified. This was probably not the best choice: if the routing is set to required, an error should be thrown instead.

A `RoutingMissingException` gets now thrown instead, like it happens in the same situation with every other api (index, update, get etc.). Last but not least, this change allows to get rid of a couple of `TransportAction`s, `Request`s and `Response`s and simplify the codebase.

Closes #9123
Closes #10136
This commit is contained in:
javanna 2015-03-17 21:20:57 -07:00 committed by Luca Cavanna
parent 780fe57a2c
commit 4348959f9d
16 changed files with 30 additions and 703 deletions

View File

@ -161,6 +161,12 @@ shards aren't being kept track of. From version 2.0 the stats in the `_shards` h
of an index. The http status code is left unchanged and is only based on failures that occurred while executing on of an index. The http status code is left unchanged and is only based on failures that occurred while executing on
primary shards. primary shards.
=== Delete api with missing routing when required
Delete api requires a routing value when deleting a document belonging to a type that has routing set to required in its
mapping, whereas previous elasticsearch versions would trigger a broadcast delete on all shards belonging to the index.
A `RoutingMissingException` is now thrown instead.
=== Mappings === Mappings
* The setting `index.mapping.allow_type_wrapper` has been removed. Documents should always be sent without the type as the root element. * The setting `index.mapping.allow_type_wrapper` has been removed. Documents should always be sent without the type as the root element.

View File

@ -129,8 +129,6 @@ import org.elasticsearch.action.count.CountAction;
import org.elasticsearch.action.count.TransportCountAction; import org.elasticsearch.action.count.TransportCountAction;
import org.elasticsearch.action.delete.DeleteAction; import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.TransportDeleteAction; import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.delete.TransportIndexDeleteAction;
import org.elasticsearch.action.delete.TransportShardDeleteAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryAction; import org.elasticsearch.action.deletebyquery.DeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction; import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.TransportIndexDeleteByQueryAction; import org.elasticsearch.action.deletebyquery.TransportIndexDeleteByQueryAction;
@ -282,8 +280,7 @@ public class ActionModule extends AbstractModule {
TransportDfsOnlyAction.class); TransportDfsOnlyAction.class);
registerAction(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class, registerAction(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class,
TransportShardMultiTermsVectorAction.class); TransportShardMultiTermsVectorAction.class);
registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class, registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class);
TransportIndexDeleteAction.class, TransportShardDeleteAction.class);
registerAction(CountAction.INSTANCE, TransportCountAction.class); registerAction(CountAction.INSTANCE, TransportCountAction.class);
registerAction(ExistsAction.INSTANCE, TransportExistsAction.class); registerAction(ExistsAction.INSTANCE, TransportExistsAction.class);
registerAction(SuggestAction.INSTANCE, TransportSuggestAction.class); registerAction(SuggestAction.INSTANCE, TransportSuggestAction.class);

View File

@ -1,65 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.delete;
import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
/**
* Delete request to execute on all shards that belong to a specific index.
* Used when routing is required but not specified within the delete request.
*/
class IndexDeleteRequest extends IndexReplicationOperationRequest<IndexDeleteRequest> {
private final String type;
private final String id;
private final boolean refresh;
private final long version;
private final String originalIndex;
IndexDeleteRequest(DeleteRequest request, String concreteIndex) {
super(concreteIndex, request.timeout(), request.consistencyLevel(),
request.indices(), request.indicesOptions(), request);
this.type = request.type();
this.id = request.id();
this.refresh = request.refresh();
this.version = request.version();
this.originalIndex = request.index();
}
String type() {
return this.type;
}
String id() {
return this.id;
}
boolean refresh() {
return this.refresh;
}
long version() {
return this.version;
}
String originalIndex() {
return originalIndex;
}
}

View File

@ -1,61 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* Delete by query response executed on a specific index.
*/
public class IndexDeleteResponse extends ActionWriteResponse {
private String index;
private ShardDeleteResponse[] deleteResponses;
IndexDeleteResponse(String index, ShardDeleteResponse[] deleteResponses) {
this.index = index;
this.deleteResponses = deleteResponses;
}
/**
* The index the delete by query operation was executed against.
*/
public String getIndex() {
return this.index;
}
public ShardDeleteResponse[] getResponses() {
return this.deleteResponses;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("IndexDeleteResponse is not supposed to be sent over the transport");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException("IndexDeleteResponse is not supposed to be sent over the transport");
}
}

View File

@ -1,123 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.delete;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* Delete by query request to execute on a specific shard.
*/
public class ShardDeleteRequest extends ShardReplicationOperationRequest<ShardDeleteRequest> {
private int shardId;
private String type;
private String id;
private boolean refresh = false;
private long version;
private String originalIndex;
ShardDeleteRequest(IndexDeleteRequest request, int shardId) {
super(request);
this.index = request.index();
this.shardId = shardId;
this.type = request.type();
this.id = request.id();
consistencyLevel(request.consistencyLevel());
timeout = request.timeout();
this.refresh = request.refresh();
this.version = request.version();
this.originalIndex = request.originalIndex();
}
ShardDeleteRequest() {
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
if (type == null) {
addValidationError("type is missing", validationException);
}
if (id == null) {
addValidationError("id is missing", validationException);
}
return validationException;
}
public int shardId() {
return this.shardId;
}
public String type() {
return this.type;
}
public String id() {
return this.id;
}
public boolean refresh() {
return this.refresh;
}
public void version(long version) {
this.version = version;
}
public long version() {
return this.version;
}
@Override
public String[] indices() {
return new String[]{originalIndex};
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = in.readVInt();
type = in.readString();
id = in.readString();
refresh = in.readBoolean();
version = in.readLong();
originalIndex = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(shardId);
out.writeString(type);
out.writeString(id);
out.writeBoolean(refresh);
out.writeLong(version);
out.writeOptionalString(originalIndex);
}
}

View File

@ -1,65 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* Delete response executed on a specific shard.
*/
public class ShardDeleteResponse extends ActionWriteResponse {
private long version;
private boolean found;
public ShardDeleteResponse() {
}
public ShardDeleteResponse(long version, boolean found) {
this.version = version;
this.found = found;
}
public long getVersion() {
return version;
}
public boolean isFound() {
return found;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
version = in.readLong();
found = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(version);
out.writeBoolean(found);
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.delete;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
@ -35,7 +36,6 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
@ -54,15 +54,12 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
private final TransportCreateIndexAction createIndexAction; private final TransportCreateIndexAction createIndexAction;
private final TransportIndexDeleteAction indexDeleteAction;
@Inject @Inject
public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService, public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
TransportCreateIndexAction createIndexAction, TransportIndexDeleteAction indexDeleteAction, ActionFilters actionFilters) { TransportCreateIndexAction createIndexAction, ActionFilters actionFilters) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters); super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters);
this.createIndexAction = createIndexAction; this.createIndexAction = createIndexAction;
this.indexDeleteAction = indexDeleteAction;
this.autoCreateIndex = new AutoCreateIndex(settings); this.autoCreateIndex = new AutoCreateIndex(settings);
} }
@ -102,7 +99,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
} }
@Override @Override
protected boolean resolveRequest(final ClusterState state, final InternalRequest request, final ActionListener<DeleteResponse> listener) { protected void resolveRequest(final ClusterState state, final InternalRequest request, final ActionListener<DeleteResponse> listener) {
request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index())); request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index()));
if (state.metaData().hasIndex(request.concreteIndex())) { if (state.metaData().hasIndex(request.concreteIndex())) {
// check if routing is required, if so, do a broadcast delete // check if routing is required, if so, do a broadcast delete
@ -114,34 +111,10 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
throw new ElasticsearchIllegalArgumentException("routing value is required for deleting documents of type [" + request.request().type() throw new ElasticsearchIllegalArgumentException("routing value is required for deleting documents of type [" + request.request().type()
+ "] while using version_type [" + request.request().versionType() + "]"); + "] while using version_type [" + request.request().versionType() + "]");
} }
indexDeleteAction.execute(new IndexDeleteRequest(request.request(), request.concreteIndex()), new ActionListener<IndexDeleteResponse>() { throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());
@Override
public void onResponse(IndexDeleteResponse indexDeleteResponse) {
// go over the response, see if we have found one, and the version if found
long version = Versions.MATCH_ANY;
boolean found = false;
for (ShardDeleteResponse deleteResponse : indexDeleteResponse.getResponses()) {
if (deleteResponse.isFound()) {
version = deleteResponse.getVersion();
found = true;
break;
}
}
DeleteResponse response = new DeleteResponse(request.concreteIndex(), request.request().type(), request.request().id(), version, found);
response.setShardInfo(indexDeleteResponse.getShardInfo());
listener.onResponse(response);
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
return false;
} }
} }
} }
return true;
} }
private void innerExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) { private void innerExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {

View File

@ -1,63 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
/**
* Internal transport action that broadcasts a delete request to all of the shards that belongs to an index.
* Used when routing is required but not specified within the delete request.
*/
public class TransportIndexDeleteAction extends TransportIndexReplicationOperationAction<IndexDeleteRequest, IndexDeleteResponse, ShardDeleteRequest, ShardDeleteResponse> {
private static final String ACTION_NAME = DeleteAction.NAME + "[index]";
@Inject
public TransportIndexDeleteAction(Settings settings, ClusterService clusterService,
ThreadPool threadPool, TransportShardDeleteAction deleteAction, ActionFilters actionFilters) {
super(settings, ACTION_NAME, clusterService, threadPool, deleteAction, actionFilters);
}
@Override
protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, List<ShardDeleteResponse> shardDeleteResponses, ActionWriteResponse.ShardInfo shardInfo) {
IndexDeleteResponse indexDeleteResponse = new IndexDeleteResponse(request.index(), shardDeleteResponses.toArray(new ShardDeleteResponse[shardDeleteResponses.size()]));
indexDeleteResponse.setShardInfo(shardInfo);
return indexDeleteResponse;
}
@Override
protected GroupShardsIterator shards(IndexDeleteRequest request) {
return clusterService.operationRouting().broadcastDeleteShards(clusterService.state(), request.index());
}
@Override
protected ShardDeleteRequest newShardRequestInstance(IndexDeleteRequest request, int shardId) {
return new ShardDeleteRequest(request, shardId);
}
}

View File

@ -1,138 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.delete;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
*
*/
public class TransportShardDeleteAction extends TransportShardReplicationOperationAction<ShardDeleteRequest, ShardDeleteRequest, ShardDeleteResponse> {
private static final String ACTION_NAME = DeleteAction.NAME + "[s]";
@Inject
public TransportShardDeleteAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters);
}
@Override
protected boolean checkWriteConsistency() {
return true;
}
@Override
protected ShardDeleteRequest newRequestInstance() {
return new ShardDeleteRequest();
}
@Override
protected ShardDeleteRequest newReplicaRequestInstance() {
return newRequestInstance();
}
@Override
protected ShardDeleteResponse newResponseInstance() {
return new ShardDeleteResponse();
}
@Override
protected String executor() {
return ThreadPool.Names.INDEX;
}
@Override
protected boolean resolveIndex() {
return false;
}
@Override
protected Tuple<ShardDeleteResponse, ShardDeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the version to happen on the replicas
request.version(delete.version());
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_delete");
} catch (Exception e) {
// ignore
}
}
return new Tuple<>(new ShardDeleteResponse(delete.version(), delete.found()), shardRequest.request);
}
@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), VersionType.INTERNAL, Engine.Operation.Origin.REPLICA);
// IndexDeleteAction doesn't support version type at the moment. Hard coded for the INTERNAL version
delete = new Engine.Delete(delete, VersionType.INTERNAL.versionTypeForReplicationAndRecovery());
assert delete.versionType().validateVersionForWrites(delete.version());
indexShard.delete(delete);
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_delete");
} catch (Exception e) {
// ignore
}
}
}
@Override
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
GroupShardsIterator group = clusterService.operationRouting().broadcastDeleteShards(clusterService.state(), request.concreteIndex());
for (ShardIterator shardIt : group) {
if (shardIt.shardId().id() == request.request().shardId()) {
return shardIt;
}
}
throw new ElasticsearchIllegalStateException("No shards iterator found for shard [" + request.request().shardId() + "]");
}
}

View File

@ -122,7 +122,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
} }
@Override @Override
protected boolean resolveRequest(ClusterState state, InternalRequest request, ActionListener<IndexResponse> indexResponseActionListener) { protected void resolveRequest(ClusterState state, InternalRequest request, ActionListener<IndexResponse> indexResponseActionListener) {
MetaData metaData = clusterService.state().metaData(); MetaData metaData = clusterService.state().metaData();
MappingMetaData mappingMd = null; MappingMetaData mappingMd = null;
@ -130,7 +130,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
mappingMd = metaData.index(request.concreteIndex()).mappingOrDefault(request.request().type()); mappingMd = metaData.index(request.concreteIndex()).mappingOrDefault(request.request().type());
} }
request.request().process(metaData, mappingMd, allowIdGeneration, request.concreteIndex()); request.request().process(metaData, mappingMd, allowIdGeneration, request.concreteIndex());
return true;
} }
private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) { private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {

View File

@ -134,11 +134,10 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected abstract boolean resolveIndex(); protected abstract boolean resolveIndex();
/** /**
* Resolves the request, by default doing nothing. If the resolve * Resolves the request, by default doing nothing. Can be subclassed to do
* means a different execution, then return false here to indicate not to continue and execute this request. * additional processing or validation depending on the incoming request
*/ */
protected boolean resolveRequest(ClusterState state, InternalRequest request, ActionListener<Response> listener) { protected void resolveRequest(ClusterState state, InternalRequest request, ActionListener<Response> listener) {
return true;
} }
protected TransportRequestOptions transportOptions() { protected TransportRequestOptions transportOptions() {
@ -340,10 +339,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} else { } else {
internalRequest.concreteIndex(internalRequest.request().index()); internalRequest.concreteIndex(internalRequest.request().index());
} }
// check if we need to execute, and if not, return
if (!resolveRequest(observer.observedState(), internalRequest, listener)) { resolveRequest(observer.observedState(), internalRequest, listener);
return;
}
blockException = checkRequestBlock(observer.observedState(), internalRequest); blockException = checkRequestBlock(observer.observedState(), internalRequest);
if (blockException != null) { if (blockException != null) {
if (blockException.retryable()) { if (blockException.retryable()) {

View File

@ -64,7 +64,6 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest; import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
@ -536,7 +535,6 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa
assertNoFailures(searchResponse); assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo((long) numDocs)); assertThat(searchResponse.getHits().totalHits(), equalTo((long) numDocs));
//use routing
DeleteResponse deleteResponse = client().prepareDelete("test", "test", firstDocId).setRouting("routing").get(); DeleteResponse deleteResponse = client().prepareDelete("test", "test", firstDocId).setRouting("routing").get();
assertThat(deleteResponse.isFound(), equalTo(true)); assertThat(deleteResponse.isFound(), equalTo(true));
GetResponse getResponse = client().prepareGet("test", "test", firstDocId).setRouting("routing").get(); GetResponse getResponse = client().prepareGet("test", "test", firstDocId).setRouting("routing").get();
@ -545,17 +543,6 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa
searchResponse = client().prepareSearch("test").get(); searchResponse = client().prepareSearch("test").get();
assertNoFailures(searchResponse); assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo((long) numDocs - 1)); assertThat(searchResponse.getHits().totalHits(), equalTo((long) numDocs - 1));
//don't use routing and trigger a broadcast delete
deleteResponse = client().prepareDelete("test", "test", secondDocId).get();
assertThat(deleteResponse.isFound(), equalTo(true));
getResponse = client().prepareGet("test", "test", secondDocId).setRouting(secondRouting).get();
assertThat(getResponse.isExists(), equalTo(false));
refresh();
searchResponse = client().prepareSearch("test").setSize(numDocs).get();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo((long) numDocs - 2));
} }
@Test @Test

View File

@ -30,25 +30,18 @@ import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.client.Requests.*; import static org.elasticsearch.client.Requests.*;
import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
/** /**
* *
@ -271,54 +264,6 @@ public class DocumentActionsTests extends ElasticsearchIntegrationTest {
} }
} }
@Test
public void testDeleteRoutingRequired() throws ExecutionException, InterruptedException, IOException {
assertAcked(prepareCreate("test").addMapping("test",
XContentFactory.jsonBuilder().startObject().startObject("test").startObject("_routing").field("required", true).endObject().endObject().endObject()));
ensureGreen();
int numDocs = iterations(10, 50);
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs - 2; i++) {
indexRequestBuilders[i] = client().prepareIndex("test", "test", Integer.toString(i))
.setRouting(randomAsciiOfLength(randomIntBetween(1, 10))).setSource("field", "value");
}
String firstDocId = Integer.toString(numDocs - 2);
indexRequestBuilders[numDocs - 2] = client().prepareIndex("test", "test", firstDocId)
.setRouting("routing").setSource("field", "value");
String secondDocId = Integer.toString(numDocs - 1);
String secondRouting = randomAsciiOfLength(randomIntBetween(1, 10));
indexRequestBuilders[numDocs - 1] = client().prepareIndex("test", "test", secondDocId)
.setRouting(secondRouting).setSource("field", "value");
indexRandom(true, indexRequestBuilders);
SearchResponse searchResponse = client().prepareSearch("test").get();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo((long) numDocs));
//use routing
DeleteResponse deleteResponse = client().prepareDelete("test", "test", firstDocId).setRouting("routing").get();
assertThat(deleteResponse.isFound(), equalTo(true));
GetResponse getResponse = client().prepareGet("test", "test", firstDocId).setRouting("routing").get();
assertThat(getResponse.isExists(), equalTo(false));
refresh();
searchResponse = client().prepareSearch("test").get();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo((long) numDocs - 1));
//don't use routing and trigger a broadcast delete
deleteResponse = client().prepareDelete("test", "test", secondDocId).get();
assertThat(deleteResponse.isFound(), equalTo(true));
getResponse = client().prepareGet("test", "test", secondDocId).setRouting(secondRouting).get();
assertThat(getResponse.isExists(), equalTo(false));
refresh();
searchResponse = client().prepareSearch("test").setSize(numDocs).get();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo((long) numDocs - 2));
}
private XContentBuilder source(String id, String nameValue) throws IOException { private XContentBuilder source(String id, String nameValue) throws IOException {
return XContentFactory.jsonBuilder().startObject().field("id", id).field("name", nameValue).endObject(); return XContentFactory.jsonBuilder().startObject().field("id", id).field("name", nameValue).endObject();
} }

View File

@ -100,14 +100,6 @@ public class ShardInfoTests extends ElasticsearchIntegrationTest {
} }
} }
@Test
public void testDeleteWithRoutingRequiredButNotSpecified() throws Exception {
int numPrimaryShards = randomIntBetween(1, 2);
prepareIndex(numPrimaryShards, true);
DeleteResponse deleteResponse = client().prepareDelete("idx", "type", "1").get();
assertShardInfo(deleteResponse, numCopies * numPrimaryShards, numNodes * numPrimaryShards);
}
@Test @Test
public void testDeleteByQuery() throws Exception { public void testDeleteByQuery() throws Exception {
int numPrimaryShards = randomIntBetween(1, 2); int numPrimaryShards = randomIntBetween(1, 2);

View File

@ -19,11 +19,8 @@
package org.elasticsearch.routing; package org.elasticsearch.routing;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
@ -34,7 +31,6 @@ import static org.elasticsearch.cluster.metadata.AliasAction.newAddAliasAction;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
/** /**
* *
@ -329,60 +325,6 @@ public class AliasRoutingTests extends ElasticsearchIntegrationTest {
assertThat(searchResponse.getHits().getHits().length, equalTo(1)); assertThat(searchResponse.getHits().getHits().length, equalTo(1));
} }
@Test
public void testRequiredRoutingMappingWithAlias() throws Exception {
prepareCreate("test").addMapping(
"type1",
XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_routing").field("required", true)
.endObject().endObject().endObject()).get();
ensureGreen();
logger.info("--> indexing with id [1], and routing [0]");
client().prepareIndex("test", "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
logger.info("--> indexing with id [1], with no routing, should fail");
try {
client().prepareIndex("test", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet();
fail();
} catch (ElasticsearchException e) {
assertThat(e.unwrapCause(), instanceOf(RoutingMissingException.class));
}
logger.info("--> verifying get with routing, should find");
for (int i = 0; i < 5; i++) {
assertThat(client().prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().isExists(), equalTo(true));
}
logger.info("--> deleting with no routing, should broadcast the delete since _routing is required");
client().prepareDelete("test", "type1", "1").setRefresh(true).execute().actionGet();
for (int i = 0; i < 5; i++) {
try {
client().prepareGet("test", "type1", "1").get();
fail();
} catch (RoutingMissingException e) {
assertThat(e.getMessage(), equalTo("routing is required for [test]/[type1]/[1]"));
}
assertThat(client().prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().isExists(), equalTo(false));
}
logger.info("--> indexing with id [1], and routing [0]");
client().prepareIndex("test", "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet();
logger.info("--> verifying get with no routing, should not find anything");
logger.info("--> bulk deleting with no routing, should broadcast the delete since _routing is required");
client().prepareBulk().add(Requests.deleteRequest("test").type("type1").id("1")).execute().actionGet();
refresh();
for (int i = 0; i < 5; i++) {
try {
assertThat(client().prepareGet("test", "type1", "1").execute().actionGet().isExists(), equalTo(false));
fail();
} catch (RoutingMissingException e) {
assertThat(e.getMessage(), equalTo("routing is required for [test]/[type1]/[1]"));
}
assertThat(client().prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().isExists(), equalTo(false));
}
}
@Test @Test
public void testIndexingAliasesOverTime() throws Exception { public void testIndexingAliasesOverTime() throws Exception {
createIndex("test"); createIndex("test");

View File

@ -32,14 +32,11 @@ import org.elasticsearch.action.termvectors.TermVectorsResponse;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
@ -187,7 +184,7 @@ public class SimpleRoutingTests extends ElasticsearchIntegrationTest {
logger.info("--> indexing with id [1], with no routing, should fail"); logger.info("--> indexing with id [1], with no routing, should fail");
try { try {
client().prepareIndex(indexOrAlias(), "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet(); client().prepareIndex(indexOrAlias(), "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet();
fail(); fail("index with missing routing when routing is required should fail");
} catch (ElasticsearchException e) { } catch (ElasticsearchException e) {
assertThat(e.unwrapCause(), instanceOf(RoutingMissingException.class)); assertThat(e.unwrapCause(), instanceOf(RoutingMissingException.class));
} }
@ -197,17 +194,23 @@ public class SimpleRoutingTests extends ElasticsearchIntegrationTest {
assertThat(client().prepareGet(indexOrAlias(), "type1", "1").setRouting("0").execute().actionGet().isExists(), equalTo(true)); assertThat(client().prepareGet(indexOrAlias(), "type1", "1").setRouting("0").execute().actionGet().isExists(), equalTo(true));
} }
logger.info("--> deleting with no routing, should broadcast the delete since _routing is required"); logger.info("--> deleting with no routing, should fail");
client().prepareDelete(indexOrAlias(), "type1", "1").setRefresh(true).execute().actionGet(); try {
client().prepareDelete(indexOrAlias(), "type1", "1").setRefresh(true).execute().actionGet();
fail("delete with missing routing when routing is required should fail");
} catch (ElasticsearchException e) {
assertThat(e.unwrapCause(), instanceOf(RoutingMissingException.class));
}
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
try { try {
client().prepareGet(indexOrAlias(), "type1", "1").execute().actionGet().isExists(); client().prepareGet(indexOrAlias(), "type1", "1").execute().actionGet().isExists();
fail(); fail("get with missing routing when routing is required should fail");
} catch (RoutingMissingException e) { } catch (RoutingMissingException e) {
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("routing is required for [test]/[type1]/[1]")); assertThat(e.getMessage(), equalTo("routing is required for [test]/[type1]/[1]"));
} }
assertThat(client().prepareGet(indexOrAlias(), "type1", "1").setRouting("0").execute().actionGet().isExists(), equalTo(false)); assertThat(client().prepareGet(indexOrAlias(), "type1", "1").setRouting("0").execute().actionGet().isExists(), equalTo(true));
} }
logger.info("--> indexing with id [1], and routing [0]"); logger.info("--> indexing with id [1], and routing [0]");