Remove index/indices replication infra code
now that delete by query is out, we don't need this infrastructure code. The delete by query will be implenented as a plugin, with scan scroll + bulk delete, so it will not need this infra anyhow
This commit is contained in:
parent
cf2fb4ed0f
commit
0a61d03ea2
|
@ -1,87 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import org.elasticsearch.action.*;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Request used within {@link org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction}.
|
||||
* Since the corresponding action is internal that gets always executed locally, this request never gets sent over the transport.
|
||||
* The specified index is expected to be a concrete index. Relies on input validation done by the caller actions.
|
||||
*/
|
||||
public abstract class IndexReplicationOperationRequest<T extends IndexReplicationOperationRequest> extends ActionRequest<T> implements IndicesRequest {
|
||||
|
||||
private final TimeValue timeout;
|
||||
private final String index;
|
||||
private final WriteConsistencyLevel consistencyLevel;
|
||||
private final OriginalIndices originalIndices;
|
||||
|
||||
protected IndexReplicationOperationRequest(String index, TimeValue timeout, WriteConsistencyLevel consistencyLevel,
|
||||
String[] originalIndices, IndicesOptions originalIndicesOptions, ActionRequest request) {
|
||||
super(request);
|
||||
this.index = index;
|
||||
this.timeout = timeout;
|
||||
this.consistencyLevel = consistencyLevel;
|
||||
this.originalIndices = new OriginalIndices(originalIndices, originalIndicesOptions);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public TimeValue timeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
public String index() {
|
||||
return this.index;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indices() {
|
||||
return originalIndices.indices();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndicesOptions indicesOptions() {
|
||||
return originalIndices.indicesOptions();
|
||||
}
|
||||
|
||||
public WriteConsistencyLevel consistencyLevel() {
|
||||
return this.consistencyLevel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void readFrom(StreamInput in) throws IOException {
|
||||
throw new UnsupportedOperationException("IndexReplicationOperationRequest is not supposed to be sent over the transport");
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void writeTo(StreamOutput out) throws IOException {
|
||||
throw new UnsupportedOperationException("IndexReplicationOperationRequest is not supposed to be sent over the transport");
|
||||
}
|
||||
}
|
|
@ -1,140 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class IndicesReplicationOperationRequest<T extends IndicesReplicationOperationRequest> extends ActionRequest<T> implements IndicesRequest.Replaceable {
|
||||
|
||||
protected TimeValue timeout = ShardReplicationOperationRequest.DEFAULT_TIMEOUT;
|
||||
protected String[] indices;
|
||||
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, false);
|
||||
|
||||
protected WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
|
||||
|
||||
public TimeValue timeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
protected IndicesReplicationOperationRequest() {
|
||||
}
|
||||
|
||||
protected IndicesReplicationOperationRequest(ActionRequest actionRequest) {
|
||||
super(actionRequest);
|
||||
}
|
||||
|
||||
/**
|
||||
* A timeout to wait if the delete by query operation can't be performed immediately. Defaults to <tt>1m</tt>.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final T timeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
/**
|
||||
* A timeout to wait if the delete by query operation can't be performed immediately. Defaults to <tt>1m</tt>.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public T timeout(String timeout) {
|
||||
this.timeout = TimeValue.parseTimeValue(timeout, null);
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indices() {
|
||||
return this.indices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndicesOptions indicesOptions() {
|
||||
return indicesOptions;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public T indicesOptions(IndicesOptions indicesOptions) {
|
||||
if (indicesOptions == null) {
|
||||
throw new IllegalArgumentException("IndicesOptions must not be null");
|
||||
}
|
||||
this.indicesOptions = indicesOptions;
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
/**
|
||||
* The indices the request will execute against.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public final T indices(String[] indices) {
|
||||
this.indices = indices;
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
public WriteConsistencyLevel consistencyLevel() {
|
||||
return this.consistencyLevel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final T consistencyLevel(WriteConsistencyLevel consistencyLevel) {
|
||||
if (consistencyLevel == null) {
|
||||
throw new IllegalArgumentException("WriteConsistencyLevel must not be null");
|
||||
}
|
||||
this.consistencyLevel = consistencyLevel;
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
|
||||
timeout = TimeValue.readTimeValue(in);
|
||||
indices = in.readStringArray();
|
||||
indicesOptions = IndicesOptions.readIndicesOptions(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeByte(consistencyLevel.id());
|
||||
timeout.writeTo(out);
|
||||
out.writeStringArrayNullable(indices);
|
||||
indicesOptions.writeIndicesOptions(out);
|
||||
}
|
||||
}
|
|
@ -1,80 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class IndicesReplicationOperationRequestBuilder<Request extends IndicesReplicationOperationRequest<Request>, Response extends ActionResponse, RequestBuilder extends IndicesReplicationOperationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
extends ActionRequestBuilder<Request, Response, RequestBuilder, Client> {
|
||||
|
||||
protected IndicesReplicationOperationRequestBuilder(Client client, Request request) {
|
||||
super(client, request);
|
||||
}
|
||||
|
||||
/**
|
||||
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final RequestBuilder setTimeout(TimeValue timeout) {
|
||||
request.timeout(timeout);
|
||||
return (RequestBuilder) this;
|
||||
}
|
||||
|
||||
/**
|
||||
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public final RequestBuilder setTimeout(String timeout) {
|
||||
request.timeout(timeout);
|
||||
return (RequestBuilder) this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public final RequestBuilder setIndices(String... indices) {
|
||||
request.indices(indices);
|
||||
return (RequestBuilder) this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies what type of requested indices to ignore and how to deal with wildcard indices expressions.
|
||||
* For example indices that don't exist.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public RequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
|
||||
request().indicesOptions(indicesOptions);
|
||||
return (RequestBuilder) this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public RequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
|
||||
request.consistencyLevel(consistencyLevel);
|
||||
return (RequestBuilder) this;
|
||||
}
|
||||
}
|
|
@ -1,194 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.ActionWriteResponse.ShardInfo.Failure;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
/**
|
||||
* Internal transport action that executes on multiple shards, doesn't register any transport handler as it is always executed locally.
|
||||
* It relies on a shard sub-action that gets sent over the transport and executed on each of the shard.
|
||||
* The index provided with the request is expected to be a concrete index, properly resolved by the callers (parent actions).
|
||||
*/
|
||||
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse>
|
||||
extends TransportAction<Request, Response> {
|
||||
|
||||
protected final ClusterService clusterService;
|
||||
|
||||
protected final TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse> shardAction;
|
||||
|
||||
protected TransportIndexReplicationOperationAction(Settings settings, String actionName, ClusterService clusterService,
|
||||
ThreadPool threadPool, TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse> shardAction, ActionFilters actionFilters) {
|
||||
super(settings, actionName, threadPool, actionFilters);
|
||||
this.clusterService = clusterService;
|
||||
this.shardAction = shardAction;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(final Request request, final ActionListener<Response> listener) {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
|
||||
if (blockException != null) {
|
||||
throw blockException;
|
||||
}
|
||||
blockException = checkRequestBlock(clusterState, request);
|
||||
if (blockException != null) {
|
||||
throw blockException;
|
||||
}
|
||||
|
||||
final GroupShardsIterator groups;
|
||||
try {
|
||||
groups = shards(request);
|
||||
} catch (Throwable e) {
|
||||
listener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
final AtomicInteger indexCounter = new AtomicInteger();
|
||||
final AtomicInteger failureCounter = new AtomicInteger();
|
||||
final AtomicInteger completionCounter = new AtomicInteger(groups.size());
|
||||
final AtomicReferenceArray<ShardActionResult> shardsResponses = new AtomicReferenceArray<>(groups.size());
|
||||
|
||||
for (final ShardIterator shardIt : groups) {
|
||||
final ShardRequest shardRequest = newShardRequestInstance(request, shardIt.shardId().id());
|
||||
shardRequest.operationThreaded(true);
|
||||
// no need for threaded listener, we will fork when its done based on the index request
|
||||
shardRequest.listenerThreaded(false);
|
||||
shardAction.execute(shardRequest, new ActionListener<ShardResponse>() {
|
||||
@Override
|
||||
public void onResponse(ShardResponse result) {
|
||||
shardsResponses.set(indexCounter.getAndIncrement(), new ShardActionResult(result));
|
||||
returnIfNeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
failureCounter.getAndIncrement();
|
||||
int index = indexCounter.getAndIncrement();
|
||||
// this is a failure for an entire shard group, constructs shard info accordingly
|
||||
final RestStatus status = ExceptionsHelper.status(e);
|
||||
Failure failure = new Failure(request.index(), shardIt.shardId().id(), null, e, status, true);
|
||||
shardsResponses.set(index, new ShardActionResult(new ActionWriteResponse.ShardInfo(shardIt.size(), 0, failure)));
|
||||
returnIfNeeded();
|
||||
}
|
||||
|
||||
private void returnIfNeeded() {
|
||||
if (completionCounter.decrementAndGet() == 0) {
|
||||
List<ShardResponse> responses = new ArrayList<>();
|
||||
List<Failure> failureList = new ArrayList<>();
|
||||
|
||||
int total = 0;
|
||||
int successful = 0;
|
||||
for (int i = 0; i < shardsResponses.length(); i++) {
|
||||
ShardActionResult shardActionResult = shardsResponses.get(i);
|
||||
final ActionWriteResponse.ShardInfo sf;
|
||||
if (shardActionResult.isFailure()) {
|
||||
assert shardActionResult.shardInfoOnFailure != null;
|
||||
sf = shardActionResult.shardInfoOnFailure;
|
||||
} else {
|
||||
responses.add(shardActionResult.shardResponse);
|
||||
sf = shardActionResult.shardResponse.getShardInfo();
|
||||
}
|
||||
total += sf.getTotal();
|
||||
successful += sf.getSuccessful();
|
||||
failureList.addAll(Arrays.asList(sf.getFailures()));
|
||||
}
|
||||
assert failureList.size() == 0 || numShardGroupFailures(failureList) == failureCounter.get();
|
||||
|
||||
final Failure[] failures;
|
||||
if (failureList.isEmpty()) {
|
||||
failures = ActionWriteResponse.EMPTY;
|
||||
} else {
|
||||
failures = failureList.toArray(new Failure[failureList.size()]);
|
||||
}
|
||||
listener.onResponse(newResponseInstance(request, responses, new ActionWriteResponse.ShardInfo(total, successful, failures)));
|
||||
}
|
||||
}
|
||||
|
||||
private int numShardGroupFailures(List<Failure> failures) {
|
||||
int numShardGroupFailures = 0;
|
||||
for (Failure failure : failures) {
|
||||
if (failure.primary()) {
|
||||
numShardGroupFailures++;
|
||||
}
|
||||
}
|
||||
return numShardGroupFailures;
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Response newResponseInstance(Request request, List<ShardResponse> shardResponses, ActionWriteResponse.ShardInfo shardInfo);
|
||||
|
||||
protected abstract GroupShardsIterator shards(Request request) throws ElasticsearchException;
|
||||
|
||||
protected abstract ShardRequest newShardRequestInstance(Request request, int shardId);
|
||||
|
||||
protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
|
||||
}
|
||||
|
||||
protected ClusterBlockException checkRequestBlock(ClusterState state, Request request) {
|
||||
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index());
|
||||
}
|
||||
|
||||
private class ShardActionResult {
|
||||
|
||||
private final ShardResponse shardResponse;
|
||||
private final ActionWriteResponse.ShardInfo shardInfoOnFailure;
|
||||
|
||||
private ShardActionResult(ShardResponse shardResponse) {
|
||||
assert shardResponse != null;
|
||||
this.shardResponse = shardResponse;
|
||||
this.shardInfoOnFailure = null;
|
||||
}
|
||||
|
||||
private ShardActionResult(ActionWriteResponse.ShardInfo shardInfoOnFailure) {
|
||||
assert shardInfoOnFailure != null;
|
||||
this.shardInfoOnFailure = shardInfoOnFailure;
|
||||
this.shardResponse = null;
|
||||
}
|
||||
|
||||
boolean isFailure() {
|
||||
return shardInfoOnFailure != null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,126 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse,
|
||||
ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse>
|
||||
extends HandledTransportAction<Request, Response> {
|
||||
|
||||
protected final ClusterService clusterService;
|
||||
|
||||
protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction;
|
||||
|
||||
protected TransportIndicesReplicationOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
||||
TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction, ActionFilters actionFilters,
|
||||
Class<Request> request) {
|
||||
super(settings, actionName, threadPool, transportService, actionFilters, request);
|
||||
this.clusterService = clusterService;
|
||||
this.indexAction = indexAction;
|
||||
}
|
||||
|
||||
|
||||
protected abstract Map<String, Set<String>> resolveRouting(ClusterState clusterState, Request request) throws ElasticsearchException;
|
||||
|
||||
@Override
|
||||
protected void doExecute(final Request request, final ActionListener<Response> listener) {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
|
||||
if (blockException != null) {
|
||||
throw blockException;
|
||||
}
|
||||
// get actual indices
|
||||
String[] concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices());
|
||||
blockException = checkRequestBlock(clusterState, request, concreteIndices);
|
||||
if (blockException != null) {
|
||||
throw blockException;
|
||||
}
|
||||
|
||||
final AtomicInteger indexCounter = new AtomicInteger();
|
||||
final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length);
|
||||
final AtomicReferenceArray<Object> indexResponses = new AtomicReferenceArray<>(concreteIndices.length);
|
||||
final long startTimeInMillis = System.currentTimeMillis();
|
||||
|
||||
Map<String, Set<String>> routingMap = resolveRouting(clusterState, request);
|
||||
if (concreteIndices.length == 0) {
|
||||
listener.onResponse(newResponseInstance(request, indexResponses));
|
||||
} else {
|
||||
for (final String index : concreteIndices) {
|
||||
Set<String> routing = null;
|
||||
if (routingMap != null) {
|
||||
routing = routingMap.get(index);
|
||||
}
|
||||
IndexRequest indexRequest = newIndexRequestInstance(request, index, routing, startTimeInMillis);
|
||||
// no threading needed, all is done on the index replication one
|
||||
indexRequest.listenerThreaded(false);
|
||||
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndexResponse result) {
|
||||
indexResponses.set(indexCounter.getAndIncrement(), result);
|
||||
if (completionCounter.decrementAndGet() == 0) {
|
||||
listener.onResponse(newResponseInstance(request, indexResponses));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
int index = indexCounter.getAndIncrement();
|
||||
if (accumulateExceptions()) {
|
||||
indexResponses.set(index, e);
|
||||
}
|
||||
if (completionCounter.decrementAndGet() == 0) {
|
||||
listener.onResponse(newResponseInstance(request, indexResponses));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Response newResponseInstance(Request request, AtomicReferenceArray indexResponses);
|
||||
|
||||
protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set<String> routing, long startTimeInMillis);
|
||||
|
||||
protected abstract boolean accumulateExceptions();
|
||||
|
||||
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
|
||||
|
||||
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices);
|
||||
}
|
Loading…
Reference in New Issue