From c724f0de5dc8108acbbef5034249c1b99388a113 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Sun, 3 Nov 2013 21:20:38 -0500 Subject: [PATCH] Initial implementation of ResourceWatcherService Closes #4062 --- docs/reference/modules/scripting.asciidoc | 12 + .../node/internal/InternalNode.java | 5 + .../elasticsearch/script/ScriptService.java | 127 +++--- .../watcher/AbstractResourceWatcher.java | 79 ++++ .../watcher/FileChangesListener.java | 75 ++++ .../elasticsearch/watcher/FileWatcher.java | 274 +++++++++++++ .../watcher/ResourceWatcher.java | 37 ++ .../watcher/ResourceWatcherModule.java | 31 ++ .../watcher/ResourceWatcherService.java | 106 +++++ .../watcher/FileWatcherTest.java | 386 ++++++++++++++++++ 10 files changed, 1084 insertions(+), 48 deletions(-) create mode 100644 src/main/java/org/elasticsearch/watcher/AbstractResourceWatcher.java create mode 100644 src/main/java/org/elasticsearch/watcher/FileChangesListener.java create mode 100644 src/main/java/org/elasticsearch/watcher/FileWatcher.java create mode 100644 src/main/java/org/elasticsearch/watcher/ResourceWatcher.java create mode 100644 src/main/java/org/elasticsearch/watcher/ResourceWatcherModule.java create mode 100644 src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java create mode 100644 src/test/java/org/elasticsearch/watcher/FileWatcherTest.java diff --git a/docs/reference/modules/scripting.asciidoc b/docs/reference/modules/scripting.asciidoc index 763cae5cb40..39dee1478b1 100644 --- a/docs/reference/modules/scripting.asciidoc +++ b/docs/reference/modules/scripting.asciidoc @@ -68,6 +68,18 @@ This will still allow execution of named scripts provided in the config, or _native_ Java scripts registered through plugins, however it will prevent users from running arbitrary scripts via the API. +[float] +=== Automatic Script Reloading + +added[0.90.6] + +The `config/scripts` directory is scanned periodically for changes. +New and changed scripts are reloaded and deleted script are removed +from preloaded scripts cache. The reload frequency can be specified +using `watcher.interval` setting, which defaults to `60s`. +To disable script reloading completely set `script.auto_reload_enabled` +to `false`. + [float] === Native (Java) Scripts diff --git a/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 04d5ecfa4cd..4276b4770ac 100644 --- a/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -90,6 +90,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.watcher.ResourceWatcherModule; +import org.elasticsearch.watcher.ResourceWatcherService; import java.util.Arrays; import java.util.concurrent.TimeUnit; @@ -170,6 +172,7 @@ public final class InternalNode implements Node { modules.add(new BulkUdpModule()); modules.add(new ShapeModule()); modules.add(new PercolatorModule()); + modules.add(new ResourceWatcherModule()); injector = modules.createInjector(); @@ -223,6 +226,7 @@ public final class InternalNode implements Node { injector.getInstance(HttpServer.class).start(); } injector.getInstance(BulkUdpService.class).start(); + injector.getInstance(ResourceWatcherService.class).start(); logger.info("started"); @@ -238,6 +242,7 @@ public final class InternalNode implements Node { logger.info("stopping ..."); injector.getInstance(BulkUdpService.class).stop(); + injector.getInstance(ResourceWatcherService.class).stop(); if (settings.getAsBoolean("http.enabled", true)) { injector.getInstance(HttpServer.class).stop(); } diff --git a/src/main/java/org/elasticsearch/script/ScriptService.java b/src/main/java/org/elasticsearch/script/ScriptService.java index 6869d6a532e..071f3ff2268 100644 --- a/src/main/java/org/elasticsearch/script/ScriptService.java +++ b/src/main/java/org/elasticsearch/script/ScriptService.java @@ -23,9 +23,9 @@ import com.google.common.base.Charsets; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.Streams; @@ -35,8 +35,10 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.env.Environment; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.script.mvel.MvelScriptEngineService; import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.watcher.FileChangesListener; +import org.elasticsearch.watcher.FileWatcher; +import org.elasticsearch.watcher.ResourceWatcherService; import java.io.File; import java.io.FileInputStream; @@ -59,18 +61,13 @@ public class ScriptService extends AbstractComponent { private final ConcurrentMap staticCache = ConcurrentCollections.newConcurrentMap(); private final Cache cache; + private final File scriptsDirectory; private final boolean disableDynamic; - public ScriptService(Settings settings) { - this(settings, new Environment(), ImmutableSet.builder() - .add(new MvelScriptEngineService(settings)) - .build() - ); - } - @Inject - public ScriptService(Settings settings, Environment env, Set scriptEngines) { + public ScriptService(Settings settings, Environment env, Set scriptEngines, + ResourceWatcherService resourceWatcherService) { super(settings); int cacheMaxSize = componentSettings.getAsInt("cache.max_size", 500); @@ -100,45 +97,17 @@ public class ScriptService extends AbstractComponent { // put some default optimized scripts staticCache.put("doc.score", new CompiledScript("native", new DocScoreNativeScriptFactory())); - // compile static scripts - File scriptsFile = new File(env.configFile(), "scripts"); - if (scriptsFile.exists()) { - processScriptsDirectory("", scriptsFile); - } - } + // add file watcher for static scripts + scriptsDirectory = new File(env.configFile(), "scripts"); + FileWatcher fileWatcher = new FileWatcher(scriptsDirectory); + fileWatcher.addListener(new ScriptChangesListener()); - private void processScriptsDirectory(String prefix, File dir) { - for (File file : dir.listFiles()) { - if (file.isDirectory()) { - processScriptsDirectory(prefix + file.getName() + "_", file); - } else { - int extIndex = file.getName().lastIndexOf('.'); - if (extIndex != -1) { - String ext = file.getName().substring(extIndex + 1); - String scriptName = prefix + file.getName().substring(0, extIndex); - boolean found = false; - for (ScriptEngineService engineService : scriptEngines.values()) { - for (String s : engineService.extensions()) { - if (s.equals(ext)) { - found = true; - try { - String script = Streams.copyToString(new InputStreamReader(new FileInputStream(file), Charsets.UTF_8)); - staticCache.put(scriptName, new CompiledScript(engineService.types()[0], engineService.compile(script))); - } catch (Exception e) { - logger.warn("failed to load/compile script [{}]", e, scriptName); - } - break; - } - } - if (found) { - break; - } - } - if (!found) { - logger.warn("no script engine found for [{}]", ext); - } - } - } + if (componentSettings.getAsBoolean("auto_reload_enabled", true)) { + // automatic reload is enabled - register scripts + resourceWatcherService.add(fileWatcher); + } else { + // automatic reload is disable just load scripts once + fileWatcher.init(); } } @@ -214,6 +183,68 @@ public class ScriptService extends AbstractComponent { return !"native".equals(lang); } + private class ScriptChangesListener extends FileChangesListener { + + private Tuple scriptNameExt(File file) { + String scriptPath = scriptsDirectory.toURI().relativize(file.toURI()).getPath(); + int extIndex = scriptPath.lastIndexOf('.'); + if (extIndex != -1) { + String ext = scriptPath.substring(extIndex + 1); + String scriptName = scriptPath.substring(0, extIndex).replace(File.separatorChar, '_'); + return new Tuple(scriptName, ext); + } else { + return null; + } + } + + @Override + public void onFileInit(File file) { + Tuple scriptNameExt = scriptNameExt(file); + if (scriptNameExt != null) { + boolean found = false; + for (ScriptEngineService engineService : scriptEngines.values()) { + for (String s : engineService.extensions()) { + if (s.equals(scriptNameExt.v2())) { + found = true; + try { + logger.trace("compiling script file " + file.getAbsolutePath()); + String script = Streams.copyToString(new InputStreamReader(new FileInputStream(file), Charsets.UTF_8)); + staticCache.put(scriptNameExt.v1(), new CompiledScript(engineService.types()[0], engineService.compile(script))); + } catch (Throwable e) { + logger.warn("failed to load/compile script [{}]", e, scriptNameExt.v1()); + } + break; + } + } + if (found) { + break; + } + } + if (!found) { + logger.warn("no script engine found for [{}]", scriptNameExt.v2()); + } + } + } + + @Override + public void onFileCreated(File file) { + onFileInit(file); + } + + @Override + public void onFileDeleted(File file) { + Tuple scriptNameExt = scriptNameExt(file); + logger.trace("removing script file " + file.getAbsolutePath()); + staticCache.remove(scriptNameExt.v1()); + } + + @Override + public void onFileChanged(File file) { + onFileInit(file); + } + + } + public static class CacheKey { public final String lang; public final String script; diff --git a/src/main/java/org/elasticsearch/watcher/AbstractResourceWatcher.java b/src/main/java/org/elasticsearch/watcher/AbstractResourceWatcher.java new file mode 100644 index 00000000000..3ba82ad7ff7 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/AbstractResourceWatcher.java @@ -0,0 +1,79 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.watcher; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * Abstract resource watcher framework, which handles adding and removing listeners + * and calling resource observer. + */ +public abstract class AbstractResourceWatcher implements ResourceWatcher { + private final List listeners = new CopyOnWriteArrayList(); + private boolean initialized = false; + + @Override + public void init() { + if (!initialized) { + doInit(); + initialized = true; + } + } + + @Override + public void checkAndNotify() { + init(); + doCheckAndNotify(); + } + + /** + * Registers new listener + */ + public void addListener(Listener listener) { + listeners.add(listener); + } + + /** + * Unregisters a listener + */ + public void remove(Listener listener) { + listeners.remove(listener); + } + + /** + * Returns a list of listeners + */ + protected List listeners() { + return listeners; + } + + /** + * Will be called once on initialization + */ + protected abstract void doInit(); + + /** + * Will be called periodically + *

+ * Implementing watcher should check resource and notify all {@link #listeners()}. + */ + protected abstract void doCheckAndNotify(); + +} diff --git a/src/main/java/org/elasticsearch/watcher/FileChangesListener.java b/src/main/java/org/elasticsearch/watcher/FileChangesListener.java new file mode 100644 index 00000000000..78e0cac5473 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/FileChangesListener.java @@ -0,0 +1,75 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.watcher; + +import java.io.File; + +/** + * Callback interface that file changes File Watcher is using to notify listeners about changes. + */ +public class FileChangesListener { + /** + * Called for every file found in the watched directory during initialization + */ + public void onFileInit(File file) { + + } + + /** + * Called for every subdirectory found in the watched directory during initialization + */ + public void onDirectoryInit(File file) { + + } + + /** + * Called for every new file found in the watched directory + */ + public void onFileCreated(File file) { + + } + + /** + * Called for every file that disappeared in the watched directory + */ + public void onFileDeleted(File file) { + + } + + /** + * Called for every file that was changed in the watched directory + */ + public void onFileChanged(File file) { + + } + + /** + * Called for every new subdirectory found in the watched directory + */ + public void onDirectoryCreated(File file) { + + } + + /** + * Called for every file that disappeared in the watched directory + */ + public void onDirectoryDeleted(File file) { + + } +} diff --git a/src/main/java/org/elasticsearch/watcher/FileWatcher.java b/src/main/java/org/elasticsearch/watcher/FileWatcher.java new file mode 100644 index 00000000000..46ee7fe0d74 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/FileWatcher.java @@ -0,0 +1,274 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.watcher; + +import java.io.File; +import java.util.Arrays; + +/** + * File resources watcher + * + * The file watcher checks directory and all its subdirectories for file changes and notifies its listeners accordingly + */ +public class FileWatcher extends AbstractResourceWatcher { + + private FileObserver rootFileObserver; + + /** + * Creates new file watcher on the given directory + */ + public FileWatcher(File file) { + rootFileObserver = new FileObserver(file); + } + + @Override + protected void doInit() { + rootFileObserver.init(true); + } + + @Override + protected void doCheckAndNotify() { + rootFileObserver.checkAndNotify(); + } + + private static FileObserver[] EMPTY_DIRECTORY = new FileObserver[0]; + + private class FileObserver { + private File file; + private boolean exists; + private long length; + private long lastModified; + private boolean isDirectory; + private FileObserver[] children; + + public FileObserver(File file) { + this.file = file; + } + + public void checkAndNotify() { + boolean prevExists = exists; + boolean prevIsDirectory = isDirectory; + long prevLength = length; + long prevLastModified = lastModified; + + exists = file.exists(); + + if (exists) { + isDirectory = file.isDirectory(); + if (isDirectory) { + length = 0; + lastModified = 0; + } else { + length = file.length(); + lastModified = file.lastModified(); + } + } else { + isDirectory = false; + length = 0; + lastModified = 0; + } + + // Perform notifications and update children for the current file + if (prevExists) { + if (exists) { + if (isDirectory) { + if (prevIsDirectory) { + // Remained a directory + updateChildren(); + } else { + // File replaced by directory + onFileDeleted(); + onDirectoryCreated(false); + } + } else { + if (prevIsDirectory) { + // Directory replaced by file + onDirectoryDeleted(); + onFileCreated(false); + } else { + // Remained file + if (prevLastModified != lastModified || prevLength != length) { + onFileChanged(); + } + } + } + } else { + // Deleted + if (prevIsDirectory) { + onDirectoryDeleted(); + } else { + onFileDeleted(); + } + } + } else { + // Created + if (exists) { + if (isDirectory) { + onDirectoryCreated(false); + } else { + onFileCreated(false); + } + } + } + + } + + private void init(boolean initial) { + exists = file.exists(); + if (exists) { + isDirectory = file.isDirectory(); + if (isDirectory) { + onDirectoryCreated(initial); + } else { + length = file.length(); + lastModified = file.lastModified(); + onFileCreated(initial); + } + } + } + + private FileObserver createChild(File file, boolean initial) { + FileObserver child = new FileObserver(file); + child.init(initial); + return child; + } + + private File[] listFiles() { + File[] files = file.listFiles(); + if (files != null) { + Arrays.sort(files); + } + return files; + } + + private FileObserver[] listChildren(boolean initial) { + File[] files = listFiles(); + if (files != null && files.length > 0) { + FileObserver[] children = new FileObserver[files.length]; + for (int i = 0; i < files.length; i++) { + children[i] = createChild(files[i], initial); + } + return children; + } else { + return EMPTY_DIRECTORY; + } + } + + private void updateChildren() { + File[] files = listFiles(); + if (files != null && files.length > 0) { + FileObserver[] newChildren = new FileObserver[files.length]; + int child = 0; + int file = 0; + while (file < files.length || child < children.length ) { + int compare; + + if (file >= files.length) { + compare = -1; + } else if (child >= children.length) { + compare = 1; + } else { + compare = children[child].file.compareTo(files[file]); + } + + if (compare == 0) { + // Same file copy it and update + children[child].checkAndNotify(); + newChildren[file] = children[child]; + file++; + child++; + } else { + if (compare > 0) { + // This child doesn't appear in the old list - init it + newChildren[file] = createChild(files[file], false); + file++; + } else { + // The child from the old list is missing in the new list + // Delete it + deleteChild(child); + child++; + } + } + } + children = newChildren; + } else { + // No files - delete all children + for (int child = 0; child < children.length; child++) { + deleteChild(child); + } + children = EMPTY_DIRECTORY; + } + } + + private void deleteChild(int child) { + if (children[child].exists) { + if (children[child].isDirectory) { + children[child].onDirectoryDeleted(); + } else { + children[child].onFileDeleted(); + } + } + } + + private void onFileCreated(boolean initial) { + for (FileChangesListener listener : listeners()) { + if (initial) { + listener.onFileInit(file); + } else { + listener.onFileCreated(file); + } + } + } + + private void onFileDeleted() { + for (FileChangesListener listener : listeners()) { + listener.onFileDeleted(file); + } + } + + private void onFileChanged() { + for (FileChangesListener listener : listeners()) { + listener.onFileChanged(file); + } + } + + private void onDirectoryCreated(boolean initial) { + for (FileChangesListener listener : listeners()) { + if (initial) { + listener.onDirectoryInit(file); + } else { + listener.onDirectoryCreated(file); + } + } + children = listChildren(initial); + } + + private void onDirectoryDeleted() { + // First delete all children + for (int child = 0; child < children.length; child++) { + deleteChild(child); + } + for (FileChangesListener listener : listeners()) { + listener.onDirectoryDeleted(file); + } + } + + } + +} diff --git a/src/main/java/org/elasticsearch/watcher/ResourceWatcher.java b/src/main/java/org/elasticsearch/watcher/ResourceWatcher.java new file mode 100644 index 00000000000..8a3c616e02d --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/ResourceWatcher.java @@ -0,0 +1,37 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.watcher; + +/** + * Abstract resource watcher interface. + *

+ * Different resource watchers can be registered with {@link ResourceWatcherService} to be called + * periodically in order to check for changes in different external resources. + */ +public interface ResourceWatcher { + /** + * Called once when the resource watcher is added to {@link ResourceWatcherService} + */ + void init(); + + /** + * Called periodically by {@link ResourceWatcherService} so resource watcher can check the resource + */ + void checkAndNotify(); +} diff --git a/src/main/java/org/elasticsearch/watcher/ResourceWatcherModule.java b/src/main/java/org/elasticsearch/watcher/ResourceWatcherModule.java new file mode 100644 index 00000000000..aeb0abbf489 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/ResourceWatcherModule.java @@ -0,0 +1,31 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.watcher; + +import org.elasticsearch.common.inject.AbstractModule; + +/** + * + */ +public class ResourceWatcherModule extends AbstractModule { + @Override + protected void configure() { + bind(ResourceWatcherService.class).asEagerSingleton(); + } +} diff --git a/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java b/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java new file mode 100644 index 00000000000..eae671a8b33 --- /dev/null +++ b/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java @@ -0,0 +1,106 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.watcher; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledFuture; + +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; + +/** + * Generic resource watcher service + * + * Other elasticsearch services can register their resource watchers with this service using {@link #add(ResourceWatcher)} + * method. This service will call {@link org.elasticsearch.watcher.ResourceWatcher#checkAndNotify()} method of all + * registered watcher periodically. The frequency of checks can be specified using {@code watcher.interval} setting, which + * defaults to {@code 60s}. The service can be disabled by setting {@code watcher.enabled} setting to {@code false}. + */ +public class ResourceWatcherService extends AbstractLifecycleComponent { + + private final List watchers = new CopyOnWriteArrayList(); + + private volatile ScheduledFuture scheduledFuture; + + private final boolean enabled; + + private final TimeValue interval; + + private final ThreadPool threadPool; + + @Inject + public ResourceWatcherService(Settings settings, ThreadPool threadPool) { + super(settings); + this.enabled = componentSettings.getAsBoolean("enabled", true); + this.interval = componentSettings.getAsTime("interval", timeValueSeconds(60)); + this.threadPool = threadPool; + } + + @Override + protected void doStart() throws ElasticSearchException { + if (!enabled) { + return; + } + scheduledFuture = threadPool.scheduleWithFixedDelay(new ResourceMonitor(), interval); + } + + @Override + protected void doStop() throws ElasticSearchException { + if (!enabled) { + return; + } + scheduledFuture.cancel(true); + } + + @Override + protected void doClose() throws ElasticSearchException { + } + + /** + * Register new resource watcher + */ + public void add(ResourceWatcher watcher) { + watcher.init(); + watchers.add(watcher); + } + + /** + * Unregister a resource watcher + */ + public void remove(ResourceWatcher watcher) { + watchers.remove(watcher); + } + + private class ResourceMonitor implements Runnable { + + @Override + public void run() { + for(ResourceWatcher watcher : watchers) { + watcher.checkAndNotify(); + } + } + } +} diff --git a/src/test/java/org/elasticsearch/watcher/FileWatcherTest.java b/src/test/java/org/elasticsearch/watcher/FileWatcherTest.java new file mode 100644 index 00000000000..939bd12613c --- /dev/null +++ b/src/test/java/org/elasticsearch/watcher/FileWatcherTest.java @@ -0,0 +1,386 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.watcher; + +import com.carrotsearch.randomizedtesting.LifecycleScope; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; + +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.io.Files.*; +import static org.elasticsearch.common.io.FileSystemUtils.deleteRecursively; +import static org.hamcrest.Matchers.*; + +/** + * + */ +public class FileWatcherTest extends ElasticsearchTestCase { + + private class RecordingChangeListener extends FileChangesListener { + + private File rootDir; + + private RecordingChangeListener(File rootDir) { + this.rootDir = rootDir; + } + + private String getRelativeFileName(File file) { + return rootDir.toURI().relativize(file.toURI()).getPath(); + } + + private List notifications = newArrayList(); + + @Override + public void onFileInit(File file) { + notifications.add("onFileInit: " + getRelativeFileName(file)); + } + + @Override + public void onDirectoryInit(File file) { + notifications.add("onDirectoryInit: " + getRelativeFileName(file)); + } + + @Override + public void onFileCreated(File file) { + notifications.add("onFileCreated: " + getRelativeFileName(file)); + } + + @Override + public void onFileDeleted(File file) { + notifications.add("onFileDeleted: " + getRelativeFileName(file)); + } + + @Override + public void onFileChanged(File file) { + notifications.add("onFileChanged: " + getRelativeFileName(file)); + } + + @Override + public void onDirectoryCreated(File file) { + notifications.add("onDirectoryCreated: " + getRelativeFileName(file)); + } + + @Override + public void onDirectoryDeleted(File file) { + notifications.add("onDirectoryDeleted: " + getRelativeFileName(file)); + } + + public List notifications() { + return notifications; + } + } + + @Test + public void testSimpleFileOperations() throws IOException { + File tempDir = newTempDir(LifecycleScope.TEST); + RecordingChangeListener changes = new RecordingChangeListener(tempDir); + File testFile = new File(tempDir, "test.txt"); + touch(testFile); + FileWatcher fileWatcher = new FileWatcher(testFile); + fileWatcher.addListener(changes); + fileWatcher.init(); + assertThat(changes.notifications(), contains(equalTo("onFileInit: test.txt"))); + + changes.notifications().clear(); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), hasSize(0)); + + append("Test", testFile, Charset.defaultCharset()); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), contains(equalTo("onFileChanged: test.txt"))); + + changes.notifications().clear(); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), hasSize(0)); + + testFile.delete(); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), contains(equalTo("onFileDeleted: test.txt"))); + + } + + @Test + public void testSimpleDirectoryOperations() throws IOException { + File tempDir = newTempDir(LifecycleScope.TEST); + RecordingChangeListener changes = new RecordingChangeListener(tempDir); + File testDir = new File(tempDir, "test-dir"); + testDir.mkdir(); + touch(new File(testDir, "test.txt")); + touch(new File(testDir, "test0.txt")); + + FileWatcher fileWatcher = new FileWatcher(testDir); + fileWatcher.addListener(changes); + fileWatcher.init(); + assertThat(changes.notifications(), contains( + equalTo("onDirectoryInit: test-dir/"), + equalTo("onFileInit: test-dir/test.txt"), + equalTo("onFileInit: test-dir/test0.txt") + )); + + changes.notifications().clear(); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), hasSize(0)); + + for (int i = 0; i < 4; i++) { + touch(new File(testDir, "test" + i + ".txt")); + } + // Make sure that first file is modified + append("Test", new File(testDir, "test0.txt"), Charset.defaultCharset()); + + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), contains( + equalTo("onFileChanged: test-dir/test0.txt"), + equalTo("onFileCreated: test-dir/test1.txt"), + equalTo("onFileCreated: test-dir/test2.txt"), + equalTo("onFileCreated: test-dir/test3.txt") + )); + + changes.notifications().clear(); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), hasSize(0)); + + new File(testDir, "test1.txt").delete(); + new File(testDir, "test2.txt").delete(); + + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), contains( + equalTo("onFileDeleted: test-dir/test1.txt"), + equalTo("onFileDeleted: test-dir/test2.txt") + )); + + changes.notifications().clear(); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), hasSize(0)); + + new File(testDir, "test0.txt").delete(); + touch(new File(testDir, "test2.txt")); + touch(new File(testDir, "test4.txt")); + fileWatcher.checkAndNotify(); + + assertThat(changes.notifications(), contains( + equalTo("onFileDeleted: test-dir/test0.txt"), + equalTo("onFileCreated: test-dir/test2.txt"), + equalTo("onFileCreated: test-dir/test4.txt") + )); + + + changes.notifications().clear(); + + new File(testDir, "test3.txt").delete(); + new File(testDir, "test4.txt").delete(); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), contains( + equalTo("onFileDeleted: test-dir/test3.txt"), + equalTo("onFileDeleted: test-dir/test4.txt") + )); + + + changes.notifications().clear(); + deleteRecursively(testDir); + fileWatcher.checkAndNotify(); + + assertThat(changes.notifications(), contains( + equalTo("onFileDeleted: test-dir/test.txt"), + equalTo("onFileDeleted: test-dir/test2.txt"), + equalTo("onDirectoryDeleted: test-dir") + )); + + } + + @Test + public void testNestedDirectoryOperations() throws IOException { + File tempDir = newTempDir(LifecycleScope.TEST); + RecordingChangeListener changes = new RecordingChangeListener(tempDir); + File testDir = new File(tempDir, "test-dir"); + testDir.mkdir(); + touch(new File(testDir, "test.txt")); + new File(testDir, "sub-dir").mkdir(); + touch(new File(testDir, "sub-dir/test0.txt")); + + FileWatcher fileWatcher = new FileWatcher(testDir); + fileWatcher.addListener(changes); + fileWatcher.init(); + assertThat(changes.notifications(), contains( + equalTo("onDirectoryInit: test-dir/"), + equalTo("onDirectoryInit: test-dir/sub-dir/"), + equalTo("onFileInit: test-dir/sub-dir/test0.txt"), + equalTo("onFileInit: test-dir/test.txt") + )); + + changes.notifications().clear(); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), hasSize(0)); + + // Create new file in subdirectory + touch(new File(testDir, "sub-dir/test1.txt")); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), contains( + equalTo("onFileCreated: test-dir/sub-dir/test1.txt") + )); + + changes.notifications().clear(); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), hasSize(0)); + + // Create new subdirectory in subdirectory + new File(testDir, "first-level").mkdir(); + touch(new File(testDir, "first-level/file1.txt")); + new File(testDir, "first-level/second-level").mkdir(); + touch(new File(testDir, "first-level/second-level/file2.txt")); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), contains( + equalTo("onDirectoryCreated: test-dir/first-level/"), + equalTo("onFileCreated: test-dir/first-level/file1.txt"), + equalTo("onDirectoryCreated: test-dir/first-level/second-level/"), + equalTo("onFileCreated: test-dir/first-level/second-level/file2.txt") + )); + + changes.notifications().clear(); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), hasSize(0)); + + // Delete a directory, check notifications for + deleteRecursively(new File(testDir, "first-level")); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), contains( + equalTo("onFileDeleted: test-dir/first-level/file1.txt"), + equalTo("onFileDeleted: test-dir/first-level/second-level/file2.txt"), + equalTo("onDirectoryDeleted: test-dir/first-level/second-level"), + equalTo("onDirectoryDeleted: test-dir/first-level") + )); + } + + @Test + public void testFileReplacingDirectory() throws IOException { + File tempDir = newTempDir(LifecycleScope.TEST); + RecordingChangeListener changes = new RecordingChangeListener(tempDir); + File testDir = new File(tempDir, "test-dir"); + testDir.mkdir(); + File subDir = new File(testDir, "sub-dir"); + subDir.mkdir(); + touch(new File(subDir, "test0.txt")); + touch(new File(subDir, "test1.txt")); + + FileWatcher fileWatcher = new FileWatcher(testDir); + fileWatcher.addListener(changes); + fileWatcher.init(); + assertThat(changes.notifications(), contains( + equalTo("onDirectoryInit: test-dir/"), + equalTo("onDirectoryInit: test-dir/sub-dir/"), + equalTo("onFileInit: test-dir/sub-dir/test0.txt"), + equalTo("onFileInit: test-dir/sub-dir/test1.txt") + )); + + changes.notifications().clear(); + + deleteRecursively(subDir); + touch(subDir); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), contains( + equalTo("onFileDeleted: test-dir/sub-dir/test0.txt"), + equalTo("onFileDeleted: test-dir/sub-dir/test1.txt"), + equalTo("onDirectoryDeleted: test-dir/sub-dir"), + equalTo("onFileCreated: test-dir/sub-dir") + )); + + changes.notifications().clear(); + + subDir.delete(); + subDir.mkdir(); + + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), contains( + equalTo("onFileDeleted: test-dir/sub-dir/"), + equalTo("onDirectoryCreated: test-dir/sub-dir/") + )); + } + + @Test + public void testEmptyDirectory() throws IOException { + File tempDir = newTempDir(LifecycleScope.TEST); + RecordingChangeListener changes = new RecordingChangeListener(tempDir); + File testDir = new File(tempDir, "test-dir"); + testDir.mkdir(); + touch(new File(testDir, "test0.txt")); + touch(new File(testDir, "test1.txt")); + + FileWatcher fileWatcher = new FileWatcher(testDir); + fileWatcher.addListener(changes); + fileWatcher.init(); + changes.notifications().clear(); + + new File(testDir, "test0.txt").delete(); + new File(testDir, "test1.txt").delete(); + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), contains( + equalTo("onFileDeleted: test-dir/test0.txt"), + equalTo("onFileDeleted: test-dir/test1.txt") + )); + } + + @Test + public void testNoDirectoryOnInit() throws IOException { + File tempDir = newTempDir(LifecycleScope.TEST); + RecordingChangeListener changes = new RecordingChangeListener(tempDir); + File testDir = new File(tempDir, "test-dir"); + + FileWatcher fileWatcher = new FileWatcher(testDir); + fileWatcher.addListener(changes); + fileWatcher.init(); + assertThat(changes.notifications(), hasSize(0)); + changes.notifications().clear(); + + testDir.mkdir(); + touch(new File(testDir, "test0.txt")); + touch(new File(testDir, "test1.txt")); + + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), contains( + equalTo("onDirectoryCreated: test-dir/"), + equalTo("onFileCreated: test-dir/test0.txt"), + equalTo("onFileCreated: test-dir/test1.txt") + )); + } + + @Test + public void testNoFileOnInit() throws IOException { + File tempDir = newTempDir(LifecycleScope.TEST); + RecordingChangeListener changes = new RecordingChangeListener(tempDir); + File testFile = new File(tempDir, "testfile.txt"); + + FileWatcher fileWatcher = new FileWatcher(testFile); + fileWatcher.addListener(changes); + fileWatcher.init(); + assertThat(changes.notifications(), hasSize(0)); + changes.notifications().clear(); + + touch(testFile); + + fileWatcher.checkAndNotify(); + assertThat(changes.notifications(), contains( + equalTo("onFileCreated: testfile.txt") + )); + } + +} \ No newline at end of file