Added three frequency levels for resource watching

It's now possible to register watchers along with a specified check frequency. There are three frequencies: low, medium, high. Each one is associated with a check interval that determines how frequent the watchers will check for changes and notify listeners if needed. By default, the intervals are 5s, 30s and 60s respectively, but they can also be customized in the settings. also:

  - Added the WatcherHandle construct by which one can stop it (remove it) and resume it (re add it). Also provices access to the watchers itself and the frequency by which it's checked
  - Change the default frequency to 30 seconds interval (used to be 60 seconds). The only watcher that is currently effected by this is the script watcher (now auto-loading scripts will auto-load every 30 seconds if changed)
This commit is contained in:
uboness 2014-07-16 21:28:30 +02:00
parent bdbe86dd2d
commit cc8f7ddb9a
3 changed files with 236 additions and 20 deletions

View File

@ -19,18 +19,17 @@
package org.elasticsearch.watcher;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledFuture;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
/**
* Generic resource watcher service
*
@ -41,22 +40,53 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
*/
public class ResourceWatcherService extends AbstractLifecycleComponent<ResourceWatcherService> {
private final List<ResourceWatcher> watchers = new CopyOnWriteArrayList<>();
public static enum Frequency {
private volatile ScheduledFuture scheduledFuture;
/**
* Defaults to 5 seconds
*/
HIGH(TimeValue.timeValueSeconds(5)),
/**
* Defaults to 30 seconds
*/
MEDIUM(TimeValue.timeValueSeconds(25)),
/**
* Defaults to 60 seconds
*/
LOW(TimeValue.timeValueSeconds(60));
final TimeValue interval;
private Frequency(TimeValue interval) {
this.interval = interval;
}
}
private final boolean enabled;
private final TimeValue interval;
private final ThreadPool threadPool;
final ResourceMonitor lowMonitor;
final ResourceMonitor mediumMonitor;
final ResourceMonitor highMonitor;
private volatile ScheduledFuture lowFuture;
private volatile ScheduledFuture mediumFuture;
private volatile ScheduledFuture highFuture;
@Inject
public ResourceWatcherService(Settings settings, ThreadPool threadPool) {
super(settings);
this.enabled = componentSettings.getAsBoolean("enabled", true);
this.interval = componentSettings.getAsTime("interval", timeValueSeconds(60));
this.threadPool = threadPool;
TimeValue interval = componentSettings.getAsTime("interval.low", Frequency.LOW.interval);
lowMonitor = new ResourceMonitor(interval, Frequency.LOW);
interval = componentSettings.getAsTime("interval.medium", componentSettings.getAsTime("interval", Frequency.MEDIUM.interval));
mediumMonitor = new ResourceMonitor(interval, Frequency.MEDIUM);
interval = componentSettings.getAsTime("interval.high", Frequency.HIGH.interval);
highMonitor = new ResourceMonitor(interval, Frequency.HIGH);
}
@Override
@ -64,7 +94,9 @@ public class ResourceWatcherService extends AbstractLifecycleComponent<ResourceW
if (!enabled) {
return;
}
scheduledFuture = threadPool.scheduleWithFixedDelay(new ResourceMonitor(), interval);
lowFuture = threadPool.scheduleWithFixedDelay(lowMonitor, lowMonitor.interval);
mediumFuture = threadPool.scheduleWithFixedDelay(mediumMonitor, mediumMonitor.interval);
highFuture = threadPool.scheduleWithFixedDelay(highMonitor, highMonitor.interval);
}
@Override
@ -72,7 +104,9 @@ public class ResourceWatcherService extends AbstractLifecycleComponent<ResourceW
if (!enabled) {
return;
}
scheduledFuture.cancel(true);
lowFuture.cancel(true);
mediumFuture.cancel(true);
highFuture.cancel(true);
}
@Override
@ -80,21 +114,45 @@ public class ResourceWatcherService extends AbstractLifecycleComponent<ResourceW
}
/**
* Register new resource watcher
* Register new resource watcher that will be checked in default {@link Frequency#MEDIUM MEDIUM} frequency
*/
public void add(ResourceWatcher watcher) {
watcher.init();
watchers.add(watcher);
public <W extends ResourceWatcher> WatcherHandle<W> add(W watcher) {
return add(watcher, Frequency.MEDIUM);
}
/**
* Unregister a resource watcher
* Register new resource watcher that will be checked in the given frequency
*/
public void remove(ResourceWatcher watcher) {
watchers.remove(watcher);
public <W extends ResourceWatcher> WatcherHandle<W> add(W watcher, Frequency frequency) {
watcher.init();
switch (frequency) {
case LOW:
return lowMonitor.add(watcher);
case MEDIUM:
return mediumMonitor.add(watcher);
case HIGH:
return highMonitor.add(watcher);
default:
throw new ElasticsearchIllegalArgumentException("Unknown frequency [" + frequency + "]");
}
}
private class ResourceMonitor implements Runnable {
static class ResourceMonitor implements Runnable {
final TimeValue interval;
final Frequency frequency;
final Set<ResourceWatcher> watchers = new CopyOnWriteArraySet<>();
private ResourceMonitor(TimeValue interval, Frequency frequency) {
this.interval = interval;
this.frequency = frequency;
}
private <W extends ResourceWatcher> WatcherHandle<W> add(W watcher) {
watchers.add(watcher);
return new WatcherHandle<>(this, watcher);
}
@Override
public void run() {

View File

@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.watcher;
/**
*
*/
public class WatcherHandle<W extends ResourceWatcher> {
private final ResourceWatcherService.ResourceMonitor monitor;
private final W watcher;
WatcherHandle(ResourceWatcherService.ResourceMonitor monitor, W watcher) {
this.monitor = monitor;
this.watcher = watcher;
}
public W watcher() {
return watcher;
}
public ResourceWatcherService.Frequency frequency() {
return monitor.frequency;
}
public void stop() {
monitor.watchers.remove(watcher);
}
public void resume() {
monitor.watchers.add(watcher);
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.watcher;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Test;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.*;
/**
*
*/
public class ResourceWatcherServiceTests extends ElasticsearchTestCase {
@Test
public void testSettings() throws Exception {
ThreadPool threadPool = new ThreadPool("test");
// checking the defaults
Settings settings = ImmutableSettings.builder().build();
ResourceWatcherService service = new ResourceWatcherService(settings, threadPool);
assertThat(service.highMonitor.interval, is(ResourceWatcherService.Frequency.HIGH.interval));
assertThat(service.mediumMonitor.interval, is(ResourceWatcherService.Frequency.MEDIUM.interval));
assertThat(service.lowMonitor.interval, is(ResourceWatcherService.Frequency.LOW.interval));
// checking bwc
settings = ImmutableSettings.builder()
.put("watcher.interval", "40s") // only applies to medium
.build();
service = new ResourceWatcherService(settings, threadPool);
assertThat(service.highMonitor.interval.millis(), is(timeValueSeconds(5).millis()));
assertThat(service.mediumMonitor.interval.millis(), is(timeValueSeconds(40).millis()));
assertThat(service.lowMonitor.interval.millis(), is(timeValueSeconds(60).millis()));
// checking custom
settings = ImmutableSettings.builder()
.put("watcher.interval.high", "10s")
.put("watcher.interval.medium", "20s")
.put("watcher.interval.low", "30s")
.build();
service = new ResourceWatcherService(settings, threadPool);
assertThat(service.highMonitor.interval.millis(), is(timeValueSeconds(10).millis()));
assertThat(service.mediumMonitor.interval.millis(), is(timeValueSeconds(20).millis()));
assertThat(service.lowMonitor.interval.millis(), is(timeValueSeconds(30).millis()));
}
@Test
public void testHandle() throws Exception {
ThreadPool threadPool = new ThreadPool("test");
Settings settings = ImmutableSettings.builder().build();
ResourceWatcherService service = new ResourceWatcherService(settings, threadPool);
ResourceWatcher watcher = new ResourceWatcher() {
@Override
public void init() {
}
@Override
public void checkAndNotify() {
}
};
// checking default freq
WatcherHandle handle = service.add(watcher);
assertThat(handle, notNullValue());
assertThat(handle.frequency(), equalTo(ResourceWatcherService.Frequency.MEDIUM));
assertThat(service.lowMonitor.watchers.size(), is(0));
assertThat(service.highMonitor.watchers.size(), is(0));
assertThat(service.mediumMonitor.watchers.size(), is(1));
handle.stop();
assertThat(service.mediumMonitor.watchers.size(), is(0));
handle.resume();
assertThat(service.mediumMonitor.watchers.size(), is(1));
handle.stop();
// checking custom freq
handle = service.add(watcher, ResourceWatcherService.Frequency.HIGH);
assertThat(handle, notNullValue());
assertThat(handle.frequency(), equalTo(ResourceWatcherService.Frequency.HIGH));
assertThat(service.lowMonitor.watchers.size(), is(0));
assertThat(service.mediumMonitor.watchers.size(), is(0));
assertThat(service.highMonitor.watchers.size(), is(1));
handle.stop();
assertThat(service.highMonitor.watchers.size(), is(0));
handle.resume();
assertThat(service.highMonitor.watchers.size(), is(1));
}
}