also upgrade cluster state, on loading initially (MetaDataService.loadGlobalState) and on restore (RestoreService)
This commit is contained in:
parent
68d6427944
commit
062dbee955
|
@ -27,24 +27,31 @@ import com.google.common.base.Predicate;
|
|||
import com.google.common.collect.*;
|
||||
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.cluster.DiffableUtils.KeyedReader;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
|
||||
import org.elasticsearch.cluster.service.InternalClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.HppcMaps;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.loader.SettingsLoader;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.indices.IndexClosedException;
|
||||
import org.elasticsearch.indices.IndexMissingException;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
import org.elasticsearch.indices.store.IndicesStore;
|
||||
import org.elasticsearch.indices.ttl.IndicesTTLService;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
|
||||
|
||||
|
@ -55,9 +62,6 @@ import static com.google.common.collect.Lists.newArrayList;
|
|||
import static com.google.common.collect.Maps.newHashMap;
|
||||
import static org.elasticsearch.common.settings.Settings.*;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData> {
|
||||
|
||||
public static final MetaData PROTO = builder().build();
|
||||
|
@ -257,7 +261,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the merges transient and persistent settings.
|
||||
* Returns the merged transient and persistent settings.
|
||||
*/
|
||||
public Settings settings() {
|
||||
return this.settings;
|
||||
|
@ -1288,6 +1292,80 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData> {
|
|||
return new Builder(metaData);
|
||||
}
|
||||
|
||||
/** All known byte-sized cluster settings. */
|
||||
public static final Set<String> CLUSTER_BYTES_SIZE_SETTINGS = ImmutableSet.of(
|
||||
IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC,
|
||||
RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE,
|
||||
RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE,
|
||||
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC,
|
||||
RecoverySettings.INDICES_RECOVERY_MAX_SIZE_PER_SEC);
|
||||
|
||||
|
||||
/** All known time cluster settings. */
|
||||
public static final Set<String> CLUSTER_TIME_SETTINGS = ImmutableSet.of(
|
||||
IndicesTTLService.INDICES_TTL_INTERVAL,
|
||||
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC,
|
||||
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK,
|
||||
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT,
|
||||
RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT,
|
||||
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT,
|
||||
DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL,
|
||||
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL,
|
||||
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT,
|
||||
DiscoverySettings.PUBLISH_TIMEOUT,
|
||||
InternalClusterService.SETTING_CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD);
|
||||
|
||||
/** As of 2.0 we require units for time and byte-sized settings. This methods adds default units to any cluster settings that don't
|
||||
* specify a unit. */
|
||||
public static MetaData addDefaultUnitsIfNeeded(ESLogger logger, MetaData metaData) {
|
||||
Settings.Builder newPersistentSettings = null;
|
||||
for(Map.Entry<String,String> ent : metaData.persistentSettings().getAsMap().entrySet()) {
|
||||
String settingName = ent.getKey();
|
||||
String settingValue = ent.getValue();
|
||||
if (CLUSTER_BYTES_SIZE_SETTINGS.contains(settingName)) {
|
||||
try {
|
||||
Double.parseDouble(settingValue);
|
||||
} catch (NumberFormatException nfe) {
|
||||
continue;
|
||||
}
|
||||
// It's a naked number; add default unit (b for bytes):
|
||||
logger.warn("byte-sized cluster setting [{}] with value [{}] is missing units; now adding default units (b)", settingName, settingValue);
|
||||
if (newPersistentSettings == null) {
|
||||
newPersistentSettings = Settings.builder();
|
||||
newPersistentSettings.put(metaData.persistentSettings());
|
||||
}
|
||||
newPersistentSettings.put(settingName, settingValue + "b");
|
||||
}
|
||||
if (CLUSTER_TIME_SETTINGS.contains(settingName)) {
|
||||
try {
|
||||
Double.parseDouble(settingValue);
|
||||
} catch (NumberFormatException nfe) {
|
||||
continue;
|
||||
}
|
||||
// It's a naked number; add default unit (b for bytes):
|
||||
logger.warn("time cluster setting [{}] with value [{}] is missing units; now adding default units (ms)", settingName, settingValue);
|
||||
if (newPersistentSettings == null) {
|
||||
newPersistentSettings = Settings.builder();
|
||||
newPersistentSettings.put(metaData.persistentSettings());
|
||||
}
|
||||
newPersistentSettings.put(settingName, settingValue + "ms");
|
||||
}
|
||||
}
|
||||
|
||||
if (newPersistentSettings != null) {
|
||||
return new MetaData(metaData.uuid(),
|
||||
metaData.version(),
|
||||
metaData.transientSettings(),
|
||||
newPersistentSettings.build(),
|
||||
metaData.getIndices(),
|
||||
metaData.getTemplates(),
|
||||
metaData.getCustoms());
|
||||
} else {
|
||||
// No changes:
|
||||
return metaData;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
||||
private String uuid;
|
||||
|
|
|
@ -178,6 +178,7 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
|
|||
*/
|
||||
private IndexMetaData addDefaultUnitsIfNeeded(IndexMetaData indexMetaData) {
|
||||
if (indexMetaData.getCreationVersion().before(Version.V_2_0_0)) {
|
||||
// TODO: can we somehow only do this *once* for a pre-2.0 index? Maybe we could stuff a "fake marker setting" here? Seems hackish...
|
||||
// Created lazily if we find any settings that are missing units:
|
||||
Settings settings = indexMetaData.settings();
|
||||
Settings.Builder newSettings = null;
|
||||
|
@ -190,7 +191,7 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
|
|||
continue;
|
||||
}
|
||||
// It's a naked number; add default unit (b for bytes):
|
||||
logger.warn("byte-sized setting [{}] with value [{}] is missing units; now adding default units (b)", byteSizeSetting, value);
|
||||
logger.warn("byte-sized index setting [{}] with value [{}] is missing units; now adding default units (b)", byteSizeSetting, value);
|
||||
if (newSettings == null) {
|
||||
newSettings = Settings.builder();
|
||||
newSettings.put(settings);
|
||||
|
@ -207,7 +208,7 @@ public class MetaDataIndexUpgradeService extends AbstractComponent {
|
|||
continue;
|
||||
}
|
||||
// It's a naked number; add default unit (ms for msec):
|
||||
logger.warn("time setting [{}] with value [{}] is missing units; now adding default units (ms)", timeSetting, value);
|
||||
logger.warn("time index setting [{}] with value [{}] is missing units; now adding default units (ms)", timeSetting, value);
|
||||
if (newSettings == null) {
|
||||
newSettings = Settings.builder();
|
||||
newSettings.put(settings);
|
||||
|
|
|
@ -101,20 +101,20 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
DiskThresholdDecider.this.includeRelocations = newRelocationsSetting;
|
||||
}
|
||||
if (newLowWatermark != null) {
|
||||
if (!validWatermarkSetting(newLowWatermark)) {
|
||||
if (!validWatermarkSetting(newLowWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK)) {
|
||||
throw new ElasticsearchParseException("Unable to parse low watermark: [" + newLowWatermark + "]");
|
||||
}
|
||||
logger.info("updating [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, newLowWatermark);
|
||||
DiskThresholdDecider.this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(newLowWatermark);
|
||||
DiskThresholdDecider.this.freeBytesThresholdLow = thresholdBytesFromWatermark(newLowWatermark);
|
||||
DiskThresholdDecider.this.freeBytesThresholdLow = thresholdBytesFromWatermark(newLowWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK);
|
||||
}
|
||||
if (newHighWatermark != null) {
|
||||
if (!validWatermarkSetting(newHighWatermark)) {
|
||||
if (!validWatermarkSetting(newHighWatermark, CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK)) {
|
||||
throw new ElasticsearchParseException("Unable to parse high watermark: [" + newHighWatermark + "]");
|
||||
}
|
||||
logger.info("updating [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, newHighWatermark);
|
||||
DiskThresholdDecider.this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(newHighWatermark);
|
||||
DiskThresholdDecider.this.freeBytesThresholdHigh = thresholdBytesFromWatermark(newHighWatermark);
|
||||
DiskThresholdDecider.this.freeBytesThresholdHigh = thresholdBytesFromWatermark(newHighWatermark, CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK);
|
||||
}
|
||||
if (newRerouteInterval != null) {
|
||||
logger.info("updating [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, newRerouteInterval);
|
||||
|
@ -199,18 +199,18 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
String lowWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "85%");
|
||||
String highWatermark = settings.get(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "90%");
|
||||
|
||||
if (!validWatermarkSetting(lowWatermark)) {
|
||||
if (!validWatermarkSetting(lowWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK)) {
|
||||
throw new ElasticsearchParseException("Unable to parse low watermark: [" + lowWatermark + "]");
|
||||
}
|
||||
if (!validWatermarkSetting(highWatermark)) {
|
||||
if (!validWatermarkSetting(highWatermark, CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK)) {
|
||||
throw new ElasticsearchParseException("Unable to parse high watermark: [" + highWatermark + "]");
|
||||
}
|
||||
// Watermark is expressed in terms of used data, but we need "free" data watermark
|
||||
this.freeDiskThresholdLow = 100.0 - thresholdPercentageFromWatermark(lowWatermark);
|
||||
this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
|
||||
|
||||
this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark);
|
||||
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark);
|
||||
this.freeBytesThresholdLow = thresholdBytesFromWatermark(lowWatermark, CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK);
|
||||
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark, CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK);
|
||||
this.includeRelocations = settings.getAsBoolean(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, true);
|
||||
this.rerouteInterval = settings.getAsTime(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, TimeValue.timeValueSeconds(60));
|
||||
|
||||
|
@ -555,6 +555,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
try {
|
||||
return RatioValue.parseRatioValue(watermark).getAsPercent();
|
||||
} catch (ElasticsearchParseException ex) {
|
||||
// nocommit: why be lenient here?
|
||||
return 100.0;
|
||||
}
|
||||
}
|
||||
|
@ -563,12 +564,12 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
* Attempts to parse the watermark into a {@link ByteSizeValue}, returning
|
||||
* a ByteSizeValue of 0 bytes if the value cannot be parsed.
|
||||
*/
|
||||
public ByteSizeValue thresholdBytesFromWatermark(String watermark) {
|
||||
// nocommit: why be lenient here?
|
||||
public ByteSizeValue thresholdBytesFromWatermark(String watermark, String settingName) {
|
||||
try {
|
||||
return ByteSizeValue.parseBytesSizeValue(watermark, "DiskThresholdDecider watermark");
|
||||
return ByteSizeValue.parseBytesSizeValue(watermark, settingName);
|
||||
} catch (ElasticsearchParseException ex) {
|
||||
return ByteSizeValue.parseBytesSizeValue("0b", "DiskThresholdDecider watermark");
|
||||
// nocommit: why be lenient here?
|
||||
return ByteSizeValue.parseBytesSizeValue("0b", settingName);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -576,13 +577,13 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
* Checks if a watermark string is a valid percentage or byte size value,
|
||||
* returning true if valid, false if invalid.
|
||||
*/
|
||||
public boolean validWatermarkSetting(String watermark) {
|
||||
public boolean validWatermarkSetting(String watermark, String settingName) {
|
||||
try {
|
||||
RatioValue.parseRatioValue(watermark);
|
||||
return true;
|
||||
} catch (ElasticsearchParseException e) {
|
||||
try {
|
||||
ByteSizeValue.parseBytesSizeValue(watermark, "DiskThresholdDecider watermark");
|
||||
ByteSizeValue.parseBytesSizeValue(watermark, settingName);
|
||||
return true;
|
||||
} catch (ElasticsearchParseException ex) {
|
||||
return false;
|
||||
|
|
|
@ -116,7 +116,14 @@ public class MetaStateService extends AbstractComponent {
|
|||
* Loads the global state, *without* index state, see {@link #loadFullState()} for that.
|
||||
*/
|
||||
MetaData loadGlobalState() throws IOException {
|
||||
return globalStateFormat.loadLatestState(logger, nodeEnv.nodeDataPaths());
|
||||
MetaData globalState = globalStateFormat.loadLatestState(logger, nodeEnv.nodeDataPaths());
|
||||
// ES 2.0 now requires units for all time and byte-sized settings, so we add the default unit if it's missing
|
||||
// TODO: can we somehow only do this for pre-2.0 cluster state?
|
||||
if (globalState != null) {
|
||||
return MetaData.addDefaultUnitsIfNeeded(logger, globalState);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -155,7 +155,11 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
|
|||
final SnapshotId snapshotId = new SnapshotId(request.repository(), request.name());
|
||||
final Snapshot snapshot = repository.readSnapshot(snapshotId);
|
||||
ImmutableList<String> filteredIndices = SnapshotUtils.filterIndices(snapshot.indices(), request.indices(), request.indicesOptions());
|
||||
final MetaData metaData = repository.readSnapshotMetaData(snapshotId, filteredIndices);
|
||||
MetaData metaDataIn = repository.readSnapshotMetaData(snapshotId, filteredIndices);
|
||||
|
||||
// ES 2.0 now requires units for all time and byte-sized settings, so we add the default unit if it's missing in this snapshot:
|
||||
// TODO: can we somehow only do this for pre-2.0 cluster state?
|
||||
final MetaData metaData = MetaData.addDefaultUnitsIfNeeded(logger, metaDataIn);
|
||||
|
||||
// Make sure that we can restore from this snapshot
|
||||
validateSnapshotRestorable(snapshotId, snapshot);
|
||||
|
|
|
@ -58,9 +58,9 @@ public class DiskThresholdDeciderUnitTests extends ElasticsearchTestCase {
|
|||
};
|
||||
DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null);
|
||||
|
||||
assertThat(decider.getFreeBytesThresholdHigh(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "decider")));
|
||||
assertThat(decider.getFreeBytesThresholdHigh(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "test")));
|
||||
assertThat(decider.getFreeDiskThresholdHigh(), equalTo(10.0d));
|
||||
assertThat(decider.getFreeBytesThresholdLow(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "decider")));
|
||||
assertThat(decider.getFreeBytesThresholdLow(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "test")));
|
||||
assertThat(decider.getFreeDiskThresholdLow(), equalTo(15.0d));
|
||||
assertThat(decider.getRerouteInterval().seconds(), equalTo(60L));
|
||||
assertTrue(decider.isEnabled());
|
||||
|
@ -79,11 +79,11 @@ public class DiskThresholdDeciderUnitTests extends ElasticsearchTestCase {
|
|||
applySettings.onRefreshSettings(newSettings);
|
||||
|
||||
assertThat("high threshold bytes should be unset",
|
||||
decider.getFreeBytesThresholdHigh(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "high threshold")));
|
||||
decider.getFreeBytesThresholdHigh(), equalTo(ByteSizeValue.parseBytesSizeValue("0b", "test")));
|
||||
assertThat("high threshold percentage should be changed",
|
||||
decider.getFreeDiskThresholdHigh(), equalTo(30.0d));
|
||||
assertThat("low threshold bytes should be set to 500mb",
|
||||
decider.getFreeBytesThresholdLow(), equalTo(ByteSizeValue.parseBytesSizeValue("500mb", "low threshold")));
|
||||
decider.getFreeBytesThresholdLow(), equalTo(ByteSizeValue.parseBytesSizeValue("500mb", "test")));
|
||||
assertThat("low threshold bytes should be unset",
|
||||
decider.getFreeDiskThresholdLow(), equalTo(0.0d));
|
||||
assertThat("reroute interval should be changed to 30 seconds",
|
||||
|
|
Loading…
Reference in New Issue