Merge pull request #16313 from jasontedor/monitor-settings

Monitor settings
This commit is contained in:
Jason Tedor 2016-01-29 10:21:38 -05:00
commit c7b8fecb0e
8 changed files with 186 additions and 25 deletions

View File

@ -67,6 +67,11 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.monitor.fs.FsService;
import org.elasticsearch.monitor.jvm.JvmGcMonitorService;
import org.elasticsearch.monitor.jvm.JvmService;
import org.elasticsearch.monitor.os.OsService;
import org.elasticsearch.monitor.process.ProcessService;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.repositories.fs.FsRepository;
@ -324,5 +329,13 @@ public final class ClusterSettings extends AbstractScopedSettings {
TribeService.ON_CONFLICT_SETTING,
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING,
NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH)));
NodeEnvironment.ADD_NODE_ID_TO_CUSTOM_PATH,
OsService.REFRESH_INTERVAL_SETTING,
ProcessService.REFRESH_INTERVAL_SETTING,
JvmService.REFRESH_INTERVAL_SETTING,
FsService.REFRESH_INTERVAL_SETTING,
JvmGcMonitorService.ENABLED_SETTING,
JvmGcMonitorService.REFRESH_INTERVAL_SETTING,
JvmGcMonitorService.GC_SETTING
)));
}

View File

@ -23,7 +23,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.monitor.fs.FsService;
import org.elasticsearch.monitor.jvm.JvmMonitorService;
import org.elasticsearch.monitor.jvm.JvmGcMonitorService;
import org.elasticsearch.monitor.jvm.JvmService;
import org.elasticsearch.monitor.os.OsService;
import org.elasticsearch.monitor.process.ProcessService;
@ -36,7 +36,7 @@ import java.io.IOException;
*/
public class MonitorService extends AbstractLifecycleComponent<MonitorService> {
private final JvmMonitorService jvmMonitorService;
private final JvmGcMonitorService jvmGcMonitorService;
private final OsService osService;
@ -48,7 +48,7 @@ public class MonitorService extends AbstractLifecycleComponent<MonitorService> {
public MonitorService(Settings settings, NodeEnvironment nodeEnvironment, ThreadPool threadPool) throws IOException {
super(settings);
this.jvmMonitorService = new JvmMonitorService(settings, threadPool);
this.jvmGcMonitorService = new JvmGcMonitorService(settings, threadPool);
this.osService = new OsService(settings);
this.processService = new ProcessService(settings);
this.jvmService = new JvmService(settings);
@ -73,16 +73,16 @@ public class MonitorService extends AbstractLifecycleComponent<MonitorService> {
@Override
protected void doStart() {
jvmMonitorService.start();
jvmGcMonitorService.start();
}
@Override
protected void doStop() {
jvmMonitorService.stop();
jvmGcMonitorService.stop();
}
@Override
protected void doClose() {
jvmMonitorService.close();
jvmGcMonitorService.close();
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.monitor.fs;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.SingleObjectCache;
@ -35,10 +36,13 @@ public class FsService extends AbstractComponent {
private final SingleObjectCache<FsInfo> fsStatsCache;
public final static Setting<TimeValue> REFRESH_INTERVAL_SETTING =
Setting.timeSetting("monitor.fs.refresh_interval", TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(1), false, Setting.Scope.CLUSTER);
public FsService(Settings settings, NodeEnvironment nodeEnvironment) throws IOException {
super(settings);
this.probe = new FsProbe(settings, nodeEnvironment);
TimeValue refreshInterval = settings.getAsTime("monitor.fs.refresh_interval", TimeValue.timeValueSeconds(1));
TimeValue refreshInterval = REFRESH_INTERVAL_SETTING.get(settings);
fsStatsCache = new FsInfoCache(refreshInterval, probe.stats());
logger.debug("Using probe [{}] with refresh_interval [{}]", probe, refreshInterval);
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.monitor.jvm;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Scope;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
@ -31,13 +33,12 @@ import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.elasticsearch.monitor.jvm.JvmStats.jvmStats;
/**
*
*/
public class JvmMonitorService extends AbstractLifecycleComponent<JvmMonitorService> {
public class JvmGcMonitorService extends AbstractLifecycleComponent<JvmGcMonitorService> {
private final ThreadPool threadPool;
private final boolean enabled;
@ -46,6 +47,13 @@ public class JvmMonitorService extends AbstractLifecycleComponent<JvmMonitorServ
private volatile ScheduledFuture scheduledFuture;
public final static Setting<Boolean> ENABLED_SETTING = Setting.boolSetting("monitor.jvm.gc.enabled", true, false, Scope.CLUSTER);
public final static Setting<TimeValue> REFRESH_INTERVAL_SETTING =
Setting.timeSetting("monitor.jvm.gc.refresh_interval", TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(1), false, Scope.CLUSTER);
private static String GC_COLLECTOR_PREFIX = "monitor.jvm.gc.collector.";
public final static Setting<Settings> GC_SETTING = Setting.groupSetting(GC_COLLECTOR_PREFIX, false, Scope.CLUSTER);
static class GcThreshold {
public final String name;
public final long warnThreshold;
@ -70,25 +78,21 @@ public class JvmMonitorService extends AbstractLifecycleComponent<JvmMonitorServ
}
}
public JvmMonitorService(Settings settings, ThreadPool threadPool) {
public JvmGcMonitorService(Settings settings, ThreadPool threadPool) {
super(settings);
this.threadPool = threadPool;
this.enabled = this.settings.getAsBoolean("monitor.jvm.enabled", true);
this.interval = this.settings.getAsTime("monitor.jvm.interval", timeValueSeconds(1));
this.enabled = ENABLED_SETTING.get(settings);
this.interval = REFRESH_INTERVAL_SETTING.get(settings);
Map<String, GcThreshold> gcThresholds = new HashMap<>();
Map<String, Settings> gcThresholdGroups = this.settings.getGroups("monitor.jvm.gc");
Map<String, Settings> gcThresholdGroups = GC_SETTING.get(settings).getAsGroups();
for (Map.Entry<String, Settings> entry : gcThresholdGroups.entrySet()) {
String name = entry.getKey();
TimeValue warn = entry.getValue().getAsTime("warn", null);
TimeValue info = entry.getValue().getAsTime("info", null);
TimeValue debug = entry.getValue().getAsTime("debug", null);
if (warn == null || info == null || debug == null) {
logger.warn("ignoring gc_threshold for [{}], missing warn/info/debug values", name);
} else {
gcThresholds.put(name, new GcThreshold(name, warn.millis(), info.millis(), debug.millis()));
}
TimeValue warn = getValidThreshold(entry.getValue(), entry.getKey(), "warn");
TimeValue info = getValidThreshold(entry.getValue(), entry.getKey(), "info");
TimeValue debug = getValidThreshold(entry.getValue(), entry.getKey(), "debug");
gcThresholds.put(name, new GcThreshold(name, warn.millis(), info.millis(), debug.millis()));
}
gcThresholds.putIfAbsent(GcNames.YOUNG, new GcThreshold(GcNames.YOUNG, 1000, 700, 400));
gcThresholds.putIfAbsent(GcNames.OLD, new GcThreshold(GcNames.OLD, 10000, 5000, 2000));
@ -98,6 +102,21 @@ public class JvmMonitorService extends AbstractLifecycleComponent<JvmMonitorServ
logger.debug("enabled [{}], interval [{}], gc_threshold [{}]", enabled, interval, this.gcThresholds);
}
private static TimeValue getValidThreshold(Settings settings, String key, String level) {
TimeValue threshold = settings.getAsTime(level, null);
if (threshold == null) {
throw new IllegalArgumentException("missing gc_threshold for [" + getThresholdName(key, level) + "]");
}
if (threshold.nanos() <= 0) {
throw new IllegalArgumentException("invalid gc_threshold [" + threshold + "] for [" + getThresholdName(key, level) + "]");
}
return threshold;
}
private static String getThresholdName(String key, String level) {
return GC_COLLECTOR_PREFIX + key + "." + level;
}
@Override
protected void doStart() {
if (!enabled) {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.monitor.jvm;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -34,12 +35,15 @@ public class JvmService extends AbstractComponent {
private JvmStats jvmStats;
public final static Setting<TimeValue> REFRESH_INTERVAL_SETTING =
Setting.timeSetting("monitor.jvm.refresh_interval", TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(1), false, Setting.Scope.CLUSTER);
public JvmService(Settings settings) {
super(settings);
this.jvmInfo = JvmInfo.jvmInfo();
this.jvmStats = JvmStats.jvmStats();
this.refreshInterval = this.settings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(1));
this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings);
logger.debug("Using refresh_interval [{}]", refreshInterval);
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.monitor.os;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.SingleObjectCache;
@ -36,11 +37,14 @@ public class OsService extends AbstractComponent {
private SingleObjectCache<OsStats> osStatsCache;
public final static Setting<TimeValue> REFRESH_INTERVAL_SETTING =
Setting.timeSetting("monitor.os.refresh_interval", TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(1), false, Setting.Scope.CLUSTER);
public OsService(Settings settings) {
super(settings);
this.probe = OsProbe.getInstance();
TimeValue refreshInterval = settings.getAsTime("monitor.os.refresh_interval", TimeValue.timeValueSeconds(1));
TimeValue refreshInterval = REFRESH_INTERVAL_SETTING.get(settings);
this.info = probe.osInfo();
this.info.refreshInterval = refreshInterval.millis();

View File

@ -20,6 +20,7 @@
package org.elasticsearch.monitor.process;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.SingleObjectCache;
@ -33,11 +34,14 @@ public final class ProcessService extends AbstractComponent {
private final ProcessInfo info;
private final SingleObjectCache<ProcessStats> processStatsCache;
public final static Setting<TimeValue> REFRESH_INTERVAL_SETTING =
Setting.timeSetting("monitor.process.refresh_interval", TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(1), false, Setting.Scope.CLUSTER);
public ProcessService(Settings settings) {
super(settings);
this.probe = ProcessProbe.getInstance();
final TimeValue refreshInterval = settings.getAsTime("monitor.process.refresh_interval", TimeValue.timeValueSeconds(1));
final TimeValue refreshInterval = REFRESH_INTERVAL_SETTING.get(settings);
processStatsCache = new ProcessStatsCache(refreshInterval, probe.processStats());
this.info = probe.processInfo();
this.info.refreshInterval = refreshInterval.millis();

View File

@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.monitor.jvm;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.AbstractMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
public class JvmGcMonitorServiceSettingsTests extends ESTestCase {
public void testEmptySettingsAreOkay() throws InterruptedException {
AtomicBoolean scheduled = new AtomicBoolean();
execute(Settings.EMPTY, (command, interval) -> { scheduled.set(true); return null; }, () -> assertTrue(scheduled.get()));
}
public void testDisabledSetting() throws InterruptedException {
Settings settings = Settings.builder().put("monitor.jvm.gc.enabled", "false").build();
AtomicBoolean scheduled = new AtomicBoolean();
execute(settings, (command, interval) -> { scheduled.set(true); return null; }, () -> assertFalse(scheduled.get()));
}
public void testNegativeSetting() throws InterruptedException {
String collector = randomAsciiOfLength(5);
Settings settings = Settings.builder().put("monitor.jvm.gc.collector." + collector + ".warn", "-" + randomTimeValue()).build();
execute(settings, (command, interval) -> null, t -> {
assertThat(t, instanceOf(IllegalArgumentException.class));
assertThat(t.getMessage(), allOf(containsString("invalid gc_threshold"), containsString("for [monitor.jvm.gc.collector." + collector + ".")));
}, true, null);
}
public void testMissingSetting() throws InterruptedException {
String collector = randomAsciiOfLength(5);
Set<AbstractMap.SimpleEntry<String, String>> entries = new HashSet<>();
entries.add(new AbstractMap.SimpleEntry<>("monitor.jvm.gc.collector." + collector + ".warn", randomTimeValue()));
entries.add(new AbstractMap.SimpleEntry<>("monitor.jvm.gc.collector." + collector + ".info", randomTimeValue()));
entries.add(new AbstractMap.SimpleEntry<>("monitor.jvm.gc.collector." + collector + ".debug", randomTimeValue()));
Settings.Builder builder = Settings.builder();
// drop a random setting or two
for (@SuppressWarnings("unchecked") AbstractMap.SimpleEntry<String, String> entry : randomSubsetOf(randomIntBetween(1, 2), entries.toArray(new AbstractMap.SimpleEntry[0]))) {
builder.put(entry.getKey(), entry.getValue());
}
// we should get an exception that a setting is missing
execute(builder.build(), (command, interval) -> null, t -> {
assertThat(t, instanceOf(IllegalArgumentException.class));
assertThat(t.getMessage(), containsString("missing gc_threshold for [monitor.jvm.gc.collector." + collector + "."));
}, true, null);
}
private static void execute(Settings settings, BiFunction<Runnable, TimeValue, ScheduledFuture<?>> scheduler, Runnable asserts) throws InterruptedException {
execute(settings, scheduler, null, false, asserts);
}
private static void execute(Settings settings, BiFunction<Runnable, TimeValue, ScheduledFuture<?>> scheduler, Consumer<Throwable> consumer, boolean constructionShouldFail, Runnable asserts) throws InterruptedException {
assert constructionShouldFail == (consumer != null);
assert constructionShouldFail == (asserts == null);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(JvmGcMonitorServiceSettingsTests.class.getCanonicalName()) {
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) {
return scheduler.apply(command, interval);
}
};
try {
JvmGcMonitorService service = new JvmGcMonitorService(settings, threadPool);
if (constructionShouldFail) {
fail("construction of jvm gc service should have failed");
}
service.doStart();
asserts.run();
service.doStop();
} catch (Throwable t) {
consumer.accept(t);
}
} finally {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
}
}
}