From 82e42fad03d706092406e2f62c963da8cbffe1ba Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 10 Dec 2014 14:39:59 -0500 Subject: [PATCH 01/20] NIFI-145: Moved bootstrap jar into its own directory: lib/bootstrap --- assemblies/nifi/src/main/assembly/dependencies.xml | 12 ++++++++++++ .../resources/src/main/resources/bin/nifi-status.bat | 8 ++++---- .../resources/src/main/resources/bin/nifi.sh | 4 +--- .../resources/src/main/resources/bin/run-nifi.bat | 6 +++--- .../resources/src/main/resources/bin/start-nifi.bat | 7 ++++--- .../resources/src/main/resources/bin/stop-nifi.bat | 8 ++++---- 6 files changed, 28 insertions(+), 17 deletions(-) diff --git a/assemblies/nifi/src/main/assembly/dependencies.xml b/assemblies/nifi/src/main/assembly/dependencies.xml index 339275bd11..97f3d4ea77 100644 --- a/assemblies/nifi/src/main/assembly/dependencies.xml +++ b/assemblies/nifi/src/main/assembly/dependencies.xml @@ -27,6 +27,18 @@ runtime lib true + + nifi-bootstrap + + + + + runtime + lib/bootstrap + true + + nifi-bootstrap + diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat index ed9c5163ff..d00f31cf55 100644 --- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat @@ -1,3 +1,5 @@ +@echo off + rem rem Licensed to the Apache Software Foundation (ASF) under one or more rem contributor license agreements. See the NOTICE file distributed with @@ -15,18 +17,16 @@ rem See the License for the specific language governing permissions and rem limitations under the License. rem -@echo off - rem Use JAVA_HOME if it's set; otherwise, just use java IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe) -SET LIB_DIR=%~dp0..\lib +SET LIB_DIR=%~dp0..\lib\bootstrap SET CONF_DIR=%~dp0..\conf SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% -SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi +SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi SET BOOTSTRAP_ACTION=status cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh index ad90d5b35b..60afa48992 100644 --- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh @@ -166,15 +166,13 @@ run() { fi echo - echo "Classpath: $CLASSPATH" - echo echo "Java home: $JAVA_HOME" echo "NiFi home: $NIFI_HOME" echo echo "Bootstrap Config File: $BOOTSTRAP_CONF" echo - exec "$JAVA" -cp "$NIFI_HOME"/lib/nifi-bootstrap*.jar -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $1 + exec "$JAVA" -cp "$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $1 } main() { diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/run-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/run-nifi.bat index fdff815a0a..5bab3886b1 100644 --- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/run-nifi.bat +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/run-nifi.bat @@ -1,3 +1,4 @@ +@echo off rem rem Licensed to the Apache Software Foundation (ASF) under one or more rem contributor license agreements. See the NOTICE file distributed with @@ -15,18 +16,17 @@ rem See the License for the specific language governing permissions and rem limitations under the License. rem -@echo off rem Use JAVA_HOME if it's set; otherwise, just use java IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe) -SET LIB_DIR=%~dp0..\lib +SET LIB_DIR=%~dp0..\lib\bootstrap SET CONF_DIR=%~dp0..\conf SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% -SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi +SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi SET BOOTSTRAP_ACTION=run cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat index ba4739a998..882b719c29 100644 --- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat @@ -1,3 +1,5 @@ +@echo off + rem rem Licensed to the Apache Software Foundation (ASF) under one or more rem contributor license agreements. See the NOTICE file distributed with @@ -15,18 +17,17 @@ rem See the License for the specific language governing permissions and rem limitations under the License. rem -@echo off rem Use JAVA_HOME if it's set; otherwise, just use java IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe) -SET LIB_DIR=%~dp0..\lib +SET LIB_DIR=%~dp0..\lib\bootstrap SET CONF_DIR=%~dp0..\conf SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% -SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi +SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi SET BOOTSTRAP_ACTION=start cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat index 828be6ec85..40c2d57b45 100644 --- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat @@ -1,3 +1,5 @@ +@echo off + rem rem Licensed to the Apache Software Foundation (ASF) under one or more rem contributor license agreements. See the NOTICE file distributed with @@ -15,18 +17,16 @@ rem See the License for the specific language governing permissions and rem limitations under the License. rem -@echo off - rem Use JAVA_HOME if it's set; otherwise, just use java IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe) -SET LIB_DIR=%~dp0..\lib +SET LIB_DIR=%~dp0..\lib\bootstrap SET CONF_DIR=%~dp0..\conf SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE% -SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi +SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi SET BOOTSTRAP_ACTION=stop cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION% From 8ed8d69899bbb97ebff0f4c74ff358bd03ee900c Mon Sep 17 00:00:00 2001 From: joewitt Date: Wed, 10 Dec 2014 23:33:13 -0500 Subject: [PATCH 02/20] updated build-order.sh to incorporate the boostrap stuff --- misc/build-order.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/misc/build-order.sh b/misc/build-order.sh index 855321a312..e8f8e5e373 100755 --- a/misc/build-order.sh +++ b/misc/build-order.sh @@ -5,7 +5,9 @@ cd misc/nar-maven-plugin && \ mvn $MAVEN_FLAGS install && \ cd ../../commons/nifi-parent && \ mvn $MAVEN_FLAGS install && \ -cd ../../nifi-api && \ +cd ../../nifi-bootstrap && \ +mvn $MAVEN_FLAGS install && \ +cd ../nifi-api && \ mvn $MAVEN_FLAGS install && \ cd ../commons/ && \ cd nifi-stream-utils && \ From 6b0a5e8cd75a03a13ca90b08204bccc2a45bee70 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 11 Dec 2014 09:03:05 -0500 Subject: [PATCH 03/20] NIFI-72: Auto-generate CREATE, CONTENT_MODIFIED, ATTRIBUTES_MODIFIED events when appropriate --- .../repository/StandardProcessSession.java | 83 ++++++- .../StandardProvenanceReporter.java | 4 +- .../TestStandardProcessSession.java | 211 +++++++++++++----- 3 files changed, 232 insertions(+), 66 deletions(-) diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 08e6afe84a..4ba45aadd5 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -75,7 +75,6 @@ import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.util.NiFiProperties; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -488,6 +487,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } + private void addEventType(final Map> map, final String id, final ProvenanceEventType eventType) { + Set eventTypes = map.get(id); + if ( eventTypes == null ) { + eventTypes = new HashSet<>(); + map.put(id, eventTypes); + } + + eventTypes.add(eventType); + } + private void updateProvenanceRepo(final Checkpoint checkpoint) { // Update Provenance Repository final ProvenanceEventRepository provenanceRepo = context.getProvenanceRepository(); @@ -496,7 +505,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // in case the Processor developer submitted the same events to the reporter. So we use a LinkedHashSet // for this, so that we are able to ensure that the events are submitted in the proper order. final Set recordsToSubmit = new LinkedHashSet<>(); - + final Map> eventTypesPerFlowFileId = new HashMap<>(); + final Set processorGenerated = checkpoint.reportedEvents; // We first want to submit FORK events because if the Processor is going to create events against @@ -513,6 +523,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && !processorGenerated.contains(event)) { recordsToSubmit.add(event); + + for ( final String childUuid : event.getChildUuids() ) { + addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType()); + } + for ( final String parentUuid : event.getParentUuids() ) { + addEventType(eventTypesPerFlowFileId, parentUuid, event.getEventType()); + } } } @@ -523,6 +540,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } recordsToSubmit.add(event); + addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); } // Finally, add any other events that we may have generated. @@ -533,6 +551,67 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } recordsToSubmit.add(event); + addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); + } + } + + // Check if content or attributes changed. If so, register the appropriate events. + for (final StandardRepositoryRecord repoRecord : checkpoint.records.values() ) { + final ContentClaim original = repoRecord.getOriginalClaim(); + final ContentClaim current = repoRecord.getCurrentClaim(); + + boolean contentChanged = false; + if ( original == null && current != null ) { + contentChanged = true; + } + if ( original != null && current == null ) { + contentChanged = true; + } + if ( original != null && current != null && !original.equals(current) ) { + contentChanged = true; + } + + final FlowFileRecord curFlowFile = repoRecord.getCurrent(); + final String flowFileId = curFlowFile.getAttribute(CoreAttributes.UUID.key()); + boolean eventAdded = false; + + if (checkpoint.removedFlowFiles.contains(flowFileId)) { + continue; + } + + if ( contentChanged ) { + recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build()); + addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED); + eventAdded = true; + } + + if ( checkpoint.createdFlowFiles.contains(flowFileId) ) { + final Set registeredTypes = eventTypesPerFlowFileId.get(flowFileId); + boolean creationEventRegistered = false; + if ( registeredTypes != null ) { + if ( registeredTypes.contains(ProvenanceEventType.CREATE) || + registeredTypes.contains(ProvenanceEventType.FORK) || + registeredTypes.contains(ProvenanceEventType.JOIN) || + registeredTypes.contains(ProvenanceEventType.RECEIVE) ) { + creationEventRegistered = true; + } + } + + if ( !creationEventRegistered ) { + recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CREATE).build()); + eventAdded = true; + } + } + + if ( !eventAdded && !repoRecord.getUpdatedAttributes().isEmpty() ) { + // We generate an ATTRIBUTES_MODIFIED event only if no other event has been + // created for the FlowFile. We do this because all events contain both the + // newest and the original attributes, so generating an ATTRIBUTES_MODIFIED + // event is redundant if another already exists. + if ( !eventTypesPerFlowFileId.containsKey(flowFileId) ) { + recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).build()); + addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.ATTRIBUTES_MODIFIED); + } } } diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java index e8b1e87daf..01fb3dc028 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java @@ -328,7 +328,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter { } } } - + @Override public void modifyContent(final FlowFile flowFile) { modifyContent(flowFile, null, -1L); @@ -421,7 +421,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter { } } - private ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) { + ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) { final ProvenanceEventBuilder builder = repository.eventBuilder(); builder.setEventType(eventType); builder.fromFlowFile(flowFile); diff --git a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 6e0a5d79fd..3dbbcf34e2 100644 --- a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.repository; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.when; @@ -64,7 +65,6 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; - import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -267,7 +267,7 @@ public class TestStandardProcessSession { } @Test - public void testSpawnsNotEmittedIfFilesDeleted() throws IOException { + public void testForksNotEmittedIfFilesDeleted() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) @@ -283,8 +283,9 @@ public class TestStandardProcessSession { assertEquals(0, provenanceRepo.getEvents(0L, 100000).size()); } + @Test - public void testProvenanceEventsEmittedForSpawnIfNotRemoved() throws IOException { + public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) @@ -319,6 +320,79 @@ public class TestStandardProcessSession { assertEquals(1, provenanceRepo.getEvents(0L, 100000).size()); } + @Test + public void testUpdateAttributesThenJoin() throws IOException { + final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder() + .id(1L) + .addAttribute("uuid", "11111111-1111-1111-1111-111111111111") + .entryDate(System.currentTimeMillis()) + .build(); + + final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() + .id(2L) + .addAttribute("uuid", "22222222-2222-2222-2222-222222222222") + .entryDate(System.currentTimeMillis()) + .build(); + + flowFileQueue.put(flowFileRecord1); + flowFileQueue.put(flowFileRecord2); + + FlowFile ff1 = session.get(); + FlowFile ff2 = session.get(); + + ff1 = session.putAttribute(ff1, "index", "1"); + ff2 = session.putAttribute(ff2, "index", "2"); + + final List parents = new ArrayList<>(2); + parents.add(ff1); + parents.add(ff2); + + final FlowFile child = session.create(parents); + + final Relationship rel = new Relationship.Builder().name("A").build(); + + session.transfer(ff1, rel); + session.transfer(ff2, rel); + session.transfer(child, rel); + + session.commit(); + + final List events = provenanceRepo.getEvents(0L, 1000); + + // We should have a JOIN and 2 ATTRIBUTE_MODIFIED's + assertEquals(3, events.size()); + + int joinCount = 0; + int ff1UpdateCount = 0; + int ff2UpdateCount = 0; + + for ( final ProvenanceEventRecord event : events ) { + switch (event.getEventType()) { + case JOIN: + assertEquals(child.getAttribute("uuid"), event.getFlowFileUuid()); + joinCount++; + break; + case ATTRIBUTES_MODIFIED: + if ( event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) { + ff1UpdateCount++; + } else if ( event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) { + ff2UpdateCount++; + } else { + Assert.fail("Got ATTRIBUTE_MODIFIED for wrong FlowFile: " + event.getFlowFileUuid()); + } + break; + default: + Assert.fail("Unexpected event type: " + event); + } + } + + assertEquals(1, joinCount); + assertEquals(1, ff1UpdateCount); + assertEquals(1, ff2UpdateCount); + + assertEquals(1, joinCount); + } + @Test public void testForkOneToOneReported() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() @@ -628,34 +702,34 @@ public class TestStandardProcessSession { @Test public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } - - @Override - public String getSection() { - return "section"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }).build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new ContentClaim() { + @Override + public int compareTo(ContentClaim arg0) { + return 0; + } + + @Override + public String getId() { + return "0"; + } + + @Override + public String getContainer() { + return "container"; + } + + @Override + public String getSection() { + return "section"; + } + + @Override + public boolean isLossTolerant() { + return true; + } + }).build(); flowFileQueue.put(flowFileRecord); FlowFile ff1 = session.get(); @@ -668,37 +742,35 @@ public class TestStandardProcessSession { session.commit(); final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } - - @Override - public String getSection() { - return "section"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }) - .contentClaimOffset(1000L) - .size(1L) - .build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new ContentClaim() { + @Override + public int compareTo(ContentClaim arg0) { + return 0; + } + + @Override + public String getId() { + return "0"; + } + + @Override + public String getContainer() { + return "container"; + } + + @Override + public String getSection() { + return "section"; + } + + @Override + public boolean isLossTolerant() { + return true; + } + }) + .contentClaimOffset(1000L).size(1L).build(); flowFileQueue.put(flowFileRecord2); // attempt to read the data. @@ -759,6 +831,21 @@ public class TestStandardProcessSession { } } + + @Test + public void testCreateEmitted() throws IOException { + FlowFile newFlowFile = session.create(); + session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); + session.commit(); + + final List events = provenanceRepo.getEvents(0L, 10000); + assertFalse(events.isEmpty()); + assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + assertEquals(ProvenanceEventType.CREATE, event.getEventType()); + } + private static class MockFlowFileRepository implements FlowFileRepository { private final AtomicLong idGenerator = new AtomicLong(0L); From cbea1f193620f934d6186d167d0c0fe0723fad7c Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 11 Dec 2014 09:16:48 -0500 Subject: [PATCH 04/20] NIFI-72: Added unit tests and fixed bug that caused CONTENT_MODIFIED to be emitted for newly created FlowFiles --- .../repository/StandardProcessSession.java | 3 +- .../TestStandardProcessSession.java | 68 +++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 4ba45aadd5..11172a8f54 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -579,7 +579,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE continue; } - if ( contentChanged ) { + final boolean newFlowFile = repoRecord.getOriginal() == null; + if ( contentChanged && !newFlowFile ) { recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build()); addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED); eventAdded = true; diff --git a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 3dbbcf34e2..060bbd9d95 100644 --- a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -846,6 +846,74 @@ public class TestStandardProcessSession { assertEquals(ProvenanceEventType.CREATE, event.getEventType()); } + @Test + public void testContentModifiedNotEmittedForCreate() throws IOException { + FlowFile newFlowFile = session.create(); + newFlowFile = session.write(newFlowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + } + }); + session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); + session.commit(); + + final List events = provenanceRepo.getEvents(0L, 10000); + assertFalse(events.isEmpty()); + assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + assertEquals(ProvenanceEventType.CREATE, event.getEventType()); + } + + @Test + public void testContentModifiedEmittedAndNotAttributesModified() throws IOException { + final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() + .id(1L) + .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") + .build(); + this.flowFileQueue.put(flowFile); + + FlowFile existingFlowFile = session.get(); + existingFlowFile = session.write(existingFlowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + } + }); + existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a"); + session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); + session.commit(); + + final List events = provenanceRepo.getEvents(0L, 10000); + assertFalse(events.isEmpty()); + assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + assertEquals(ProvenanceEventType.CONTENT_MODIFIED, event.getEventType()); + } + + @Test + public void testAttributesModifiedEmitted() throws IOException { + final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() + .id(1L) + .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") + .build(); + this.flowFileQueue.put(flowFile); + + FlowFile existingFlowFile = session.get(); + existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a"); + session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); + session.commit(); + + final List events = provenanceRepo.getEvents(0L, 10000); + assertFalse(events.isEmpty()); + assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType()); + } + + + private static class MockFlowFileRepository implements FlowFileRepository { private final AtomicLong idGenerator = new AtomicLong(0L); From 74c79404873b27cc1eea78d74ff05b5492988ec6 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 11 Dec 2014 09:44:38 -0500 Subject: [PATCH 05/20] NIFI-161: Removed references to deprecated methods --- .../remote/StandardRemoteProcessGroup.java | 100 +++++++++--------- 1 file changed, 52 insertions(+), 48 deletions(-) diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index bb5efd7af6..d3fb41f412 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -54,6 +54,7 @@ import java.util.regex.Pattern; import javax.net.ssl.SSLContext; import javax.security.cert.CertificateExpiredException; import javax.security.cert.CertificateNotYetValidException; +import javax.ws.rs.core.Response; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -88,7 +89,6 @@ import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.ControllerDTO; import org.apache.nifi.web.api.dto.PortDTO; import org.apache.nifi.web.api.entity.ControllerEntity; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,6 +110,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { public static final String ROOT_GROUP_STATUS_URI_PATH = "/controller/process-groups/root/status"; public static final long LISTENING_PORT_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); + // status codes + public static final int OK_STATUS_CODE = Status.OK.getStatusCode(); + public static final int UNAUTHORIZED_STATUS_CODE = Status.UNAUTHORIZED.getStatusCode(); + public static final int FORBIDDEN_STATUS_CODE = Status.FORBIDDEN.getStatusCode(); + private final String id; private final URI targetUri; @@ -860,7 +865,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { try { // perform the request final ClientResponse response = utils.get(uri, getCommunicationsTimeout(TimeUnit.MILLISECONDS)); - if (!Status.OK.equals(response.getClientResponseStatus())) { + + if (!Response.Status.Family.SUCCESSFUL.equals(response.getStatusInfo().getFamily())) { writeLock.lock(); try { for (final Iterator iter = inputPorts.values().iterator(); iter.hasNext();) { @@ -882,7 +888,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { // consume the entity entirely response.getEntity(String.class); - throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + uriVal + ". Got HTTP Error Code " + response.getClientResponseStatus().getStatusCode() + ": " + response.getClientResponseStatus().getReasonPhrase()); + throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + uriVal + ". Got HTTP Error Code " + response.getStatus() + ": " + response.getStatusInfo().getReasonPhrase()); } final ControllerEntity entity = response.getEntity(ControllerEntity.class); @@ -1303,56 +1309,54 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { try { final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null); final ClientResponse response = utils.get(new URI(apiUri + CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS)); - switch (response.getClientResponseStatus()) { - case OK: - final ControllerEntity entity = response.getEntity(ControllerEntity.class); - final ControllerDTO dto = entity.getController(); + + final int statusCode = response.getStatus(); + + if ( statusCode == OK_STATUS_CODE ) { + final ControllerEntity entity = response.getEntity(ControllerEntity.class); + final ControllerDTO dto = entity.getController(); - if (dto.getRemoteSiteListeningPort() == null) { - authorizationIssue = "Remote instance is not configured to allow Site-to-Site communications at this time."; + if (dto.getRemoteSiteListeningPort() == null) { + authorizationIssue = "Remote instance is not configured to allow Site-to-Site communications at this time."; + } else { + authorizationIssue = null; + } + + writeLock.lock(); + try { + listeningPort = dto.getRemoteSiteListeningPort(); + destinationSecure = dto.isSiteToSiteSecure(); + } finally { + writeLock.unlock(); + } + + final String remoteInstanceId = dto.getInstanceId(); + boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId); + pointsToCluster.set(isPointingToCluster); + } else if ( statusCode == UNAUTHORIZED_STATUS_CODE ) { + try { + final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString()); + if (Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily()) ) { + logger.info("{} Issued a Request to communicate with remote instance", this); } else { - authorizationIssue = null; + logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{ + this, requestAccountResponse.getStatus(), requestAccountResponse.getStatusInfo().getReasonPhrase()}); } - - writeLock.lock(); - try { - listeningPort = dto.getRemoteSiteListeningPort(); - destinationSecure = dto.isSiteToSiteSecure(); - } finally { - writeLock.unlock(); + } catch (final Exception e) { + logger.error("{} Failed to request account due to {}", this, e.toString()); + if (logger.isDebugEnabled()) { + logger.error("", e); } + } - final String remoteInstanceId = dto.getInstanceId(); - boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId); - pointsToCluster.set(isPointingToCluster); - break; - case UNAUTHORIZED: - try { - final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString()); - if (requestAccountResponse.getClientResponseStatus() == Status.OK) { - logger.info("{} Issued a Request to communicate with remote instance", this); - } else { - logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{ - this, requestAccountResponse.getClientResponseStatus().getStatusCode(), requestAccountResponse.getClientResponseStatus().getReasonPhrase()}); - } - } catch (final Exception e) { - logger.error("{} Failed to request account due to {}", this, e.toString()); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - } - - authorizationIssue = response.getEntity(String.class); - break; - case FORBIDDEN: - authorizationIssue = response.getEntity(String.class); - break; - default: - final String message = response.getEntity(String.class); - logger.warn("{} When communicating with remote instance, got unexpected response code {}:{} with entity: {}", - new Object[]{this, response.getClientResponseStatus().getStatusCode(), response.getClientResponseStatus().getReasonPhrase(), message}); - authorizationIssue = "Unable to determine Site-to-Site availability."; - break; + authorizationIssue = response.getEntity(String.class); + } else if ( statusCode == FORBIDDEN_STATUS_CODE ) { + authorizationIssue = response.getEntity(String.class); + } else { + final String message = response.getEntity(String.class); + logger.warn("{} When communicating with remote instance, got unexpected response code {}:{} with entity: {}", + new Object[]{this, response.getStatus(), response.getStatusInfo().getReasonPhrase(), message}); + authorizationIssue = "Unable to determine Site-to-Site availability."; } } catch (Exception e) { logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e)); From 2bcd1e657a5a959fad6572b011924f07d995abe8 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 11 Dec 2014 09:45:24 -0500 Subject: [PATCH 06/20] NIFI-132: Log to Processor's logger --- .../processors/standard/EvaluateXPath.java | 31 +++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java index 056f4fc350..2f3f34bf11 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import javax.xml.namespace.QName; +import javax.xml.transform.ErrorListener; import javax.xml.transform.OutputKeys; import javax.xml.transform.Source; import javax.xml.transform.Transformer; @@ -50,6 +51,7 @@ import javax.xml.xpath.XPathFactoryConfigurationException; import net.sf.saxon.lib.NamespaceConstant; import net.sf.saxon.xpath.XPathEvaluator; + import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -73,7 +75,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.util.ObjectHolder; - import org.xml.sax.InputSource; @EventDriven @@ -356,8 +357,7 @@ public class EvaluateXPath extends AbstractProcessor { session.getProvenanceReporter().modifyContent(flowFile); } } else { - logger.error("Failed to write XPath result for {} due to {}; routing original to 'failure'", new Object[]{ - flowFile, error.get()}); + logger.error("Failed to write XPath result for {} due to {}; routing original to 'failure'", new Object[]{flowFile, error.get()}); session.transfer(flowFile, REL_FAILURE); } } @@ -377,7 +377,32 @@ public class EvaluateXPath extends AbstractProcessor { props.setProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); transformer.setOutputProperties(props); + final ProcessorLog logger = getLogger(); + + final ObjectHolder error = new ObjectHolder<>(null); + transformer.setErrorListener(new ErrorListener() { + @Override + public void warning(final TransformerException exception) throws TransformerException { + logger.warn("Encountered warning from XPath Engine: ", new Object[] {exception.toString(), exception}); + } + + @Override + public void error(final TransformerException exception) throws TransformerException { + logger.error("Encountered error from XPath Engine: ", new Object[] {exception.toString(), exception}); + error.set(exception); + } + + @Override + public void fatalError(final TransformerException exception) throws TransformerException { + logger.error("Encountered warning from XPath Engine: ", new Object[] {exception.toString(), exception}); + error.set(exception); + } + }); + transformer.transform(sourceNode, new StreamResult(out)); + if ( error.get() != null ) { + throw error.get(); + } } private static class XPathValidator implements Validator { From f1baec6b66d0bf6e02db7e538b54d92d88a551d2 Mon Sep 17 00:00:00 2001 From: joewitt Date: Thu, 11 Dec 2014 13:52:49 -0500 Subject: [PATCH 07/20] updating to skip nb configuration files --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 6d4eca990c..73b32b7e01 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ target .settings .classpath nbactions.xml +nb-configuration.xml .DS_Store # Intellij From 52eadeba1827e462f34bd6b6df79f88859091ef2 Mon Sep 17 00:00:00 2001 From: joewitt Date: Thu, 11 Dec 2014 13:54:25 -0500 Subject: [PATCH 08/20] another change for goofy netbeans configs --- .../web/nifi-web-api/nb-configuration.xml | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 nar-bundles/framework-bundle/framework/web/nifi-web-api/nb-configuration.xml diff --git a/nar-bundles/framework-bundle/framework/web/nifi-web-api/nb-configuration.xml b/nar-bundles/framework-bundle/framework/web/nifi-web-api/nb-configuration.xml deleted file mode 100644 index d290d4597f..0000000000 --- a/nar-bundles/framework-bundle/framework/web/nifi-web-api/nb-configuration.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - - - - ide - - From 55d4b1c09990a9cff8a235fe65a0056879e6355a Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 12 Dec 2014 10:00:40 -0500 Subject: [PATCH 09/20] NIFI-164: Add shutdown() method to ContentRepository and implement in FileSystemRepository and VolatileContentRepository to cleanup executors; call shutdown() from FlowController shutdown method --- .../java/org/apache/nifi/controller/FlowController.java | 4 ++++ .../nifi/controller/repository/FileSystemRepository.java | 6 ++++++ .../controller/repository/VolatileContentRepository.java | 5 +++++ .../nifi/controller/repository/ContentRepository.java | 6 ++++++ 4 files changed, 21 insertions(+) diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java index 20c50b5763..e1abe4eaa2 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -1050,6 +1050,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H processScheduler.shutdown(); } + if ( contentRepository != null ) { + contentRepository.shutdown(); + } + if ( provenanceEventRepository != null ) { try { provenanceEventRepository.close(); diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index ba74295122..5fbbfd5533 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -223,6 +223,12 @@ public class FileSystemRepository implements ContentRepository { this.contentClaimManager = claimManager; } + @Override + public void shutdown() { + executor.shutdown(); + containerCleanupExecutor.shutdown(); + } + private static double getRatio(final String value) { final String trimmed = value.trim(); final String percentage = trimmed.substring(0, trimmed.length() - 1); diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java index e14ec5dcea..99e3655e1d 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java @@ -137,6 +137,11 @@ public class VolatileContentRepository implements ContentRepository { public void initialize(final ContentClaimManager claimManager) { this.claimManager = claimManager; } + + @Override + public void shutdown() { + executor.shutdown(); + } /** * Specifies a Backup Repository where data should be written if this diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java index 7012cb3a48..d66b8a6499 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java @@ -45,6 +45,12 @@ public interface ContentRepository { */ void initialize(ContentClaimManager claimManager) throws IOException; + /** + * Shuts down the Content Repository, freeing any resources that may be held. + * This is called when an administrator shuts down NiFi. + */ + void shutdown(); + /** * Returns the names of all Containers that exist for this Content * Repository From d57861d8b5283483b5721f5c4be3ce285ea6bcf4 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 12 Dec 2014 10:19:20 -0500 Subject: [PATCH 10/20] NIFI-164: Fixed MockContentRepository that existed in a unit test --- .../controller/repository/TestStandardProcessSession.java | 4 ++++ .../apache/nifi/provenance/VolatileProvenanceRepository.java | 2 ++ 2 files changed, 6 insertions(+) diff --git a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 060bbd9d95..1ff63c545b 100644 --- a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -977,6 +977,10 @@ public class TestStandardProcessSession { private ConcurrentMap claimantCounts = new ConcurrentHashMap<>(); + @Override + public void shutdown() { + } + public Set getExistingClaims() { final Set claims = new HashSet<>(); diff --git a/nar-bundles/volatile-provenance-repository-bundle/volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nar-bundles/volatile-provenance-repository-bundle/volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java index 9de26613aa..f4f9d127c0 100644 --- a/nar-bundles/volatile-provenance-repository-bundle/volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java +++ b/nar-bundles/volatile-provenance-repository-bundle/volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java @@ -169,6 +169,8 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository { @Override public void close() throws IOException { + queryExecService.shutdownNow(); + scheduledExecService.shutdown(); } @Override From a8722317abdbf28e86d3d4ed91e18d2f3ac2a40a Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 12 Dec 2014 11:09:52 -0500 Subject: [PATCH 11/20] NIFI-66: --- .../nifi/controller/repository/StandardProcessSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 11172a8f54..fbbb29b2d4 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -479,7 +479,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE context.getFlowFileEventRepository().updateRepository(flowFileEvent); - for (final FlowFileEvent connectionEvent : connectionCounts.values()) { + for (final FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) { context.getFlowFileEventRepository().updateRepository(connectionEvent); } } catch (final IOException ioe) { From bc94625142163d60af7aad27ec9de11b79c2ba21 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 12 Dec 2014 12:02:49 -0500 Subject: [PATCH 12/20] NIFI-70: If ROUTE indicates FlowFile routed to same Connection it came from (and only that connection!) then drop the redundant ROUTE event --- .../repository/StandardProcessSession.java | 48 ++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index fbbb29b2d4..60dcdb3f75 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -538,7 +538,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) { continue; } - + if ( isSpuriousRouteEvent(event, checkpoint.records) ) { + continue; + } + + // Check if the event indicates that the FlowFile was routed to the same + // connection from which it was pulled (and only this connection). If so, discard the event. + isSpuriousRouteEvent(event, checkpoint.records); + recordsToSubmit.add(event); addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType()); } @@ -776,6 +783,45 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE return false; } + + /** + * Checks if the given event is a spurious ROUTE, meaning that the ROUTE indicates that a FlowFile + * was routed to a relationship with only 1 connection and that Connection is the Connection from which + * the FlowFile was pulled. I.e., the FlowFile was really routed nowhere. + * + * @param event + * @param records + * @return + */ + private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map records) { + if ( event.getEventType() == ProvenanceEventType.ROUTE ) { + final String relationshipName = event.getRelationship(); + final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); + final Collection connectionsForRelationship = this.context.getConnections(relationship); + + // If the number of connections for this relationship is not 1, then we can't ignore this ROUTE event, + // as it may be cloning the FlowFile and adding to multiple connections. + if ( connectionsForRelationship.size() == 1 ) { + for ( final Map.Entry entry : records.entrySet() ) { + final FlowFileRecord flowFileRecord = entry.getKey(); + if ( event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())) ) { + final StandardRepositoryRecord repoRecord = entry.getValue(); + if ( repoRecord.getOriginalQueue() == null ) { + return false; + } + + final String originalQueueId = repoRecord.getOriginalQueue().getIdentifier(); + final Connection destinationConnection = connectionsForRelationship.iterator().next(); + final String destinationQueueId = destinationConnection.getFlowFileQueue().getIdentifier(); + return originalQueueId.equals(destinationQueueId); + } + } + } + } + + return false; + } + @Override public void rollback() { rollback(false); From 21c24b6b68ffa4991bd9a15205adb2eb1a71e862 Mon Sep 17 00:00:00 2001 From: Karl-Heinz Marbaise Date: Sat, 13 Dec 2014 23:47:57 +0100 Subject: [PATCH 13/20] Correctly inject MavenProject / MavenSession into nar-maven-plugin Signed-off-by: joewitt --- misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java b/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java index 263fe8820e..5196f73555 100644 --- a/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java +++ b/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java @@ -86,10 +86,10 @@ public class NarMojo extends AbstractMojo { * POM * */ - @Parameter(property = "project", readonly = true, required = true) + @Parameter(defaultValue = "${project}", readonly = true, required = true) protected MavenProject project; - @Parameter(property = "session", readonly = true, required = true) + @Parameter(defaultValue = "${session}", readonly = true, required = true) protected MavenSession session; /** From 9ccbf8be7bb4385c39905911a659b59fb6b34994 Mon Sep 17 00:00:00 2001 From: Karl-Heinz Marbaise Date: Sat, 13 Dec 2014 22:58:01 +0100 Subject: [PATCH 14/20] Added appropriate Maven configuration to build the nar-maven-plugin with Maven 3.X instead of Maven 3.2.X Signed-off-by: joewitt --- misc/nar-maven-plugin/pom.xml | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/misc/nar-maven-plugin/pom.xml b/misc/nar-maven-plugin/pom.xml index 3888df39ff..5c7ca7f979 100644 --- a/misc/nar-maven-plugin/pom.xml +++ b/misc/nar-maven-plugin/pom.xml @@ -42,7 +42,23 @@ org.apache.maven.plugins maven-plugin-plugin 3.3 - + + + default-descriptor + + descriptor + + process-classes + + + help-descriptor + + helpmojo + + process-classes + + + @@ -68,7 +84,8 @@ org.apache.maven.plugin-tools maven-plugin-annotations 3.3 - + provided + From 73cc6cbe284faab9a91b4816197a105524b1df41 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 15 Dec 2014 13:09:08 -0500 Subject: [PATCH 15/20] NIFI-56: Made test single-threaded --- .../org/apache/nifi/processors/standard/TestScanContent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanContent.java b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanContent.java index 499fb3e15e..9079f82101 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanContent.java +++ b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanContent.java @@ -55,7 +55,7 @@ public class TestScanContent { Files.write(dictionaryPath, termBytes, StandardOpenOption.CREATE, StandardOpenOption.WRITE); final TestRunner runner = TestRunners.newTestRunner(new ScanContent()); - runner.setThreadCount(3); + runner.setThreadCount(1); runner.setProperty(ScanContent.DICTIONARY, dictionaryPath.toString()); runner.setProperty(ScanContent.DICTIONARY_ENCODING, ScanContent.BINARY_ENCODING); From e04a55d3a5097d1ae3ff5c5a4c8f8ad1e1dc56b9 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 15 Dec 2014 13:14:42 -0500 Subject: [PATCH 16/20] NIFI-43: Do not throw InvocationTargetException if it is wrapping a RuntimeException; instead just throw the RuntimeException --- .../org/apache/nifi/util/ReflectionUtils.java | 72 ++++++++++--------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java index 9d52eb35d8..e15e00a6bc 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java @@ -42,43 +42,51 @@ public class ReflectionUtils { * @throws IllegalAccessException */ public static void invokeMethodsWithAnnotation(final Class annotation, final Object instance, final Object... args) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { - for (final Method method : instance.getClass().getMethods()) { - if (method.isAnnotationPresent(annotation)) { - final boolean isAccessible = method.isAccessible(); - method.setAccessible(true); - - try { - final Class[] argumentTypes = method.getParameterTypes(); - if (argumentTypes.length > args.length) { - throw new IllegalArgumentException(String.format("Unable to invoke method %1$s on %2$s because method expects %3$s parameters but only %4$s were given", - method.getName(), instance, argumentTypes.length, args.length)); - } - - for (int i = 0; i < argumentTypes.length; i++) { - final Class argType = argumentTypes[i]; - if (!argType.isAssignableFrom(args[i].getClass())) { - throw new IllegalArgumentException(String.format( - "Unable to invoke method %1$s on %2$s because method parameter %3$s is expected to be of type %4$s but argument passed was of type %5$s", - method.getName(), instance, i, argType, args[i].getClass())); + try { + for (final Method method : instance.getClass().getMethods()) { + if (method.isAnnotationPresent(annotation)) { + final boolean isAccessible = method.isAccessible(); + method.setAccessible(true); + + try { + final Class[] argumentTypes = method.getParameterTypes(); + if (argumentTypes.length > args.length) { + throw new IllegalArgumentException(String.format("Unable to invoke method %1$s on %2$s because method expects %3$s parameters but only %4$s were given", + method.getName(), instance, argumentTypes.length, args.length)); } - } - - if (argumentTypes.length == args.length) { - method.invoke(instance, args); - } else { - final Object[] argsToPass = new Object[argumentTypes.length]; - for (int i = 0; i < argsToPass.length; i++) { - argsToPass[i] = args[i]; + + for (int i = 0; i < argumentTypes.length; i++) { + final Class argType = argumentTypes[i]; + if (!argType.isAssignableFrom(args[i].getClass())) { + throw new IllegalArgumentException(String.format( + "Unable to invoke method %1$s on %2$s because method parameter %3$s is expected to be of type %4$s but argument passed was of type %5$s", + method.getName(), instance, i, argType, args[i].getClass())); + } + } + + if (argumentTypes.length == args.length) { + method.invoke(instance, args); + } else { + final Object[] argsToPass = new Object[argumentTypes.length]; + for (int i = 0; i < argsToPass.length; i++) { + argsToPass[i] = args[i]; + } + + method.invoke(instance, argsToPass); + } + } finally { + if (!isAccessible) { + method.setAccessible(false); } - - method.invoke(instance, argsToPass); - } - } finally { - if (!isAccessible) { - method.setAccessible(false); } } } + } catch (final InvocationTargetException ite) { + if ( ite.getCause() instanceof RuntimeException ) { + throw (RuntimeException) ite.getCause(); + } else { + throw ite; + } } } From f0bea5c156fa0bfc1d7773ae45ea73ba594ba77a Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 15 Dec 2014 13:21:59 -0500 Subject: [PATCH 17/20] NIFI-49: include name of default value if it is not allowed in a property descriptor --- .../java/org/apache/nifi/components/PropertyDescriptor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java index c95d449b70..19600ab0a6 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java @@ -425,7 +425,7 @@ public final class PropertyDescriptor implements Comparable throw new IllegalStateException("Must specify a name"); } if (!isValueAllowed(defaultValue)) { - throw new IllegalStateException("Default value is not in the set of allowable values"); + throw new IllegalStateException("Default value '" + defaultValue + "' is not in the set of allowable values"); } return new PropertyDescriptor(this); From 13160429777e6a48a7833174a99f2771c11649ed Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 15 Dec 2014 13:28:09 -0500 Subject: [PATCH 18/20] NIFI-49: Included patch from Philip Young to include name of default vlaue when not allowed as a property descriptor value --- .../java/org/apache/nifi/components/PropertyDescriptor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java index c95d449b70..ba0f7dcc8c 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java @@ -425,7 +425,7 @@ public final class PropertyDescriptor implements Comparable throw new IllegalStateException("Must specify a name"); } if (!isValueAllowed(defaultValue)) { - throw new IllegalStateException("Default value is not in the set of allowable values"); + throw new IllegalStateException("Default value ["+ defaultValue +"] is not in the set of allowable values"); } return new PropertyDescriptor(this); From 9e60aa0f25754751d468ef9cd3da428055c08b4e Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 15 Dec 2014 13:53:12 -0500 Subject: [PATCH 19/20] NIFI-49: Included patch from Philip Young to include name of default value when not allowed as a property descriptor value --- .../components/TestPropertyDescriptor.java | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 nifi-api/src/test/java/org/apache/nifi/components/TestPropertyDescriptor.java diff --git a/nifi-api/src/test/java/org/apache/nifi/components/TestPropertyDescriptor.java b/nifi-api/src/test/java/org/apache/nifi/components/TestPropertyDescriptor.java new file mode 100644 index 0000000000..82b8111089 --- /dev/null +++ b/nifi-api/src/test/java/org/apache/nifi/components/TestPropertyDescriptor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.components; + +import static org.junit.Assert.assertNotNull; + +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + + +/** + * Regression test for issue NIFI-49, to ensure that if a Processor's Property's Default Value is not allowed, + * the Exception thrown should indicate what the default value is + */ +public class TestPropertyDescriptor { + + private static Builder invalidDescriptorBuilder; + private static Builder validDescriptorBuilder; + private static String DEFAULT_VALUE = "Default Value"; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @BeforeClass + public static void setUp() { + validDescriptorBuilder = new PropertyDescriptor.Builder().name("").allowableValues("Allowable Value", "Another Allowable Value").defaultValue("Allowable Value"); + invalidDescriptorBuilder = new PropertyDescriptor.Builder().name("").allowableValues("Allowable Value", "Another Allowable Value").defaultValue(DEFAULT_VALUE); + } + + @Test + public void testExceptionThrownByDescriptorWithInvalidDefaultValue() { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("["+ DEFAULT_VALUE +"]"); + + invalidDescriptorBuilder.build(); + } + + @Test + public void testNoExceptionThrownByPropertyDescriptorWithValidDefaultValue() { + assertNotNull(validDescriptorBuilder.build()); + } +} From 1cc3ce57556eb7cf9a5f94b269eb24b284e518eb Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 15 Dec 2014 14:28:11 -0500 Subject: [PATCH 20/20] NIFI-35: Provide an EventReporter to the FlowFileSwapManager and provide events for any errors --- .../controller/FileSystemSwapManager.java | 33 ++++++++++++++----- .../nifi/controller/FlowController.java | 20 ++++++----- .../repository/FlowFileSwapManager.java | 5 ++- 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index 3af209841e..ad95f8ec6f 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -61,11 +61,12 @@ import org.apache.nifi.controller.repository.StandardFlowFileRecord; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaimManager; import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.io.BufferedOutputStream; import org.apache.nifi.processor.QueueSize; +import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,10 +81,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager { public static final int MINIMUM_SWAP_COUNT = 10000; private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap"); public static final int SWAP_ENCODING_VERSION = 6; + public static final String EVENT_CATEGORY = "Swap FlowFiles"; private final ScheduledExecutorService swapQueueIdentifierExecutor; private final ScheduledExecutorService swapInExecutor; private volatile FlowFileRepository flowFileRepository; + private volatile EventReporter eventReporter; // Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which provides queue locking and necessary state for swapping back in private final ConcurrentMap swapMap = new ConcurrentHashMap<>(); @@ -129,9 +132,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } - public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager) { + public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) { this.claimManager = claimManager; this.flowFileRepository = flowFileRepository; + this.eventReporter = eventReporter; swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, TimeUnit.MILLISECONDS); swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, swapInMillis, TimeUnit.MILLISECONDS); } @@ -437,10 +441,15 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } if (!swapFile.delete()) { - logger.warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file can be cleaned up manually"); + final String errMsg = "Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually"; + logger.warn(errMsg); + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, errMsg); } } catch (final Exception e) { - logger.error("Failed to Swap In FlowFiles for {} due to {}", new Object[]{flowFileQueue, e.toString()}, e); + final String errMsg = "Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e; + logger.error(errMsg); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); + if (swapFile != null) { queue.add(swapFile); } @@ -488,7 +497,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } catch (final IOException ioe) { recordsSwapped = 0; flowFileQueue.putSwappedRecords(toSwap); - logger.error("Failed to swap out {} FlowFiles from {} to Swap File {} due to {}", new Object[]{toSwap.size(), flowFileQueue, swapLocation, ioe.toString()}, ioe); + final String errMsg = "Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe; + logger.error(errMsg); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); } if (recordsSwapped > 0) { @@ -549,14 +560,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager { final int swapEncodingVersion = in.readInt(); if (swapEncodingVersion > SWAP_ENCODING_VERSION) { - throw new IOException("Cannot swap FlowFiles in from " + swapFile + " because the encoding version is " - + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"); + final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is " + + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)"; + + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); + throw new IOException(errMsg); } final String connectionId = in.readUTF(); final FlowFileQueue queue = queueMap.get(connectionId); if (queue == null) { logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist"); continue; } @@ -579,7 +594,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager { maxRecoveredId = maxId; } } catch (final IOException ioe) { - logger.error("Cannot recover Swapped FlowFiles from Swap File {} due to {}", swapFile, ioe.toString()); + final String errMsg = "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe; + logger.error(errMsg); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg); if (logger.isDebugEnabled()) { logger.error("", ioe); } diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java index e1abe4eaa2..545017abb1 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -388,13 +388,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H try { this.provenanceEventRepository = createProvenanceRepository(properties); - this.provenanceEventRepository.initialize(new EventReporter() { - @Override - public void reportEvent(final Severity severity, final String category, final String message) { - final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message); - bulletinRepository.addBulletin(bulletin); - } - }); + this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository)); this.contentRepository = createContentRepository(properties); } catch (final Exception e) { @@ -516,6 +510,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H } } + private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) { + return new EventReporter() { + @Override + public void reportEvent(final Severity severity, final String category, final String message) { + final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message); + bulletinRepository.addBulletin(bulletin); + } + }; + } + public void initializeFlow() throws IOException { writeLock.lock(); try { @@ -537,7 +541,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H contentRepository.cleanup(); if (flowFileSwapManager != null) { - flowFileSwapManager.start(flowFileRepository, this, contentClaimManager); + flowFileSwapManager.start(flowFileRepository, this, contentClaimManager, createEventReporter(bulletinRepository)); } if (externalSiteListener != null) { diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java index 739cb2be92..c6daab8303 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.repository; import org.apache.nifi.controller.repository.claim.ContentClaimManager; +import org.apache.nifi.events.EventReporter; /** * Defines a mechanism by which FlowFiles can be move into external storage or @@ -34,8 +35,10 @@ public interface FlowFileSwapManager { * can be obtained and restored * @param claimManager the ContentClaimManager to use for interacting with * Content Claims + * @param reporter the EventReporter that can be used for notifying users of + * important events */ - void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager); + void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager, EventReporter reporter); /** * Shuts down the manager