Merge pull request #13068 from brwe/broadcast_replication

Make refresh a replicated action
This commit is contained in:
Britta Weber 2015-09-01 09:21:54 +02:00
commit 333831c126
30 changed files with 1049 additions and 458 deletions

View File

@ -39,7 +39,7 @@ import java.util.Collections;
/** /**
* Base class for write action responses. * Base class for write action responses.
*/ */
public abstract class ActionWriteResponse extends ActionResponse { public class ActionWriteResponse extends ActionResponse {
public final static ActionWriteResponse.ShardInfo.Failure[] EMPTY = new ActionWriteResponse.ShardInfo.Failure[0]; public final static ActionWriteResponse.ShardInfo.Failure[] EMPTY = new ActionWriteResponse.ShardInfo.Failure[0];

View File

@ -21,10 +21,7 @@ package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.List; import java.util.List;
/** /**
@ -42,13 +39,4 @@ public class FlushResponse extends BroadcastResponse {
super(totalShards, successfulShards, failedShards, shardFailures); super(totalShards, successfulShards, failedShards, shardFailures);
} }
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
} }

View File

@ -19,27 +19,27 @@
package org.elasticsearch.action.admin.indices.flush; package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.action.support.broadcast.BroadcastShardRequest; import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException; import java.io.IOException;
/** public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
*
*/
class ShardFlushRequest extends BroadcastShardRequest {
private FlushRequest request = new FlushRequest(); private FlushRequest request = new FlushRequest();
ShardFlushRequest() { public ShardFlushRequest(FlushRequest request) {
} super(request);
ShardFlushRequest(ShardId shardId, FlushRequest request) {
super(shardId, request);
this.request = request; this.request = request;
} }
public ShardFlushRequest() {
}
FlushRequest getRequest() {
return request;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
@ -53,7 +53,5 @@ class ShardFlushRequest extends BroadcastShardRequest {
request.writeTo(out); request.writeTo(out);
} }
FlushRequest getRequest() {
return request;
}
} }

View File

@ -1,37 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
import org.elasticsearch.index.shard.ShardId;
/**
*
*/
class ShardFlushResponse extends BroadcastShardResponse {
ShardFlushResponse() {
}
ShardFlushResponse(ShardId shardId) {
super(shardId);
}
}

View File

@ -19,99 +19,45 @@
package org.elasticsearch.action.admin.indices.flush; package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
/** /**
* Flush Action. * Flush Action.
*/ */
public class TransportFlushAction extends TransportBroadcastAction<FlushRequest, FlushResponse, ShardFlushRequest, ShardFlushResponse> { public class TransportFlushAction extends TransportBroadcastReplicationAction<FlushRequest, FlushResponse, ShardFlushRequest, ActionWriteResponse> {
private final IndicesService indicesService;
@Inject @Inject
public TransportFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, public TransportFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, TransportService transportService, ActionFilters actionFilters,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { IndexNameExpressionResolver indexNameExpressionResolver,
super(settings, FlushAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, TransportShardFlushAction replicatedFlushAction) {
FlushRequest.class, ShardFlushRequest.class, ThreadPool.Names.FLUSH); super(FlushAction.NAME, FlushRequest.class, settings, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, replicatedFlushAction);
this.indicesService = indicesService;
} }
@Override @Override
protected FlushResponse newResponse(FlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { protected ActionWriteResponse newShardResponse() {
int successfulShards = 0; return new ActionWriteResponse();
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// a non active shard, ignore
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = new ArrayList<>();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
successfulShards++;
}
}
return new FlushResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
} }
@Override @Override
protected ShardFlushRequest newShardRequest(int numShards, ShardRouting shard, FlushRequest request) { protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId) {
return new ShardFlushRequest(shard.shardId(), request); return new ShardFlushRequest(request).setShardId(shardId).timeout("0ms");
} }
@Override @Override
protected ShardFlushResponse newShardResponse() { protected FlushResponse newResponse(int successfulShards, int failedShards, int totalNumCopies, List<ShardOperationFailedException> shardFailures) {
return new ShardFlushResponse(); return new FlushResponse(totalNumCopies, successfulShards, failedShards, shardFailures);
}
@Override
protected ShardFlushResponse shardOperation(ShardFlushRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
indexShard.flush(request.getRequest());
return new ShardFlushResponse(request.shardId());
}
/**
* The refresh request works against *all* shards.
*/
@Override
protected GroupShardsIterator shards(ClusterState clusterState, FlushRequest request, String[] concreteIndices) {
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true, true);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, FlushRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, FlushRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices);
} }
} }

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
*
*/
public class TransportShardFlushAction extends TransportReplicationAction<ShardFlushRequest, ShardFlushRequest, ActionWriteResponse> {
public static final String NAME = "indices:data/write/flush";
@Inject
public TransportShardFlushAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction,
actionFilters, indexNameExpressionResolver, ShardFlushRequest.class, ShardFlushRequest.class, ThreadPool.Names.FLUSH);
}
@Override
protected ActionWriteResponse newResponseInstance() {
return new ActionWriteResponse();
}
@Override
protected Tuple<ActionWriteResponse, ShardFlushRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
indexShard.flush(shardRequest.request.getRequest());
logger.trace("{} flush request executed on primary", indexShard.shardId());
return new Tuple<>(new ActionWriteResponse(), shardRequest.request);
}
@Override
protected void shardOperationOnReplica(ShardId shardId, ShardFlushRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
indexShard.flush(request.getRequest());
logger.trace("{} flush request executed on replica", indexShard.shardId());
}
@Override
protected boolean checkWriteConsistency() {
return false;
}
@Override
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
return clusterState.getRoutingTable().indicesRouting().get(request.concreteIndex()).getShards().get(request.request().shardId().getId()).shardsIt();
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, new String[]{request.concreteIndex()});
}
@Override
protected boolean shouldExecuteReplication(Settings settings) {
return true;
}
}

View File

@ -22,8 +22,6 @@ package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.Action; import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
/**
*/
public class RefreshAction extends Action<RefreshRequest, RefreshResponse, RefreshRequestBuilder> { public class RefreshAction extends Action<RefreshRequest, RefreshResponse, RefreshRequestBuilder> {
public static final RefreshAction INSTANCE = new RefreshAction(); public static final RefreshAction INSTANCE = new RefreshAction();

View File

@ -33,7 +33,6 @@ import org.elasticsearch.action.support.broadcast.BroadcastRequest;
*/ */
public class RefreshRequest extends BroadcastRequest<RefreshRequest> { public class RefreshRequest extends BroadcastRequest<RefreshRequest> {
RefreshRequest() { RefreshRequest() {
} }
@ -48,5 +47,4 @@ public class RefreshRequest extends BroadcastRequest<RefreshRequest> {
public RefreshRequest(String... indices) { public RefreshRequest(String... indices) {
super(indices); super(indices);
} }
} }

View File

@ -21,34 +21,18 @@ package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.List; import java.util.List;
/** /**
* The response of a refresh action. * The response of a refresh action.
*
*
*/ */
public class RefreshResponse extends BroadcastResponse { public class RefreshResponse extends BroadcastResponse {
RefreshResponse() { RefreshResponse() {
} }
RefreshResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) { RefreshResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures); super(totalShards, successfulShards, failedShards, shardFailures);
} }
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
} }

View File

@ -1,37 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.support.broadcast.BroadcastShardRequest;
import org.elasticsearch.index.shard.ShardId;
/**
*
*/
class ShardRefreshRequest extends BroadcastShardRequest {
ShardRefreshRequest() {
}
ShardRefreshRequest(ShardId shardId, RefreshRequest request) {
super(shardId, request);
}
}

View File

@ -1,36 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
import org.elasticsearch.index.shard.ShardId;
/**
*
*/
class ShardRefreshResponse extends BroadcastShardResponse {
ShardRefreshResponse() {
}
ShardRefreshResponse(ShardId shardId) {
super(shardId);
}
}

View File

@ -19,100 +19,46 @@
package org.elasticsearch.action.admin.indices.refresh; package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction;
import org.elasticsearch.action.support.broadcast.TransportBroadcastAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
/** /**
* Refresh action. * Refresh action.
*/ */
public class TransportRefreshAction extends TransportBroadcastAction<RefreshRequest, RefreshResponse, ShardRefreshRequest, ShardRefreshResponse> { public class TransportRefreshAction extends TransportBroadcastReplicationAction<RefreshRequest, RefreshResponse, ReplicationRequest, ActionWriteResponse> {
private final IndicesService indicesService;
@Inject @Inject
public TransportRefreshAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, public TransportRefreshAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, TransportService transportService, ActionFilters actionFilters,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { IndexNameExpressionResolver indexNameExpressionResolver,
super(settings, RefreshAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, TransportShardRefreshAction shardRefreshAction) {
RefreshRequest.class, ShardRefreshRequest.class, ThreadPool.Names.REFRESH); super(RefreshAction.NAME, RefreshRequest.class, settings, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, shardRefreshAction);
this.indicesService = indicesService;
} }
@Override @Override
protected RefreshResponse newResponse(RefreshRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { protected ActionWriteResponse newShardResponse() {
int successfulShards = 0; return new ActionWriteResponse();
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// non active shard, ignore
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = new ArrayList<>();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
successfulShards++;
}
}
return new RefreshResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
} }
@Override @Override
protected ShardRefreshRequest newShardRequest(int numShards, ShardRouting shard, RefreshRequest request) { protected ReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) {
return new ShardRefreshRequest(shard.shardId(), request); return new ReplicationRequest(request).setShardId(shardId).timeout("0ms");
} }
@Override @Override
protected ShardRefreshResponse newShardResponse() { protected RefreshResponse newResponse(int successfulShards, int failedShards, int totalNumCopies, List<ShardOperationFailedException> shardFailures) {
return new ShardRefreshResponse(); return new RefreshResponse(totalNumCopies, successfulShards, failedShards, shardFailures);
}
@Override
protected ShardRefreshResponse shardOperation(ShardRefreshRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
indexShard.refresh("api");
logger.trace("{} refresh request executed", indexShard.shardId());
return new ShardRefreshResponse(request.shardId());
}
/**
* The refresh request works against *all* shards.
*/
@Override
protected GroupShardsIterator shards(ClusterState clusterState, RefreshRequest request, String[] concreteIndices) {
return clusterState.routingTable().allAssignedShardsGrouped(concreteIndices, true, true);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, RefreshRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, RefreshRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices);
} }
} }

View File

@ -0,0 +1,103 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
*
*/
public class TransportShardRefreshAction extends TransportReplicationAction<ReplicationRequest, ReplicationRequest, ActionWriteResponse> {
public static final String NAME = "indices:data/write/refresh";
@Inject
public TransportShardRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction,
actionFilters, indexNameExpressionResolver, ReplicationRequest.class, ReplicationRequest.class, ThreadPool.Names.REFRESH);
}
@Override
protected ActionWriteResponse newResponseInstance() {
return new ActionWriteResponse();
}
@Override
protected Tuple<ActionWriteResponse, ReplicationRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
indexShard.refresh("api");
logger.trace("{} refresh request executed on primary", indexShard.shardId());
return new Tuple<>(new ActionWriteResponse(), shardRequest.request);
}
@Override
protected void shardOperationOnReplica(ShardId shardId, ReplicationRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).shardSafe(shardId.id());
indexShard.refresh("api");
logger.trace("{} refresh request executed on replica", indexShard.shardId());
}
@Override
protected boolean checkWriteConsistency() {
return false;
}
@Override
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
return clusterState.getRoutingTable().indicesRouting().get(request.concreteIndex()).getShards().get(request.request().shardId().getId()).shardsIt();
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, new String[]{request.concreteIndex()});
}
@Override
protected boolean shouldExecuteReplication(Settings settings) {
return true;
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk;
import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -32,8 +33,6 @@ import java.util.List;
*/ */
public class BulkShardRequest extends ReplicationRequest<BulkShardRequest> { public class BulkShardRequest extends ReplicationRequest<BulkShardRequest> {
private int shardId;
private BulkItemRequest[] items; private BulkItemRequest[] items;
private boolean refresh; private boolean refresh;
@ -44,7 +43,7 @@ public class BulkShardRequest extends ReplicationRequest<BulkShardRequest> {
BulkShardRequest(BulkRequest bulkRequest, String index, int shardId, boolean refresh, BulkItemRequest[] items) { BulkShardRequest(BulkRequest bulkRequest, String index, int shardId, boolean refresh, BulkItemRequest[] items) {
super(bulkRequest); super(bulkRequest);
this.index = index; this.index = index;
this.shardId = shardId; this.setShardId(new ShardId(index, shardId));
this.items = items; this.items = items;
this.refresh = refresh; this.refresh = refresh;
} }
@ -53,10 +52,6 @@ public class BulkShardRequest extends ReplicationRequest<BulkShardRequest> {
return this.refresh; return this.refresh;
} }
int shardId() {
return shardId;
}
BulkItemRequest[] items() { BulkItemRequest[] items() {
return items; return items;
} }
@ -75,7 +70,6 @@ public class BulkShardRequest extends ReplicationRequest<BulkShardRequest> {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeVInt(shardId);
out.writeVInt(items.length); out.writeVInt(items.length);
for (BulkItemRequest item : items) { for (BulkItemRequest item : items) {
if (item != null) { if (item != null) {
@ -91,7 +85,6 @@ public class BulkShardRequest extends ReplicationRequest<BulkShardRequest> {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
shardId = in.readVInt();
items = new BulkItemRequest[in.readVInt()]; items = new BulkItemRequest[in.readVInt()];
for (int i = 0; i < items.length; i++) { for (int i = 0; i < items.length; i++) {
if (in.readBoolean()) { if (in.readBoolean()) {

View File

@ -109,7 +109,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
@Override @Override
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) { protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
return clusterState.routingTable().index(request.concreteIndex()).shard(request.request().shardId()).shardsIt(); return clusterState.routingTable().index(request.concreteIndex()).shard(request.request().shardId().id()).shardsIt();
} }
@Override @Override

View File

@ -25,18 +25,20 @@ import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit;
/** /**
* *
*/ */
public abstract class BroadcastRequest<T extends BroadcastRequest> extends ActionRequest<T> implements IndicesRequest.Replaceable { public class BroadcastRequest<T extends BroadcastRequest> extends ActionRequest<T> implements IndicesRequest.Replaceable {
protected String[] indices; protected String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed(); private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed();
protected BroadcastRequest() { public BroadcastRequest() {
} }

View File

@ -32,17 +32,17 @@ import static org.elasticsearch.action.support.DefaultShardOperationFailedExcept
/** /**
* Base class for all broadcast operation based responses. * Base class for all broadcast operation based responses.
*/ */
public abstract class BroadcastResponse extends ActionResponse { public class BroadcastResponse extends ActionResponse {
private static final ShardOperationFailedException[] EMPTY = new ShardOperationFailedException[0]; private static final ShardOperationFailedException[] EMPTY = new ShardOperationFailedException[0];
private int totalShards; private int totalShards;
private int successfulShards; private int successfulShards;
private int failedShards; private int failedShards;
private ShardOperationFailedException[] shardFailures = EMPTY; private ShardOperationFailedException[] shardFailures = EMPTY;
protected BroadcastResponse() { public BroadcastResponse() {
} }
protected BroadcastResponse(int totalShards, int successfulShards, int failedShards, List<? extends ShardOperationFailedException> shardFailures) { public BroadcastResponse(int totalShards, int successfulShards, int failedShards, List<? extends ShardOperationFailedException> shardFailures) {
this.totalShards = totalShards; this.totalShards = totalShards;
this.successfulShards = successfulShards; this.successfulShards = successfulShards;
this.failedShards = failedShards; this.failedShards = failedShards;

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -37,7 +38,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
/** /**
* *
*/ */
public abstract class ReplicationRequest<T extends ReplicationRequest> extends ActionRequest<T> implements IndicesRequest { public class ReplicationRequest<T extends ReplicationRequest> extends ActionRequest<T> implements IndicesRequest {
public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);
@ -49,14 +50,14 @@ public abstract class ReplicationRequest<T extends ReplicationRequest> extends A
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private volatile boolean canHaveDuplicates = false; private volatile boolean canHaveDuplicates = false;
protected ReplicationRequest() { public ReplicationRequest() {
} }
/** /**
* Creates a new request that inherits headers and context from the request provided as argument. * Creates a new request that inherits headers and context from the request provided as argument.
*/ */
protected ReplicationRequest(ActionRequest request) { public ReplicationRequest(ActionRequest request) {
super(request); super(request);
} }
@ -133,6 +134,16 @@ public abstract class ReplicationRequest<T extends ReplicationRequest> extends A
return this.consistencyLevel; return this.consistencyLevel;
} }
/**
* @return the shardId of the shard where this operation should be executed on.
* can be null in case the shardId is determined by a single document (index, type, id) for example for index or delete request.
*/
public
@Nullable
ShardId shardId() {
return internalShardId;
}
/** /**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT} * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
*/ */
@ -173,4 +184,10 @@ public abstract class ReplicationRequest<T extends ReplicationRequest> extends A
out.writeString(index); out.writeString(index);
out.writeBoolean(canHaveDuplicates); out.writeBoolean(canHaveDuplicates);
} }
public T setShardId(ShardId shardId) {
this.internalShardId = shardId;
this.index = shardId.getIndex();
return (T) this;
}
} }

View File

@ -0,0 +1,162 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Base class for requests that should be executed on all shards of an index or several indices.
* This action sends shard requests to all primary shards of the indices and they are then replicated like write requests
*/
public abstract class TransportBroadcastReplicationAction<Request extends BroadcastRequest, Response extends BroadcastResponse, ShardRequest extends ReplicationRequest, ShardResponse extends ActionWriteResponse> extends HandledTransportAction<Request, Response> {
private final TransportReplicationAction replicatedBroadcastShardAction;
private final ClusterService clusterService;
public TransportBroadcastReplicationAction(String name, Class<Request> request, Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportReplicationAction replicatedBroadcastShardAction) {
super(settings, name, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
this.replicatedBroadcastShardAction = replicatedBroadcastShardAction;
this.clusterService = clusterService;
}
@Override
protected void doExecute(final Request request, final ActionListener<Response> listener) {
final ClusterState clusterState = clusterService.state();
List<ShardId> shards = shards(request, clusterState);
final CopyOnWriteArrayList<ShardResponse> shardsResponses = new CopyOnWriteArrayList();
if (shards.size() == 0) {
finishAndNotifyListener(listener, shardsResponses);
}
final CountDown responsesCountDown = new CountDown(shards.size());
for (final ShardId shardId : shards) {
ActionListener<ShardResponse> shardActionListener = new ActionListener<ShardResponse>() {
@Override
public void onResponse(ShardResponse shardResponse) {
shardsResponses.add(shardResponse);
logger.trace("{}: got response from {}", actionName, shardId);
if (responsesCountDown.countDown()) {
finishAndNotifyListener(listener, shardsResponses);
}
}
@Override
public void onFailure(Throwable e) {
logger.trace("{}: got failure from {}", actionName, shardId);
int totalNumCopies = clusterState.getMetaData().index(shardId.index().getName()).getNumberOfReplicas() + 1;
ShardResponse shardResponse = newShardResponse();
ActionWriteResponse.ShardInfo.Failure[] failures;
if (ExceptionsHelper.unwrap(e, UnavailableShardsException.class) != null) {
failures = new ActionWriteResponse.ShardInfo.Failure[0];
} else {
ActionWriteResponse.ShardInfo.Failure failure = new ActionWriteResponse.ShardInfo.Failure(shardId.index().name(), shardId.id(), null, e, ExceptionsHelper.status(e), true);
failures = new ActionWriteResponse.ShardInfo.Failure[totalNumCopies];
Arrays.fill(failures, failure);
}
shardResponse.setShardInfo(new ActionWriteResponse.ShardInfo(totalNumCopies, 0, failures));
shardsResponses.add(shardResponse);
if (responsesCountDown.countDown()) {
finishAndNotifyListener(listener, shardsResponses);
}
}
};
shardExecute(request, shardId, shardActionListener);
}
}
protected void shardExecute(Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
replicatedBroadcastShardAction.execute(newShardRequest(request, shardId), shardActionListener);
}
/**
* @return all shard ids the request should run on
*/
protected List<ShardId> shards(Request request, ClusterState clusterState) {
List<ShardId> shardIds = new ArrayList<>();
String[] concreteIndices = indexNameExpressionResolver.concreteIndices(clusterState, request);
for (String index : concreteIndices) {
IndexMetaData indexMetaData = clusterState.metaData().getIndices().get(index);
if (indexMetaData != null) {
for (IntObjectCursor<IndexShardRoutingTable> shardRouting : clusterState.getRoutingTable().indicesRouting().get(index).getShards()) {
shardIds.add(shardRouting.value.shardId());
}
}
}
return shardIds;
}
protected abstract ShardResponse newShardResponse();
protected abstract ShardRequest newShardRequest(Request request, ShardId shardId);
private void finishAndNotifyListener(ActionListener listener, CopyOnWriteArrayList<ShardResponse> shardsResponses) {
logger.trace("{}: got all shard responses", actionName);
int successfulShards = 0;
int failedShards = 0;
int totalNumCopies = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.size(); i++) {
ActionWriteResponse shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// non active shard, ignore
} else {
failedShards += shardResponse.getShardInfo().getFailed();
successfulShards += shardResponse.getShardInfo().getSuccessful();
totalNumCopies += shardResponse.getShardInfo().getTotal();
if (shardFailures == null) {
shardFailures = new ArrayList<>();
}
for (ActionWriteResponse.ShardInfo.Failure failure : shardResponse.getShardInfo().getFailures()) {
shardFailures.add(new DefaultShardOperationFailedException(new BroadcastShardOperationFailedException(new ShardId(failure.index(), failure.shardId()), failure.getCause())));
}
}
}
listener.onResponse(newResponse(successfulShards, failedShards, totalNumCopies, shardFailures));
}
protected abstract BroadcastResponse newResponse(int successfulShards, int failedShards, int totalNumCopies, List<ShardOperationFailedException> shardFailures);
}

View File

@ -362,6 +362,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
finishWithUnexpectedFailure(e); finishWithUnexpectedFailure(e);
} }
@Override
protected void doRun() { protected void doRun() {
if (checkBlocks() == false) { if (checkBlocks() == false) {
return; return;
@ -727,7 +728,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
// new primary shard as well... // new primary shard as well...
ClusterState newState = clusterService.state(); ClusterState newState = clusterService.state();
int numberOfUnassignedOrShadowReplicas = 0; int numberOfUnassignedOrIgnoredReplicas = 0;
int numberOfPendingShardInstances = 0; int numberOfPendingShardInstances = 0;
if (observer.observedState() != newState) { if (observer.observedState() != newState) {
observer.reset(newState); observer.reset(newState);
@ -741,7 +742,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
if (shard.relocating()) { if (shard.relocating()) {
numberOfPendingShardInstances++; numberOfPendingShardInstances++;
} }
} else if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings())) { } else if (shouldExecuteReplication(indexMetaData.settings()) == false) {
// If the replicas use shadow replicas, there is no reason to // If the replicas use shadow replicas, there is no reason to
// perform the action on the replica, so skip it and // perform the action on the replica, so skip it and
// immediately return // immediately return
@ -750,9 +751,9 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
// to wait until they get the new mapping through the cluster // to wait until they get the new mapping through the cluster
// state, which is why we recommend pre-defined mappings for // state, which is why we recommend pre-defined mappings for
// indices using shadow replicas // indices using shadow replicas
numberOfUnassignedOrShadowReplicas++; numberOfUnassignedOrIgnoredReplicas++;
} else if (shard.unassigned()) { } else if (shard.unassigned()) {
numberOfUnassignedOrShadowReplicas++; numberOfUnassignedOrIgnoredReplicas++;
} else if (shard.relocating()) { } else if (shard.relocating()) {
// we need to send to two copies // we need to send to two copies
numberOfPendingShardInstances += 2; numberOfPendingShardInstances += 2;
@ -769,13 +770,13 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
replicaRequest.setCanHaveDuplicates(); replicaRequest.setCanHaveDuplicates();
} }
if (shard.unassigned()) { if (shard.unassigned()) {
numberOfUnassignedOrShadowReplicas++; numberOfUnassignedOrIgnoredReplicas++;
} else if (shard.primary()) { } else if (shard.primary()) {
if (shard.relocating()) { if (shard.relocating()) {
// we have to replicate to the other copy // we have to replicate to the other copy
numberOfPendingShardInstances += 1; numberOfPendingShardInstances += 1;
} }
} else if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings())) { } else if (shouldExecuteReplication(indexMetaData.settings()) == false) {
// If the replicas use shadow replicas, there is no reason to // If the replicas use shadow replicas, there is no reason to
// perform the action on the replica, so skip it and // perform the action on the replica, so skip it and
// immediately return // immediately return
@ -784,7 +785,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
// to wait until they get the new mapping through the cluster // to wait until they get the new mapping through the cluster
// state, which is why we recommend pre-defined mappings for // state, which is why we recommend pre-defined mappings for
// indices using shadow replicas // indices using shadow replicas
numberOfUnassignedOrShadowReplicas++; numberOfUnassignedOrIgnoredReplicas++;
} else if (shard.relocating()) { } else if (shard.relocating()) {
// we need to send to two copies // we need to send to two copies
numberOfPendingShardInstances += 2; numberOfPendingShardInstances += 2;
@ -795,7 +796,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
} }
// one for the primary already done // one for the primary already done
this.totalShards = 1 + numberOfPendingShardInstances + numberOfUnassignedOrShadowReplicas; this.totalShards = 1 + numberOfPendingShardInstances + numberOfUnassignedOrIgnoredReplicas;
this.pending = new AtomicInteger(numberOfPendingShardInstances); this.pending = new AtomicInteger(numberOfPendingShardInstances);
} }
@ -854,7 +855,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
if (shard.relocating()) { if (shard.relocating()) {
performOnReplica(shard, shard.relocatingNodeId()); performOnReplica(shard, shard.relocatingNodeId());
} }
} else if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings()) == false) { } else if (shouldExecuteReplication(indexMetaData.settings())) {
performOnReplica(shard, shard.currentNodeId()); performOnReplica(shard, shard.currentNodeId());
if (shard.relocating()) { if (shard.relocating()) {
performOnReplica(shard, shard.relocatingNodeId()); performOnReplica(shard, shard.relocatingNodeId());
@ -985,6 +986,14 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
} }
/**
* Indicated whether this operation should be replicated to shadow replicas or not. If this method returns true the replication phase will be skipped.
* For example writes such as index and delete don't need to be replicated on shadow replicas but refresh and flush do.
*/
protected boolean shouldExecuteReplication(Settings settings) {
return IndexMetaData.isIndexUsingShadowReplicas(settings) == false;
}
/** /**
* Internal request class that gets built on each node. Holds the original request plus additional info. * Internal request class that gets built on each node. Holds the original request plus additional info.
*/ */

View File

@ -28,8 +28,8 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexAction;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushAction;
import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.TransportShardFlushAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsAction; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
@ -42,8 +42,8 @@ import org.elasticsearch.action.admin.indices.optimize.OptimizeAction;
import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest; import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryAction; import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
@ -85,6 +85,7 @@ import org.elasticsearch.action.termvectors.TermVectorsRequest;
import org.elasticsearch.action.update.UpdateAction; import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -95,35 +96,18 @@ import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.*;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.*;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
@ClusterScope(scope = Scope.SUITE, numClientNodes = 1, minNumDataNodes = 2) @ClusterScope(scope = Scope.SUITE, numClientNodes = 1, minNumDataNodes = 2)
public class IndicesRequestIT extends ESIntegTestCase { public class IndicesRequestIT extends ESIntegTestCase {
@ -390,14 +374,15 @@ public class IndicesRequestIT extends ESIntegTestCase {
@Test @Test
public void testFlush() { public void testFlush() {
String flushShardAction = FlushAction.NAME + "[s]"; String[] indexShardActions = new String[]{TransportShardFlushAction.NAME + "[r]", TransportShardFlushAction.NAME};
interceptTransportActions(flushShardAction); interceptTransportActions(indexShardActions);
FlushRequest flushRequest = new FlushRequest(randomIndicesOrAliases()); FlushRequest flushRequest = new FlushRequest(randomIndicesOrAliases());
internalCluster().clientNodeClient().admin().indices().flush(flushRequest).actionGet(); internalCluster().clientNodeClient().admin().indices().flush(flushRequest).actionGet();
clearInterceptedActions(); clearInterceptedActions();
assertSameIndices(flushRequest, flushShardAction); String[] indices = new IndexNameExpressionResolver(Settings.EMPTY).concreteIndices(client().admin().cluster().prepareState().get().getState(), flushRequest);
assertIndicesSubset(Arrays.asList(indices), indexShardActions);
} }
@Test @Test
@ -414,14 +399,15 @@ public class IndicesRequestIT extends ESIntegTestCase {
@Test @Test
public void testRefresh() { public void testRefresh() {
String refreshShardAction = RefreshAction.NAME + "[s]"; String[] indexShardActions = new String[]{TransportShardRefreshAction.NAME + "[r]", TransportShardRefreshAction.NAME};
interceptTransportActions(refreshShardAction); interceptTransportActions(indexShardActions);
RefreshRequest refreshRequest = new RefreshRequest(randomIndicesOrAliases()); RefreshRequest refreshRequest = new RefreshRequest(randomIndicesOrAliases());
internalCluster().clientNodeClient().admin().indices().refresh(refreshRequest).actionGet(); internalCluster().clientNodeClient().admin().indices().refresh(refreshRequest).actionGet();
clearInterceptedActions(); clearInterceptedActions();
assertSameIndices(refreshRequest, refreshShardAction); String[] indices = new IndexNameExpressionResolver(Settings.EMPTY).concreteIndices(client().admin().cluster().prepareState().get().getState(), refreshRequest);
assertIndicesSubset(Arrays.asList(indices), indexShardActions);
} }
@Test @Test

View File

@ -61,7 +61,8 @@ public class FlushBlocksIT extends ESIntegTestCase {
for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) { for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) {
try { try {
enableIndexBlock("test", blockSetting); enableIndexBlock("test", blockSetting);
assertBlocked(client().admin().indices().prepareFlush("test")); FlushResponse flushResponse = client().admin().indices().prepareFlush("test").get();
assertBlocked(flushResponse);
} finally { } finally {
disableIndexBlock("test", blockSetting); disableIndexBlock("test", blockSetting);
} }
@ -74,7 +75,7 @@ public class FlushBlocksIT extends ESIntegTestCase {
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards)); assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
setClusterReadOnly(true); setClusterReadOnly(true);
assertBlocked(client().admin().indices().prepareFlush()); assertBlocked(client().admin().indices().prepareFlush().get());
} finally { } finally {
setClusterReadOnly(false); setClusterReadOnly(false);
} }

View File

@ -74,7 +74,7 @@ public class OptimizeBlocksIT extends ESIntegTestCase {
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards)); assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
setClusterReadOnly(true); setClusterReadOnly(true);
assertBlocked(client().admin().indices().prepareFlush()); assertBlocked(client().admin().indices().prepareOptimize());
} finally { } finally {
setClusterReadOnly(false); setClusterReadOnly(false);
} }

View File

@ -57,7 +57,7 @@ public class RefreshBlocksIT extends ESIntegTestCase {
for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) { for (String blockSetting : Arrays.asList(SETTING_READ_ONLY, SETTING_BLOCKS_METADATA)) {
try { try {
enableIndexBlock("test", blockSetting); enableIndexBlock("test", blockSetting);
assertBlocked(client().admin().indices().prepareRefresh("test")); assertBlocked(client().admin().indices().prepareRefresh("test").get());
} finally { } finally {
disableIndexBlock("test", blockSetting); disableIndexBlock("test", blockSetting);
} }
@ -70,7 +70,7 @@ public class RefreshBlocksIT extends ESIntegTestCase {
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards)); assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
setClusterReadOnly(true); setClusterReadOnly(true);
assertBlocked(client().admin().indices().prepareRefresh()); assertBlocked(client().admin().indices().prepareRefresh().get());
} finally { } finally {
setClusterReadOnly(false); setClusterReadOnly(false);
} }

View File

@ -0,0 +1,315 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.TransportFlushAction;
import org.elasticsearch.action.admin.indices.flush.TransportShardFlushAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.local.LocalTransport;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.*;
import static org.hamcrest.Matchers.*;
public class BroadcastReplicationTests extends ESTestCase {
private static ThreadPool threadPool;
private TestClusterService clusterService;
private TransportService transportService;
private LocalTransport transport;
private TestBroadcastReplicationAction broadcastReplicationAction;
@BeforeClass
public static void beforeClass() {
threadPool = new ThreadPool("BroadcastReplicationTests");
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
transport = new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry());
clusterService = new TestClusterService(threadPool);
transportService = new TransportService(transport, threadPool);
transportService.start();
broadcastReplicationAction = new TestBroadcastReplicationAction(Settings.EMPTY, threadPool, clusterService, transportService, new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY), null);
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}
@Test
public void testNotStartedPrimary() throws InterruptedException, ExecutionException, IOException {
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
clusterService.setState(state(index, randomBoolean(),
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index)));
for (Tuple<ShardId, ActionListener<ActionWriteResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
shardRequests.v2().onFailure(new UnavailableShardsException(shardId, "test exception expected"));
}
response.get();
logger.info("total shards: {}, ", response.get().getTotalShards());
// we expect no failures here because UnavailableShardsException does not count as failed
assertBroadcastResponse(2, 0, 0, response.get(), null);
}
@Test
public void testStartedPrimary() throws InterruptedException, ExecutionException, IOException {
final String index = "test";
clusterService.setState(state(index, randomBoolean(),
ShardRoutingState.STARTED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index)));
for (Tuple<ShardId, ActionListener<ActionWriteResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
ActionWriteResponse actionWriteResponse = new ActionWriteResponse();
actionWriteResponse.setShardInfo(new ActionWriteResponse.ShardInfo(1, 1, new ActionWriteResponse.ShardInfo.Failure[0]));
shardRequests.v2().onResponse(actionWriteResponse);
}
logger.info("total shards: {}, ", response.get().getTotalShards());
assertBroadcastResponse(1, 1, 0, response.get(), null);
}
@Test
public void testResultCombine() throws InterruptedException, ExecutionException, IOException {
final String index = "test";
int numShards = randomInt(3);
clusterService.setState(stateWithAssignedPrimariesAndOneReplica(index, numShards));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index)));
int succeeded = 0;
int failed = 0;
for (Tuple<ShardId, ActionListener<ActionWriteResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
if (randomBoolean()) {
ActionWriteResponse.ShardInfo.Failure[] failures = new ActionWriteResponse.ShardInfo.Failure[0];
int shardsSucceeded = randomInt(1) + 1;
succeeded += shardsSucceeded;
ActionWriteResponse actionWriteResponse = new ActionWriteResponse();
if (shardsSucceeded == 1 && randomBoolean()) {
//sometimes add failure (no failure means shard unavailable)
failures = new ActionWriteResponse.ShardInfo.Failure[1];
failures[0] = new ActionWriteResponse.ShardInfo.Failure(index, shardRequests.v1().id(), null, new Exception("pretend shard failed"), RestStatus.GATEWAY_TIMEOUT, false);
failed++;
}
actionWriteResponse.setShardInfo(new ActionWriteResponse.ShardInfo(2, shardsSucceeded, failures));
shardRequests.v2().onResponse(actionWriteResponse);
} else {
// sometimes fail
failed += 2;
// just add a general exception and see if failed shards will be incremented by 2
shardRequests.v2().onFailure(new Exception("pretend shard failed"));
}
}
assertBroadcastResponse(2 * numShards, succeeded, failed, response.get(), Exception.class);
}
@Test
public void testNoShards() throws InterruptedException, ExecutionException, IOException {
clusterService.setState(stateWithNoShard());
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
BroadcastResponse response = executeAndAssertImmediateResponse(broadcastReplicationAction, new BroadcastRequest());
assertBroadcastResponse(0, 0, 0, response, null);
}
@Test
public void testShardsList() throws InterruptedException, ExecutionException {
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
ClusterState clusterState = state(index, randomBoolean(),
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED);
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
List<ShardId> shards = broadcastReplicationAction.shards(new BroadcastRequest().indices(shardId.index().name()), clusterState);
assertThat(shards.size(), equalTo(1));
assertThat(shards.get(0), equalTo(shardId));
}
private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction<BroadcastRequest, BroadcastResponse, ReplicationRequest, ActionWriteResponse> {
protected final Set<Tuple<ShardId, ActionListener<ActionWriteResponse>>> capturedShardRequests = ConcurrentCollections.newConcurrentSet();
public TestBroadcastReplicationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportReplicationAction replicatedBroadcastShardAction) {
super("test-broadcast-replication-action", BroadcastRequest.class, settings, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, replicatedBroadcastShardAction);
}
@Override
protected ActionWriteResponse newShardResponse() {
return new ActionWriteResponse();
}
@Override
protected ReplicationRequest newShardRequest(BroadcastRequest request, ShardId shardId) {
return new ReplicationRequest().setShardId(shardId);
}
@Override
protected BroadcastResponse newResponse(int successfulShards, int failedShards, int totalNumCopies, List shardFailures) {
return new BroadcastResponse(totalNumCopies, successfulShards, failedShards, shardFailures);
}
@Override
protected void shardExecute(BroadcastRequest request, ShardId shardId, ActionListener<ActionWriteResponse> shardActionListener) {
capturedShardRequests.add(new Tuple<>(shardId, shardActionListener));
}
protected void clearCapturedRequests() {
capturedShardRequests.clear();
}
}
public FlushResponse assertImmediateResponse(String index, TransportFlushAction flushAction) throws InterruptedException, ExecutionException {
Date beginDate = new Date();
FlushResponse flushResponse = flushAction.execute(new FlushRequest(index)).get();
Date endDate = new Date();
long maxTime = 500;
assertThat("this should not take longer than " + maxTime + " ms. The request hangs somewhere", endDate.getTime() - beginDate.getTime(), lessThanOrEqualTo(maxTime));
return flushResponse;
}
@Test
public void testTimeoutFlush() throws ExecutionException, InterruptedException {
final String index = "test";
clusterService.setState(state(index, randomBoolean(),
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
TransportShardFlushAction shardFlushAction = new TransportShardFlushAction(Settings.EMPTY, transportService, clusterService,
null, threadPool, null,
null, new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY));
TransportFlushAction flushAction = new TransportFlushAction(Settings.EMPTY, threadPool, clusterService,
transportService, new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY),
shardFlushAction);
FlushResponse flushResponse = (FlushResponse) executeAndAssertImmediateResponse(flushAction, new FlushRequest(index));
logger.info("total shards: {}, ", flushResponse.getTotalShards());
assertBroadcastResponse(2, 0, 0, flushResponse, UnavailableShardsException.class);
ClusterBlocks.Builder block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
assertFailure("all shards should fail with cluster block", executeAndAssertImmediateResponse(flushAction, new FlushRequest(index)), ClusterBlockException.class);
block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
assertFailure("all shards should fail with cluster block", executeAndAssertImmediateResponse(flushAction, new FlushRequest(index)), ClusterBlockException.class);
block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
assertFailure("all shards should fail with cluster block", executeAndAssertImmediateResponse(flushAction, new FlushRequest(index)), ClusterBlockException.class);
}
void assertFailure(String msg, BroadcastResponse broadcastResponse, Class<?> klass) throws InterruptedException {
assertThat(broadcastResponse.getSuccessfulShards(), equalTo(0));
assertThat(broadcastResponse.getTotalShards(), equalTo(broadcastResponse.getFailedShards()));
for (int i = 0; i < broadcastResponse.getFailedShards(); i++) {
assertThat(msg, broadcastResponse.getShardFailures()[i].getCause().getCause(), instanceOf(klass));
}
}
@Test
public void testTimeoutRefresh() throws ExecutionException, InterruptedException {
final String index = "test";
clusterService.setState(state(index, randomBoolean(),
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
TransportShardRefreshAction shardrefreshAction = new TransportShardRefreshAction(Settings.EMPTY, transportService, clusterService,
null, threadPool, null,
null, new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY));
TransportRefreshAction refreshAction = new TransportRefreshAction(Settings.EMPTY, threadPool, clusterService,
transportService, new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY),
shardrefreshAction);
RefreshResponse refreshResponse = (RefreshResponse) executeAndAssertImmediateResponse(refreshAction, new RefreshRequest(index));
assertBroadcastResponse(2, 0, 0, refreshResponse, UnavailableShardsException.class);
ClusterBlocks.Builder block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
assertFailure("all shards should fail with cluster block", executeAndAssertImmediateResponse(refreshAction, new RefreshRequest(index)), ClusterBlockException.class);
block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
assertFailure("all shards should fail with cluster block", executeAndAssertImmediateResponse(refreshAction, new RefreshRequest(index)), ClusterBlockException.class);
block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
assertFailure("all shards should fail with cluster block", executeAndAssertImmediateResponse(refreshAction, new RefreshRequest(index)), ClusterBlockException.class);
}
public BroadcastResponse executeAndAssertImmediateResponse(TransportBroadcastReplicationAction broadcastAction, BroadcastRequest request) throws InterruptedException, ExecutionException {
return (BroadcastResponse) broadcastAction.execute(request).actionGet("5s");
}
private void assertBroadcastResponse(int total, int successful, int failed, BroadcastResponse response, Class exceptionClass) {
assertThat(response.getSuccessfulShards(), equalTo(successful));
assertThat(response.getTotalShards(), equalTo(total));
assertThat(response.getFailedShards(), equalTo(failed));
for (int i = 0; i < failed; i++) {
assertThat(response.getShardFailures()[0].getCause().getCause(), instanceOf(exceptionClass));
}
}
}

View File

@ -0,0 +1,230 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.index.shard.ShardId;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
/**
* Helper methods for generating cluster states
*/
public class ClusterStateCreationUtils {
/**
* Creates cluster state with and index that has one shard and #(replicaStates) replicas
*
* @param index name of the index
* @param primaryLocal if primary should coincide with the local node in the cluster state
* @param primaryState state of primary
* @param replicaStates states of the replicas. length of this array determines also the number of replicas
*/
public static ClusterState state(String index, boolean primaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) {
final int numberOfReplicas = replicaStates.length;
int numberOfNodes = numberOfReplicas + 1;
if (primaryState == ShardRoutingState.RELOCATING) {
numberOfNodes++;
}
for (ShardRoutingState state : replicaStates) {
if (state == ShardRoutingState.RELOCATING) {
numberOfNodes++;
}
}
numberOfNodes = Math.max(2, numberOfNodes); // we need a non-local master to test shard failures
final ShardId shardId = new ShardId(index, 0);
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
Set<String> unassignedNodes = new HashSet<>();
for (int i = 0; i < numberOfNodes + 1; i++) {
final DiscoveryNode node = newNode(i);
discoBuilder = discoBuilder.put(node);
unassignedNodes.add(node.id());
}
discoBuilder.localNodeId(newNode(0).id());
discoBuilder.masterNodeId(newNode(1).id()); // we need a non-local master to test shard failures
IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(SETTING_CREATION_DATE, System.currentTimeMillis())).build();
RoutingTable.Builder routing = new RoutingTable.Builder();
routing.addAsNew(indexMetaData);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
String primaryNode = null;
String relocatingNode = null;
UnassignedInfo unassignedInfo = null;
if (primaryState != ShardRoutingState.UNASSIGNED) {
if (primaryLocal) {
primaryNode = newNode(0).id();
unassignedNodes.remove(primaryNode);
} else {
primaryNode = selectAndRemove(unassignedNodes);
}
if (primaryState == ShardRoutingState.RELOCATING) {
relocatingNode = selectAndRemove(unassignedNodes);
}
} else {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
}
indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, 0, primaryNode, relocatingNode, null, true, primaryState, 0, unassignedInfo));
for (ShardRoutingState replicaState : replicaStates) {
String replicaNode = null;
relocatingNode = null;
unassignedInfo = null;
if (replicaState != ShardRoutingState.UNASSIGNED) {
assert primaryNode != null : "a replica is assigned but the primary isn't";
replicaNode = selectAndRemove(unassignedNodes);
if (replicaState == ShardRoutingState.RELOCATING) {
relocatingNode = selectAndRemove(unassignedNodes);
}
} else {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
}
indexShardRoutingBuilder.addShard(
TestShardRouting.newShardRouting(index, shardId.id(), replicaNode, relocatingNode, null, false, replicaState, 0, unassignedInfo));
}
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded());
state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addIndexShard(indexShardRoutingBuilder.build())));
return state.build();
}
/**
* Creates cluster state with several shards and one replica and all shards STARTED.
*/
public static ClusterState stateWithAssignedPrimariesAndOneReplica(String index, int numberOfShards) {
int numberOfNodes = 2; // we need a non-local master to test shard failures
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
for (int i = 0; i < numberOfNodes + 1; i++) {
final DiscoveryNode node = newNode(i);
discoBuilder = discoBuilder.put(node);
}
discoBuilder.localNodeId(newNode(0).id());
discoBuilder.masterNodeId(newNode(1).id()); // we need a non-local master to test shard failures
IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(SETTING_CREATION_DATE, System.currentTimeMillis())).build();
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded());
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
for (int i = 0; i < numberOfShards; i++) {
RoutingTable.Builder routing = new RoutingTable.Builder();
routing.addAsNew(indexMetaData);
final ShardId shardId = new ShardId(index, i);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(0).id(), null, null, true, ShardRoutingState.STARTED, 0, null));
indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(1).id(), null, null, false, ShardRoutingState.STARTED, 0, null));
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build());
}
state.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder));
return state.build();
}
/**
* Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas.
* Primary will be STARTED in cluster state but replicas will be one of UNASSIGNED, INITIALIZING, STARTED or RELOCATING.
*
* @param index name of the index
* @param primaryLocal if primary should coincide with the local node in the cluster state
* @param numberOfReplicas number of replicas
*/
public static ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int numberOfReplicas) {
int assignedReplicas = randomIntBetween(0, numberOfReplicas);
return stateWithStartedPrimary(index, primaryLocal, assignedReplicas, numberOfReplicas - assignedReplicas);
}
/**
* Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas.
* Primary will be STARTED in cluster state. Some (unassignedReplicas) will be UNASSIGNED and
* some (assignedReplicas) will be one of INITIALIZING, STARTED or RELOCATING.
*
* @param index name of the index
* @param primaryLocal if primary should coincide with the local node in the cluster state
* @param assignedReplicas number of replicas that should have INITIALIZING, STARTED or RELOCATING state
* @param unassignedReplicas number of replicas that should be unassigned
*/
public static ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int assignedReplicas, int unassignedReplicas) {
ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas];
// no point in randomizing - node assignment later on does it too.
for (int i = 0; i < assignedReplicas; i++) {
replicaStates[i] = randomFrom(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING);
}
for (int i = assignedReplicas; i < replicaStates.length; i++) {
replicaStates[i] = ShardRoutingState.UNASSIGNED;
}
return state(index, primaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates);
}
/**
* Creates a cluster state with no index
*/
public static ClusterState stateWithNoShard() {
int numberOfNodes = 2;
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
Set<String> unassignedNodes = new HashSet<>();
for (int i = 0; i < numberOfNodes + 1; i++) {
final DiscoveryNode node = newNode(i);
discoBuilder = discoBuilder.put(node);
unassignedNodes.add(node.id());
}
discoBuilder.localNodeId(newNode(0).id());
discoBuilder.masterNodeId(newNode(1).id());
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metaData(MetaData.builder().generateClusterUuidIfNeeded());
state.routingTable(RoutingTable.builder());
return state.build();
}
private static DiscoveryNode newNode(int nodeId) {
return new DiscoveryNode("node_" + nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT);
}
static private String selectAndRemove(Set<String> strings) {
String selection = randomFrom(strings.toArray(new String[strings.size()]));
strings.remove(selection);
return selection;
}
}

View File

@ -77,6 +77,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
@ -98,6 +100,7 @@ public class ShardReplicationTests extends ESTestCase {
threadPool = new ThreadPool("ShardReplicationTests"); threadPool = new ThreadPool("ShardReplicationTests");
} }
@Override
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
@ -161,103 +164,6 @@ public class ShardReplicationTests extends ESTestCase {
assertEquals(1, count.get()); assertEquals(1, count.get());
} }
ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int numberOfReplicas) {
int assignedReplicas = randomIntBetween(0, numberOfReplicas);
return stateWithStartedPrimary(index, primaryLocal, assignedReplicas, numberOfReplicas - assignedReplicas);
}
ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int assignedReplicas, int unassignedReplicas) {
ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas];
// no point in randomizing - node assignment later on does it too.
for (int i = 0; i < assignedReplicas; i++) {
replicaStates[i] = randomFrom(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING);
}
for (int i = assignedReplicas; i < replicaStates.length; i++) {
replicaStates[i] = ShardRoutingState.UNASSIGNED;
}
return state(index, primaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates);
}
ClusterState state(String index, boolean primaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) {
final int numberOfReplicas = replicaStates.length;
int numberOfNodes = numberOfReplicas + 1;
if (primaryState == ShardRoutingState.RELOCATING) {
numberOfNodes++;
}
for (ShardRoutingState state : replicaStates) {
if (state == ShardRoutingState.RELOCATING) {
numberOfNodes++;
}
}
numberOfNodes = Math.max(2, numberOfNodes); // we need a non-local master to test shard failures
final ShardId shardId = new ShardId(index, 0);
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
Set<String> unassignedNodes = new HashSet<>();
for (int i = 0; i < numberOfNodes + 1; i++) {
final DiscoveryNode node = newNode(i);
discoBuilder = discoBuilder.put(node);
unassignedNodes.add(node.id());
}
discoBuilder.localNodeId(newNode(0).id());
discoBuilder.masterNodeId(newNode(1).id()); // we need a non-local master to test shard failures
IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(SETTING_CREATION_DATE, System.currentTimeMillis())).build();
RoutingTable.Builder routing = new RoutingTable.Builder();
routing.addAsNew(indexMetaData);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
String primaryNode = null;
String relocatingNode = null;
UnassignedInfo unassignedInfo = null;
if (primaryState != ShardRoutingState.UNASSIGNED) {
if (primaryLocal) {
primaryNode = newNode(0).id();
unassignedNodes.remove(primaryNode);
} else {
primaryNode = selectAndRemove(unassignedNodes);
}
if (primaryState == ShardRoutingState.RELOCATING) {
relocatingNode = selectAndRemove(unassignedNodes);
}
} else {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
}
indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, 0, primaryNode, relocatingNode, null, true, primaryState, 0, unassignedInfo));
for (ShardRoutingState replicaState : replicaStates) {
String replicaNode = null;
relocatingNode = null;
unassignedInfo = null;
if (replicaState != ShardRoutingState.UNASSIGNED) {
assert primaryNode != null : "a replica is assigned but the primary isn't";
replicaNode = selectAndRemove(unassignedNodes);
if (replicaState == ShardRoutingState.RELOCATING) {
relocatingNode = selectAndRemove(unassignedNodes);
}
} else {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
}
indexShardRoutingBuilder.addShard(
TestShardRouting.newShardRouting(index, shardId.id(), replicaNode, relocatingNode, null, false, replicaState, 0, unassignedInfo));
}
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded());
state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addIndexShard(indexShardRoutingBuilder.build())));
return state.build();
}
private String selectAndRemove(Set<String> strings) {
String selection = randomFrom(strings.toArray(new String[strings.size()]));
strings.remove(selection);
return selection;
}
@Test @Test
public void testNotStartedPrimary() throws InterruptedException, ExecutionException { public void testNotStartedPrimary() throws InterruptedException, ExecutionException {
final String index = "test"; final String index = "test";
@ -527,6 +433,7 @@ public class ShardReplicationTests extends ESTestCase {
action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool); action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
final TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); final TransportReplicationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
Thread t = new Thread() { Thread t = new Thread() {
@Override
public void run() { public void run() {
primaryPhase.run(); primaryPhase.run();
} }
@ -587,6 +494,7 @@ public class ShardReplicationTests extends ESTestCase {
action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool); action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
Thread t = new Thread() { Thread t = new Thread() {
@Override
public void run() { public void run() {
try { try {
replicaOperationTransportHandler.messageReceived(new Request(), createTransportChannel()); replicaOperationTransportHandler.messageReceived(new Request(), createTransportChannel());
@ -746,10 +654,6 @@ public class ShardReplicationTests extends ESTestCase {
} }
} }
static DiscoveryNode newNode(int nodeId) {
return new DiscoveryNode("node_" + nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT);
}
/* /*
* Throws exceptions when executed. Used for testing if the counter is correctly decremented in case an operation fails. * Throws exceptions when executed. Used for testing if the counter is correctly decremented in case an operation fails.
* */ * */

View File

@ -432,12 +432,12 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
* state processing when a recover starts and only unblocking it shortly after the node receives * state processing when a recover starts and only unblocking it shortly after the node receives
* the ShardActiveRequest. * the ShardActiveRequest.
*/ */
static class ReclocationStartEndTracer extends MockTransportService.Tracer { public static class ReclocationStartEndTracer extends MockTransportService.Tracer {
private final ESLogger logger; private final ESLogger logger;
private final CountDownLatch beginRelocationLatch; private final CountDownLatch beginRelocationLatch;
private final CountDownLatch receivedShardExistsRequestLatch; private final CountDownLatch receivedShardExistsRequestLatch;
ReclocationStartEndTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) { public ReclocationStartEndTracer(ESLogger logger, CountDownLatch beginRelocationLatch, CountDownLatch receivedShardExistsRequestLatch) {
this.logger = logger; this.logger = logger;
this.beginRelocationLatch = beginRelocationLatch; this.beginRelocationLatch = beginRelocationLatch;
this.receivedShardExistsRequestLatch = receivedShardExistsRequestLatch; this.receivedShardExistsRequestLatch = receivedShardExistsRequestLatch;

View File

@ -25,6 +25,7 @@ import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
@ -55,6 +56,7 @@ public class TestClusterService implements ClusterService {
private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue(); private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ESLogger logger = Loggers.getLogger(getClass(), Settings.EMPTY); private final ESLogger logger = Loggers.getLogger(getClass(), Settings.EMPTY);
private final OperationRouting operationRouting = new OperationRouting(Settings.Builder.EMPTY_SETTINGS, new AwarenessAllocationDecider());
public TestClusterService() { public TestClusterService() {
this(ClusterState.builder(new ClusterName("test")).build()); this(ClusterState.builder(new ClusterName("test")).build());
@ -129,7 +131,7 @@ public class TestClusterService implements ClusterService {
@Override @Override
public OperationRouting operationRouting() { public OperationRouting operationRouting() {
return null; return operationRouting;
} }
@Override @Override

View File

@ -67,6 +67,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.rest.client.http.HttpResponse; import org.elasticsearch.test.rest.client.http.HttpResponse;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
@ -126,6 +127,22 @@ public class ElasticsearchAssertions {
assertBlocked(builder, null); assertBlocked(builder, null);
} }
/**
* Checks that all shard requests of a replicated brodcast request failed due to a cluster block
*
* @param replicatedBroadcastResponse the response that should only contain failed shard responses
*
* */
public static void assertBlocked(BroadcastResponse replicatedBroadcastResponse) {
assertThat("all shard requests should have failed", replicatedBroadcastResponse.getFailedShards(), Matchers.equalTo(replicatedBroadcastResponse.getTotalShards()));
for (ShardOperationFailedException exception : replicatedBroadcastResponse.getShardFailures()) {
ClusterBlockException clusterBlockException = (ClusterBlockException) ExceptionsHelper.unwrap(exception.getCause(), ClusterBlockException.class);
assertNotNull("expected the cause of failure to be a ClusterBlockException but got " + exception.getCause().getMessage(), clusterBlockException);
assertThat(clusterBlockException.blocks().size(), greaterThan(0));
assertThat(clusterBlockException.status(), CoreMatchers.equalTo(RestStatus.FORBIDDEN));
}
}
/** /**
* Executes the request and fails if the request has not been blocked by a specific {@link ClusterBlock}. * Executes the request and fails if the request has not been blocked by a specific {@link ClusterBlock}.
* *