diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java index 1d4ea69460..318b1a0737 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java @@ -104,8 +104,8 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { } @Override - public boolean isComplete() { - return getMergedResponse() != null; + public synchronized boolean isComplete() { + return failure != null || mergedResponse != null || requestsCompleted.get() >= responseMap.size(); } @Override @@ -125,6 +125,10 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { public synchronized NodeResponse getMergedResponse(final boolean triggerCallback) { if (failure != null) { + if (completedResultFetchedCallback != null) { + completedResultFetchedCallback.run(); + } + throw failure; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index 3c782a7ac1..88a883660c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -77,6 +77,36 @@ public class TestThreadPoolRequestReplicator { System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties"); } + @Test + public void testFailedRequestsAreCleanedUp() { + withReplicator(replicator -> { + final Set nodeIds = new HashSet<>(); + nodeIds.add(new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false)); + final URI uri = new URI("http://localhost:8080/processors/1"); + final Entity entity = new ProcessorEntity(); + + // set the user + final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS)); + SecurityContextHolder.getContext().setAuthentication(authentication); + + final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true); + + // We should get back the same response object + assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); + + assertEquals(HttpMethod.GET, response.getMethod()); + assertEquals(nodeIds, response.getNodesInvolved()); + + assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier())); + + final NodeResponse nodeResponse = response.awaitMergedResponse(3, TimeUnit.SECONDS); + assertEquals(8000, nodeResponse.getNodeId().getApiPort()); + assertEquals(ClientResponse.Status.FORBIDDEN.getStatusCode(), nodeResponse.getStatus()); + + assertNull(replicator.getClusterResponse(response.getRequestIdentifier())); + }, Status.FORBIDDEN, 0L, null); + } + /** * If we replicate a request, whenever we obtain the merged response from * the AsyncClusterResponse object, the response should no longer be diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-connection.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-connection.js index 759860b3b4..d7e2e3acc2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-connection.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-connection.js @@ -50,8 +50,8 @@ }(this, function ($, d3, nfCommon, nfDialog, nfErrorHandler, nfClient, nfCanvasUtils) { 'use strict'; - var nfCanvas; var nfSelectable; + var nfConnectionConfiguration; var nfContextMenu; // the dimensions for the connection label @@ -1546,9 +1546,10 @@ * @param nfSelectableRef The nfSelectable module. * @param nfContextMenuRef The nfContextMenu module. */ - init: function (nfSelectableRef, nfContextMenuRef) { + init: function (nfSelectableRef, nfContextMenuRef, nfConnectionConfigurationRef) { nfSelectable = nfSelectableRef; nfContextMenu = nfContextMenuRef; + nfConnectionConfiguration = nfConnectionConfigurationRef; connectionMap = d3.map(); removedCache = d3.map(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-graph.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-graph.js index e5c8e22b22..394bf42a2c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-graph.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-graph.js @@ -30,13 +30,14 @@ 'nf.ProcessGroup', 'nf.Processor', 'nf.Connection', + 'nf.ConnectionConfiguration', 'nf.CanvasUtils', 'nf.Connectable', 'nf.Draggable', 'nf.Selectable', 'nf.ContextMenu'], - function ($, d3, nfCommon, nfNgBridge, nfLabel, nfFunnel, nfPort, nfRemoteProcessGroup, nfProcessGroup, nfProcessor, nfConnection, nfCanvasUtils, nfConnectable, nfDraggable, nfSelectable, nfContextMenu) { - return (nf.Graph = factory($, d3, nfCommon, nfNgBridge, nfLabel, nfFunnel, nfPort, nfRemoteProcessGroup, nfProcessGroup, nfProcessor, nfConnection, nfCanvasUtils, nfConnectable, nfDraggable, nfSelectable, nfContextMenu)); + function ($, d3, nfCommon, nfNgBridge, nfLabel, nfFunnel, nfPort, nfRemoteProcessGroup, nfProcessGroup, nfProcessor, nfConnection, nfConnectionConfiguration, nfCanvasUtils, nfConnectable, nfDraggable, nfSelectable, nfContextMenu) { + return (nf.Graph = factory($, d3, nfCommon, nfNgBridge, nfLabel, nfFunnel, nfPort, nfRemoteProcessGroup, nfProcessGroup, nfProcessor, nfConnection, nfConnectionConfiguration, nfCanvasUtils, nfConnectable, nfDraggable, nfSelectable, nfContextMenu)); }); } else if (typeof exports === 'object' && typeof module === 'object') { module.exports = (nf.Graph = @@ -51,6 +52,7 @@ require('nf.ProcessGroup'), require('nf.Processor'), require('nf.Connection'), + require('nf.ConnectionConfiguration'), require('nf.CanvasUtils'), require('nf.Connectable'), require('nf.Draggable'), @@ -68,13 +70,14 @@ root.nf.ProcessGroup, root.nf.Processor, root.nf.Connection, + root.nf.ConnectionConfiguration, root.nf.CanvasUtils, root.nf.Connectable, root.nf.Draggable, root.nf.Selectable, root.nf.ContextMenu); } -}(this, function ($, d3, nfCommon, nfNgBridge, nfLabel, nfFunnel, nfPort, nfRemoteProcessGroup, nfProcessGroup, nfProcessor, nfConnection, nfCanvasUtils, nfConnectable, nfDraggable, nfSelectable, nfContextMenu) { +}(this, function ($, d3, nfCommon, nfNgBridge, nfLabel, nfFunnel, nfPort, nfRemoteProcessGroup, nfProcessGroup, nfProcessor, nfConnection, nfConnectionConfiguration, nfCanvasUtils, nfConnectable, nfDraggable, nfSelectable, nfContextMenu) { 'use strict'; var combinePorts = function (contents) { @@ -201,7 +204,7 @@ nfRemoteProcessGroup.init(nfConnectable, nfDraggable, nfSelectable, nfContextMenu); nfProcessGroup.init(nfConnectable, nfDraggable, nfSelectable, nfContextMenu); nfProcessor.init(nfConnectable, nfDraggable, nfSelectable, nfContextMenu); - nfConnection.init(nfSelectable, nfContextMenu); + nfConnection.init(nfSelectable, nfContextMenu, nfConnectionConfiguration); // load the graph return nfProcessGroup.enterGroup(nfCanvasUtils.getGroupId()); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java index 229a96d5d9..89750282de 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java @@ -141,7 +141,12 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository { eventStore.initialize(); eventIndex.initialize(eventStore); - eventStore.reindexLatestEvents(eventIndex); + try { + eventStore.reindexLatestEvents(eventIndex); + } catch (final Exception e) { + logger.error("Failed to re-index some of the Provenance Events. It is possible that some of the latest " + + "events will not be available from the Provenance Repository when a query is issued.", e); + } } @Override diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java index b0b01e592b..4d6c11d946 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java @@ -318,7 +318,9 @@ public class SimpleIndexManager implements IndexManager { // This method exists solely for unit testing purposes. protected void close(final IndexWriterCount count) throws IOException { + logger.debug("Closing Index Writer for {}...", count.getWriter().getDirectory()); count.close(); + logger.debug("Finished closing Index Writer for {}...", count.getWriter().getDirectory()); } protected int getWriterCount() { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java index 1a6c3c55dc..93c066963e 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java @@ -283,7 +283,20 @@ public abstract class CompressableRecordReader implements RecordReader { } if (isData()) { - return nextRecord(dis, serializationVersion); + while (true) { + try { + return nextRecord(dis, serializationVersion); + } catch (final IOException ioe) { + throw ioe; + } catch (final Exception e) { + // This would only happen if a bug were to exist such that an 'invalid' event were written + // out. For example an Event that has no FlowFile UUID. While there is in fact an underlying + // cause that would need to be sorted out in this case, the Provenance Repository should be + // resilient enough to handle this. Otherwise, we end up throwing an Exception, which may + // prevent iterating over additional events in the repository. + logger.error("Failed to read Provenance Event from " + filename + "; will skip this event and continue reading subsequent events", e); + } + } } else { return null; } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java index a25043a50f..fde76f5063 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java @@ -128,8 +128,8 @@ public class WriteAheadStorePartition implements EventStorePartition { maxEventId = eventId; break; } - } catch (final IOException ioe) { - logger.warn("Could not read file {}; if this file contains Provenance Events, new events may be created with the same event identifiers", file, ioe); + } catch (final Exception e) { + logger.warn("Could not read file {}; if this file contains Provenance Events, new events may be created with the same event identifiers", file, e); } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 48d8e09aff..e3f729b996 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -1905,13 +1905,6 @@ public class TestPersistentProvenanceRepository { testRepo.recoverJournalFiles(); - assertEquals("mergeJournals() should report a skipped journal", 1, reportedEvents.size()); - assertEquals("mergeJournals() should report a skipped journal", - "Failed to read Provenance Event Record from Journal due to java.lang.IllegalArgumentException: " - + "No enum constant org.apache.nifi.provenance.ProvenanceEventType.BADTYPE; it's possible " - + "that the record wasn't completely written to the file. This journal will be skipped.", - reportedEvents.get(reportedEvents.size() - 1).getMessage()); - final File storageDir = config.getStorageDirectories().values().iterator().next(); assertTrue(checkJournalRecords(storageDir, false) < 10000); } @@ -1937,7 +1930,7 @@ public class TestPersistentProvenanceRepository { final ProvenanceEventRecord record = builder.build(); final ExecutorService exec = Executors.newFixedThreadPool(10); - final List futures = new ArrayList<>(); + final List> futures = new ArrayList<>(); for (int i = 0; i < 10000; i++) { futures.add(exec.submit(new Runnable() { @Override @@ -1948,7 +1941,7 @@ public class TestPersistentProvenanceRepository { } // corrupt the first record of the first journal file - for (Future future : futures) { + for (Future future : futures) { while (!future.isDone()) { Thread.sleep(10); } @@ -1958,14 +1951,6 @@ public class TestPersistentProvenanceRepository { testRepo.recoverJournalFiles(); - assertEquals("mergeJournals should report a skipped journal", 1, reportedEvents.size()); - assertEquals("mergeJournals should report a skipped journal", - "Failed to read Provenance Event Record from Journal due to java.lang.IllegalArgumentException: " - + "No enum constant org.apache.nifi.provenance.ProvenanceEventType.BADTYPE; it's possible " - + "that the record wasn't completely written to the file. The remainder of this journal will " - + "be skipped.", - reportedEvents.get(reportedEvents.size() - 1).getMessage()); - final File storageDir = config.getStorageDirectories().values().iterator().next(); assertTrue(checkJournalRecords(storageDir, false) < 10000); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java index 5cee460517..1c69ee9e80 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ScanAttribute.java @@ -97,7 +97,8 @@ public class ScanAttribute extends AbstractProcessor { public static final PropertyDescriptor DICTIONARY_FILE = new PropertyDescriptor.Builder() .name("dictionary-file") .displayName("Dictionary File") - .description("A new-line-delimited text file that includes the terms that should trigger a match. Empty lines are ignored.") + .description("A new-line-delimited text file that includes the terms that should trigger a match. Empty lines are ignored. The contents of " + + "the text file are loaded into memory when the processor is scheduled and reloaded when the contents are modified.") .required(true) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .build();