diff --git a/.gitignore b/.gitignore
index 6d4eca990c..73b32b7e01 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@ target
.settings
.classpath
nbactions.xml
+nb-configuration.xml
.DS_Store
# Intellij
diff --git a/assemblies/nifi/src/main/assembly/dependencies.xml b/assemblies/nifi/src/main/assembly/dependencies.xml
index 339275bd11..97f3d4ea77 100644
--- a/assemblies/nifi/src/main/assembly/dependencies.xml
+++ b/assemblies/nifi/src/main/assembly/dependencies.xml
@@ -27,6 +27,18 @@
runtime
lib
true
+
+ nifi-bootstrap
+
+
+
+
+ runtime
+ lib/bootstrap
+ true
+
+ nifi-bootstrap
+
diff --git a/misc/build-order.sh b/misc/build-order.sh
index 855321a312..e8f8e5e373 100755
--- a/misc/build-order.sh
+++ b/misc/build-order.sh
@@ -5,7 +5,9 @@ cd misc/nar-maven-plugin && \
mvn $MAVEN_FLAGS install && \
cd ../../commons/nifi-parent && \
mvn $MAVEN_FLAGS install && \
-cd ../../nifi-api && \
+cd ../../nifi-bootstrap && \
+mvn $MAVEN_FLAGS install && \
+cd ../nifi-api && \
mvn $MAVEN_FLAGS install && \
cd ../commons/ && \
cd nifi-stream-utils && \
diff --git a/misc/nar-maven-plugin/pom.xml b/misc/nar-maven-plugin/pom.xml
index 3888df39ff..5c7ca7f979 100644
--- a/misc/nar-maven-plugin/pom.xml
+++ b/misc/nar-maven-plugin/pom.xml
@@ -42,7 +42,23 @@
org.apache.maven.plugins
maven-plugin-plugin
3.3
-
+
+
+ default-descriptor
+
+ descriptor
+
+ process-classes
+
+
+ help-descriptor
+
+ helpmojo
+
+ process-classes
+
+
+
@@ -68,7 +84,8 @@
org.apache.maven.plugin-tools
maven-plugin-annotations
3.3
-
+ provided
+
diff --git a/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java b/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java
index 263fe8820e..5196f73555 100644
--- a/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java
+++ b/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java
@@ -86,10 +86,10 @@ public class NarMojo extends AbstractMojo {
* POM
*
*/
- @Parameter(property = "project", readonly = true, required = true)
+ @Parameter(defaultValue = "${project}", readonly = true, required = true)
protected MavenProject project;
- @Parameter(property = "session", readonly = true, required = true)
+ @Parameter(defaultValue = "${session}", readonly = true, required = true)
protected MavenSession session;
/**
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 3af209841e..ad95f8ec6f 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -61,11 +61,12 @@ import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentClaimManager;
import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.events.EventReporter;
import org.apache.nifi.io.BufferedOutputStream;
import org.apache.nifi.processor.QueueSize;
+import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,10 +81,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
public static final int MINIMUM_SWAP_COUNT = 10000;
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
public static final int SWAP_ENCODING_VERSION = 6;
+ public static final String EVENT_CATEGORY = "Swap FlowFiles";
private final ScheduledExecutorService swapQueueIdentifierExecutor;
private final ScheduledExecutorService swapInExecutor;
private volatile FlowFileRepository flowFileRepository;
+ private volatile EventReporter eventReporter;
// Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which provides queue locking and necessary state for swapping back in
private final ConcurrentMap swapMap = new ConcurrentHashMap<>();
@@ -129,9 +132,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
}
- public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager) {
+ public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) {
this.claimManager = claimManager;
this.flowFileRepository = flowFileRepository;
+ this.eventReporter = eventReporter;
swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, TimeUnit.MILLISECONDS);
swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, swapInMillis, TimeUnit.MILLISECONDS);
}
@@ -437,10 +441,15 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
if (!swapFile.delete()) {
- logger.warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file can be cleaned up manually");
+ final String errMsg = "Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually";
+ logger.warn(errMsg);
+ eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, errMsg);
}
} catch (final Exception e) {
- logger.error("Failed to Swap In FlowFiles for {} due to {}", new Object[]{flowFileQueue, e.toString()}, e);
+ final String errMsg = "Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e;
+ logger.error(errMsg);
+ eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
+
if (swapFile != null) {
queue.add(swapFile);
}
@@ -488,7 +497,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} catch (final IOException ioe) {
recordsSwapped = 0;
flowFileQueue.putSwappedRecords(toSwap);
- logger.error("Failed to swap out {} FlowFiles from {} to Swap File {} due to {}", new Object[]{toSwap.size(), flowFileQueue, swapLocation, ioe.toString()}, ioe);
+ final String errMsg = "Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe;
+ logger.error(errMsg);
+ eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
}
if (recordsSwapped > 0) {
@@ -549,14 +560,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final int swapEncodingVersion = in.readInt();
if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
- throw new IOException("Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
- + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
+ final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
+ + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
+
+ eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
+ throw new IOException(errMsg);
}
final String connectionId = in.readUTF();
final FlowFileQueue queue = queueMap.get(connectionId);
if (queue == null) {
logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId);
+ eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist");
continue;
}
@@ -579,7 +594,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
maxRecoveredId = maxId;
}
} catch (final IOException ioe) {
- logger.error("Cannot recover Swapped FlowFiles from Swap File {} due to {}", swapFile, ioe.toString());
+ final String errMsg = "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe;
+ logger.error(errMsg);
+ eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
if (logger.isDebugEnabled()) {
logger.error("", ioe);
}
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
index 20c50b5763..545017abb1 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -388,13 +388,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
try {
this.provenanceEventRepository = createProvenanceRepository(properties);
- this.provenanceEventRepository.initialize(new EventReporter() {
- @Override
- public void reportEvent(final Severity severity, final String category, final String message) {
- final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
- bulletinRepository.addBulletin(bulletin);
- }
- });
+ this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository));
this.contentRepository = createContentRepository(properties);
} catch (final Exception e) {
@@ -516,6 +510,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
}
}
+ private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) {
+ return new EventReporter() {
+ @Override
+ public void reportEvent(final Severity severity, final String category, final String message) {
+ final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
+ bulletinRepository.addBulletin(bulletin);
+ }
+ };
+ }
+
public void initializeFlow() throws IOException {
writeLock.lock();
try {
@@ -537,7 +541,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
contentRepository.cleanup();
if (flowFileSwapManager != null) {
- flowFileSwapManager.start(flowFileRepository, this, contentClaimManager);
+ flowFileSwapManager.start(flowFileRepository, this, contentClaimManager, createEventReporter(bulletinRepository));
}
if (externalSiteListener != null) {
@@ -1050,6 +1054,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
processScheduler.shutdown();
}
+ if ( contentRepository != null ) {
+ contentRepository.shutdown();
+ }
+
if ( provenanceEventRepository != null ) {
try {
provenanceEventRepository.close();
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index ba74295122..5fbbfd5533 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -223,6 +223,12 @@ public class FileSystemRepository implements ContentRepository {
this.contentClaimManager = claimManager;
}
+ @Override
+ public void shutdown() {
+ executor.shutdown();
+ containerCleanupExecutor.shutdown();
+ }
+
private static double getRatio(final String value) {
final String trimmed = value.trim();
final String percentage = trimmed.substring(0, trimmed.length() - 1);
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 08e6afe84a..60dcdb3f75 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -75,7 +75,6 @@ import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.util.NiFiProperties;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -480,7 +479,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
context.getFlowFileEventRepository().updateRepository(flowFileEvent);
- for (final FlowFileEvent connectionEvent : connectionCounts.values()) {
+ for (final FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) {
context.getFlowFileEventRepository().updateRepository(connectionEvent);
}
} catch (final IOException ioe) {
@@ -488,6 +487,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
+ private void addEventType(final Map> map, final String id, final ProvenanceEventType eventType) {
+ Set eventTypes = map.get(id);
+ if ( eventTypes == null ) {
+ eventTypes = new HashSet<>();
+ map.put(id, eventTypes);
+ }
+
+ eventTypes.add(eventType);
+ }
+
private void updateProvenanceRepo(final Checkpoint checkpoint) {
// Update Provenance Repository
final ProvenanceEventRepository provenanceRepo = context.getProvenanceRepository();
@@ -496,7 +505,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// in case the Processor developer submitted the same events to the reporter. So we use a LinkedHashSet
// for this, so that we are able to ensure that the events are submitted in the proper order.
final Set recordsToSubmit = new LinkedHashSet<>();
-
+ final Map> eventTypesPerFlowFileId = new HashMap<>();
+
final Set processorGenerated = checkpoint.reportedEvents;
// We first want to submit FORK events because if the Processor is going to create events against
@@ -513,6 +523,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (!event.getChildUuids().isEmpty() && !isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && !processorGenerated.contains(event)) {
recordsToSubmit.add(event);
+
+ for ( final String childUuid : event.getChildUuids() ) {
+ addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType());
+ }
+ for ( final String parentUuid : event.getParentUuids() ) {
+ addEventType(eventTypesPerFlowFileId, parentUuid, event.getEventType());
+ }
}
}
@@ -521,8 +538,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
continue;
}
-
+ if ( isSpuriousRouteEvent(event, checkpoint.records) ) {
+ continue;
+ }
+
+ // Check if the event indicates that the FlowFile was routed to the same
+ // connection from which it was pulled (and only this connection). If so, discard the event.
+ isSpuriousRouteEvent(event, checkpoint.records);
+
recordsToSubmit.add(event);
+ addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
}
// Finally, add any other events that we may have generated.
@@ -533,6 +558,68 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
recordsToSubmit.add(event);
+ addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
+ }
+ }
+
+ // Check if content or attributes changed. If so, register the appropriate events.
+ for (final StandardRepositoryRecord repoRecord : checkpoint.records.values() ) {
+ final ContentClaim original = repoRecord.getOriginalClaim();
+ final ContentClaim current = repoRecord.getCurrentClaim();
+
+ boolean contentChanged = false;
+ if ( original == null && current != null ) {
+ contentChanged = true;
+ }
+ if ( original != null && current == null ) {
+ contentChanged = true;
+ }
+ if ( original != null && current != null && !original.equals(current) ) {
+ contentChanged = true;
+ }
+
+ final FlowFileRecord curFlowFile = repoRecord.getCurrent();
+ final String flowFileId = curFlowFile.getAttribute(CoreAttributes.UUID.key());
+ boolean eventAdded = false;
+
+ if (checkpoint.removedFlowFiles.contains(flowFileId)) {
+ continue;
+ }
+
+ final boolean newFlowFile = repoRecord.getOriginal() == null;
+ if ( contentChanged && !newFlowFile ) {
+ recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build());
+ addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED);
+ eventAdded = true;
+ }
+
+ if ( checkpoint.createdFlowFiles.contains(flowFileId) ) {
+ final Set registeredTypes = eventTypesPerFlowFileId.get(flowFileId);
+ boolean creationEventRegistered = false;
+ if ( registeredTypes != null ) {
+ if ( registeredTypes.contains(ProvenanceEventType.CREATE) ||
+ registeredTypes.contains(ProvenanceEventType.FORK) ||
+ registeredTypes.contains(ProvenanceEventType.JOIN) ||
+ registeredTypes.contains(ProvenanceEventType.RECEIVE) ) {
+ creationEventRegistered = true;
+ }
+ }
+
+ if ( !creationEventRegistered ) {
+ recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CREATE).build());
+ eventAdded = true;
+ }
+ }
+
+ if ( !eventAdded && !repoRecord.getUpdatedAttributes().isEmpty() ) {
+ // We generate an ATTRIBUTES_MODIFIED event only if no other event has been
+ // created for the FlowFile. We do this because all events contain both the
+ // newest and the original attributes, so generating an ATTRIBUTES_MODIFIED
+ // event is redundant if another already exists.
+ if ( !eventTypesPerFlowFileId.containsKey(flowFileId) ) {
+ recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).build());
+ addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.ATTRIBUTES_MODIFIED);
+ }
}
}
@@ -696,6 +783,45 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return false;
}
+
+ /**
+ * Checks if the given event is a spurious ROUTE, meaning that the ROUTE indicates that a FlowFile
+ * was routed to a relationship with only 1 connection and that Connection is the Connection from which
+ * the FlowFile was pulled. I.e., the FlowFile was really routed nowhere.
+ *
+ * @param event
+ * @param records
+ * @return
+ */
+ private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map records) {
+ if ( event.getEventType() == ProvenanceEventType.ROUTE ) {
+ final String relationshipName = event.getRelationship();
+ final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
+ final Collection connectionsForRelationship = this.context.getConnections(relationship);
+
+ // If the number of connections for this relationship is not 1, then we can't ignore this ROUTE event,
+ // as it may be cloning the FlowFile and adding to multiple connections.
+ if ( connectionsForRelationship.size() == 1 ) {
+ for ( final Map.Entry entry : records.entrySet() ) {
+ final FlowFileRecord flowFileRecord = entry.getKey();
+ if ( event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())) ) {
+ final StandardRepositoryRecord repoRecord = entry.getValue();
+ if ( repoRecord.getOriginalQueue() == null ) {
+ return false;
+ }
+
+ final String originalQueueId = repoRecord.getOriginalQueue().getIdentifier();
+ final Connection destinationConnection = connectionsForRelationship.iterator().next();
+ final String destinationQueueId = destinationConnection.getFlowFileQueue().getIdentifier();
+ return originalQueueId.equals(destinationQueueId);
+ }
+ }
+ }
+ }
+
+ return false;
+ }
+
@Override
public void rollback() {
rollback(false);
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
index e8b1e87daf..01fb3dc028 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
@@ -328,7 +328,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
}
}
}
-
+
@Override
public void modifyContent(final FlowFile flowFile) {
modifyContent(flowFile, null, -1L);
@@ -421,7 +421,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
}
}
- private ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) {
+ ProvenanceEventBuilder build(final FlowFile flowFile, final ProvenanceEventType eventType) {
final ProvenanceEventBuilder builder = repository.eventBuilder();
builder.setEventType(eventType);
builder.fromFlowFile(flowFile);
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index e14ec5dcea..99e3655e1d 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -137,6 +137,11 @@ public class VolatileContentRepository implements ContentRepository {
public void initialize(final ContentClaimManager claimManager) {
this.claimManager = claimManager;
}
+
+ @Override
+ public void shutdown() {
+ executor.shutdown();
+ }
/**
* Specifies a Backup Repository where data should be written if this
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index bb5efd7af6..d3fb41f412 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -54,6 +54,7 @@ import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import javax.security.cert.CertificateExpiredException;
import javax.security.cert.CertificateNotYetValidException;
+import javax.ws.rs.core.Response;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
@@ -88,7 +89,6 @@ import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.entity.ControllerEntity;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,6 +110,11 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public static final String ROOT_GROUP_STATUS_URI_PATH = "/controller/process-groups/root/status";
public static final long LISTENING_PORT_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
+ // status codes
+ public static final int OK_STATUS_CODE = Status.OK.getStatusCode();
+ public static final int UNAUTHORIZED_STATUS_CODE = Status.UNAUTHORIZED.getStatusCode();
+ public static final int FORBIDDEN_STATUS_CODE = Status.FORBIDDEN.getStatusCode();
+
private final String id;
private final URI targetUri;
@@ -860,7 +865,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
try {
// perform the request
final ClientResponse response = utils.get(uri, getCommunicationsTimeout(TimeUnit.MILLISECONDS));
- if (!Status.OK.equals(response.getClientResponseStatus())) {
+
+ if (!Response.Status.Family.SUCCESSFUL.equals(response.getStatusInfo().getFamily())) {
writeLock.lock();
try {
for (final Iterator iter = inputPorts.values().iterator(); iter.hasNext();) {
@@ -882,7 +888,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
// consume the entity entirely
response.getEntity(String.class);
- throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + uriVal + ". Got HTTP Error Code " + response.getClientResponseStatus().getStatusCode() + ": " + response.getClientResponseStatus().getReasonPhrase());
+ throw new CommunicationsException("Unable to communicate with Remote NiFi at URI " + uriVal + ". Got HTTP Error Code " + response.getStatus() + ": " + response.getStatusInfo().getReasonPhrase());
}
final ControllerEntity entity = response.getEntity(ControllerEntity.class);
@@ -1303,56 +1309,54 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
try {
final RemoteProcessGroupUtils utils = new RemoteProcessGroupUtils(isWebApiSecure() ? sslContext : null);
final ClientResponse response = utils.get(new URI(apiUri + CONTROLLER_URI_PATH), getCommunicationsTimeout(TimeUnit.MILLISECONDS));
- switch (response.getClientResponseStatus()) {
- case OK:
- final ControllerEntity entity = response.getEntity(ControllerEntity.class);
- final ControllerDTO dto = entity.getController();
+
+ final int statusCode = response.getStatus();
+
+ if ( statusCode == OK_STATUS_CODE ) {
+ final ControllerEntity entity = response.getEntity(ControllerEntity.class);
+ final ControllerDTO dto = entity.getController();
- if (dto.getRemoteSiteListeningPort() == null) {
- authorizationIssue = "Remote instance is not configured to allow Site-to-Site communications at this time.";
+ if (dto.getRemoteSiteListeningPort() == null) {
+ authorizationIssue = "Remote instance is not configured to allow Site-to-Site communications at this time.";
+ } else {
+ authorizationIssue = null;
+ }
+
+ writeLock.lock();
+ try {
+ listeningPort = dto.getRemoteSiteListeningPort();
+ destinationSecure = dto.isSiteToSiteSecure();
+ } finally {
+ writeLock.unlock();
+ }
+
+ final String remoteInstanceId = dto.getInstanceId();
+ boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId);
+ pointsToCluster.set(isPointingToCluster);
+ } else if ( statusCode == UNAUTHORIZED_STATUS_CODE ) {
+ try {
+ final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString());
+ if (Response.Status.Family.SUCCESSFUL.equals(requestAccountResponse.getStatusInfo().getFamily()) ) {
+ logger.info("{} Issued a Request to communicate with remote instance", this);
} else {
- authorizationIssue = null;
+ logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{
+ this, requestAccountResponse.getStatus(), requestAccountResponse.getStatusInfo().getReasonPhrase()});
}
-
- writeLock.lock();
- try {
- listeningPort = dto.getRemoteSiteListeningPort();
- destinationSecure = dto.isSiteToSiteSecure();
- } finally {
- writeLock.unlock();
+ } catch (final Exception e) {
+ logger.error("{} Failed to request account due to {}", this, e.toString());
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
}
+ }
- final String remoteInstanceId = dto.getInstanceId();
- boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId);
- pointsToCluster.set(isPointingToCluster);
- break;
- case UNAUTHORIZED:
- try {
- final ClientResponse requestAccountResponse = utils.issueRegistrationRequest(apiUri.toString());
- if (requestAccountResponse.getClientResponseStatus() == Status.OK) {
- logger.info("{} Issued a Request to communicate with remote instance", this);
- } else {
- logger.error("{} Failed to request account: got unexpected response code of {}:{}", new Object[]{
- this, requestAccountResponse.getClientResponseStatus().getStatusCode(), requestAccountResponse.getClientResponseStatus().getReasonPhrase()});
- }
- } catch (final Exception e) {
- logger.error("{} Failed to request account due to {}", this, e.toString());
- if (logger.isDebugEnabled()) {
- logger.error("", e);
- }
- }
-
- authorizationIssue = response.getEntity(String.class);
- break;
- case FORBIDDEN:
- authorizationIssue = response.getEntity(String.class);
- break;
- default:
- final String message = response.getEntity(String.class);
- logger.warn("{} When communicating with remote instance, got unexpected response code {}:{} with entity: {}",
- new Object[]{this, response.getClientResponseStatus().getStatusCode(), response.getClientResponseStatus().getReasonPhrase(), message});
- authorizationIssue = "Unable to determine Site-to-Site availability.";
- break;
+ authorizationIssue = response.getEntity(String.class);
+ } else if ( statusCode == FORBIDDEN_STATUS_CODE ) {
+ authorizationIssue = response.getEntity(String.class);
+ } else {
+ final String message = response.getEntity(String.class);
+ logger.warn("{} When communicating with remote instance, got unexpected response code {}:{} with entity: {}",
+ new Object[]{this, response.getStatus(), response.getStatusInfo().getReasonPhrase(), message});
+ authorizationIssue = "Unable to determine Site-to-Site availability.";
}
} catch (Exception e) {
logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e));
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
index 9d52eb35d8..e15e00a6bc 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
@@ -42,43 +42,51 @@ public class ReflectionUtils {
* @throws IllegalAccessException
*/
public static void invokeMethodsWithAnnotation(final Class extends Annotation> annotation, final Object instance, final Object... args) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
- for (final Method method : instance.getClass().getMethods()) {
- if (method.isAnnotationPresent(annotation)) {
- final boolean isAccessible = method.isAccessible();
- method.setAccessible(true);
-
- try {
- final Class>[] argumentTypes = method.getParameterTypes();
- if (argumentTypes.length > args.length) {
- throw new IllegalArgumentException(String.format("Unable to invoke method %1$s on %2$s because method expects %3$s parameters but only %4$s were given",
- method.getName(), instance, argumentTypes.length, args.length));
- }
-
- for (int i = 0; i < argumentTypes.length; i++) {
- final Class> argType = argumentTypes[i];
- if (!argType.isAssignableFrom(args[i].getClass())) {
- throw new IllegalArgumentException(String.format(
- "Unable to invoke method %1$s on %2$s because method parameter %3$s is expected to be of type %4$s but argument passed was of type %5$s",
- method.getName(), instance, i, argType, args[i].getClass()));
+ try {
+ for (final Method method : instance.getClass().getMethods()) {
+ if (method.isAnnotationPresent(annotation)) {
+ final boolean isAccessible = method.isAccessible();
+ method.setAccessible(true);
+
+ try {
+ final Class>[] argumentTypes = method.getParameterTypes();
+ if (argumentTypes.length > args.length) {
+ throw new IllegalArgumentException(String.format("Unable to invoke method %1$s on %2$s because method expects %3$s parameters but only %4$s were given",
+ method.getName(), instance, argumentTypes.length, args.length));
}
- }
-
- if (argumentTypes.length == args.length) {
- method.invoke(instance, args);
- } else {
- final Object[] argsToPass = new Object[argumentTypes.length];
- for (int i = 0; i < argsToPass.length; i++) {
- argsToPass[i] = args[i];
+
+ for (int i = 0; i < argumentTypes.length; i++) {
+ final Class> argType = argumentTypes[i];
+ if (!argType.isAssignableFrom(args[i].getClass())) {
+ throw new IllegalArgumentException(String.format(
+ "Unable to invoke method %1$s on %2$s because method parameter %3$s is expected to be of type %4$s but argument passed was of type %5$s",
+ method.getName(), instance, i, argType, args[i].getClass()));
+ }
+ }
+
+ if (argumentTypes.length == args.length) {
+ method.invoke(instance, args);
+ } else {
+ final Object[] argsToPass = new Object[argumentTypes.length];
+ for (int i = 0; i < argsToPass.length; i++) {
+ argsToPass[i] = args[i];
+ }
+
+ method.invoke(instance, argsToPass);
+ }
+ } finally {
+ if (!isAccessible) {
+ method.setAccessible(false);
}
-
- method.invoke(instance, argsToPass);
- }
- } finally {
- if (!isAccessible) {
- method.setAccessible(false);
}
}
}
+ } catch (final InvocationTargetException ite) {
+ if ( ite.getCause() instanceof RuntimeException ) {
+ throw (RuntimeException) ite.getCause();
+ } else {
+ throw ite;
+ }
}
}
diff --git a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 6e0a5d79fd..1ff63c545b 100644
--- a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller.repository;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
@@ -64,7 +65,6 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
-
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -267,7 +267,7 @@ public class TestStandardProcessSession {
}
@Test
- public void testSpawnsNotEmittedIfFilesDeleted() throws IOException {
+ public void testForksNotEmittedIfFilesDeleted() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
@@ -283,8 +283,9 @@ public class TestStandardProcessSession {
assertEquals(0, provenanceRepo.getEvents(0L, 100000).size());
}
+
@Test
- public void testProvenanceEventsEmittedForSpawnIfNotRemoved() throws IOException {
+ public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
@@ -319,6 +320,79 @@ public class TestStandardProcessSession {
assertEquals(1, provenanceRepo.getEvents(0L, 100000).size());
}
+ @Test
+ public void testUpdateAttributesThenJoin() throws IOException {
+ final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
+ .entryDate(System.currentTimeMillis())
+ .build();
+
+ final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
+ .id(2L)
+ .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
+ .entryDate(System.currentTimeMillis())
+ .build();
+
+ flowFileQueue.put(flowFileRecord1);
+ flowFileQueue.put(flowFileRecord2);
+
+ FlowFile ff1 = session.get();
+ FlowFile ff2 = session.get();
+
+ ff1 = session.putAttribute(ff1, "index", "1");
+ ff2 = session.putAttribute(ff2, "index", "2");
+
+ final List parents = new ArrayList<>(2);
+ parents.add(ff1);
+ parents.add(ff2);
+
+ final FlowFile child = session.create(parents);
+
+ final Relationship rel = new Relationship.Builder().name("A").build();
+
+ session.transfer(ff1, rel);
+ session.transfer(ff2, rel);
+ session.transfer(child, rel);
+
+ session.commit();
+
+ final List events = provenanceRepo.getEvents(0L, 1000);
+
+ // We should have a JOIN and 2 ATTRIBUTE_MODIFIED's
+ assertEquals(3, events.size());
+
+ int joinCount = 0;
+ int ff1UpdateCount = 0;
+ int ff2UpdateCount = 0;
+
+ for ( final ProvenanceEventRecord event : events ) {
+ switch (event.getEventType()) {
+ case JOIN:
+ assertEquals(child.getAttribute("uuid"), event.getFlowFileUuid());
+ joinCount++;
+ break;
+ case ATTRIBUTES_MODIFIED:
+ if ( event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) {
+ ff1UpdateCount++;
+ } else if ( event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) {
+ ff2UpdateCount++;
+ } else {
+ Assert.fail("Got ATTRIBUTE_MODIFIED for wrong FlowFile: " + event.getFlowFileUuid());
+ }
+ break;
+ default:
+ Assert.fail("Unexpected event type: " + event);
+ }
+ }
+
+ assertEquals(1, joinCount);
+ assertEquals(1, ff1UpdateCount);
+ assertEquals(1, ff2UpdateCount);
+
+ assertEquals(1, joinCount);
+ }
+
@Test
public void testForkOneToOneReported() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
@@ -628,34 +702,34 @@ public class TestStandardProcessSession {
@Test
public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .contentClaim(new ContentClaim() {
- @Override
- public int compareTo(ContentClaim arg0) {
- return 0;
- }
-
- @Override
- public String getId() {
- return "0";
- }
-
- @Override
- public String getContainer() {
- return "container";
- }
-
- @Override
- public String getSection() {
- return "section";
- }
-
- @Override
- public boolean isLossTolerant() {
- return true;
- }
- }).build();
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .contentClaim(new ContentClaim() {
+ @Override
+ public int compareTo(ContentClaim arg0) {
+ return 0;
+ }
+
+ @Override
+ public String getId() {
+ return "0";
+ }
+
+ @Override
+ public String getContainer() {
+ return "container";
+ }
+
+ @Override
+ public String getSection() {
+ return "section";
+ }
+
+ @Override
+ public boolean isLossTolerant() {
+ return true;
+ }
+ }).build();
flowFileQueue.put(flowFileRecord);
FlowFile ff1 = session.get();
@@ -668,37 +742,35 @@ public class TestStandardProcessSession {
session.commit();
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .contentClaim(new ContentClaim() {
- @Override
- public int compareTo(ContentClaim arg0) {
- return 0;
- }
-
- @Override
- public String getId() {
- return "0";
- }
-
- @Override
- public String getContainer() {
- return "container";
- }
-
- @Override
- public String getSection() {
- return "section";
- }
-
- @Override
- public boolean isLossTolerant() {
- return true;
- }
- })
- .contentClaimOffset(1000L)
- .size(1L)
- .build();
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .contentClaim(new ContentClaim() {
+ @Override
+ public int compareTo(ContentClaim arg0) {
+ return 0;
+ }
+
+ @Override
+ public String getId() {
+ return "0";
+ }
+
+ @Override
+ public String getContainer() {
+ return "container";
+ }
+
+ @Override
+ public String getSection() {
+ return "section";
+ }
+
+ @Override
+ public boolean isLossTolerant() {
+ return true;
+ }
+ })
+ .contentClaimOffset(1000L).size(1L).build();
flowFileQueue.put(flowFileRecord2);
// attempt to read the data.
@@ -759,6 +831,89 @@ public class TestStandardProcessSession {
}
}
+
+ @Test
+ public void testCreateEmitted() throws IOException {
+ FlowFile newFlowFile = session.create();
+ session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
+ session.commit();
+
+ final List events = provenanceRepo.getEvents(0L, 10000);
+ assertFalse(events.isEmpty());
+ assertEquals(1, events.size());
+
+ final ProvenanceEventRecord event = events.get(0);
+ assertEquals(ProvenanceEventType.CREATE, event.getEventType());
+ }
+
+ @Test
+ public void testContentModifiedNotEmittedForCreate() throws IOException {
+ FlowFile newFlowFile = session.create();
+ newFlowFile = session.write(newFlowFile, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ }
+ });
+ session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
+ session.commit();
+
+ final List events = provenanceRepo.getEvents(0L, 10000);
+ assertFalse(events.isEmpty());
+ assertEquals(1, events.size());
+
+ final ProvenanceEventRecord event = events.get(0);
+ assertEquals(ProvenanceEventType.CREATE, event.getEventType());
+ }
+
+ @Test
+ public void testContentModifiedEmittedAndNotAttributesModified() throws IOException {
+ final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+ .build();
+ this.flowFileQueue.put(flowFile);
+
+ FlowFile existingFlowFile = session.get();
+ existingFlowFile = session.write(existingFlowFile, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ }
+ });
+ existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
+ session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
+ session.commit();
+
+ final List events = provenanceRepo.getEvents(0L, 10000);
+ assertFalse(events.isEmpty());
+ assertEquals(1, events.size());
+
+ final ProvenanceEventRecord event = events.get(0);
+ assertEquals(ProvenanceEventType.CONTENT_MODIFIED, event.getEventType());
+ }
+
+ @Test
+ public void testAttributesModifiedEmitted() throws IOException {
+ final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+ .build();
+ this.flowFileQueue.put(flowFile);
+
+ FlowFile existingFlowFile = session.get();
+ existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
+ session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
+ session.commit();
+
+ final List events = provenanceRepo.getEvents(0L, 10000);
+ assertFalse(events.isEmpty());
+ assertEquals(1, events.size());
+
+ final ProvenanceEventRecord event = events.get(0);
+ assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType());
+ }
+
+
+
private static class MockFlowFileRepository implements FlowFileRepository {
private final AtomicLong idGenerator = new AtomicLong(0L);
@@ -822,6 +977,10 @@ public class TestStandardProcessSession {
private ConcurrentMap claimantCounts = new ConcurrentHashMap<>();
+ @Override
+ public void shutdown() {
+ }
+
public Set getExistingClaims() {
final Set claims = new HashSet<>();
diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat
index ed9c5163ff..d00f31cf55 100644
--- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat
+++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi-status.bat
@@ -1,3 +1,5 @@
+@echo off
+
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
@@ -15,18 +17,16 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
-@echo off
-
rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
-SET LIB_DIR=%~dp0..\lib
+SET LIB_DIR=%~dp0..\lib\bootstrap
SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
-SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
+SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=status
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%
diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
index ad90d5b35b..60afa48992 100644
--- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
+++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/nifi.sh
@@ -166,15 +166,13 @@ run() {
fi
echo
- echo "Classpath: $CLASSPATH"
- echo
echo "Java home: $JAVA_HOME"
echo "NiFi home: $NIFI_HOME"
echo
echo "Bootstrap Config File: $BOOTSTRAP_CONF"
echo
- exec "$JAVA" -cp "$NIFI_HOME"/lib/nifi-bootstrap*.jar -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $1
+ exec "$JAVA" -cp "$NIFI_HOME"/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.file="$BOOTSTRAP_CONF" org.apache.nifi.bootstrap.RunNiFi $1
}
main() {
diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/run-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/run-nifi.bat
index fdff815a0a..5bab3886b1 100644
--- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/run-nifi.bat
+++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/run-nifi.bat
@@ -1,3 +1,4 @@
+@echo off
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
@@ -15,18 +16,17 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
-@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
-SET LIB_DIR=%~dp0..\lib
+SET LIB_DIR=%~dp0..\lib\bootstrap
SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
-SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
+SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=run
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%
diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat
index ba4739a998..882b719c29 100644
--- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat
+++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/start-nifi.bat
@@ -1,3 +1,5 @@
+@echo off
+
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
@@ -15,18 +17,17 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
-@echo off
rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
-SET LIB_DIR=%~dp0..\lib
+SET LIB_DIR=%~dp0..\lib\bootstrap
SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
-SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
+SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=start
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%
diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat
index 828be6ec85..40c2d57b45 100644
--- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat
+++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/bin/stop-nifi.bat
@@ -1,3 +1,5 @@
+@echo off
+
rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
@@ -15,18 +17,16 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem
-@echo off
-
rem Use JAVA_HOME if it's set; otherwise, just use java
IF "%JAVA_HOME%"=="" (SET JAVA_EXE=java) ELSE (SET JAVA_EXE=%JAVA_HOME%\bin\java.exe)
-SET LIB_DIR=%~dp0..\lib
+SET LIB_DIR=%~dp0..\lib\bootstrap
SET CONF_DIR=%~dp0..\conf
SET BOOTSTRAP_CONF_FILE=%CONF_DIR%\bootstrap.conf
SET JAVA_ARGS=-Dorg.apache.nifi.bootstrap.config.file=%BOOTSTRAP_CONF_FILE%
-SET JAVA_PARAMS=-cp %LIB_DIR%\nifi-bootstrap*.jar -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
+SET JAVA_PARAMS=-cp %LIB_DIR%\* -Xms12m -Xmx24m %JAVA_ARGS% org.apache.nifi.bootstrap.RunNiFi
SET BOOTSTRAP_ACTION=stop
cmd.exe /C "%JAVA_EXE%" %JAVA_PARAMS% %BOOTSTRAP_ACTION%
diff --git a/nar-bundles/framework-bundle/framework/web/nifi-web-api/nb-configuration.xml b/nar-bundles/framework-bundle/framework/web/nifi-web-api/nb-configuration.xml
deleted file mode 100644
index d290d4597f..0000000000
--- a/nar-bundles/framework-bundle/framework/web/nifi-web-api/nb-configuration.xml
+++ /dev/null
@@ -1,18 +0,0 @@
-
-
-
-
-
- ide
-
-
diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java
index 056f4fc350..2f3f34bf11 100644
--- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java
+++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateXPath.java
@@ -36,6 +36,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.xml.namespace.QName;
+import javax.xml.transform.ErrorListener;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Source;
import javax.xml.transform.Transformer;
@@ -50,6 +51,7 @@ import javax.xml.xpath.XPathFactoryConfigurationException;
import net.sf.saxon.lib.NamespaceConstant;
import net.sf.saxon.xpath.XPathEvaluator;
+
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -73,7 +75,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.util.ObjectHolder;
-
import org.xml.sax.InputSource;
@EventDriven
@@ -356,8 +357,7 @@ public class EvaluateXPath extends AbstractProcessor {
session.getProvenanceReporter().modifyContent(flowFile);
}
} else {
- logger.error("Failed to write XPath result for {} due to {}; routing original to 'failure'", new Object[]{
- flowFile, error.get()});
+ logger.error("Failed to write XPath result for {} due to {}; routing original to 'failure'", new Object[]{flowFile, error.get()});
session.transfer(flowFile, REL_FAILURE);
}
}
@@ -377,7 +377,32 @@ public class EvaluateXPath extends AbstractProcessor {
props.setProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
transformer.setOutputProperties(props);
+ final ProcessorLog logger = getLogger();
+
+ final ObjectHolder error = new ObjectHolder<>(null);
+ transformer.setErrorListener(new ErrorListener() {
+ @Override
+ public void warning(final TransformerException exception) throws TransformerException {
+ logger.warn("Encountered warning from XPath Engine: ", new Object[] {exception.toString(), exception});
+ }
+
+ @Override
+ public void error(final TransformerException exception) throws TransformerException {
+ logger.error("Encountered error from XPath Engine: ", new Object[] {exception.toString(), exception});
+ error.set(exception);
+ }
+
+ @Override
+ public void fatalError(final TransformerException exception) throws TransformerException {
+ logger.error("Encountered warning from XPath Engine: ", new Object[] {exception.toString(), exception});
+ error.set(exception);
+ }
+ });
+
transformer.transform(sourceNode, new StreamResult(out));
+ if ( error.get() != null ) {
+ throw error.get();
+ }
}
private static class XPathValidator implements Validator {
diff --git a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanContent.java b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanContent.java
index 499fb3e15e..9079f82101 100644
--- a/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanContent.java
+++ b/nar-bundles/standard-bundle/standard-processors/src/test/java/org/apache/nifi/processors/standard/TestScanContent.java
@@ -55,7 +55,7 @@ public class TestScanContent {
Files.write(dictionaryPath, termBytes, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
final TestRunner runner = TestRunners.newTestRunner(new ScanContent());
- runner.setThreadCount(3);
+ runner.setThreadCount(1);
runner.setProperty(ScanContent.DICTIONARY, dictionaryPath.toString());
runner.setProperty(ScanContent.DICTIONARY_ENCODING, ScanContent.BINARY_ENCODING);
diff --git a/nar-bundles/volatile-provenance-repository-bundle/volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nar-bundles/volatile-provenance-repository-bundle/volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
index 9de26613aa..f4f9d127c0 100644
--- a/nar-bundles/volatile-provenance-repository-bundle/volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
+++ b/nar-bundles/volatile-provenance-repository-bundle/volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
@@ -169,6 +169,8 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
@Override
public void close() throws IOException {
+ queryExecService.shutdownNow();
+ scheduledExecService.shutdown();
}
@Override
diff --git a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
index c95d449b70..ba0f7dcc8c 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java
@@ -425,7 +425,7 @@ public final class PropertyDescriptor implements Comparable
throw new IllegalStateException("Must specify a name");
}
if (!isValueAllowed(defaultValue)) {
- throw new IllegalStateException("Default value is not in the set of allowable values");
+ throw new IllegalStateException("Default value ["+ defaultValue +"] is not in the set of allowable values");
}
return new PropertyDescriptor(this);
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
index 7012cb3a48..d66b8a6499 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java
@@ -45,6 +45,12 @@ public interface ContentRepository {
*/
void initialize(ContentClaimManager claimManager) throws IOException;
+ /**
+ * Shuts down the Content Repository, freeing any resources that may be held.
+ * This is called when an administrator shuts down NiFi.
+ */
+ void shutdown();
+
/**
* Returns the names of all Containers that exist for this Content
* Repository
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 739cb2be92..c6daab8303 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.events.EventReporter;
/**
* Defines a mechanism by which FlowFiles can be move into external storage or
@@ -34,8 +35,10 @@ public interface FlowFileSwapManager {
* can be obtained and restored
* @param claimManager the ContentClaimManager to use for interacting with
* Content Claims
+ * @param reporter the EventReporter that can be used for notifying users of
+ * important events
*/
- void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager);
+ void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager, EventReporter reporter);
/**
* Shuts down the manager
diff --git a/nifi-api/src/test/java/org/apache/nifi/components/TestPropertyDescriptor.java b/nifi-api/src/test/java/org/apache/nifi/components/TestPropertyDescriptor.java
new file mode 100644
index 0000000000..82b8111089
--- /dev/null
+++ b/nifi-api/src/test/java/org/apache/nifi/components/TestPropertyDescriptor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.components;
+
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+
+/**
+ * Regression test for issue NIFI-49, to ensure that if a Processor's Property's Default Value is not allowed,
+ * the Exception thrown should indicate what the default value is
+ */
+public class TestPropertyDescriptor {
+
+ private static Builder invalidDescriptorBuilder;
+ private static Builder validDescriptorBuilder;
+ private static String DEFAULT_VALUE = "Default Value";
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @BeforeClass
+ public static void setUp() {
+ validDescriptorBuilder = new PropertyDescriptor.Builder().name("").allowableValues("Allowable Value", "Another Allowable Value").defaultValue("Allowable Value");
+ invalidDescriptorBuilder = new PropertyDescriptor.Builder().name("").allowableValues("Allowable Value", "Another Allowable Value").defaultValue(DEFAULT_VALUE);
+ }
+
+ @Test
+ public void testExceptionThrownByDescriptorWithInvalidDefaultValue() {
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("["+ DEFAULT_VALUE +"]");
+
+ invalidDescriptorBuilder.build();
+ }
+
+ @Test
+ public void testNoExceptionThrownByPropertyDescriptorWithValidDefaultValue() {
+ assertNotNull(validDescriptorBuilder.build());
+ }
+}