Merge branch 'master' into feature-suggest-refactoring
This commit is contained in:
commit
e785e11aa1
|
@ -22,12 +22,23 @@ package org.elasticsearch.common.logging;
|
||||||
import org.elasticsearch.common.logging.jdk.JdkESLoggerFactory;
|
import org.elasticsearch.common.logging.jdk.JdkESLoggerFactory;
|
||||||
import org.elasticsearch.common.logging.log4j.Log4jESLoggerFactory;
|
import org.elasticsearch.common.logging.log4j.Log4jESLoggerFactory;
|
||||||
import org.elasticsearch.common.logging.slf4j.Slf4jESLoggerFactory;
|
import org.elasticsearch.common.logging.slf4j.Slf4jESLoggerFactory;
|
||||||
|
import org.elasticsearch.common.settings.AbstractScopedSettings;
|
||||||
|
import org.elasticsearch.common.settings.Setting;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory to get {@link ESLogger}s
|
* Factory to get {@link ESLogger}s
|
||||||
*/
|
*/
|
||||||
public abstract class ESLoggerFactory {
|
public abstract class ESLoggerFactory {
|
||||||
|
|
||||||
|
public static final Setting<LogLevel> LOG_DEFAULT_LEVEL_SETTING = new Setting<>("logger.level", LogLevel.INFO.name(), LogLevel::parse, false, Setting.Scope.CLUSTER);
|
||||||
|
public static final Setting<LogLevel> LOG_LEVEL_SETTING = Setting.dynamicKeySetting("logger.", LogLevel.INFO.name(), LogLevel::parse, true, Setting.Scope.CLUSTER);
|
||||||
|
|
||||||
private static volatile ESLoggerFactory defaultFactory = new JdkESLoggerFactory();
|
private static volatile ESLoggerFactory defaultFactory = new JdkESLoggerFactory();
|
||||||
|
|
||||||
static {
|
static {
|
||||||
|
@ -85,4 +96,11 @@ public abstract class ESLoggerFactory {
|
||||||
protected abstract ESLogger rootLogger();
|
protected abstract ESLogger rootLogger();
|
||||||
|
|
||||||
protected abstract ESLogger newInstance(String prefix, String name);
|
protected abstract ESLogger newInstance(String prefix, String name);
|
||||||
|
|
||||||
|
public enum LogLevel {
|
||||||
|
WARN, TRACE, INFO, DEBUG, ERROR;
|
||||||
|
public static LogLevel parse(String level) {
|
||||||
|
return valueOf(level.toUpperCase(Locale.ROOT));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,7 +166,11 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
||||||
if (setting != get(setting.getKey())) {
|
if (setting != get(setting.getKey())) {
|
||||||
throw new IllegalArgumentException("Setting is not registered for key [" + setting.getKey() + "]");
|
throw new IllegalArgumentException("Setting is not registered for key [" + setting.getKey() + "]");
|
||||||
}
|
}
|
||||||
this.settingUpdaters.add(setting.newUpdater(consumer, logger, validator));
|
addSettingsUpdater(setting.newUpdater(consumer, logger, validator));
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void addSettingsUpdater(SettingUpdater<?> updater) {
|
||||||
|
this.settingUpdaters.add(updater);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -184,7 +188,7 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
||||||
if (b != get(b.getKey())) {
|
if (b != get(b.getKey())) {
|
||||||
throw new IllegalArgumentException("Setting is not registered for key [" + b.getKey() + "]");
|
throw new IllegalArgumentException("Setting is not registered for key [" + b.getKey() + "]");
|
||||||
}
|
}
|
||||||
this.settingUpdaters.add(Setting.compoundUpdater(consumer, a, b, logger));
|
addSettingsUpdater(Setting.compoundUpdater(consumer, a, b, logger));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -288,7 +292,7 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
||||||
}
|
}
|
||||||
for (Map.Entry<String, Setting<?>> entry : complexMatchers.entrySet()) {
|
for (Map.Entry<String, Setting<?>> entry : complexMatchers.entrySet()) {
|
||||||
if (entry.getValue().match(key)) {
|
if (entry.getValue().match(key)) {
|
||||||
return entry.getValue();
|
return entry.getValue().getConcreteSetting(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -53,6 +53,7 @@ import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.gateway.GatewayService;
|
import org.elasticsearch.gateway.GatewayService;
|
||||||
import org.elasticsearch.gateway.PrimaryShardAllocator;
|
import org.elasticsearch.gateway.PrimaryShardAllocator;
|
||||||
import org.elasticsearch.http.netty.NettyHttpServerTransport;
|
import org.elasticsearch.http.netty.NettyHttpServerTransport;
|
||||||
|
@ -82,50 +83,61 @@ import org.elasticsearch.transport.netty.NettyTransport;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encapsulates all valid cluster level settings.
|
* Encapsulates all valid cluster level settings.
|
||||||
*/
|
*/
|
||||||
public final class ClusterSettings extends AbstractScopedSettings {
|
public final class ClusterSettings extends AbstractScopedSettings {
|
||||||
|
public ClusterSettings(Settings nodeSettings, Set<Setting<?>> settingsSet) {
|
||||||
|
super(nodeSettings, settingsSet, Setting.Scope.CLUSTER);
|
||||||
|
addSettingsUpdater(new LoggingSettingUpdater(nodeSettings));
|
||||||
|
}
|
||||||
|
|
||||||
public ClusterSettings(Settings settings, Set<Setting<?>> settingsSet) {
|
private static final class LoggingSettingUpdater implements SettingUpdater<Settings> {
|
||||||
super(settings, settingsSet, Setting.Scope.CLUSTER);
|
final Predicate<String> loggerPredicate = ESLoggerFactory.LOG_LEVEL_SETTING::match;
|
||||||
|
private final Settings settings;
|
||||||
|
|
||||||
|
LoggingSettingUpdater(Settings settings) {
|
||||||
|
this.settings = settings;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized Settings applySettings(Settings newSettings) {
|
public boolean hasChanged(Settings current, Settings previous) {
|
||||||
Settings settings = super.applySettings(newSettings);
|
return current.filter(loggerPredicate).getAsMap().equals(previous.filter(loggerPredicate).getAsMap()) == false;
|
||||||
try {
|
}
|
||||||
for (Map.Entry<String, String> entry : settings.getAsMap().entrySet()) {
|
|
||||||
if (entry.getKey().startsWith("logger.")) {
|
@Override
|
||||||
String component = entry.getKey().substring("logger.".length());
|
public Settings getValue(Settings current, Settings previous) {
|
||||||
if ("_root".equals(component)) {
|
Settings.Builder builder = Settings.builder();
|
||||||
ESLoggerFactory.getRootLogger().setLevel(entry.getValue());
|
builder.put(current.filter(loggerPredicate).getAsMap());
|
||||||
|
for (String key : previous.getAsMap().keySet()) {
|
||||||
|
if (loggerPredicate.test(key) && builder.internalMap().containsKey(key) == false) {
|
||||||
|
if (ESLoggerFactory.LOG_LEVEL_SETTING.getConcreteSetting(key).exists(settings) == false) {
|
||||||
|
builder.putNull(key);
|
||||||
} else {
|
} else {
|
||||||
ESLoggerFactory.getLogger(component).setLevel(entry.getValue());
|
builder.put(key, ESLoggerFactory.LOG_LEVEL_SETTING.getConcreteSetting(key).get(settings).name());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
return builder.build();
|
||||||
logger.warn("failed to refresh settings for [{}]", e, "logger");
|
|
||||||
}
|
|
||||||
return settings;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasDynamicSetting(String key) {
|
public void apply(Settings value, Settings current, Settings previous) {
|
||||||
return isLoggerSetting(key) || super.hasDynamicSetting(key);
|
for (String key : value.getAsMap().keySet()) {
|
||||||
|
assert loggerPredicate.test(key);
|
||||||
|
String component = key.substring("logger.".length());
|
||||||
|
if ("_root".equals(component)) {
|
||||||
|
final String rootLevel = value.get(key);
|
||||||
|
ESLoggerFactory.getRootLogger().setLevel(rootLevel == null ? ESLoggerFactory.LOG_DEFAULT_LEVEL_SETTING.get(settings).name() : rootLevel);
|
||||||
|
} else {
|
||||||
|
ESLoggerFactory.getLogger(component).setLevel(value.get(key));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns <code>true</code> if the settings is a logger setting.
|
|
||||||
*/
|
|
||||||
public boolean isLoggerSetting(String key) {
|
|
||||||
return key.startsWith("logger.");
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
public static Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(new HashSet<>(
|
public static Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(new HashSet<>(
|
||||||
Arrays.asList(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
|
Arrays.asList(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
|
||||||
|
@ -300,5 +312,10 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||||
InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING,
|
InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING,
|
||||||
ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
|
ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
|
||||||
EsExecutors.PROCESSORS_SETTING,
|
EsExecutors.PROCESSORS_SETTING,
|
||||||
ThreadContext.DEFAULT_HEADERS_SETTING)));
|
ThreadContext.DEFAULT_HEADERS_SETTING,
|
||||||
|
ESLoggerFactory.LOG_DEFAULT_LEVEL_SETTING,
|
||||||
|
ESLoggerFactory.LOG_LEVEL_SETTING,
|
||||||
|
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
|
||||||
|
NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING,
|
||||||
|
NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH)));
|
||||||
}
|
}
|
||||||
|
|
|
@ -200,6 +200,11 @@ public class Setting<T> extends ToXContentToBytes {
|
||||||
return get(secondary);
|
return get(secondary);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Setting<T> getConcreteSetting(String key) {
|
||||||
|
assert key.startsWith(this.getKey()) : "was " + key + " expected: " + getKey(); // we use startsWith here since the key might be foo.bar.0 if it's an array
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The settings scope - settings can either be cluster settings or per index settings.
|
* The settings scope - settings can either be cluster settings or per index settings.
|
||||||
*/
|
*/
|
||||||
|
@ -323,6 +328,10 @@ public class Setting<T> extends ToXContentToBytes {
|
||||||
}, dynamic, scope);
|
}, dynamic, scope);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Setting<Integer> intSetting(String key, int defaultValue, int minValue, int maxValue, boolean dynamic, Scope scope) {
|
||||||
|
return new Setting<>(key, (s) -> Integer.toString(defaultValue), (s) -> parseInt(s, minValue, maxValue, key), dynamic, scope);
|
||||||
|
}
|
||||||
|
|
||||||
public static Setting<Integer> intSetting(String key, int defaultValue, int minValue, boolean dynamic, Scope scope) {
|
public static Setting<Integer> intSetting(String key, int defaultValue, int minValue, boolean dynamic, Scope scope) {
|
||||||
return new Setting<>(key, (s) -> Integer.toString(defaultValue), (s) -> parseInt(s, minValue, key), dynamic, scope);
|
return new Setting<>(key, (s) -> Integer.toString(defaultValue), (s) -> parseInt(s, minValue, key), dynamic, scope);
|
||||||
}
|
}
|
||||||
|
@ -336,10 +345,17 @@ public class Setting<T> extends ToXContentToBytes {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static int parseInt(String s, int minValue, String key) {
|
public static int parseInt(String s, int minValue, String key) {
|
||||||
|
return parseInt(s, minValue, Integer.MAX_VALUE, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int parseInt(String s, int minValue, int maxValue, String key) {
|
||||||
int value = Integer.parseInt(s);
|
int value = Integer.parseInt(s);
|
||||||
if (value < minValue) {
|
if (value < minValue) {
|
||||||
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
|
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
|
||||||
}
|
}
|
||||||
|
if (value > maxValue) {
|
||||||
|
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be =< " + maxValue);
|
||||||
|
}
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -449,8 +465,6 @@ public class Setting<T> extends ToXContentToBytes {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public static Setting<Settings> groupSetting(String key, boolean dynamic, Scope scope) {
|
public static Setting<Settings> groupSetting(String key, boolean dynamic, Scope scope) {
|
||||||
if (key.endsWith(".") == false) {
|
if (key.endsWith(".") == false) {
|
||||||
throw new IllegalArgumentException("key must end with a '.'");
|
throw new IllegalArgumentException("key must end with a '.'");
|
||||||
|
@ -558,4 +572,38 @@ public class Setting<T> extends ToXContentToBytes {
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(key);
|
return Objects.hash(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This setting type allows to validate settings that have the same type and a common prefix. For instance feature.${type}=[true|false]
|
||||||
|
* can easily be added with this setting. Yet, dynamic key settings don't support updaters our of the box unless {@link #getConcreteSetting(String)}
|
||||||
|
* is used to pull the updater.
|
||||||
|
*/
|
||||||
|
public static <T> Setting<T> dynamicKeySetting(String key, String defaultValue, Function<String, T> parser, boolean dynamic, Scope scope) {
|
||||||
|
return new Setting<T>(key, defaultValue, parser, dynamic, scope) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean isGroupSetting() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean match(String toTest) {
|
||||||
|
return toTest.startsWith(getKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
AbstractScopedSettings.SettingUpdater<T> newUpdater(Consumer<T> consumer, ESLogger logger, Consumer<T> validator) {
|
||||||
|
throw new UnsupportedOperationException("dynamic settings can't be updated use #getConcreteSetting for updating");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Setting<T> getConcreteSetting(String key) {
|
||||||
|
if (match(key)) {
|
||||||
|
return new Setting<>(key, defaultValue, parser, dynamic, scope);
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("key must match setting but didn't ["+key +"]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,8 @@ import org.elasticsearch.common.SuppressForbidden;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.FileSystemUtils;
|
import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
|
import org.elasticsearch.common.settings.Setting;
|
||||||
|
import org.elasticsearch.common.settings.Setting.Scope;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
@ -74,7 +76,6 @@ import static java.util.Collections.unmodifiableSet;
|
||||||
* A component that holds all data paths for a single node.
|
* A component that holds all data paths for a single node.
|
||||||
*/
|
*/
|
||||||
public class NodeEnvironment extends AbstractComponent implements Closeable {
|
public class NodeEnvironment extends AbstractComponent implements Closeable {
|
||||||
|
|
||||||
public static class NodePath {
|
public static class NodePath {
|
||||||
/* ${data.paths}/nodes/{node.id} */
|
/* ${data.paths}/nodes/{node.id} */
|
||||||
public final Path path;
|
public final Path path;
|
||||||
|
@ -130,22 +131,33 @@ public class NodeEnvironment extends AbstractComponent implements Closeable {
|
||||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||||
private final Map<ShardLockKey, InternalShardLock> shardLocks = new HashMap<>();
|
private final Map<ShardLockKey, InternalShardLock> shardLocks = new HashMap<>();
|
||||||
|
|
||||||
// Setting to automatically append node id to custom data paths
|
/**
|
||||||
public static final String ADD_NODE_ID_TO_CUSTOM_PATH = "node.add_id_to_custom_path";
|
* Maximum number of data nodes that should run in an environment.
|
||||||
|
*/
|
||||||
|
public static final Setting<Integer> MAX_LOCAL_STORAGE_NODES_SETTING = Setting.intSetting("node.max_local_storage_nodes", 50, 1, false,
|
||||||
|
Scope.CLUSTER);
|
||||||
|
|
||||||
// If enabled, the [verbose] SegmentInfos.infoStream logging is sent to System.out:
|
/**
|
||||||
public static final String SETTING_ENABLE_LUCENE_SEGMENT_INFOS_TRACE = "node.enable_lucene_segment_infos_trace";
|
* If true automatically append node id to custom data paths.
|
||||||
|
*/
|
||||||
|
public static final Setting<Boolean> ADD_NODE_ID_TO_CUSTOM_PATH = Setting.boolSetting("node.add_id_to_custom_path", true, false,
|
||||||
|
Scope.CLUSTER);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If true the [verbose] SegmentInfos.infoStream logging is sent to System.out.
|
||||||
|
*/
|
||||||
|
public static final Setting<Boolean> ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING = Setting
|
||||||
|
.boolSetting("node.enable_lucene_segment_infos_trace", false, false, Scope.CLUSTER);
|
||||||
|
|
||||||
public static final String NODES_FOLDER = "nodes";
|
public static final String NODES_FOLDER = "nodes";
|
||||||
public static final String INDICES_FOLDER = "indices";
|
public static final String INDICES_FOLDER = "indices";
|
||||||
public static final String NODE_LOCK_FILENAME = "node.lock";
|
public static final String NODE_LOCK_FILENAME = "node.lock";
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
@SuppressForbidden(reason = "System.out.*")
|
|
||||||
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
|
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
|
||||||
super(settings);
|
super(settings);
|
||||||
|
|
||||||
this.addNodeId = settings.getAsBoolean(ADD_NODE_ID_TO_CUSTOM_PATH, true);
|
this.addNodeId = ADD_NODE_ID_TO_CUSTOM_PATH.get(settings);
|
||||||
|
|
||||||
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
|
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
|
||||||
nodePaths = null;
|
nodePaths = null;
|
||||||
|
@ -161,7 +173,7 @@ public class NodeEnvironment extends AbstractComponent implements Closeable {
|
||||||
|
|
||||||
int localNodeId = -1;
|
int localNodeId = -1;
|
||||||
IOException lastException = null;
|
IOException lastException = null;
|
||||||
int maxLocalStorageNodes = settings.getAsInt("node.max_local_storage_nodes", 50);
|
int maxLocalStorageNodes = MAX_LOCAL_STORAGE_NODES_SETTING.get(settings);
|
||||||
for (int possibleLockId = 0; possibleLockId < maxLocalStorageNodes; possibleLockId++) {
|
for (int possibleLockId = 0; possibleLockId < maxLocalStorageNodes; possibleLockId++) {
|
||||||
for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) {
|
for (int dirIndex = 0; dirIndex < environment.dataWithClusterFiles().length; dirIndex++) {
|
||||||
Path dir = environment.dataWithClusterFiles()[dirIndex].resolve(NODES_FOLDER).resolve(Integer.toString(possibleLockId));
|
Path dir = environment.dataWithClusterFiles()[dirIndex].resolve(NODES_FOLDER).resolve(Integer.toString(possibleLockId));
|
||||||
|
@ -210,9 +222,7 @@ public class NodeEnvironment extends AbstractComponent implements Closeable {
|
||||||
maybeLogPathDetails();
|
maybeLogPathDetails();
|
||||||
maybeLogHeapDetails();
|
maybeLogHeapDetails();
|
||||||
|
|
||||||
if (settings.getAsBoolean(SETTING_ENABLE_LUCENE_SEGMENT_INFOS_TRACE, false)) {
|
applySegmentInfosTrace(settings);
|
||||||
SegmentInfos.setInfoStream(System.out);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void releaseAndNullLocks(Lock[] locks) {
|
private static void releaseAndNullLocks(Lock[] locks) {
|
||||||
|
@ -303,6 +313,13 @@ public class NodeEnvironment extends AbstractComponent implements Closeable {
|
||||||
logger.info("heap size [{}], compressed ordinary object pointers [{}]", maxHeapSize, useCompressedOops);
|
logger.info("heap size [{}], compressed ordinary object pointers [{}]", maxHeapSize, useCompressedOops);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressForbidden(reason = "System.out.*")
|
||||||
|
static void applySegmentInfosTrace(Settings settings) {
|
||||||
|
if (ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING.get(settings)) {
|
||||||
|
SegmentInfos.setInfoStream(System.out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static String toString(Collection<String> items) {
|
private static String toString(Collection<String> items) {
|
||||||
StringBuilder b = new StringBuilder();
|
StringBuilder b = new StringBuilder();
|
||||||
for(String item : items) {
|
for(String item : items) {
|
||||||
|
|
|
@ -21,13 +21,11 @@ package org.elasticsearch.cluster.settings;
|
||||||
|
|
||||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequestBuilder;
|
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequestBuilder;
|
||||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||||
|
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|
||||||
import org.elasticsearch.discovery.DiscoverySettings;
|
import org.elasticsearch.discovery.DiscoverySettings;
|
||||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
@ -329,16 +327,30 @@ public class ClusterSettingsIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createNode(Settings settings) {
|
public void testLoggerLevelUpdate() {
|
||||||
internalCluster().startNode(Settings.builder()
|
assertAcked(prepareCreate("test"));
|
||||||
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "ClusterSettingsIT")
|
final String rootLevel = ESLoggerFactory.getRootLogger().getLevel();
|
||||||
.put("node.name", "ClusterSettingsIT")
|
final String testLevel = ESLoggerFactory.getLogger("test").getLevel();
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
try {
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("logger._root", "BOOM")).execute().actionGet();
|
||||||
.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
|
fail("Expected IllegalArgumentException");
|
||||||
.put("http.enabled", false)
|
} catch (IllegalArgumentException e) {
|
||||||
.put("config.ignore_system_properties", true) // make sure we get what we set :)
|
assertEquals("No enum constant org.elasticsearch.common.logging.ESLoggerFactory.LogLevel.BOOM", e.getMessage());
|
||||||
.put(settings)
|
}
|
||||||
);
|
|
||||||
|
try {
|
||||||
|
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("logger.test", "TRACE").put("logger._root", "trace")).execute().actionGet();
|
||||||
|
assertEquals("TRACE", ESLoggerFactory.getLogger("test").getLevel());
|
||||||
|
assertEquals("TRACE", ESLoggerFactory.getRootLogger().getLevel());
|
||||||
|
} finally {
|
||||||
|
if (randomBoolean()) {
|
||||||
|
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().putNull("logger.test").putNull("logger._root")).execute().actionGet();
|
||||||
|
} else {
|
||||||
|
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().putNull("logger.*")).execute().actionGet();
|
||||||
|
}
|
||||||
|
assertEquals(testLevel, ESLoggerFactory.getLogger("test").getLevel());
|
||||||
|
assertEquals(rootLevel, ESLoggerFactory.getRootLogger().getLevel());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
|
||||||
|
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||||
import org.elasticsearch.index.IndexModule;
|
import org.elasticsearch.index.IndexModule;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
@ -254,4 +255,48 @@ public class ScopedSettingsTests extends ESTestCase {
|
||||||
Settings.EMPTY, Collections.singleton(Setting.boolSetting("boo", true, false, Setting.Scope.INDEX)));
|
Settings.EMPTY, Collections.singleton(Setting.boolSetting("boo", true, false, Setting.Scope.INDEX)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testLoggingUpdates() {
|
||||||
|
final String level = ESLoggerFactory.getRootLogger().getLevel();
|
||||||
|
final String testLevel = ESLoggerFactory.getLogger("test").getLevel();
|
||||||
|
String property = System.getProperty("es.logger.level");
|
||||||
|
Settings.Builder builder = Settings.builder();
|
||||||
|
if (property != null) {
|
||||||
|
builder.put("logger.level", property);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
ClusterSettings settings = new ClusterSettings(builder.build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||||
|
try {
|
||||||
|
settings.validate(Settings.builder().put("logger._root", "boom").build());
|
||||||
|
fail();
|
||||||
|
} catch (IllegalArgumentException ex) {
|
||||||
|
assertEquals("No enum constant org.elasticsearch.common.logging.ESLoggerFactory.LogLevel.BOOM", ex.getMessage());
|
||||||
|
}
|
||||||
|
assertEquals(level, ESLoggerFactory.getRootLogger().getLevel());
|
||||||
|
settings.applySettings(Settings.builder().put("logger._root", "TRACE").build());
|
||||||
|
assertEquals("TRACE", ESLoggerFactory.getRootLogger().getLevel());
|
||||||
|
settings.applySettings(Settings.builder().build());
|
||||||
|
assertEquals(level, ESLoggerFactory.getRootLogger().getLevel());
|
||||||
|
settings.applySettings(Settings.builder().put("logger.test", "TRACE").build());
|
||||||
|
assertEquals("TRACE", ESLoggerFactory.getLogger("test").getLevel());
|
||||||
|
settings.applySettings(Settings.builder().build());
|
||||||
|
assertEquals(testLevel, ESLoggerFactory.getLogger("test").getLevel());
|
||||||
|
} finally {
|
||||||
|
ESLoggerFactory.getRootLogger().setLevel(level);
|
||||||
|
ESLoggerFactory.getLogger("test").setLevel(testLevel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testFallbackToLoggerLevel() {
|
||||||
|
final String level = ESLoggerFactory.getRootLogger().getLevel();
|
||||||
|
try {
|
||||||
|
ClusterSettings settings = new ClusterSettings(Settings.builder().put("logger.level", "ERROR").build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||||
|
assertEquals(level, ESLoggerFactory.getRootLogger().getLevel());
|
||||||
|
settings.applySettings(Settings.builder().put("logger._root", "TRACE").build());
|
||||||
|
assertEquals("TRACE", ESLoggerFactory.getRootLogger().getLevel());
|
||||||
|
settings.applySettings(Settings.builder().build()); // here we fall back to 'logger.level' which is our default.
|
||||||
|
assertEquals("ERROR", ESLoggerFactory.getRootLogger().getLevel());
|
||||||
|
} finally {
|
||||||
|
ESLoggerFactory.getRootLogger().setLevel(level);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
public class SettingTests extends ESTestCase {
|
public class SettingTests extends ESTestCase {
|
||||||
|
@ -346,6 +347,42 @@ public class SettingTests extends ESTestCase {
|
||||||
assertFalse(listSetting.match("foo_bar.1"));
|
assertFalse(listSetting.match("foo_bar.1"));
|
||||||
assertTrue(listSetting.match("foo.bar"));
|
assertTrue(listSetting.match("foo.bar"));
|
||||||
assertTrue(listSetting.match("foo.bar." + randomIntBetween(0,10000)));
|
assertTrue(listSetting.match("foo.bar." + randomIntBetween(0,10000)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDynamicKeySetting() {
|
||||||
|
Setting<Boolean> setting = Setting.dynamicKeySetting("foo.", "false", Boolean::parseBoolean, false, Setting.Scope.CLUSTER);
|
||||||
|
assertTrue(setting.hasComplexMatcher());
|
||||||
|
assertTrue(setting.match("foo.bar"));
|
||||||
|
assertFalse(setting.match("foo"));
|
||||||
|
Setting<Boolean> concreteSetting = setting.getConcreteSetting("foo.bar");
|
||||||
|
assertTrue(concreteSetting.get(Settings.builder().put("foo.bar", "true").build()));
|
||||||
|
assertFalse(concreteSetting.get(Settings.builder().put("foo.baz", "true").build()));
|
||||||
|
|
||||||
|
try {
|
||||||
|
setting.getConcreteSetting("foo");
|
||||||
|
fail();
|
||||||
|
} catch (IllegalArgumentException ex) {
|
||||||
|
assertEquals("key must match setting but didn't [foo]", ex.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testMinMaxInt() {
|
||||||
|
Setting<Integer> integerSetting = Setting.intSetting("foo.bar", 1, 0, 10, false, Setting.Scope.CLUSTER);
|
||||||
|
try {
|
||||||
|
integerSetting.get(Settings.builder().put("foo.bar", 11).build());
|
||||||
|
fail();
|
||||||
|
} catch (IllegalArgumentException ex) {
|
||||||
|
assertEquals("Failed to parse value [11] for setting [foo.bar] must be =< 10", ex.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
integerSetting.get(Settings.builder().put("foo.bar", -1).build());
|
||||||
|
fail();
|
||||||
|
} catch (IllegalArgumentException ex) {
|
||||||
|
assertEquals("Failed to parse value [-1] for setting [foo.bar] must be >= 0", ex.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(5, integerSetting.get(Settings.builder().put("foo.bar", 5).build()).intValue());
|
||||||
|
assertEquals(1, integerSetting.get(Settings.EMPTY).intValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,4 +78,24 @@ public class SettingsModuleTests extends ModuleTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testLoggerSettings() {
|
||||||
|
{
|
||||||
|
Settings settings = Settings.builder().put("logger._root", "TRACE").put("logger.transport", "INFO").build();
|
||||||
|
SettingsModule module = new SettingsModule(settings, new SettingsFilter(Settings.EMPTY));
|
||||||
|
assertInstanceBinding(module, Settings.class, (s) -> s == settings);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
Settings settings = Settings.builder().put("logger._root", "BOOM").put("logger.transport", "WOW").build();
|
||||||
|
SettingsModule module = new SettingsModule(settings, new SettingsFilter(Settings.EMPTY));
|
||||||
|
try {
|
||||||
|
assertInstanceBinding(module, Settings.class, (s) -> s == settings);
|
||||||
|
fail();
|
||||||
|
} catch (IllegalArgumentException ex) {
|
||||||
|
assertEquals("No enum constant org.elasticsearch.common.logging.ESLoggerFactory.LogLevel.BOOM", ex.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,10 +18,12 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.env;
|
package org.elasticsearch.env;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.SegmentInfos;
|
||||||
import org.apache.lucene.store.LockObtainFailedException;
|
import org.apache.lucene.store.LockObtainFailedException;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.common.SuppressForbidden;
|
||||||
import org.elasticsearch.common.io.PathUtils;
|
import org.elasticsearch.common.io.PathUtils;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
|
@ -41,35 +43,75 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.arrayWithSize;
|
||||||
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
|
|
||||||
@LuceneTestCase.SuppressFileSystems("ExtrasFS") // TODO: fix test to allow extras
|
@LuceneTestCase.SuppressFileSystems("ExtrasFS") // TODO: fix test to allow extras
|
||||||
public class NodeEnvironmentTests extends ESTestCase {
|
public class NodeEnvironmentTests extends ESTestCase {
|
||||||
private final IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("foo", Settings.EMPTY);
|
private final IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("foo", Settings.EMPTY);
|
||||||
|
|
||||||
|
public void testNodeLockSillySettings() {
|
||||||
|
try {
|
||||||
|
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.get(Settings.builder()
|
||||||
|
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), between(Integer.MIN_VALUE, 0)).build());
|
||||||
|
fail("expected failure");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
assertThat(e.getMessage(), containsString("must be >= 1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Even though its silly MAXINT nodes is a-ok!
|
||||||
|
int value = between(1, Integer.MAX_VALUE);
|
||||||
|
int max = NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.get(
|
||||||
|
Settings.builder().put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), value).build());
|
||||||
|
assertEquals(value, max);
|
||||||
|
}
|
||||||
|
|
||||||
public void testNodeLockSingleEnvironment() throws IOException {
|
public void testNodeLockSingleEnvironment() throws IOException {
|
||||||
NodeEnvironment env = newNodeEnvironment(Settings.builder()
|
NodeEnvironment env = newNodeEnvironment(Settings.builder()
|
||||||
.put("node.max_local_storage_nodes", 1).build());
|
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 1).build());
|
||||||
Settings settings = env.getSettings();
|
Settings settings = env.getSettings();
|
||||||
List<String> dataPaths = Environment.PATH_DATA_SETTING.get(env.getSettings());
|
List<String> dataPaths = Environment.PATH_DATA_SETTING.get(env.getSettings());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Reuse the same location and attempt to lock again
|
||||||
new NodeEnvironment(settings, new Environment(settings));
|
new NodeEnvironment(settings, new Environment(settings));
|
||||||
fail("env is already locked");
|
fail("env has already locked all the data directories it is allowed");
|
||||||
} catch (IllegalStateException ex) {
|
} catch (IllegalStateException ex) {
|
||||||
|
assertThat(ex.getMessage(), containsString("Failed to obtain node lock"));
|
||||||
}
|
}
|
||||||
env.close();
|
|
||||||
|
|
||||||
// now can recreate and lock it
|
// Close the environment that holds the lock and make sure we can get the lock after release
|
||||||
|
env.close();
|
||||||
env = new NodeEnvironment(settings, new Environment(settings));
|
env = new NodeEnvironment(settings, new Environment(settings));
|
||||||
assertEquals(env.nodeDataPaths().length, dataPaths.size());
|
assertThat(env.nodeDataPaths(), arrayWithSize(dataPaths.size()));
|
||||||
|
|
||||||
for (int i = 0; i < dataPaths.size(); i++) {
|
for (int i = 0; i < dataPaths.size(); i++) {
|
||||||
assertTrue(env.nodeDataPaths()[i].startsWith(PathUtils.get(dataPaths.get(i))));
|
assertTrue(env.nodeDataPaths()[i].startsWith(PathUtils.get(dataPaths.get(i))));
|
||||||
}
|
}
|
||||||
env.close();
|
env.close();
|
||||||
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
|
assertThat(env.lockedShards(), empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressForbidden(reason = "System.out.*")
|
||||||
|
public void testSegmentInfosTracing() {
|
||||||
|
// Defaults to not hooking up std out
|
||||||
|
assertNull(SegmentInfos.getInfoStream());
|
||||||
|
|
||||||
|
try {
|
||||||
|
// False means don't hook up std out
|
||||||
|
NodeEnvironment.applySegmentInfosTrace(
|
||||||
|
Settings.builder().put(NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING.getKey(), false).build());
|
||||||
|
assertNull(SegmentInfos.getInfoStream());
|
||||||
|
|
||||||
|
// But true means hook std out up statically
|
||||||
|
NodeEnvironment.applySegmentInfosTrace(
|
||||||
|
Settings.builder().put(NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING.getKey(), true).build());
|
||||||
|
assertEquals(System.out, SegmentInfos.getInfoStream());
|
||||||
|
} finally {
|
||||||
|
// Clean up after ourselves
|
||||||
|
SegmentInfos.setInfoStream(null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testNodeLockMultipleEnvironment() throws IOException {
|
public void testNodeLockMultipleEnvironment() throws IOException {
|
||||||
|
@ -312,7 +354,7 @@ public class NodeEnvironmentTests extends ESTestCase {
|
||||||
|
|
||||||
env.close();
|
env.close();
|
||||||
NodeEnvironment env2 = newNodeEnvironment(dataPaths, "/tmp",
|
NodeEnvironment env2 = newNodeEnvironment(dataPaths, "/tmp",
|
||||||
Settings.builder().put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH, false).build());
|
Settings.builder().put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH.getKey(), false).build());
|
||||||
|
|
||||||
assertThat(env2.availableShardPaths(sid), equalTo(env2.availableShardPaths(sid)));
|
assertThat(env2.availableShardPaths(sid), equalTo(env2.availableShardPaths(sid)));
|
||||||
assertThat(env2.resolveCustomLocation(s2, sid), equalTo(PathUtils.get("/tmp/foo/myindex/0")));
|
assertThat(env2.resolveCustomLocation(s2, sid), equalTo(PathUtils.get("/tmp/foo/myindex/0")));
|
||||||
|
|
|
@ -120,7 +120,7 @@ public class ShardPathTests extends ESTestCase {
|
||||||
final boolean includeNodeId = randomBoolean();
|
final boolean includeNodeId = randomBoolean();
|
||||||
indexSetttings = indexSettingsBuilder.put(IndexMetaData.SETTING_DATA_PATH, "custom").build();
|
indexSetttings = indexSettingsBuilder.put(IndexMetaData.SETTING_DATA_PATH, "custom").build();
|
||||||
nodeSettings = settingsBuilder().put(Environment.PATH_SHARED_DATA_SETTING.getKey(), path.toAbsolutePath().toAbsolutePath())
|
nodeSettings = settingsBuilder().put(Environment.PATH_SHARED_DATA_SETTING.getKey(), path.toAbsolutePath().toAbsolutePath())
|
||||||
.put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH, includeNodeId).build();
|
.put(NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH.getKey(), includeNodeId).build();
|
||||||
if (includeNodeId) {
|
if (includeNodeId) {
|
||||||
customPath = path.resolve("custom").resolve("0");
|
customPath = path.resolve("custom").resolve("0");
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.plugin.discovery.multicast;
|
package org.elasticsearch.plugin.discovery.multicast;
|
||||||
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.settings.SettingsModule;
|
||||||
import org.elasticsearch.discovery.DiscoveryModule;
|
import org.elasticsearch.discovery.DiscoveryModule;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
|
|
||||||
|
@ -46,4 +47,15 @@ public class MulticastDiscoveryPlugin extends Plugin {
|
||||||
module.addZenPing(MulticastZenPing.class);
|
module.addZenPing(MulticastZenPing.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void onModule(SettingsModule module) {
|
||||||
|
module.registerSetting(MulticastZenPing.ADDRESS_SETTING);
|
||||||
|
module.registerSetting(MulticastZenPing.GROUP_SETTING);
|
||||||
|
module.registerSetting(MulticastZenPing.PORT_SETTING);
|
||||||
|
module.registerSetting(MulticastZenPing.SHARED_SETTING);
|
||||||
|
module.registerSetting(MulticastZenPing.TTL_SETTING);
|
||||||
|
module.registerSetting(MulticastZenPing.BUFFER_SIZE_SETTING);
|
||||||
|
module.registerSetting(MulticastZenPing.PING_ENABLED_SETTING);
|
||||||
|
module.registerSetting(MulticastZenPing.DEFERE_TO_INTERFACE_SETTING);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.apache.lucene.util.Constants;
|
import org.apache.lucene.util.Constants;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
|
@ -47,7 +48,10 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.network.NetworkService;
|
import org.elasticsearch.common.network.NetworkService;
|
||||||
import org.elasticsearch.common.network.NetworkUtils;
|
import org.elasticsearch.common.network.NetworkUtils;
|
||||||
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||||
|
@ -102,6 +106,14 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
|
|
||||||
private final AtomicInteger pingIdGenerator = new AtomicInteger();
|
private final AtomicInteger pingIdGenerator = new AtomicInteger();
|
||||||
private final Map<Integer, PingCollection> receivedResponses = newConcurrentMap();
|
private final Map<Integer, PingCollection> receivedResponses = newConcurrentMap();
|
||||||
|
public static final Setting<String> ADDRESS_SETTING = Setting.simpleString("discovery.zen.ping.multicast.address", false, Setting.Scope.CLUSTER);
|
||||||
|
public static final Setting<Integer> PORT_SETTING = Setting.intSetting("discovery.zen.ping.multicast.port", 54328, 0, (1<<16)-1, false, Setting.Scope.CLUSTER);
|
||||||
|
public static final Setting<String> GROUP_SETTING = new Setting<>("discovery.zen.ping.multicast.group", "224.2.2.4", Function.identity(), false, Setting.Scope.CLUSTER);
|
||||||
|
public static final Setting<ByteSizeValue> BUFFER_SIZE_SETTING = Setting.byteSizeSetting("discovery.zen.ping.multicast.buffer_size", new ByteSizeValue(2048, ByteSizeUnit.BYTES), false, Setting.Scope.CLUSTER);
|
||||||
|
public static final Setting<Integer> TTL_SETTING = Setting.intSetting("discovery.zen.ping.multicast.ttl", 3, 0, 255, false, Setting.Scope.CLUSTER);
|
||||||
|
public static final Setting<Boolean> PING_ENABLED_SETTING = Setting.boolSetting("discovery.zen.ping.multicast.ping.enabled", true, false, Setting.Scope.CLUSTER);
|
||||||
|
public static final Setting<Boolean> SHARED_SETTING = Setting.boolSetting("discovery.zen.ping.multicast.shared", Constants.MAC_OS_X, false, Setting.Scope.CLUSTER);
|
||||||
|
public static final Setting<Boolean> DEFERE_TO_INTERFACE_SETTING = Setting.boolSetting("discovery.zen.ping.multicast.defer_group_to_set_interface", Constants.MAC_OS_X, false, Setting.Scope.CLUSTER);
|
||||||
|
|
||||||
public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version) {
|
public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName, Version version) {
|
||||||
this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS), version);
|
this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS), version);
|
||||||
|
@ -116,13 +128,12 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
this.networkService = networkService;
|
this.networkService = networkService;
|
||||||
this.version = version;
|
this.version = version;
|
||||||
|
|
||||||
this.address = this.settings.get("discovery.zen.ping.multicast.address");
|
this.address = ADDRESS_SETTING.exists(settings) ? ADDRESS_SETTING.get(settings) : null;
|
||||||
this.port = this.settings.getAsInt("discovery.zen.ping.multicast.port", 54328);
|
this.port = PORT_SETTING.get(settings);
|
||||||
this.group = this.settings.get("discovery.zen.ping.multicast.group", "224.2.2.4");
|
this.group = GROUP_SETTING.get(settings);
|
||||||
this.bufferSize = this.settings.getAsInt("discovery.zen.ping.multicast.buffer_size", 2048);
|
this.bufferSize = BUFFER_SIZE_SETTING.get(settings).bytesAsInt();
|
||||||
this.ttl = this.settings.getAsInt("discovery.zen.ping.multicast.ttl", 3);
|
this.ttl = TTL_SETTING.get(settings);
|
||||||
|
this.pingEnabled = PING_ENABLED_SETTING.get(settings);
|
||||||
this.pingEnabled = this.settings.getAsBoolean("discovery.zen.ping.multicast.ping.enabled", true);
|
|
||||||
|
|
||||||
logger.debug("using group [{}], with port [{}], ttl [{}], and address [{}]", group, port, ttl, address);
|
logger.debug("using group [{}], with port [{}], ttl [{}], and address [{}]", group, port, ttl, address);
|
||||||
|
|
||||||
|
@ -142,9 +153,9 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
|
||||||
try {
|
try {
|
||||||
// we know OSX has bugs in the JVM when creating multiple instances of multicast sockets
|
// we know OSX has bugs in the JVM when creating multiple instances of multicast sockets
|
||||||
// causing for "socket close" exceptions when receive and/or crashes
|
// causing for "socket close" exceptions when receive and/or crashes
|
||||||
boolean shared = settings.getAsBoolean("discovery.zen.ping.multicast.shared", Constants.MAC_OS_X);
|
boolean shared = SHARED_SETTING.get(settings);
|
||||||
// OSX does not correctly send multicasts FROM the right interface
|
// OSX does not correctly send multicasts FROM the right interface
|
||||||
boolean deferToInterface = settings.getAsBoolean("discovery.zen.ping.multicast.defer_group_to_set_interface", Constants.MAC_OS_X);
|
boolean deferToInterface = DEFERE_TO_INTERFACE_SETTING.get(settings);
|
||||||
|
|
||||||
final MulticastChannel.Config config = new MulticastChannel.Config(port, group, bufferSize, ttl,
|
final MulticastChannel.Config config = new MulticastChannel.Config(port, group, bufferSize, ttl,
|
||||||
getMulticastInterface(), deferToInterface);
|
getMulticastInterface(), deferToInterface);
|
||||||
|
|
|
@ -58,8 +58,6 @@ public final class SmbDirectoryWrapper extends FilterDirectory {
|
||||||
*/
|
*/
|
||||||
static final int CHUNK_SIZE = 8192;
|
static final int CHUNK_SIZE = 8192;
|
||||||
|
|
||||||
private final String name;
|
|
||||||
|
|
||||||
public SmbFSIndexOutput(String name) throws IOException {
|
public SmbFSIndexOutput(String name) throws IOException {
|
||||||
super("SmbFSIndexOutput(path=\"" + fsDirectory.getDirectory().resolve(name) + "\")", new FilterOutputStream(Channels.newOutputStream(Files.newByteChannel(fsDirectory.getDirectory().resolve(name), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.READ, StandardOpenOption.WRITE))) {
|
super("SmbFSIndexOutput(path=\"" + fsDirectory.getDirectory().resolve(name) + "\")", new FilterOutputStream(Channels.newOutputStream(Files.newByteChannel(fsDirectory.getDirectory().resolve(name), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.READ, StandardOpenOption.WRITE))) {
|
||||||
// This implementation ensures, that we never write more than CHUNK_SIZE bytes:
|
// This implementation ensures, that we never write more than CHUNK_SIZE bytes:
|
||||||
|
@ -73,7 +71,6 @@ public final class SmbDirectoryWrapper extends FilterDirectory {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, CHUNK_SIZE);
|
}, CHUNK_SIZE);
|
||||||
this.name = name;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue