This commit is contained in:
Mark Payne 2015-07-07 21:08:34 -04:00
commit 3125036d3f
9 changed files with 82 additions and 19 deletions

View File

@ -27,7 +27,7 @@
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.4</version>
<version>0.9.5</version>
<scope>provided</scope>
</dependency>
<dependency>

View File

@ -112,14 +112,35 @@ public class NiFiSpout extends BaseRichSpout {
public static final Logger LOGGER = LoggerFactory.getLogger(NiFiSpout.class);
public static final String NIFI_DATA_PACKET = "nifiDataPacket";
private NiFiSpoutReceiver spoutReceiver;
private LinkedBlockingQueue<NiFiDataPacket> queue;
private SpoutOutputCollector spoutOutputCollector;
private final SiteToSiteClientConfig clientConfig;
private final List<String> attributeNames;
/**
* @param clientConfig
* configuration used to build the SiteToSiteClient
*/
public NiFiSpout(SiteToSiteClientConfig clientConfig) {
this(clientConfig, null);
}
/**
*
* @param clientConfig
* configuration used to build the SiteToSiteClient
* @param attributeNames
* names of FlowFile attributes to be added as values to each tuple, in addition
* to the nifiDataPacket value on all tuples
*
*/
public NiFiSpout(SiteToSiteClientConfig clientConfig, List<String> attributeNames) {
this.clientConfig = clientConfig;
this.attributeNames = (attributeNames == null ? new ArrayList<String>() : attributeNames);
}
@Override
@ -139,13 +160,26 @@ public class NiFiSpout extends BaseRichSpout {
if (data == null) {
Utils.sleep(50);
} else {
spoutOutputCollector.emit(new Values(data));
// always start with the data packet
Values values = new Values(data);
// add additional values based on the specified attribute names
for (String attributeName : attributeNames) {
if (data.getAttributes().containsKey(attributeName)) {
values.add(data.getAttributes().get(attributeName));
}
}
spoutOutputCollector.emit(values);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("nifiDataPacket"));
final List<String> fieldNames = new ArrayList<>();
fieldNames.add(NIFI_DATA_PACKET);
fieldNames.addAll(attributeNames);
outputFieldsDeclarer.declare(new Fields(fieldNames));
}
@Override

View File

@ -20,7 +20,7 @@ import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.reporting.InitializationException;
/**
* An interface for initializing a ConfigurableComponent. It is up to the
* An interface for initializing and tearing down a ConfigurableComponent. It is up to the
* implementer to call "init" so that you can call
* ConfigurableComponent.getPropertyDescriptors()
*
@ -35,4 +35,11 @@ public interface ConfigurableComponentInitializer {
* @throws InitializationException if the component could not be initialized
*/
void initialize(ConfigurableComponent component) throws InitializationException;
/**
* Calls the lifecycle methods that should be called when a flow is shutdown.
*
* @param component the component to initialize
*/
void teardown(ConfigurableComponent component);
}

View File

@ -114,6 +114,8 @@ public class DocGenerator {
try (final OutputStream output = new BufferedOutputStream(new FileOutputStream(baseDocumenationFile))) {
writer.write(component, output, hasAdditionalInfo(directory));
}
initializer.teardown(component);
}
/**

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.documentation.init;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.controller.ControllerService;
@ -42,11 +41,17 @@ public class ControllerServiceInitializer implements ConfigurableComponentInitia
try (NarCloseable narCloseable = NarCloseable.withNarLoader()) {
controllerService.initialize(new MockControllerServiceInitializationContext());
}
}
@Override
public void teardown(ConfigurableComponent component) {
try (NarCloseable narCloseable = NarCloseable.withNarLoader()) {
ControllerService controllerService = (ControllerService) component;
final ProcessorLog logger = new MockProcessorLogger();
final MockConfigurationContext context = new MockConfigurationContext();
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, null, controllerService, logger, context);
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, null, controllerService, logger, context);
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, controllerService, logger, context);
}
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.documentation.init;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.documentation.ConfigurableComponentInitializer;
@ -40,11 +39,17 @@ public class ProcessorInitializer implements ConfigurableComponentInitializer {
Processor processor = (Processor) component;
try (NarCloseable narCloseable = NarCloseable.withNarLoader()) {
processor.initialize(new MockProcessorInitializationContext());
}
}
@Override
public void teardown(ConfigurableComponent component) {
Processor processor = (Processor) component;
try (NarCloseable narCloseable = NarCloseable.withNarLoader()) {
final ProcessorLog logger = new MockProcessorLogger();
final MockProcessContext context = new MockProcessContext();
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, null, processor, logger, context);
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, null, processor, logger, context);
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, processor, logger, context);
}
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.documentation.init;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.documentation.ConfigurableComponentInitializer;
@ -40,10 +39,16 @@ public class ReportingTaskingInitializer implements ConfigurableComponentInitial
ReportingTask reportingTask = (ReportingTask) component;
try (NarCloseable narCloseable = NarCloseable.withNarLoader()) {
reportingTask.initialize(new MockReportingInitializationContext());
}
}
@Override
public void teardown(ConfigurableComponent component) {
ReportingTask reportingTask = (ReportingTask) component;
try (NarCloseable narCloseable = NarCloseable.withNarLoader()) {
final MockConfigurationContext context = new MockConfigurationContext();
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, null, reportingTask, new MockProcessorLogger(), context);
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, null, reportingTask, new MockProcessorLogger(), context);
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, reportingTask, new MockProcessorLogger(), context);
}
}
}

View File

@ -58,6 +58,7 @@ public class HtmlDocumentationWriterTest {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
writer.write(controllerService, baos, false);
initializer.teardown(controllerService);
String results = new String(baos.toByteArray());
XmlValidator.assertXmlValid(results);
@ -77,8 +78,8 @@ public class HtmlDocumentationWriterTest {
assertContains(results, "Sensitive Property: true");
// verify the right OnRemoved and OnShutdown methods were called
Assert.assertEquals(1, controllerService.getOnRemovedArgs());
Assert.assertEquals(1, controllerService.getOnRemovedNoArgs());
Assert.assertEquals(0, controllerService.getOnRemovedArgs());
Assert.assertEquals(0, controllerService.getOnRemovedNoArgs());
Assert.assertEquals(1, controllerService.getOnShutdownArgs());
Assert.assertEquals(1, controllerService.getOnShutdownNoArgs());
@ -96,6 +97,7 @@ public class HtmlDocumentationWriterTest {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
writer.write(reportingTask, baos, false);
initializer.teardown(reportingTask);
String results = new String(baos.toByteArray());
XmlValidator.assertXmlValid(results);
@ -113,8 +115,8 @@ public class HtmlDocumentationWriterTest {
assertContains(results, "false");
// verify the right OnRemoved and OnShutdown methods were called
Assert.assertEquals(1, reportingTask.getOnRemovedArgs());
Assert.assertEquals(1, reportingTask.getOnRemovedNoArgs());
Assert.assertEquals(0, reportingTask.getOnRemovedArgs());
Assert.assertEquals(0, reportingTask.getOnRemovedNoArgs());
Assert.assertEquals(1, reportingTask.getOnShutdownArgs());
Assert.assertEquals(1, reportingTask.getOnShutdownNoArgs());

View File

@ -44,6 +44,7 @@ public class ProcessorDocumentationWriterTest {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
writer.write(processor, baos, false);
initializer.teardown(processor);
String results = new String(baos.toByteArray());
XmlValidator.assertXmlValid(results);
@ -74,8 +75,8 @@ public class ProcessorDocumentationWriterTest {
assertNotContains(results, "Additional Details...");
// verify the right OnRemoved and OnShutdown methods were called
Assert.assertEquals(1, processor.getOnRemovedArgs());
Assert.assertEquals(1, processor.getOnRemovedNoArgs());
Assert.assertEquals(0, processor.getOnRemovedArgs());
Assert.assertEquals(0, processor.getOnRemovedNoArgs());
Assert.assertEquals(1, processor.getOnShutdownArgs());
Assert.assertEquals(1, processor.getOnShutdownNoArgs());
@ -92,6 +93,7 @@ public class ProcessorDocumentationWriterTest {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
writer.write(processor, baos, false);
initializer.teardown(processor);
String results = new String(baos.toByteArray());
XmlValidator.assertXmlValid(results);
@ -121,6 +123,7 @@ public class ProcessorDocumentationWriterTest {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
writer.write(processor, baos, false);
initializer.teardown(processor);
String results = new String(baos.toByteArray());
XmlValidator.assertXmlValid(results);