Internal: removed needless serialization code from TransportIndexReplicationAction and corresponding request object

TransportIndexReplicationAction is always executed locally, as an internal action that is part of either delete by query or delete (when routing is required but not specified). Only the corresponding shard level requests get sent over the transport, hence no transport endpoint is needed for the index version, nor the index request itself is supposed to be sent over the transport.

Moved classes from org.elasticsearch.action.delete.index to org.elasticsearch.action.delete and adjusted visibility so that internal requests are not public anymore.

Also removed serialization code from IndexDeleteResponse as it never gets sent over transport either.

Closes #7211
This commit is contained in:
javanna 2014-08-08 19:47:08 +02:00 committed by Luca Cavanna
parent fbd337921f
commit c7a9b3da5b
15 changed files with 191 additions and 365 deletions

View File

@ -126,8 +126,8 @@ import org.elasticsearch.action.count.CountAction;
import org.elasticsearch.action.count.TransportCountAction;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.delete.index.TransportIndexDeleteAction;
import org.elasticsearch.action.delete.index.TransportShardDeleteAction;
import org.elasticsearch.action.delete.TransportIndexDeleteAction;
import org.elasticsearch.action.delete.TransportShardDeleteAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.TransportIndexDeleteByQueryAction;

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.delete;
import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
/**
* Delete request to execute on all shards that belong to a specific index.
* Used when routing is required but not specified within the delete request.
*/
class IndexDeleteRequest extends IndexReplicationOperationRequest<IndexDeleteRequest> {
private final String type;
private final String id;
private final boolean refresh;
private final long version;
IndexDeleteRequest(DeleteRequest request) {
super(request.index(), request.timeout(), request.replicationType(), request.consistencyLevel());
this.type = request.type();
this.id = request.id();
this.refresh = request.refresh();
this.version = request.version();
}
String type() {
return this.type;
}
String id() {
return this.id;
}
boolean refresh() {
return this.refresh;
}
long version() {
return this.version;
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.action.delete.index;
package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
@ -42,10 +42,6 @@ public class IndexDeleteResponse extends ActionResponse {
this.deleteResponses = deleteResponses;
}
IndexDeleteResponse() {
}
/**
* The index the delete by query operation was executed against.
*/
@ -80,26 +76,11 @@ public class IndexDeleteResponse extends ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
successfulShards = in.readVInt();
failedShards = in.readVInt();
deleteResponses = new ShardDeleteResponse[in.readVInt()];
for (int i = 0; i < deleteResponses.length; i++) {
deleteResponses[i] = new ShardDeleteResponse();
deleteResponses[i].readFrom(in);
}
throw new UnsupportedOperationException("IndexDeleteResponse is not supposed to be sent over the transport");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeVInt(successfulShards);
out.writeVInt(failedShards);
out.writeVInt(deleteResponses.length);
for (ShardDeleteResponse deleteResponse : deleteResponses) {
deleteResponse.writeTo(out);
}
throw new UnsupportedOperationException("IndexDeleteResponse is not supposed to be sent over the transport");
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.action.delete.index;
package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.action.delete.index;
package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;

View File

@ -25,10 +25,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.delete.index.IndexDeleteRequest;
import org.elasticsearch.action.delete.index.IndexDeleteResponse;
import org.elasticsearch.action.delete.index.ShardDeleteResponse;
import org.elasticsearch.action.delete.index.TransportIndexDeleteAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;

View File

@ -17,40 +17,31 @@
* under the License.
*/
package org.elasticsearch.action.delete.index;
package org.elasticsearch.action.delete;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
/**
*
* Internal transport action that broadcasts a delete request to all of the shards that belongs to an index.
* Used when routing is required but not specified within the delete request.
*/
public class TransportIndexDeleteAction extends TransportIndexReplicationOperationAction<IndexDeleteRequest, IndexDeleteResponse, ShardDeleteRequest, ShardDeleteRequest, ShardDeleteResponse> {
private static final String ACTION_NAME = DeleteAction.NAME + "[index]";
@Inject
public TransportIndexDeleteAction(Settings settings, ClusterService clusterService, TransportService transportService,
public TransportIndexDeleteAction(Settings settings, ClusterService clusterService,
ThreadPool threadPool, TransportShardDeleteAction deleteAction, ActionFilters actionFilters) {
super(settings, ACTION_NAME, transportService, clusterService, threadPool, deleteAction, actionFilters);
}
@Override
protected IndexDeleteRequest newRequestInstance() {
return new IndexDeleteRequest();
super(settings, ACTION_NAME, clusterService, threadPool, deleteAction, actionFilters);
}
@Override
@ -63,16 +54,6 @@ public class TransportIndexDeleteAction extends TransportIndexReplicationOperati
return false;
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, IndexDeleteRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, IndexDeleteRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index());
}
@Override
protected GroupShardsIterator shards(IndexDeleteRequest request) {
return clusterService.operationRouting().broadcastDeleteShards(clusterService.state(), request.index());

View File

@ -17,10 +17,9 @@
* under the License.
*/
package org.elasticsearch.action.delete.index;
package org.elasticsearch.action.delete;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;

View File

@ -1,87 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.delete.index;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import java.io.IOException;
/**
*
*/
public class IndexDeleteRequest extends IndexReplicationOperationRequest<IndexDeleteRequest> {
private String type;
private String id;
private boolean refresh = false;
private long version;
IndexDeleteRequest() {
}
public IndexDeleteRequest(DeleteRequest request) {
this.timeout = request.timeout();
this.consistencyLevel = request.consistencyLevel();
this.replicationType = request.replicationType();
this.index = request.index();
this.type = request.type();
this.id = request.id();
this.refresh = request.refresh();
this.version = request.version();
}
public String type() {
return this.type;
}
public String id() {
return this.id;
}
public boolean refresh() {
return this.refresh;
}
public long version() {
return this.version;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readString();
id = in.readString();
refresh = in.readBoolean();
version = Versions.readVersion(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(type);
out.writeString(id);
out.writeBoolean(refresh);
Versions.writeVersion(version, out);
}
}

View File

@ -19,65 +19,39 @@
package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* Delete by query request to execute on a specific index.
*/
public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<IndexDeleteByQueryRequest> {
class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<IndexDeleteByQueryRequest> {
private BytesReference source;
private String[] types = Strings.EMPTY_ARRAY;
private final BytesReference source;
private final String[] types;
@Nullable
private Set<String> routing;
private final Set<String> routing;
@Nullable
private String[] filteringAliases;
private long nowInMillis;
private final String[] filteringAliases;
private final long nowInMillis;
IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set<String> routing, @Nullable String[] filteringAliases,
long nowInMillis
) {
this.index = index;
this.timeout = request.timeout();
long nowInMillis) {
super(index, request.timeout(), request.replicationType(), request.consistencyLevel());
this.source = request.source();
this.types = request.types();
this.replicationType = request.replicationType();
this.consistencyLevel = request.consistencyLevel();
this.routing = routing;
this.filteringAliases = filteringAliases;
this.nowInMillis = nowInMillis;
}
IndexDeleteByQueryRequest() {
}
BytesReference source() {
return source;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
if (source == null) {
validationException = addValidationError("source is missing", validationException);
}
return validationException;
}
Set<String> routing() {
return this.routing;
}
@ -93,68 +67,4 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<
long nowInMillis() {
return nowInMillis;
}
public IndexDeleteByQueryRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
source = in.readBytesReference();
int typesSize = in.readVInt();
if (typesSize > 0) {
types = new String[typesSize];
for (int i = 0; i < typesSize; i++) {
types[i] = in.readString();
}
}
int routingSize = in.readVInt();
if (routingSize > 0) {
routing = new HashSet<>(routingSize);
for (int i = 0; i < routingSize; i++) {
routing.add(in.readString());
}
}
int aliasesSize = in.readVInt();
if (aliasesSize > 0) {
filteringAliases = new String[aliasesSize];
for (int i = 0; i < aliasesSize; i++) {
filteringAliases[i] = in.readString();
}
}
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
nowInMillis = in.readVLong();
} else {
nowInMillis = System.currentTimeMillis();
}
}
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(source);
out.writeVInt(types.length);
for (String type : types) {
out.writeString(type);
}
if (routing != null) {
out.writeVInt(routing.size());
for (String r : routing) {
out.writeString(r);
}
} else {
out.writeVInt(0);
}
if (filteringAliases != null) {
out.writeVInt(filteringAliases.length);
for (String alias : filteringAliases) {
out.writeString(alias);
}
} else {
out.writeVInt(0);
}
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeVLong(nowInMillis);
}
}
}

View File

@ -30,26 +30,20 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
/**
*
* Internal transport action that broadcasts a delete by query request to all of the shards that belong to an index.
*/
public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction<IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
private static final String ACTION_NAME = DeleteByQueryAction.NAME + "[index]";
@Inject
public TransportIndexDeleteByQueryAction(Settings settings, ClusterService clusterService, TransportService transportService,
public TransportIndexDeleteByQueryAction(Settings settings, ClusterService clusterService,
ThreadPool threadPool, TransportShardDeleteByQueryAction shardDeleteByQueryAction, ActionFilters actionFilters) {
super(settings, ACTION_NAME, transportService, clusterService, threadPool, shardDeleteByQueryAction, actionFilters);
}
@Override
protected IndexDeleteByQueryRequest newRequestInstance() {
return new IndexDeleteByQueryRequest();
super(settings, ACTION_NAME, clusterService, threadPool, shardDeleteByQueryAction, actionFilters);
}
@Override

View File

@ -30,19 +30,29 @@ import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
*
* Request used within {@link org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction}.
* Since the corresponding action is internal that gets always executed locally, this request never gets sent over the transport.
* The specified index is expected to be a concrete index. Relies on input validation done by the caller actions.
*/
public class IndexReplicationOperationRequest<T extends IndexReplicationOperationRequest> extends ActionRequest<T> implements IndicesRequest {
public abstract class IndexReplicationOperationRequest<T extends IndexReplicationOperationRequest> extends ActionRequest<T> implements IndicesRequest {
protected TimeValue timeout = ShardReplicationOperationRequest.DEFAULT_TIMEOUT;
private final TimeValue timeout;
private final String index;
private final ReplicationType replicationType;
private final WriteConsistencyLevel consistencyLevel;
protected String index;
protected IndexReplicationOperationRequest(String index, TimeValue timeout, ReplicationType replicationType, WriteConsistencyLevel consistencyLevel) {
this.index = index;
this.timeout = timeout;
this.replicationType = replicationType;
this.consistencyLevel = consistencyLevel;
}
protected ReplicationType replicationType = ReplicationType.DEFAULT;
protected WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
@Override
public ActionRequestValidationException validate() {
return null;
}
public TimeValue timeout() {
return timeout;
@ -52,12 +62,6 @@ public class IndexReplicationOperationRequest<T extends IndexReplicationOperatio
return this.index;
}
@SuppressWarnings("unchecked")
public T index(String index) {
this.index = index;
return (T) this;
}
@Override
public String[] indices() {
return new String[]{index};
@ -68,22 +72,6 @@ public class IndexReplicationOperationRequest<T extends IndexReplicationOperatio
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
/**
* Sets the replication type.
*/
@SuppressWarnings("unchecked")
public T replicationType(ReplicationType replicationType) {
this.replicationType = replicationType;
return (T) this;
}
/**
* Sets the replication type.
*/
public T replicationType(String replicationType) {
return replicationType(ReplicationType.fromString(replicationType));
}
public ReplicationType replicationType() {
return this.replicationType;
}
@ -92,39 +80,13 @@ public class IndexReplicationOperationRequest<T extends IndexReplicationOperatio
return this.consistencyLevel;
}
/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
*/
@SuppressWarnings("unchecked")
public T consistencyLevel(WriteConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return (T) this;
@Override
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("IndexReplicationOperationRequest is not supposed to be sent over the transport");
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (index == null) {
validationException = addValidationError("index name missing", validationException);
}
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
replicationType = ReplicationType.fromId(in.readByte());
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
index = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(replicationType.id());
out.writeByte(consistencyLevel.id());
timeout.writeTo(out);
out.writeString(index);
public final void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException("IndexReplicationOperationRequest is not supposed to be sent over the transport");
}
}

View File

@ -30,20 +30,20 @@ import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*
* Internal transport action that executes on multiple shards, doesn't register any transport handler as it is always executed locally.
* It relies on a shard sub-action that gets sent over the transport and executed on each of the shard.
* The index provided with the request is expected to be a concrete index, properly resolved by the callers (parent actions).
*/
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
extends TransportAction<Request, Response> {
@ -52,13 +52,11 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
protected final TransportShardReplicationOperationAction<ShardRequest, ShardReplicaRequest, ShardResponse> shardAction;
protected TransportIndexReplicationOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
protected TransportIndexReplicationOperationAction(Settings settings, String actionName, ClusterService clusterService,
ThreadPool threadPool, TransportShardReplicationOperationAction<ShardRequest, ShardReplicaRequest, ShardResponse> shardAction, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters);
this.clusterService = clusterService;
this.shardAction = shardAction;
transportService.registerHandler(actionName, new TransportHandler());
}
@Override
@ -68,8 +66,6 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
if (blockException != null) {
throw blockException;
}
// update to concrete index
request.index(clusterState.metaData().concreteSingleIndex(request.index(), request.indicesOptions()));
blockException = checkRequestBlock(clusterState, request);
if (blockException != null) {
throw blockException;
@ -109,7 +105,7 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
int index = indexCounter.getAndIncrement();
if (accumulateExceptions()) {
shardsResponses.set(index, new ShardActionResult(
new DefaultShardOperationFailedException(request.index, shardIt.shardId().id(), e)));
new DefaultShardOperationFailedException(request.index(), shardIt.shardId().id(), e)));
}
returnIfNeeded();
}
@ -140,8 +136,6 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
}
}
protected abstract Request newRequestInstance();
protected abstract Response newResponseInstance(Request request, List<ShardResponse> shardResponses, int failuresCount, List<ShardOperationFailedException> shardFailures);
protected abstract GroupShardsIterator shards(Request request) throws ElasticsearchException;
@ -150,9 +144,13 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
protected abstract boolean accumulateExceptions();
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request);
protected ClusterBlockException checkRequestBlock(ClusterState state, Request request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index());
}
private class ShardActionResult {
@ -175,42 +173,4 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
return shardFailure != null;
}
}
private class TransportHandler extends BaseTransportRequestHandler<Request> {
@Override
public Request newInstance() {
return newRequestInstance();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
// no need to use threaded listener, since we just send a response
request.listenerThreaded(false);
execute(request, new ActionListener<Response>() {
@Override
public void onResponse(Response result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send error response for action [" + actionName + "] and request [" + request + "]", e1);
}
}
});
}
}
}

View File

@ -275,12 +275,10 @@ final class ActionNames {
builder.put(DeleteAction.NAME, "delete");
builder.put(DeleteAction.NAME + "[r]", "delete/replica");
builder.put(DeleteAction.NAME + "[index]", "indices/index/delete");
builder.put(DeleteAction.NAME + "[s]", "indices/index/b_shard/delete");
builder.put(DeleteAction.NAME + "[s][r]", "indices/index/b_shard/delete/replica");
builder.put(DeleteByQueryAction.NAME, "deleteByQuery");
builder.put(DeleteByQueryAction.NAME + "[index]", "deleteByQuery/index");
builder.put(DeleteByQueryAction.NAME + "[s]", "deleteByQuery/shard");
builder.put(DeleteByQueryAction.NAME + "[s][r]", "deleteByQuery/shard/replica");

View File

@ -28,6 +28,9 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
@ -43,6 +46,7 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperParsingException;
@ -526,5 +530,75 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa
}
}
@Test
public void testDeleteByQuery() throws ExecutionException, InterruptedException {
createIndex("test");
ensureYellow("test");
int numDocs = iterations(10, 50);
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs + 1];
for (int i = 0; i < numDocs; i++) {
indexRequestBuilders[i] = client().prepareIndex("test", "test", Integer.toString(i)).setSource("field", "value");
}
indexRequestBuilders[numDocs] = client().prepareIndex("test", "test", Integer.toString(numDocs)).setSource("field", "other_value");
indexRandom(true, indexRequestBuilders);
SearchResponse searchResponse = client().prepareSearch("test").get();
assertThat(searchResponse.getHits().totalHits(), equalTo((long)numDocs + 1));
DeleteByQueryResponse deleteByQueryResponse = client().prepareDeleteByQuery("test").setQuery(QueryBuilders.termQuery("field", "value")).get();
assertThat(deleteByQueryResponse.getIndices().size(), equalTo(1));
for (IndexDeleteByQueryResponse indexDeleteByQueryResponse : deleteByQueryResponse) {
assertThat(indexDeleteByQueryResponse.getIndex(), equalTo("test"));
assertThat(indexDeleteByQueryResponse.getFailures().length, equalTo(0));
}
refresh();
searchResponse = client().prepareSearch("test").get();
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
}
@Test
public void testDeleteRoutingRequired() throws ExecutionException, InterruptedException, IOException {
assertAcked(prepareCreate("test").addMapping("test",
XContentFactory.jsonBuilder().startObject().startObject("test").startObject("_routing").field("required", true).endObject().endObject().endObject()));
ensureYellow("test");
int numDocs = iterations(10, 50);
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs - 2; i++) {
indexRequestBuilders[i] = client().prepareIndex("test", "test", Integer.toString(i))
.setRouting(randomAsciiOfLength(randomIntBetween(1, 10))).setSource("field", "value");
}
String firstDocId = Integer.toString(numDocs - 2);
indexRequestBuilders[numDocs - 2] = client().prepareIndex("test", "test", firstDocId)
.setRouting("routing").setSource("field", "value");
String secondDocId = Integer.toString(numDocs - 1);
String secondRouting = randomAsciiOfLength(randomIntBetween(1, 10));
indexRequestBuilders[numDocs - 1] = client().prepareIndex("test", "test", secondDocId)
.setRouting(secondRouting).setSource("field", "value");
indexRandom(true, indexRequestBuilders);
SearchResponse searchResponse = client().prepareSearch("test").get();
assertThat(searchResponse.getHits().totalHits(), equalTo((long)numDocs));
//use routing
DeleteResponse deleteResponse = client().prepareDelete("test", "test", firstDocId).setRouting("routing").get();
assertThat(deleteResponse.isFound(), equalTo(true));
GetResponse getResponse = client().prepareGet("test", "test", firstDocId).setRouting("routing").get();
assertThat(getResponse.isExists(), equalTo(false));
refresh();
searchResponse = client().prepareSearch("test").get();
assertThat(searchResponse.getHits().totalHits(), equalTo((long)numDocs - 1));
//don't use routing and trigger a broadcast delete
deleteResponse = client().prepareDelete("test", "test", secondDocId).get();
assertThat(deleteResponse.isFound(), equalTo(true));
getResponse = client().prepareGet("test", "test", secondDocId).setRouting(secondRouting).get();
assertThat(getResponse.isExists(), equalTo(false));
refresh();
searchResponse = client().prepareSearch("test").get();
assertThat(searchResponse.getHits().totalHits(), equalTo((long)numDocs - 2));
}
}