NIFI-2474: Remove VariableRegistry from outward facing API so that the it is more flexible to evolve

Signed-off-by: Yolanda M. Davis <ymdavis@apache.org>

This closes #782
This commit is contained in:
Mark Payne 2016-08-03 20:44:04 -04:00 committed by Yolanda M. Davis
parent 3943d72e95
commit 1db5e73102
7 changed files with 120 additions and 14 deletions

View File

@ -32,7 +32,7 @@ public interface VariableRegistry {
* Returns an empty registry which can be used as a more intentional null * Returns an empty registry which can be used as a more intentional null
* value. * value.
*/ */
public static final VariableRegistry EMPTY_REGISTRY = () -> Collections.EMPTY_MAP; public static final VariableRegistry EMPTY_REGISTRY = () -> Collections.emptyMap();
/** /**
* Provides a registry containing all environment variables and system * Provides a registry containing all environment variables and system

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.VariableRegistry;
public class MockVariableRegistry implements VariableRegistry {
private final Map<VariableDescriptor, String> variables = new HashMap<>();
@Override
public Map<VariableDescriptor, String> getVariableMap() {
return Collections.unmodifiableMap(variables);
}
public void setVariable(final VariableDescriptor descriptor, final String value) {
variables.put(descriptor, value);
}
public String removeVariable(final VariableDescriptor descriptor) {
return variables.remove(descriptor);
}
}

View File

@ -33,6 +33,7 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -41,6 +42,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@ -65,7 +67,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager; import org.apache.nifi.state.MockStateManager;
import org.junit.Assert; import org.junit.Assert;
@ -81,7 +83,7 @@ public class StandardProcessorTestRunner implements TestRunner {
private final boolean triggerSerially; private final boolean triggerSerially;
private final MockStateManager processorStateManager; private final MockStateManager processorStateManager;
private final Map<String, MockStateManager> controllerServiceStateManagers = new HashMap<>(); private final Map<String, MockStateManager> controllerServiceStateManagers = new HashMap<>();
private final VariableRegistry variableRegistry; private final MockVariableRegistry variableRegistry;
private int numThreads = 1; private int numThreads = 1;
private final AtomicInteger invocations = new AtomicInteger(0); private final AtomicInteger invocations = new AtomicInteger(0);
@ -89,14 +91,14 @@ public class StandardProcessorTestRunner implements TestRunner {
private final Map<String, MockComponentLog> controllerServiceLoggers = new HashMap<>(); private final Map<String, MockComponentLog> controllerServiceLoggers = new HashMap<>();
private final MockComponentLog logger; private final MockComponentLog logger;
StandardProcessorTestRunner(final Processor processor,final VariableRegistry variableRegistry) { StandardProcessorTestRunner(final Processor processor) {
this.processor = processor; this.processor = processor;
this.idGenerator = new AtomicLong(0L); this.idGenerator = new AtomicLong(0L);
this.sharedState = new SharedSessionState(processor, idGenerator); this.sharedState = new SharedSessionState(processor, idGenerator);
this.flowFileQueue = sharedState.getFlowFileQueue(); this.flowFileQueue = sharedState.getFlowFileQueue();
this.sessionFactory = new MockSessionFactory(sharedState, processor); this.sessionFactory = new MockSessionFactory(sharedState, processor);
this.processorStateManager = new MockStateManager(processor); this.processorStateManager = new MockStateManager(processor);
this.variableRegistry = variableRegistry; this.variableRegistry = new MockVariableRegistry();
this.context = new MockProcessContext(processor, processorStateManager, variableRegistry); this.context = new MockProcessContext(processor, processorStateManager, variableRegistry);
final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context); final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context);
@ -824,4 +826,27 @@ public class StandardProcessorTestRunner implements TestRunner {
public void setPrimaryNode(boolean primaryNode) { public void setPrimaryNode(boolean primaryNode) {
context.setPrimaryNode(primaryNode); context.setPrimaryNode(primaryNode);
} }
@Override
public String getVariableValue(final String name) {
Objects.requireNonNull(name);
return variableRegistry.getVariableValue(name);
}
@Override
public void setVariable(final String name, final String value) {
Objects.requireNonNull(name);
Objects.requireNonNull(value);
final VariableDescriptor descriptor = new VariableDescriptor.Builder(name).build();
variableRegistry.setVariable(descriptor, value);
}
@Override
public String removeVariable(final String name) {
Objects.requireNonNull(name);
return variableRegistry.removeVariable(new VariableDescriptor.Builder(name).build());
}
} }

View File

@ -901,4 +901,35 @@ public interface TestRunner {
* @param primaryNode Specify if this test emulates running as a primary node * @param primaryNode Specify if this test emulates running as a primary node
*/ */
void setPrimaryNode(boolean primaryNode); void setPrimaryNode(boolean primaryNode);
/**
* Sets the value of the variable with the given name to be the given value. This exposes the variable
* for use by the Expression Language.
*
* @param name the name of the variable to set
* @param value the value of the variable
*
* @throws NullPointerException if either the name or the value is null
*/
void setVariable(String name, String value);
/**
* Returns the current value of the variable with the given name
*
* @param name the name of the variable whose value should be returned.
* @return the current value of the variable with the given name or <code>null</code> if no value is currently set
*
* @throws NullPointerException if the name is null
*/
String getVariableValue(String name);
/**
* Removes the variable with the given name from this Test Runner, if it is set.
*
* @param name the name of the variable to remove
* @return the value that was set for the variable, or <code>null</code> if the variable was not set
*
* @throws NullPointerException if the name is null
*/
String removeVariable(String name);
} }

View File

@ -17,22 +17,16 @@
package org.apache.nifi.util; package org.apache.nifi.util;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.VariableRegistry;
public class TestRunners { public class TestRunners {
public static TestRunner newTestRunner(final Processor processor) { public static TestRunner newTestRunner(final Processor processor) {
return newTestRunner(processor,VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY); return new StandardProcessorTestRunner(processor);
}
public static TestRunner newTestRunner(final Processor processor, VariableRegistry variableRegistry){
return new StandardProcessorTestRunner(processor, variableRegistry);
} }
public static TestRunner newTestRunner(final Class<? extends Processor> processorClass) { public static TestRunner newTestRunner(final Class<? extends Processor> processorClass) {
try { try {
return newTestRunner(processorClass.newInstance()); return newTestRunner(processorClass.newInstance());
} catch (final Exception e) { } catch (final Exception e) {
System.err.println("Could not instantiate instance of class " + processorClass.getName() + " due to: " + e); System.err.println("Could not instantiate instance of class " + processorClass.getName() + " due to: " + e);
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -20,7 +20,6 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.registry.VariableRegistry;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -32,7 +31,7 @@ public class CurrentTestStandardProcessorTestRunner {
@Test @Test
public void testOnScheduledCalledAfterRunFinished() { public void testOnScheduledCalledAfterRunFinished() {
SlowRunProcessor processor = new SlowRunProcessor(); SlowRunProcessor processor = new SlowRunProcessor();
StandardProcessorTestRunner runner = new StandardProcessorTestRunner(processor, VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY); StandardProcessorTestRunner runner = new StandardProcessorTestRunner(processor);
final int iterations = 5; final int iterations = 5;
runner.run(iterations); runner.run(iterations);
// if the counter is not equal to iterations, the the processor must have been unscheduled // if the counter is not equal to iterations, the the processor must have been unscheduled

View File

@ -17,6 +17,7 @@
package org.apache.nifi.util; package org.apache.nifi.util;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -110,6 +111,19 @@ public class TestStandardProcessorTestRunner {
runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.KEY); runner.assertAllFlowFilesContainAttribute(AddAttributeProcessor.KEY);
} }
@Test
public void testVariables() {
final AddAttributeProcessor proc = new AddAttributeProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
assertNull(runner.getVariableValue("hello"));
runner.setVariable("hello", "world");
assertEquals("world", runner.getVariableValue("hello"));
assertEquals("world", runner.removeVariable("hello"));
assertNull(runner.getVariableValue("hello"));
}
private static class ProcessorWithOnStop extends AbstractProcessor { private static class ProcessorWithOnStop extends AbstractProcessor {
private int callsWithContext = 0; private int callsWithContext = 0;