Master based operations (create index, delete index) to automatically retry on retryable cluster blocks (like recovery from gateway)

This commit is contained in:
kimchy 2010-12-31 14:09:45 +02:00
parent a92dbc537a
commit 7c959e7ec3
17 changed files with 142 additions and 43 deletions

View File

@ -120,6 +120,7 @@
<w>reparse</w>
<w>reparsed</w>
<w>retrans</w>
<w>retryable</w>
<w>retval</w>
<w>routings</w>
<w>rsts</w>

View File

@ -24,14 +24,17 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@ -60,10 +63,12 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationA
return new IndicesAliasesResponse();
}
@Override protected void checkBlock(IndicesAliasesRequest request, ClusterState state) {
@Override protected ClusterBlockException checkBlock(IndicesAliasesRequest request, ClusterState state) {
Set<String> indices = Sets.newHashSet();
for (AliasAction aliasAction : request.aliasActions()) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, aliasAction.index());
indices.add(aliasAction.index());
}
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, indices.toArray(new String[indices.size()]));
}
@Override protected IndicesAliasesResponse masterOperation(IndicesAliasesRequest request, ClusterState state) throws ElasticSearchException {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataStateIndexService;
import org.elasticsearch.common.inject.Inject;
@ -61,8 +62,8 @@ public class TransportCloseIndexAction extends TransportMasterNodeOperationActio
return new CloseIndexResponse();
}
@Override protected void checkBlock(CloseIndexRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, request.index());
@Override protected ClusterBlockException checkBlock(CloseIndexRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA, request.index());
}
@Override protected CloseIndexResponse masterOperation(CloseIndexRequest request, ClusterState state) throws ElasticSearchException {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.common.inject.Inject;
@ -61,8 +62,8 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi
return new CreateIndexResponse();
}
@Override protected void checkBlock(CreateIndexRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, request.index());
@Override protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA, request.index());
}
@Override protected CreateIndexResponse masterOperation(CreateIndexRequest request, ClusterState state) throws ElasticSearchException {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.common.inject.Inject;
@ -61,8 +62,8 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi
return new DeleteIndexResponse();
}
@Override protected void checkBlock(DeleteIndexRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, request.index());
@Override protected ClusterBlockException checkBlock(DeleteIndexRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA, request.index());
}
@Override protected DeleteIndexResponse masterOperation(DeleteIndexRequest request, ClusterState state) throws ElasticSearchException {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.common.inject.Inject;
@ -78,13 +79,14 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
return new DeleteMappingResponse();
}
@Override protected void checkBlock(DeleteMappingRequest request, ClusterState state) {
@Override protected void doExecute(DeleteMappingRequest request, ActionListener<DeleteMappingResponse> listener) {
// update to concrete indices
request.indices(state.metaData().concreteIndices(request.indices()));
request.indices(clusterService.state().metaData().concreteIndices(request.indices()));
super.doExecute(request, listener);
}
for (String index : request.indices()) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, index);
}
@Override protected ClusterBlockException checkBlock(DeleteMappingRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, request.indices());
}
@Override protected DeleteMappingResponse masterOperation(final DeleteMappingRequest request, final ClusterState state) throws ElasticSearchException {

View File

@ -20,10 +20,12 @@
package org.elasticsearch.action.admin.indices.mapping.put;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.common.inject.Inject;
@ -62,13 +64,13 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio
return new PutMappingResponse();
}
@Override protected void checkBlock(PutMappingRequest request, ClusterState state) {
// update to concrete indices
request.indices(state.metaData().concreteIndices(request.indices()));
@Override protected void doExecute(PutMappingRequest request, ActionListener<PutMappingResponse> listener) {
request.indices(clusterService.state().metaData().concreteIndices(request.indices()));
super.doExecute(request, listener);
}
for (String index : request.indices()) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, index);
}
@Override protected ClusterBlockException checkBlock(PutMappingRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, request.indices());
}
@Override protected PutMappingResponse masterOperation(PutMappingRequest request, ClusterState state) throws ElasticSearchException {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataStateIndexService;
import org.elasticsearch.common.inject.Inject;
@ -61,8 +62,8 @@ public class TransportOpenIndexAction extends TransportMasterNodeOperationAction
return new OpenIndexResponse();
}
@Override protected void checkBlock(OpenIndexRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, request.index());
@Override protected ClusterBlockException checkBlock(OpenIndexRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA, request.index());
}
@Override protected OpenIndexResponse masterOperation(OpenIndexRequest request, ClusterState state) throws ElasticSearchException {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.common.inject.Inject;
@ -61,8 +62,8 @@ public class TransportDeleteIndexTemplateAction extends TransportMasterNodeOpera
return new DeleteIndexTemplateResponse();
}
@Override protected void checkBlock(DeleteIndexTemplateRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, "");
@Override protected ClusterBlockException checkBlock(DeleteIndexTemplateRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA, "");
}
@Override protected DeleteIndexTemplateResponse masterOperation(DeleteIndexTemplateRequest request, ClusterState state) throws ElasticSearchException {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.common.inject.Inject;
@ -61,8 +62,8 @@ public class TransportPutIndexTemplateAction extends TransportMasterNodeOperatio
return new PutIndexTemplateResponse();
}
@Override protected void checkBlock(PutIndexTemplateRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, "");
@Override protected ClusterBlockException checkBlock(PutIndexTemplateRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA, "");
}
@Override protected PutIndexTemplateResponse masterOperation(PutIndexTemplateRequest request, ClusterState state) throws ElasticSearchException {

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -69,8 +70,8 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
return false;
}
protected void checkBlock(Request request, ClusterState state) {
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return null;
}
protected void processBeforeDelegationToMaster(Request request, ClusterState state) {
@ -85,17 +86,52 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
final ClusterState clusterState = clusterService.state();
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster() || localExecute(request)) {
threadPool.execute(new Runnable() {
@Override public void run() {
try {
checkBlock(request, clusterState);
Response response = masterOperation(request, clusterState);
listener.onResponse(response);
} catch (Exception e) {
listener.onFailure(e);
}
// check for block, if blocked, retry, else, execute locally
final ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException != null) {
if (!blockException.retryable()) {
listener.onFailure(blockException);
return;
}
});
clusterService.add(request.masterNodeTimeout(), new TimeoutClusterStateListener() {
@Override public void postAdded() {
ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException == null || !blockException.retryable()) {
clusterService.remove(this);
innerExecute(request, listener, false);
}
}
@Override public void onClose() {
clusterService.remove(this);
listener.onFailure(blockException);
}
@Override public void onTimeout(TimeValue timeout) {
clusterService.remove(this);
listener.onFailure(blockException);
}
@Override public void clusterChanged(ClusterChangedEvent event) {
ClusterBlockException blockException = checkBlock(request, event.state());
if (blockException == null || !blockException.retryable()) {
clusterService.remove(this);
innerExecute(request, listener, false);
}
}
});
} else {
threadPool.execute(new Runnable() {
@Override public void run() {
try {
Response response = masterOperation(request, clusterState);
listener.onResponse(response);
} catch (Exception e) {
listener.onFailure(e);
}
}
});
}
} else {
if (nodes.masterNode() == null) {
if (retrying) {

View File

@ -39,12 +39,15 @@ public class ClusterBlock implements Serializable, Streamable, ToXContent {
private ClusterBlockLevel[] levels;
private boolean retryable;
private ClusterBlock() {
}
public ClusterBlock(int id, String description, ClusterBlockLevel... levels) {
public ClusterBlock(int id, String description, boolean retryable, ClusterBlockLevel... levels) {
this.id = id;
this.description = description;
this.retryable = retryable;
this.levels = levels;
}
@ -69,9 +72,14 @@ public class ClusterBlock implements Serializable, Streamable, ToXContent {
return false;
}
public boolean retryable() {
return this.retryable;
}
@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Integer.toString(id));
builder.field("description", description);
builder.field("retryable", retryable);
builder.startArray("levels");
for (ClusterBlockLevel level : levels) {
builder.value(level.name().toLowerCase());
@ -93,6 +101,7 @@ public class ClusterBlock implements Serializable, Streamable, ToXContent {
for (int i = 0; i < levels.length; i++) {
levels[i] = ClusterBlockLevel.fromId(in.readVInt());
}
retryable = in.readBoolean();
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -102,6 +111,7 @@ public class ClusterBlock implements Serializable, Streamable, ToXContent {
for (ClusterBlockLevel level : levels) {
out.writeVInt(level.id());
}
out.writeBoolean(retryable);
}
public String toString() {

View File

@ -34,6 +34,15 @@ public class ClusterBlockException extends ElasticSearchException {
this.blocks = blocks;
}
public boolean retryable() {
for (ClusterBlock block : blocks) {
if (!block.retryable()) {
return false;
}
}
return true;
}
public ImmutableSet<ClusterBlock> blocks() {
return blocks;
}

View File

@ -100,8 +100,15 @@ public class ClusterBlocks {
}
public void indexBlockedRaiseException(ClusterBlockLevel level, String index) throws ClusterBlockException {
ClusterBlockException blockException = indexBlockedException(level, index);
if (blockException != null) {
throw blockException;
}
}
public ClusterBlockException indexBlockedException(ClusterBlockLevel level, String index) {
if (!indexBlocked(level, index)) {
return;
return null;
}
ImmutableSet.Builder<ClusterBlock> builder = ImmutableSet.builder();
builder.addAll(global(level));
@ -109,7 +116,7 @@ public class ClusterBlocks {
if (indexBlocks != null) {
builder.addAll(indexBlocks);
}
throw new ClusterBlockException(builder.build());
return new ClusterBlockException(builder.build());
}
public boolean indexBlocked(ClusterBlockLevel level, String index) {
@ -123,6 +130,27 @@ public class ClusterBlocks {
return false;
}
public ClusterBlockException indicesBlockedException(ClusterBlockLevel level, String[] indices) {
boolean indexIsBlocked = false;
for (String index : indices) {
if (indexBlocked(level, index)) {
indexIsBlocked = true;
}
}
if (!indexIsBlocked) {
return null;
}
ImmutableSet.Builder<ClusterBlock> builder = ImmutableSet.builder();
builder.addAll(global(level));
for (String index : indices) {
ImmutableSet<ClusterBlock> indexBlocks = indices(level).get(index);
if (indexBlocks != null) {
builder.addAll(indexBlocks);
}
}
return new ClusterBlockException(builder.build());
}
static class ImmutableLevelHolder {
static final ImmutableLevelHolder EMPTY = new ImmutableLevelHolder(ImmutableSet.<ClusterBlock>of(), ImmutableMap.<String, ImmutableSet<ClusterBlock>>of());

View File

@ -39,7 +39,7 @@ import org.elasticsearch.indices.IndexMissingException;
*/
public class MetaDataStateIndexService extends AbstractComponent {
public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", ClusterBlockLevel.READ_WRITE);
public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false, ClusterBlockLevel.READ_WRITE);
private final ClusterService clusterService;

View File

@ -34,7 +34,7 @@ import org.elasticsearch.common.component.LifecycleComponent;
*/
public interface Discovery extends LifecycleComponent<Discovery> {
final ClusterBlock NO_MASTER_BLOCK = new ClusterBlock(2, "no master", ClusterBlockLevel.ALL);
final ClusterBlock NO_MASTER_BLOCK = new ClusterBlock(2, "no master", true, ClusterBlockLevel.ALL);
DiscoveryNode localNode();

View File

@ -47,7 +47,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*;
*/
public class GatewayService extends AbstractLifecycleComponent<GatewayService> implements ClusterStateListener {
public static final ClusterBlock NOT_RECOVERED_FROM_GATEWAY_BLOCK = new ClusterBlock(1, "not recovered from gateway", ClusterBlockLevel.ALL);
public static final ClusterBlock NOT_RECOVERED_FROM_GATEWAY_BLOCK = new ClusterBlock(1, "not recovered from gateway", true, ClusterBlockLevel.ALL);
private final Gateway gateway;