Merge pull request #10861 from kimchy/remove_indices_replication

Remove index/indices replication infra code
This commit is contained in:
Shay Banon 2015-04-28 22:54:50 +02:00
commit 3daf460acb
5 changed files with 0 additions and 627 deletions

View File

@ -1,87 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.*;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
/**
* Request used within {@link org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction}.
* Since the corresponding action is internal that gets always executed locally, this request never gets sent over the transport.
* The specified index is expected to be a concrete index. Relies on input validation done by the caller actions.
*/
public abstract class IndexReplicationOperationRequest<T extends IndexReplicationOperationRequest> extends ActionRequest<T> implements IndicesRequest {
private final TimeValue timeout;
private final String index;
private final WriteConsistencyLevel consistencyLevel;
private final OriginalIndices originalIndices;
protected IndexReplicationOperationRequest(String index, TimeValue timeout, WriteConsistencyLevel consistencyLevel,
String[] originalIndices, IndicesOptions originalIndicesOptions, ActionRequest request) {
super(request);
this.index = index;
this.timeout = timeout;
this.consistencyLevel = consistencyLevel;
this.originalIndices = new OriginalIndices(originalIndices, originalIndicesOptions);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
public TimeValue timeout() {
return timeout;
}
public String index() {
return this.index;
}
@Override
public String[] indices() {
return originalIndices.indices();
}
@Override
public IndicesOptions indicesOptions() {
return originalIndices.indicesOptions();
}
public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
}
@Override
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("IndexReplicationOperationRequest is not supposed to be sent over the transport");
}
@Override
public final void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException("IndexReplicationOperationRequest is not supposed to be sent over the transport");
}
}

View File

@ -1,140 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
/**
*
*/
public abstract class IndicesReplicationOperationRequest<T extends IndicesReplicationOperationRequest> extends ActionRequest<T> implements IndicesRequest.Replaceable {
protected TimeValue timeout = ShardReplicationOperationRequest.DEFAULT_TIMEOUT;
protected String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, false);
protected WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
public TimeValue timeout() {
return timeout;
}
protected IndicesReplicationOperationRequest() {
}
protected IndicesReplicationOperationRequest(ActionRequest actionRequest) {
super(actionRequest);
}
/**
* A timeout to wait if the delete by query operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
@SuppressWarnings("unchecked")
public final T timeout(TimeValue timeout) {
this.timeout = timeout;
return (T) this;
}
/**
* A timeout to wait if the delete by query operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
@SuppressWarnings("unchecked")
public T timeout(String timeout) {
this.timeout = TimeValue.parseTimeValue(timeout, null);
return (T) this;
}
@Override
public String[] indices() {
return this.indices;
}
@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}
@SuppressWarnings("unchecked")
public T indicesOptions(IndicesOptions indicesOptions) {
if (indicesOptions == null) {
throw new IllegalArgumentException("IndicesOptions must not be null");
}
this.indicesOptions = indicesOptions;
return (T) this;
}
/**
* The indices the request will execute against.
*/
@SuppressWarnings("unchecked")
@Override
public final T indices(String[] indices) {
this.indices = indices;
return (T) this;
}
public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
}
/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
*/
@SuppressWarnings("unchecked")
public final T consistencyLevel(WriteConsistencyLevel consistencyLevel) {
if (consistencyLevel == null) {
throw new IllegalArgumentException("WriteConsistencyLevel must not be null");
}
this.consistencyLevel = consistencyLevel;
return (T) this;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(consistencyLevel.id());
timeout.writeTo(out);
out.writeStringArrayNullable(indices);
indicesOptions.writeIndicesOptions(out);
}
}

View File

@ -1,80 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
/**
*/
public abstract class IndicesReplicationOperationRequestBuilder<Request extends IndicesReplicationOperationRequest<Request>, Response extends ActionResponse, RequestBuilder extends IndicesReplicationOperationRequestBuilder<Request, Response, RequestBuilder>>
extends ActionRequestBuilder<Request, Response, RequestBuilder, Client> {
protected IndicesReplicationOperationRequestBuilder(Client client, Request request) {
super(client, request);
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
@SuppressWarnings("unchecked")
public final RequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return (RequestBuilder) this;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
@SuppressWarnings("unchecked")
public final RequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return (RequestBuilder) this;
}
@SuppressWarnings("unchecked")
public final RequestBuilder setIndices(String... indices) {
request.indices(indices);
return (RequestBuilder) this;
}
/**
* Specifies what type of requested indices to ignore and how to deal with wildcard indices expressions.
* For example indices that don't exist.
*/
@SuppressWarnings("unchecked")
public RequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
request().indicesOptions(indicesOptions);
return (RequestBuilder) this;
}
/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
*/
@SuppressWarnings("unchecked")
public RequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
request.consistencyLevel(consistencyLevel);
return (RequestBuilder) this;
}
}

View File

@ -1,194 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ActionWriteResponse.ShardInfo.Failure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* Internal transport action that executes on multiple shards, doesn't register any transport handler as it is always executed locally.
* It relies on a shard sub-action that gets sent over the transport and executed on each of the shard.
* The index provided with the request is expected to be a concrete index, properly resolved by the callers (parent actions).
*/
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse>
extends TransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse> shardAction;
protected TransportIndexReplicationOperationAction(Settings settings, String actionName, ClusterService clusterService,
ThreadPool threadPool, TransportShardReplicationOperationAction<ShardRequest, ShardRequest, ShardResponse> shardAction, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters);
this.clusterService = clusterService;
this.shardAction = shardAction;
}
@Override
protected void doExecute(final Request request, final ActionListener<Response> listener) {
ClusterState clusterState = clusterService.state();
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
if (blockException != null) {
throw blockException;
}
blockException = checkRequestBlock(clusterState, request);
if (blockException != null) {
throw blockException;
}
final GroupShardsIterator groups;
try {
groups = shards(request);
} catch (Throwable e) {
listener.onFailure(e);
return;
}
final AtomicInteger indexCounter = new AtomicInteger();
final AtomicInteger failureCounter = new AtomicInteger();
final AtomicInteger completionCounter = new AtomicInteger(groups.size());
final AtomicReferenceArray<ShardActionResult> shardsResponses = new AtomicReferenceArray<>(groups.size());
for (final ShardIterator shardIt : groups) {
final ShardRequest shardRequest = newShardRequestInstance(request, shardIt.shardId().id());
shardRequest.operationThreaded(true);
// no need for threaded listener, we will fork when its done based on the index request
shardRequest.listenerThreaded(false);
shardAction.execute(shardRequest, new ActionListener<ShardResponse>() {
@Override
public void onResponse(ShardResponse result) {
shardsResponses.set(indexCounter.getAndIncrement(), new ShardActionResult(result));
returnIfNeeded();
}
@Override
public void onFailure(Throwable e) {
failureCounter.getAndIncrement();
int index = indexCounter.getAndIncrement();
// this is a failure for an entire shard group, constructs shard info accordingly
final RestStatus status = ExceptionsHelper.status(e);
Failure failure = new Failure(request.index(), shardIt.shardId().id(), null, e, status, true);
shardsResponses.set(index, new ShardActionResult(new ActionWriteResponse.ShardInfo(shardIt.size(), 0, failure)));
returnIfNeeded();
}
private void returnIfNeeded() {
if (completionCounter.decrementAndGet() == 0) {
List<ShardResponse> responses = new ArrayList<>();
List<Failure> failureList = new ArrayList<>();
int total = 0;
int successful = 0;
for (int i = 0; i < shardsResponses.length(); i++) {
ShardActionResult shardActionResult = shardsResponses.get(i);
final ActionWriteResponse.ShardInfo sf;
if (shardActionResult.isFailure()) {
assert shardActionResult.shardInfoOnFailure != null;
sf = shardActionResult.shardInfoOnFailure;
} else {
responses.add(shardActionResult.shardResponse);
sf = shardActionResult.shardResponse.getShardInfo();
}
total += sf.getTotal();
successful += sf.getSuccessful();
failureList.addAll(Arrays.asList(sf.getFailures()));
}
assert failureList.size() == 0 || numShardGroupFailures(failureList) == failureCounter.get();
final Failure[] failures;
if (failureList.isEmpty()) {
failures = ActionWriteResponse.EMPTY;
} else {
failures = failureList.toArray(new Failure[failureList.size()]);
}
listener.onResponse(newResponseInstance(request, responses, new ActionWriteResponse.ShardInfo(total, successful, failures)));
}
}
private int numShardGroupFailures(List<Failure> failures) {
int numShardGroupFailures = 0;
for (Failure failure : failures) {
if (failure.primary()) {
numShardGroupFailures++;
}
}
return numShardGroupFailures;
}
});
}
}
protected abstract Response newResponseInstance(Request request, List<ShardResponse> shardResponses, ActionWriteResponse.ShardInfo shardInfo);
protected abstract GroupShardsIterator shards(Request request) throws ElasticsearchException;
protected abstract ShardRequest newShardRequestInstance(Request request, int shardId);
protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
protected ClusterBlockException checkRequestBlock(ClusterState state, Request request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.WRITE, request.index());
}
private class ShardActionResult {
private final ShardResponse shardResponse;
private final ActionWriteResponse.ShardInfo shardInfoOnFailure;
private ShardActionResult(ShardResponse shardResponse) {
assert shardResponse != null;
this.shardResponse = shardResponse;
this.shardInfoOnFailure = null;
}
private ShardActionResult(ActionWriteResponse.ShardInfo shardInfoOnFailure) {
assert shardInfoOnFailure != null;
this.shardInfoOnFailure = shardInfoOnFailure;
this.shardResponse = null;
}
boolean isFailure() {
return shardInfoOnFailure != null;
}
}
}

View File

@ -1,126 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
*/
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse,
ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse>
extends HandledTransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction;
protected TransportIndicesReplicationOperationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction, ActionFilters actionFilters,
Class<Request> request) {
super(settings, actionName, threadPool, transportService, actionFilters, request);
this.clusterService = clusterService;
this.indexAction = indexAction;
}
protected abstract Map<String, Set<String>> resolveRouting(ClusterState clusterState, Request request) throws ElasticsearchException;
@Override
protected void doExecute(final Request request, final ActionListener<Response> listener) {
ClusterState clusterState = clusterService.state();
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
if (blockException != null) {
throw blockException;
}
// get actual indices
String[] concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices());
blockException = checkRequestBlock(clusterState, request, concreteIndices);
if (blockException != null) {
throw blockException;
}
final AtomicInteger indexCounter = new AtomicInteger();
final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length);
final AtomicReferenceArray<Object> indexResponses = new AtomicReferenceArray<>(concreteIndices.length);
final long startTimeInMillis = System.currentTimeMillis();
Map<String, Set<String>> routingMap = resolveRouting(clusterState, request);
if (concreteIndices.length == 0) {
listener.onResponse(newResponseInstance(request, indexResponses));
} else {
for (final String index : concreteIndices) {
Set<String> routing = null;
if (routingMap != null) {
routing = routingMap.get(index);
}
IndexRequest indexRequest = newIndexRequestInstance(request, index, routing, startTimeInMillis);
// no threading needed, all is done on the index replication one
indexRequest.listenerThreaded(false);
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse result) {
indexResponses.set(indexCounter.getAndIncrement(), result);
if (completionCounter.decrementAndGet() == 0) {
listener.onResponse(newResponseInstance(request, indexResponses));
}
}
@Override
public void onFailure(Throwable e) {
int index = indexCounter.getAndIncrement();
if (accumulateExceptions()) {
indexResponses.set(index, e);
}
if (completionCounter.decrementAndGet() == 0) {
listener.onResponse(newResponseInstance(request, indexResponses));
}
}
});
}
}
}
protected abstract Response newResponseInstance(Request request, AtomicReferenceArray indexResponses);
protected abstract IndexRequest newIndexRequestInstance(Request request, String index, Set<String> routing, long startTimeInMillis);
protected abstract boolean accumulateExceptions();
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices);
}