diff --git a/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java b/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java index 1e308562f14..9814d84fd81 100644 --- a/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java +++ b/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java @@ -19,18 +19,17 @@ package org.elasticsearch.watcher; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ScheduledFuture; -import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; - /** * Generic resource watcher service * @@ -41,22 +40,53 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; */ public class ResourceWatcherService extends AbstractLifecycleComponent { - private final List watchers = new CopyOnWriteArrayList<>(); + public static enum Frequency { - private volatile ScheduledFuture scheduledFuture; + /** + * Defaults to 5 seconds + */ + HIGH(TimeValue.timeValueSeconds(5)), + + /** + * Defaults to 30 seconds + */ + MEDIUM(TimeValue.timeValueSeconds(25)), + + /** + * Defaults to 60 seconds + */ + LOW(TimeValue.timeValueSeconds(60)); + + final TimeValue interval; + + private Frequency(TimeValue interval) { + this.interval = interval; + } + } private final boolean enabled; - - private final TimeValue interval; - private final ThreadPool threadPool; + final ResourceMonitor lowMonitor; + final ResourceMonitor mediumMonitor; + final ResourceMonitor highMonitor; + + private volatile ScheduledFuture lowFuture; + private volatile ScheduledFuture mediumFuture; + private volatile ScheduledFuture highFuture; + @Inject public ResourceWatcherService(Settings settings, ThreadPool threadPool) { super(settings); this.enabled = componentSettings.getAsBoolean("enabled", true); - this.interval = componentSettings.getAsTime("interval", timeValueSeconds(60)); this.threadPool = threadPool; + + TimeValue interval = componentSettings.getAsTime("interval.low", Frequency.LOW.interval); + lowMonitor = new ResourceMonitor(interval, Frequency.LOW); + interval = componentSettings.getAsTime("interval.medium", componentSettings.getAsTime("interval", Frequency.MEDIUM.interval)); + mediumMonitor = new ResourceMonitor(interval, Frequency.MEDIUM); + interval = componentSettings.getAsTime("interval.high", Frequency.HIGH.interval); + highMonitor = new ResourceMonitor(interval, Frequency.HIGH); } @Override @@ -64,7 +94,9 @@ public class ResourceWatcherService extends AbstractLifecycleComponent WatcherHandle add(W watcher) { + return add(watcher, Frequency.MEDIUM); } /** - * Unregister a resource watcher + * Register new resource watcher that will be checked in the given frequency */ - public void remove(ResourceWatcher watcher) { - watchers.remove(watcher); + public WatcherHandle add(W watcher, Frequency frequency) { + watcher.init(); + switch (frequency) { + case LOW: + return lowMonitor.add(watcher); + case MEDIUM: + return mediumMonitor.add(watcher); + case HIGH: + return highMonitor.add(watcher); + default: + throw new ElasticsearchIllegalArgumentException("Unknown frequency [" + frequency + "]"); + } } - private class ResourceMonitor implements Runnable { + static class ResourceMonitor implements Runnable { + + final TimeValue interval; + final Frequency frequency; + + final Set watchers = new CopyOnWriteArraySet<>(); + + private ResourceMonitor(TimeValue interval, Frequency frequency) { + this.interval = interval; + this.frequency = frequency; + } + + private WatcherHandle add(W watcher) { + watchers.add(watcher); + return new WatcherHandle<>(this, watcher); + } @Override public void run() { diff --git a/src/main/java/org/elasticsearch/watcher/WatcherHandle.java b/src/main/java/org/elasticsearch/watcher/WatcherHandle.java new file mode 100644 index 00000000000..da395c41a0b --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/WatcherHandle.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.watcher; + +/** +* +*/ +public class WatcherHandle { + + private final ResourceWatcherService.ResourceMonitor monitor; + private final W watcher; + + WatcherHandle(ResourceWatcherService.ResourceMonitor monitor, W watcher) { + this.monitor = monitor; + this.watcher = watcher; + } + + public W watcher() { + return watcher; + } + + public ResourceWatcherService.Frequency frequency() { + return monitor.frequency; + } + + public void stop() { + monitor.watchers.remove(watcher); + } + + public void resume() { + monitor.watchers.add(watcher); + } +} diff --git a/src/test/java/org/elasticsearch/watcher/ResourceWatcherServiceTests.java b/src/test/java/org/elasticsearch/watcher/ResourceWatcherServiceTests.java new file mode 100644 index 00000000000..ce8db76c924 --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/ResourceWatcherServiceTests.java @@ -0,0 +1,108 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.watcher; + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Test; + +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; +import static org.hamcrest.Matchers.*; + +/** + * + */ +public class ResourceWatcherServiceTests extends ElasticsearchTestCase { + + @Test + public void testSettings() throws Exception { + ThreadPool threadPool = new ThreadPool("test"); + + // checking the defaults + Settings settings = ImmutableSettings.builder().build(); + ResourceWatcherService service = new ResourceWatcherService(settings, threadPool); + assertThat(service.highMonitor.interval, is(ResourceWatcherService.Frequency.HIGH.interval)); + assertThat(service.mediumMonitor.interval, is(ResourceWatcherService.Frequency.MEDIUM.interval)); + assertThat(service.lowMonitor.interval, is(ResourceWatcherService.Frequency.LOW.interval)); + + // checking bwc + settings = ImmutableSettings.builder() + .put("watcher.interval", "40s") // only applies to medium + .build(); + service = new ResourceWatcherService(settings, threadPool); + assertThat(service.highMonitor.interval.millis(), is(timeValueSeconds(5).millis())); + assertThat(service.mediumMonitor.interval.millis(), is(timeValueSeconds(40).millis())); + assertThat(service.lowMonitor.interval.millis(), is(timeValueSeconds(60).millis())); + + // checking custom + settings = ImmutableSettings.builder() + .put("watcher.interval.high", "10s") + .put("watcher.interval.medium", "20s") + .put("watcher.interval.low", "30s") + .build(); + service = new ResourceWatcherService(settings, threadPool); + assertThat(service.highMonitor.interval.millis(), is(timeValueSeconds(10).millis())); + assertThat(service.mediumMonitor.interval.millis(), is(timeValueSeconds(20).millis())); + assertThat(service.lowMonitor.interval.millis(), is(timeValueSeconds(30).millis())); + } + + @Test + public void testHandle() throws Exception { + ThreadPool threadPool = new ThreadPool("test"); + Settings settings = ImmutableSettings.builder().build(); + ResourceWatcherService service = new ResourceWatcherService(settings, threadPool); + ResourceWatcher watcher = new ResourceWatcher() { + @Override + public void init() { + } + + @Override + public void checkAndNotify() { + } + }; + + // checking default freq + WatcherHandle handle = service.add(watcher); + assertThat(handle, notNullValue()); + assertThat(handle.frequency(), equalTo(ResourceWatcherService.Frequency.MEDIUM)); + assertThat(service.lowMonitor.watchers.size(), is(0)); + assertThat(service.highMonitor.watchers.size(), is(0)); + assertThat(service.mediumMonitor.watchers.size(), is(1)); + handle.stop(); + assertThat(service.mediumMonitor.watchers.size(), is(0)); + handle.resume(); + assertThat(service.mediumMonitor.watchers.size(), is(1)); + handle.stop(); + + // checking custom freq + handle = service.add(watcher, ResourceWatcherService.Frequency.HIGH); + assertThat(handle, notNullValue()); + assertThat(handle.frequency(), equalTo(ResourceWatcherService.Frequency.HIGH)); + assertThat(service.lowMonitor.watchers.size(), is(0)); + assertThat(service.mediumMonitor.watchers.size(), is(0)); + assertThat(service.highMonitor.watchers.size(), is(1)); + handle.stop(); + assertThat(service.highMonitor.watchers.size(), is(0)); + handle.resume(); + assertThat(service.highMonitor.watchers.size(), is(1)); + } +}