Add missing cluster blocks handling for master operations

Master node related operations were missing proper handling of cluster blocks, allowing for example to perform cluster level update settings even before the state was fully restored on initial cluster startup

Note, the change allows to change read only related settings without checking for blocks on update settings, as without it, it means one can't re-enable metadata/write. Also, it doesn't check for blocks on cluster state and health API, as those are allowed to be used even when blocked to figure out what causes the block.
closes #7763
closes #7740
This commit is contained in:
Shay Banon 2014-09-17 10:17:25 +02:00
parent a2d07058e8
commit b75d1d885a
25 changed files with 292 additions and 118 deletions

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -59,6 +60,11 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadOperati
return ThreadPool.Names.GENERIC;
}
@Override
protected ClusterBlockException checkBlock(ClusterHealthRequest request, ClusterState state) {
return null; // we want users to be able to call this even when there are global blocks, just to check the health (are there blocks?)
}
@Override
protected ClusterHealthRequest newRequest() {
return new ClusterHealthRequest();

View File

@ -29,6 +29,8 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -71,6 +73,11 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
return ThreadPool.Names.GENERIC;
}
@Override
protected ClusterBlockException checkBlock(NodesShutdownRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected NodesShutdownRequest newRequest() {
return new NodesShutdownRequest();

View File

@ -26,6 +26,8 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
@ -54,6 +56,11 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkBlock(ClusterRerouteRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected ClusterRerouteRequest newRequest() {
return new ClusterRerouteRequest();

View File

@ -26,6 +26,8 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -67,6 +69,17 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkBlock(ClusterUpdateSettingsRequest request, ClusterState state) {
// allow for dedicated changes to the metadata blocks, so we don't block those to allow to "re-enable" it
if ((request.transientSettings().getAsMap().isEmpty() && request.persistentSettings().getAsMap().size() == 1 && request.persistentSettings().get(MetaData.SETTING_READ_ONLY) != null) ||
request.persistentSettings().getAsMap().isEmpty() && request.transientSettings().getAsMap().size() == 1 && request.transientSettings().get(MetaData.SETTING_READ_ONLY) != null) {
return null;
}
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected ClusterUpdateSettingsRequest newRequest() {
return new ClusterUpdateSettingsRequest();

View File

@ -25,6 +25,8 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
@ -54,6 +56,11 @@ public class TransportClusterSearchShardsAction extends TransportMasterNodeReadO
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected ClusterSearchShardsRequest newRequest() {
return new ClusterSearchShardsRequest();

View File

@ -28,6 +28,8 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.metadata.SnapshotMetaData;
import org.elasticsearch.common.Strings;
@ -66,6 +68,11 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeOperation
return ThreadPool.Names.GENERIC;
}
@Override
protected ClusterBlockException checkBlock(SnapshotsStatusRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected SnapshotsStatusRequest newRequest() {
return new SnapshotsStatusRequest();
@ -105,22 +112,22 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeOperation
transportNodesSnapshotsStatus.status(nodesIds.toArray(new String[nodesIds.size()]),
snapshotIds, request.masterNodeTimeout(), new ActionListener<TransportNodesSnapshotsStatus.NodesSnapshotStatus>() {
@Override
public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) {
try {
ImmutableList<SnapshotMetaData.Entry> currentSnapshots =
snapshotsService.currentSnapshots(request.repository(), request.snapshots());
listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
} catch (Throwable e) {
listener.onFailure(e);
}
}
@Override
public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) {
try {
ImmutableList<SnapshotMetaData.Entry> currentSnapshots =
snapshotsService.currentSnapshots(request.repository(), request.snapshots());
listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses));
} catch (Throwable e) {
listener.onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
} else {
// We don't have any in-progress shards, just return current stats
listener.onResponse(buildResponse(request, currentSnapshots, null));

View File

@ -26,6 +26,8 @@ import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationA
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingTable;
@ -54,6 +56,15 @@ public class TransportClusterStateAction extends TransportMasterNodeReadOperatio
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkBlock(ClusterStateRequest request, ClusterState state) {
// cluster state calls are done also on a fully blocked cluster to figure out what is going
// on in the cluster. For example, which nodes have joined yet the recovery has not yet kicked
// in, we need to make sure we allow those calls
// return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
return null;
}
@Override
protected ClusterStateRequest newRequest() {
return new ClusterStateRequest();

View File

@ -21,10 +21,13 @@ package org.elasticsearch.action.admin.cluster.tasks;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
@ -48,6 +51,11 @@ public class TransportPendingClusterTasksAction extends TransportMasterNodeReadO
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkBlock(PendingClusterTasksRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected PendingClusterTasksRequest newRequest() {
return new PendingClusterTasksRequest();

View File

@ -25,6 +25,8 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
@ -45,6 +47,11 @@ public class TransportAliasesExistAction extends TransportMasterNodeReadOperatio
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkBlock(GetAliasesRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected GetAliasesRequest newRequest() {
return new GetAliasesRequest();

View File

@ -24,6 +24,8 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
@ -48,6 +50,11 @@ public class TransportGetAliasesAction extends TransportMasterNodeReadOperationA
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkBlock(GetAliasesRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected GetAliasesRequest newRequest() {
return new GetAliasesRequest();
@ -62,7 +69,7 @@ public class TransportGetAliasesAction extends TransportMasterNodeReadOperationA
protected void masterOperation(GetAliasesRequest request, ClusterState state, ActionListener<GetAliasesResponse> listener) throws ElasticsearchException {
String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
@SuppressWarnings("unchecked") // ImmutableList to List results incompatible type
ImmutableOpenMap<String, List<AliasMetaData>> result = (ImmutableOpenMap) state.metaData().findAliases(request.aliases(), concreteIndices);
ImmutableOpenMap<String, List<AliasMetaData>> result = (ImmutableOpenMap) state.metaData().findAliases(request.aliases(), concreteIndices);
listener.onResponse(new GetAliasesResponse(result));
}

View File

@ -27,6 +27,8 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.info.TransportClusterInfoAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Strings;
@ -44,10 +46,21 @@ public class TransportGetIndexAction extends TransportClusterInfoAction<GetIndex
@Inject
public TransportGetIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters) {
ThreadPool threadPool, ActionFilters actionFilters) {
super(settings, GetIndexAction.NAME, transportService, clusterService, threadPool, actionFilters);
}
@Override
protected String executor() {
// very lightweight operation, no need to fork
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkBlock(GetIndexRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected GetIndexRequest newRequest() {
return new GetIndexRequest();
@ -60,7 +73,7 @@ public class TransportGetIndexAction extends TransportClusterInfoAction<GetIndex
@Override
protected void doMasterOperation(final GetIndexRequest request, String[] concreteIndices, final ClusterState state,
final ActionListener<GetIndexResponse> listener) throws ElasticsearchException {
final ActionListener<GetIndexResponse> listener) throws ElasticsearchException {
ImmutableOpenMap<String, ImmutableList<Entry>> warmersResult = ImmutableOpenMap.of();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappingsResult = ImmutableOpenMap.of();
ImmutableOpenMap<String, ImmutableList<AliasMetaData>> aliasesResult = ImmutableOpenMap.of();
@ -72,40 +85,40 @@ public class TransportGetIndexAction extends TransportClusterInfoAction<GetIndex
boolean doneWarmers = false;
for (String feature : features) {
switch (feature) {
case "_warmer":
case "_warmers":
if (!doneWarmers) {
warmersResult = state.metaData().findWarmers(concreteIndices, request.types(), Strings.EMPTY_ARRAY);
doneWarmers = true;
}
break;
case "_mapping":
case "_mappings":
if (!doneMappings) {
mappingsResult = state.metaData().findMappings(concreteIndices, request.types());
doneMappings = true;
}
break;
case "_alias":
case "_aliases":
if (!doneAliases) {
aliasesResult = state.metaData().findAliases(Strings.EMPTY_ARRAY, concreteIndices);
doneAliases = true;
}
break;
case "_settings":
if (!doneSettings) {
ImmutableOpenMap.Builder<String, Settings> settingsMapBuilder = ImmutableOpenMap.builder();
for (String index : concreteIndices) {
settingsMapBuilder.put(index, state.metaData().index(index).getSettings());
case "_warmer":
case "_warmers":
if (!doneWarmers) {
warmersResult = state.metaData().findWarmers(concreteIndices, request.types(), Strings.EMPTY_ARRAY);
doneWarmers = true;
}
settings = settingsMapBuilder.build();
doneSettings = true;
}
break;
break;
case "_mapping":
case "_mappings":
if (!doneMappings) {
mappingsResult = state.metaData().findMappings(concreteIndices, request.types());
doneMappings = true;
}
break;
case "_alias":
case "_aliases":
if (!doneAliases) {
aliasesResult = state.metaData().findAliases(Strings.EMPTY_ARRAY, concreteIndices);
doneAliases = true;
}
break;
case "_settings":
if (!doneSettings) {
ImmutableOpenMap.Builder<String, Settings> settingsMapBuilder = ImmutableOpenMap.builder();
for (String index : concreteIndices) {
settingsMapBuilder.put(index, state.metaData().index(index).getSettings());
}
settings = settingsMapBuilder.build();
doneSettings = true;
}
break;
default:
throw new ElasticsearchIllegalStateException("feature [" + feature + "] is not valid");
default:
throw new ElasticsearchIllegalStateException("feature [" + feature + "] is not valid");
}
}
listener.onResponse(new GetIndexResponse(concreteIndices, warmersResult, mappingsResult, aliasesResult, settings));

View File

@ -25,6 +25,8 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.info.TransportClusterInfoAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
@ -41,6 +43,17 @@ public class TransportGetMappingsAction extends TransportClusterInfoAction<GetMa
super(settings, GetMappingsAction.NAME, transportService, clusterService, threadPool, actionFilters);
}
@Override
protected String executor() {
// very lightweight operation, no need to fork
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkBlock(GetMappingsRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected GetMappingsRequest newRequest() {
return new GetMappingsRequest();

View File

@ -21,10 +21,13 @@ package org.elasticsearch.action.admin.indices.settings.get;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
@ -57,6 +60,12 @@ public class TransportGetSettingsAction extends TransportMasterNodeReadOperation
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkBlock(GetSettingsRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected GetSettingsRequest newRequest() {
return new GetSettingsRequest();

View File

@ -26,6 +26,9 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -52,6 +55,19 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeOperationA
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkBlock(UpdateSettingsRequest request, ClusterState state) {
// allow for dedicated changes to the metadata blocks, so we don't block those to allow to "re-enable" it
ClusterBlockException globalBlock = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
if (globalBlock != null) {
return globalBlock;
}
if (request.settings().getAsMap().size() == 1 && (request.settings().get(IndexMetaData.SETTING_BLOCKS_METADATA) != null || request.settings().get(IndexMetaData.SETTING_READ_ONLY) != null )) {
return null;
}
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected UpdateSettingsRequest newRequest() {
return new UpdateSettingsRequest();

View File

@ -26,6 +26,8 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
@ -50,6 +52,11 @@ public class TransportGetIndexTemplatesAction extends TransportMasterNodeReadOpe
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkBlock(GetIndexTemplatesRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected GetIndexTemplatesRequest newRequest() {
return new GetIndexTemplatesRequest();

View File

@ -22,10 +22,13 @@ package org.elasticsearch.action.admin.indices.warmer.get;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.info.TransportClusterInfoAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -45,6 +48,17 @@ public class TransportGetWarmersAction extends TransportClusterInfoAction<GetWar
super(settings, GetWarmersAction.NAME, transportService, clusterService, threadPool, actionFilters);
}
@Override
protected String executor() {
// very lightweight operation, no need to fork
return ThreadPool.Names.SAME;
}
@Override
protected ClusterBlockException checkBlock(GetWarmersRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA, state.metaData().concreteIndices(request.indicesOptions(), request.indices()));
}
@Override
protected GetWarmersRequest newRequest() {
return new GetWarmersRequest();

View File

@ -24,6 +24,8 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
@ -48,6 +50,11 @@ public class TransportAbortBenchmarkAction extends TransportMasterNodeOperationA
return ThreadPool.Names.GENERIC;
}
@Override
protected ClusterBlockException checkBlock(AbortBenchmarkRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected AbortBenchmarkRequest newRequest() {
return new AbortBenchmarkRequest();

View File

@ -24,10 +24,12 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.TransportService;
/**
@ -49,6 +51,11 @@ public class TransportBenchmarkAction extends TransportMasterNodeOperationAction
return ThreadPool.Names.GENERIC;
}
@Override
protected ClusterBlockException checkBlock(BenchmarkRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected BenchmarkRequest newRequest() {
return new BenchmarkRequest();

View File

@ -21,9 +21,12 @@ package org.elasticsearch.action.bench;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
@ -49,6 +52,11 @@ public class TransportBenchmarkStatusAction extends TransportMasterNodeOperation
return ThreadPool.Names.GENERIC;
}
@Override
protected ClusterBlockException checkBlock(BenchmarkStatusRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA);
}
@Override
protected BenchmarkStatusRequest newRequest() {
return new BenchmarkStatusRequest();

View File

@ -70,9 +70,7 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
return false;
}
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return null;
}
protected abstract ClusterBlockException checkBlock(Request request, ClusterState state);
protected void processBeforeDelegationToMaster(Request request, ClusterState state) {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -117,6 +118,12 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
masterMappingUpdater.add(new MappingChange(documentMapper, index, indexUUID, listener));
}
@Override
protected ClusterBlockException checkBlock(MappingUpdatedRequest request, ClusterState state) {
// internal call by other nodes, no need to check for blocks
return null;
}
@Override
protected String executor() {
// we go async right away

View File

@ -69,7 +69,7 @@ public class RestUpdateSettingsAction extends BaseRestHandler {
}
}
for (Map.Entry<String, String> entry : request.params().entrySet()) {
if (entry.getKey().equals("pretty") || entry.getKey().equals("timeout") || entry.getKey().equals("master_timeout")) {
if (entry.getKey().equals("pretty") || entry.getKey().equals("timeout") || entry.getKey().equals("master_timeout") || entry.getKey().equals("index")) {
continue;
}
updateSettings.put(entry.getKey(), entry.getValue());

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -225,75 +226,70 @@ public class BulkProcessorTests extends ElasticsearchIntegrationTest {
@Test
public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception {
createIndex("test-ro");
try {
assertAcked(client().admin().indices().prepareUpdateSettings("test-ro")
.setSettings(ImmutableSettings.builder().put("index.blocks.read_only", true)));
ensureGreen();
assertAcked(client().admin().indices().prepareUpdateSettings("test-ro")
.setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_BLOCKS_WRITE, true)));
ensureGreen();
int bulkActions = randomIntBetween(10, 100);
int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
int concurrentRequests = randomIntBetween(0, 10);
int bulkActions = randomIntBetween(10, 100);
int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
int concurrentRequests = randomIntBetween(0, 10);
int expectedBulkActions = numDocs / bulkActions;
int expectedBulkActions = numDocs / bulkActions;
final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);
final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);
int testDocs = 0;
int testReadOnlyDocs = 0;
MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
int testDocs = 0;
int testReadOnlyDocs = 0;
MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
//set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
try (BulkProcessor processor = BulkProcessor.builder(client(), listener)
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
//set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
testDocs++;
processor.add(new IndexRequest("test", "test", Integer.toString(testDocs)).source("field", "value"));
multiGetRequestBuilder.add("test", "test", Integer.toString(testDocs));
} else {
testReadOnlyDocs++;
processor.add(new IndexRequest("test-ro", "test", Integer.toString(testReadOnlyDocs)).source("field", "value"));
}
}
}
closeLatch.await();
assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertThat(listener.bulkItems.size(), equalTo(testDocs + testReadOnlyDocs));
Set<String> ids = new HashSet<>();
Set<String> readOnlyIds = new HashSet<>();
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
assertThat(bulkItemResponse.getType(), equalTo("test"));
if (bulkItemResponse.getIndex().equals("test")) {
assertThat(bulkItemResponse.isFailed(), equalTo(false));
//with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testDocs)));
//we do want to check that we don't get duplicate ids back
assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
testDocs++;
processor.add(new IndexRequest("test", "test", Integer.toString(testDocs)).source("field", "value"));
multiGetRequestBuilder.add("test", "test", Integer.toString(testDocs));
} else {
assertThat(bulkItemResponse.isFailed(), equalTo(true));
//with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testReadOnlyDocs)));
//we do want to check that we don't get duplicate ids back
assertThat(readOnlyIds.add(bulkItemResponse.getId()), equalTo(true));
testReadOnlyDocs++;
processor.add(new IndexRequest("test-ro", "test", Integer.toString(testReadOnlyDocs)).source("field", "value"));
}
}
assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs);
} finally {
assertAcked(client().admin().indices().prepareUpdateSettings("test-ro")
.setSettings(ImmutableSettings.builder().put("index.blocks.read_only", false)));
}
closeLatch.await();
assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertThat(listener.bulkItems.size(), equalTo(testDocs + testReadOnlyDocs));
Set<String> ids = new HashSet<>();
Set<String> readOnlyIds = new HashSet<>();
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
assertThat(bulkItemResponse.getType(), equalTo("test"));
if (bulkItemResponse.getIndex().equals("test")) {
assertThat(bulkItemResponse.isFailed(), equalTo(false));
//with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testDocs)));
//we do want to check that we don't get duplicate ids back
assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
} else {
assertThat(bulkItemResponse.isFailed(), equalTo(true));
//with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testReadOnlyDocs)));
//we do want to check that we don't get duplicate ids back
assertThat(readOnlyIds.add(bulkItemResponse.getId()), equalTo(true));
}
}
assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs);
}
private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) {

View File

@ -43,13 +43,13 @@ public class BlockClusterStatsTests extends ElasticsearchIntegrationTest {
public void testBlocks() throws Exception {
assertAcked(prepareCreate("foo").addAlias(new Alias("foo-alias")));
try {
assertAcked(client().admin().indices().prepareUpdateSettings("foo").setSettings(
ImmutableSettings.settingsBuilder().put("index.blocks.read_only", true)));
ClusterUpdateSettingsResponse updateSettingsResponse = client().admin().cluster().prepareUpdateSettings().setTransientSettings(
ImmutableSettings.settingsBuilder().put("cluster.blocks.read_only", true).build()).get();
assertThat(updateSettingsResponse.isAcknowledged(), is(true));
assertAcked(client().admin().indices().prepareUpdateSettings("foo").setSettings(
ImmutableSettings.settingsBuilder().put("index.blocks.read_only", true)));
ClusterStateResponse clusterStateResponseUnfiltered = client().admin().cluster().prepareState().clear().setBlocks(true).get();
ClusterStateResponse clusterStateResponseUnfiltered = client().admin().cluster().prepareState().setLocal(true).clear().setBlocks(true).get();
assertThat(clusterStateResponseUnfiltered.getState().blocks().global(), hasSize(1));
assertThat(clusterStateResponseUnfiltered.getState().blocks().indices().size(), is(1));
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().get();

View File

@ -63,7 +63,6 @@ import static org.hamcrest.Matchers.*;
public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
@Test
@LuceneTestCase.AwaitsFix(bugUrl = "Shay is working on this")
public void restorePersistentSettingsTest() throws Exception {
logger.info("--> start node");
internalCluster().startNode(settingsBuilder().put("gateway.type", "local"));