NIFI-1493 This closes #211. removed Executor from InvokeScriptedProcessor

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Oleg Zhurakousky 2016-02-09 19:19:47 -05:00 committed by joewitt
parent 1c03fc7871
commit 8a05f6880a
1 changed files with 14 additions and 77 deletions

View File

@ -16,6 +16,19 @@
*/
package org.apache.nifi.processors.script;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.script.Invocable;
import javax.script.ScriptException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
@ -36,24 +49,6 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
import javax.script.Invocable;
import javax.script.ScriptException;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Tags({"script", "invoke", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj"})
@CapabilityDescription("Experimental - Invokes a script engine for a Processor defined in the given script. The script must define "
@ -70,66 +65,8 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
private final AtomicReference<Collection<ValidationResult>> validationResults =
new AtomicReference<>((Collection<ValidationResult>) new ArrayList<ValidationResult>());
private final Lock lock = new ReentrantLock();
private SynchronousFileWatcher scriptWatcher;
private ScheduledExecutorService reloadService = null;
private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
/**
* Creates the resources needed by this processor. An attempt is made to also initialize the scripted processor,
* but unless the properties (such as script engine name and script file path) have already been specified, the
* script will not yet have been evaluated, so the script's initialize() method will not be called.
*/
protected void createResources() {
// Set up script file reloader service. This checks to see if the script file has changed, and if so, tries
// to reload it
if (reloadService == null) {
reloadService = Executors.newScheduledThreadPool(1);
// monitor the script if configured for changes
reloadService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
final boolean hasLock = lock.tryLock();
// if a property is changing we don't need to reload this iteration
if (hasLock) {
try {
if (scriptWatcher != null && scriptWatcher.checkAndReset()) {
if (isFile(scriptPath)) {
// reload the actual script
final boolean reloaded = reloadScriptFile(scriptPath);
// log the script was reloaded
if (reloaded) {
getLogger().info("The configured script has been successfully reloaded.");
}
}
}
} finally {
lock.unlock();
}
}
} catch (final Throwable t) {
final ProcessorLog logger = getLogger();
final String message = "Unable to reload configured script Processor: " + t;
logger.error(message);
if (logger.isDebugEnabled()) {
logger.error(message, t);
}
}
}
}, 30, 10, TimeUnit.SECONDS);
}
super.createResources();
}
/**
* Returns the valid relationships for this processor. SUCCESS and FAILURE are always returned, and if the script
* processor has defined additional relationships, those will be added as well.
@ -516,7 +453,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
// Initialize the rest of the processor resources if we have not already done so
synchronized (isInitialized) {
if (!isInitialized.get()) {
createResources();
super.createResources();
}
}