diff --git a/src/main/java/org/elasticsearch/action/support/replication/IndexReplicationOperationRequest.java b/src/main/java/org/elasticsearch/action/support/replication/IndexReplicationOperationRequest.java deleted file mode 100644 index 008008b1e6c..00000000000 --- a/src/main/java/org/elasticsearch/action/support/replication/IndexReplicationOperationRequest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.replication; - -import org.elasticsearch.action.*; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.unit.TimeValue; - -import java.io.IOException; - -/** - * Request used within {@link org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction}. - * Since the corresponding action is internal that gets always executed locally, this request never gets sent over the transport. - * The specified index is expected to be a concrete index. Relies on input validation done by the caller actions. - */ -public abstract class IndexReplicationOperationRequest extends ActionRequest implements IndicesRequest { - - private final TimeValue timeout; - private final String index; - private final WriteConsistencyLevel consistencyLevel; - private final OriginalIndices originalIndices; - - protected IndexReplicationOperationRequest(String index, TimeValue timeout, WriteConsistencyLevel consistencyLevel, - String[] originalIndices, IndicesOptions originalIndicesOptions, ActionRequest request) { - super(request); - this.index = index; - this.timeout = timeout; - this.consistencyLevel = consistencyLevel; - this.originalIndices = new OriginalIndices(originalIndices, originalIndicesOptions); - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - public TimeValue timeout() { - return timeout; - } - - public String index() { - return this.index; - } - - @Override - public String[] indices() { - return originalIndices.indices(); - } - - @Override - public IndicesOptions indicesOptions() { - return originalIndices.indicesOptions(); - } - - public WriteConsistencyLevel consistencyLevel() { - return this.consistencyLevel; - } - - @Override - public final void readFrom(StreamInput in) throws IOException { - throw new UnsupportedOperationException("IndexReplicationOperationRequest is not supposed to be sent over the transport"); - } - - @Override - public final void writeTo(StreamOutput out) throws IOException { - throw new UnsupportedOperationException("IndexReplicationOperationRequest is not supposed to be sent over the transport"); - } -} diff --git a/src/main/java/org/elasticsearch/action/support/replication/IndicesReplicationOperationRequest.java b/src/main/java/org/elasticsearch/action/support/replication/IndicesReplicationOperationRequest.java deleted file mode 100644 index 5113628fa6f..00000000000 --- a/src/main/java/org/elasticsearch/action/support/replication/IndicesReplicationOperationRequest.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.replication; - -import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.WriteConsistencyLevel; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.unit.TimeValue; - -import java.io.IOException; - -/** - * - */ -public abstract class IndicesReplicationOperationRequest extends ActionRequest implements IndicesRequest.Replaceable { - - protected TimeValue timeout = ShardReplicationOperationRequest.DEFAULT_TIMEOUT; - protected String[] indices; - private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, false); - - protected WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; - - public TimeValue timeout() { - return timeout; - } - - protected IndicesReplicationOperationRequest() { - } - - protected IndicesReplicationOperationRequest(ActionRequest actionRequest) { - super(actionRequest); - } - - /** - * A timeout to wait if the delete by query operation can't be performed immediately. Defaults to 1m. - */ - @SuppressWarnings("unchecked") - public final T timeout(TimeValue timeout) { - this.timeout = timeout; - return (T) this; - } - - /** - * A timeout to wait if the delete by query operation can't be performed immediately. Defaults to 1m. - */ - @SuppressWarnings("unchecked") - public T timeout(String timeout) { - this.timeout = TimeValue.parseTimeValue(timeout, null); - return (T) this; - } - - @Override - public String[] indices() { - return this.indices; - } - - @Override - public IndicesOptions indicesOptions() { - return indicesOptions; - } - - @SuppressWarnings("unchecked") - public T indicesOptions(IndicesOptions indicesOptions) { - if (indicesOptions == null) { - throw new IllegalArgumentException("IndicesOptions must not be null"); - } - this.indicesOptions = indicesOptions; - return (T) this; - } - - /** - * The indices the request will execute against. - */ - @SuppressWarnings("unchecked") - @Override - public final T indices(String[] indices) { - this.indices = indices; - return (T) this; - } - - public WriteConsistencyLevel consistencyLevel() { - return this.consistencyLevel; - } - - /** - * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT} - */ - @SuppressWarnings("unchecked") - public final T consistencyLevel(WriteConsistencyLevel consistencyLevel) { - if (consistencyLevel == null) { - throw new IllegalArgumentException("WriteConsistencyLevel must not be null"); - } - this.consistencyLevel = consistencyLevel; - return (T) this; - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - consistencyLevel = WriteConsistencyLevel.fromId(in.readByte()); - timeout = TimeValue.readTimeValue(in); - indices = in.readStringArray(); - indicesOptions = IndicesOptions.readIndicesOptions(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeByte(consistencyLevel.id()); - timeout.writeTo(out); - out.writeStringArrayNullable(indices); - indicesOptions.writeIndicesOptions(out); - } -} diff --git a/src/main/java/org/elasticsearch/action/support/replication/IndicesReplicationOperationRequestBuilder.java b/src/main/java/org/elasticsearch/action/support/replication/IndicesReplicationOperationRequestBuilder.java deleted file mode 100644 index 75598a6d295..00000000000 --- a/src/main/java/org/elasticsearch/action/support/replication/IndicesReplicationOperationRequestBuilder.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.replication; - -import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.WriteConsistencyLevel; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.unit.TimeValue; - -/** - */ -public abstract class IndicesReplicationOperationRequestBuilder, Response extends ActionResponse, RequestBuilder extends IndicesReplicationOperationRequestBuilder> - extends ActionRequestBuilder { - - protected IndicesReplicationOperationRequestBuilder(Client client, Request request) { - super(client, request); - } - - /** - * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. - */ - @SuppressWarnings("unchecked") - public final RequestBuilder setTimeout(TimeValue timeout) { - request.timeout(timeout); - return (RequestBuilder) this; - } - - /** - * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. - */ - @SuppressWarnings("unchecked") - public final RequestBuilder setTimeout(String timeout) { - request.timeout(timeout); - return (RequestBuilder) this; - } - - @SuppressWarnings("unchecked") - public final RequestBuilder setIndices(String... indices) { - request.indices(indices); - return (RequestBuilder) this; - } - - /** - * Specifies what type of requested indices to ignore and how to deal with wildcard indices expressions. - * For example indices that don't exist. - */ - @SuppressWarnings("unchecked") - public RequestBuilder setIndicesOptions(IndicesOptions indicesOptions) { - request().indicesOptions(indicesOptions); - return (RequestBuilder) this; - } - - /** - * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT} - */ - @SuppressWarnings("unchecked") - public RequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) { - request.consistencyLevel(consistencyLevel); - return (RequestBuilder) this; - } -} diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java deleted file mode 100644 index 5d0cba209d0..00000000000 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.replication; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.ActionWriteResponse; -import org.elasticsearch.action.ActionWriteResponse.ShardInfo.Failure; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.TransportAction; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceArray; - -/** - * Internal transport action that executes on multiple shards, doesn't register any transport handler as it is always executed locally. - * It relies on a shard sub-action that gets sent over the transport and executed on each of the shard. - * The index provided with the request is expected to be a concrete index, properly resolved by the callers (parent actions). - */ -public abstract class TransportIndexReplicationOperationAction - extends TransportAction { - - protected final ClusterService clusterService; - - protected final TransportShardReplicationOperationAction shardAction; - - protected TransportIndexReplicationOperationAction(Settings settings, String actionName, ClusterService clusterService, - ThreadPool threadPool, TransportShardReplicationOperationAction shardAction, ActionFilters actionFilters) { - super(settings, actionName, threadPool, actionFilters); - this.clusterService = clusterService; - this.shardAction = shardAction; - } - - @Override - protected void doExecute(final Request request, final ActionListener listener) { - ClusterState clusterState = clusterService.state(); - ClusterBlockException blockException = checkGlobalBlock(clusterState, request); - if (blockException != null) { - throw blockException; - } - blockException = checkRequestBlock(clusterState, request); - if (blockException != null) { - throw blockException; - } - - final GroupShardsIterator groups; - try { - groups = shards(request); - } catch (Throwable e) { - listener.onFailure(e); - return; - } - final AtomicInteger indexCounter = new AtomicInteger(); - final AtomicInteger failureCounter = new AtomicInteger(); - final AtomicInteger completionCounter = new AtomicInteger(groups.size()); - final AtomicReferenceArray shardsResponses = new AtomicReferenceArray<>(groups.size()); - - for (final ShardIterator shardIt : groups) { - final ShardRequest shardRequest = newShardRequestInstance(request, shardIt.shardId().id()); - shardRequest.operationThreaded(true); - // no need for threaded listener, we will fork when its done based on the index request - shardRequest.listenerThreaded(false); - shardAction.execute(shardRequest, new ActionListener() { - @Override - public void onResponse(ShardResponse result) { - shardsResponses.set(indexCounter.getAndIncrement(), new ShardActionResult(result)); - returnIfNeeded(); - } - - @Override - public void onFailure(Throwable e) { - failureCounter.getAndIncrement(); - int index = indexCounter.getAndIncrement(); - // this is a failure for an entire shard group, constructs shard info accordingly - final RestStatus status = ExceptionsHelper.status(e); - Failure failure = new Failure(request.index(), shardIt.shardId().id(), null, e, status, true); - shardsResponses.set(index, new ShardActionResult(new ActionWriteResponse.ShardInfo(shardIt.size(), 0, failure))); - returnIfNeeded(); - } - - private void returnIfNeeded() { - if (completionCounter.decrementAndGet() == 0) { - List responses = new ArrayList<>(); - List failureList = new ArrayList<>(); - - int total = 0; - int successful = 0; - for (int i = 0; i < shardsResponses.length(); i++) { - ShardActionResult shardActionResult = shardsResponses.get(i); - final ActionWriteResponse.ShardInfo sf; - if (shardActionResult.isFailure()) { - assert shardActionResult.shardInfoOnFailure != null; - sf = shardActionResult.shardInfoOnFailure; - } else { - responses.add(shardActionResult.shardResponse); - sf = shardActionResult.shardResponse.getShardInfo(); - } - total += sf.getTotal(); - successful += sf.getSuccessful(); - failureList.addAll(Arrays.asList(sf.getFailures())); - } - assert failureList.size() == 0 || numShardGroupFailures(failureList) == failureCounter.get(); - - final Failure[] failures; - if (failureList.isEmpty()) { - failures = ActionWriteResponse.EMPTY; - } else { - failures = failureList.toArray(new Failure[failureList.size()]); - } - listener.onResponse(newResponseInstance(request, responses, new ActionWriteResponse.ShardInfo(total, successful, failures))); - } - } - - private int numShardGroupFailures(List failures) { - int numShardGroupFailures = 0; - for (Failure failure : failures) { - if (failure.primary()) { - numShardGroupFailures++; - } - } - return numShardGroupFailures; - } - }); - - } - } - - protected abstract Response newResponseInstance(Request request, List shardResponses, ActionWriteResponse.ShardInfo shardInfo); - - protected abstract GroupShardsIterator shards(Request request) throws ElasticsearchException; - - protected abstract ShardRequest newShardRequestInstance(Request request, int shardId); - - protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) { - return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); - } - - protected ClusterBlockException checkRequestBlock(ClusterState state, Request request) { - return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index()); - } - - private class ShardActionResult { - - private final ShardResponse shardResponse; - private final ActionWriteResponse.ShardInfo shardInfoOnFailure; - - private ShardActionResult(ShardResponse shardResponse) { - assert shardResponse != null; - this.shardResponse = shardResponse; - this.shardInfoOnFailure = null; - } - - private ShardActionResult(ActionWriteResponse.ShardInfo shardInfoOnFailure) { - assert shardInfoOnFailure != null; - this.shardInfoOnFailure = shardInfoOnFailure; - this.shardResponse = null; - } - - boolean isFailure() { - return shardInfoOnFailure != null; - } - } -} diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java deleted file mode 100644 index 0c76ebf8f58..00000000000 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support.replication; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.ActionWriteResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.action.support.TransportAction; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceArray; - -/** - */ -public abstract class TransportIndicesReplicationOperationAction - extends HandledTransportAction { - - protected final ClusterService clusterService; - - protected final TransportIndexReplicationOperationAction indexAction; - - protected TransportIndicesReplicationOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - TransportIndexReplicationOperationAction indexAction, ActionFilters actionFilters, - Class request) { - super(settings, actionName, threadPool, transportService, actionFilters, request); - this.clusterService = clusterService; - this.indexAction = indexAction; - } - - - protected abstract Map> resolveRouting(ClusterState clusterState, Request request) throws ElasticsearchException; - - @Override - protected void doExecute(final Request request, final ActionListener listener) { - ClusterState clusterState = clusterService.state(); - ClusterBlockException blockException = checkGlobalBlock(clusterState, request); - if (blockException != null) { - throw blockException; - } - // get actual indices - String[] concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices()); - blockException = checkRequestBlock(clusterState, request, concreteIndices); - if (blockException != null) { - throw blockException; - } - - final AtomicInteger indexCounter = new AtomicInteger(); - final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length); - final AtomicReferenceArray indexResponses = new AtomicReferenceArray<>(concreteIndices.length); - final long startTimeInMillis = System.currentTimeMillis(); - - Map> routingMap = resolveRouting(clusterState, request); - if (concreteIndices.length == 0) { - listener.onResponse(newResponseInstance(request, indexResponses)); - } else { - for (final String index : concreteIndices) { - Set routing = null; - if (routingMap != null) { - routing = routingMap.get(index); - } - IndexRequest indexRequest = newIndexRequestInstance(request, index, routing, startTimeInMillis); - // no threading needed, all is done on the index replication one - indexRequest.listenerThreaded(false); - indexAction.execute(indexRequest, new ActionListener() { - @Override - public void onResponse(IndexResponse result) { - indexResponses.set(indexCounter.getAndIncrement(), result); - if (completionCounter.decrementAndGet() == 0) { - listener.onResponse(newResponseInstance(request, indexResponses)); - } - } - - @Override - public void onFailure(Throwable e) { - int index = indexCounter.getAndIncrement(); - if (accumulateExceptions()) { - indexResponses.set(index, e); - } - if (completionCounter.decrementAndGet() == 0) { - listener.onResponse(newResponseInstance(request, indexResponses)); - } - } - }); - } - } - } - - protected abstract Response newResponseInstance(Request request, AtomicReferenceArray indexResponses); - - protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set routing, long startTimeInMillis); - - protected abstract boolean accumulateExceptions(); - - protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request); - - protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices); -}