mirror of https://github.com/apache/nifi.git
Merge branch 'master' into master
This commit is contained in:
commit
74cbfc4b69
|
@ -104,8 +104,8 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isComplete() {
|
public synchronized boolean isComplete() {
|
||||||
return getMergedResponse() != null;
|
return failure != null || mergedResponse != null || requestsCompleted.get() >= responseMap.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -125,6 +125,10 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
|
||||||
|
|
||||||
public synchronized NodeResponse getMergedResponse(final boolean triggerCallback) {
|
public synchronized NodeResponse getMergedResponse(final boolean triggerCallback) {
|
||||||
if (failure != null) {
|
if (failure != null) {
|
||||||
|
if (completedResultFetchedCallback != null) {
|
||||||
|
completedResultFetchedCallback.run();
|
||||||
|
}
|
||||||
|
|
||||||
throw failure;
|
throw failure;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,36 @@ public class TestThreadPoolRequestReplicator {
|
||||||
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties");
|
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailedRequestsAreCleanedUp() {
|
||||||
|
withReplicator(replicator -> {
|
||||||
|
final Set<NodeIdentifier> nodeIds = new HashSet<>();
|
||||||
|
nodeIds.add(new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false));
|
||||||
|
final URI uri = new URI("http://localhost:8080/processors/1");
|
||||||
|
final Entity entity = new ProcessorEntity();
|
||||||
|
|
||||||
|
// set the user
|
||||||
|
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
|
||||||
|
SecurityContextHolder.getContext().setAuthentication(authentication);
|
||||||
|
|
||||||
|
final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true);
|
||||||
|
|
||||||
|
// We should get back the same response object
|
||||||
|
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
|
||||||
|
|
||||||
|
assertEquals(HttpMethod.GET, response.getMethod());
|
||||||
|
assertEquals(nodeIds, response.getNodesInvolved());
|
||||||
|
|
||||||
|
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
|
||||||
|
|
||||||
|
final NodeResponse nodeResponse = response.awaitMergedResponse(3, TimeUnit.SECONDS);
|
||||||
|
assertEquals(8000, nodeResponse.getNodeId().getApiPort());
|
||||||
|
assertEquals(ClientResponse.Status.FORBIDDEN.getStatusCode(), nodeResponse.getStatus());
|
||||||
|
|
||||||
|
assertNull(replicator.getClusterResponse(response.getRequestIdentifier()));
|
||||||
|
}, Status.FORBIDDEN, 0L, null);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If we replicate a request, whenever we obtain the merged response from
|
* If we replicate a request, whenever we obtain the merged response from
|
||||||
* the AsyncClusterResponse object, the response should no longer be
|
* the AsyncClusterResponse object, the response should no longer be
|
||||||
|
|
|
@ -50,8 +50,8 @@
|
||||||
}(this, function ($, d3, nfCommon, nfDialog, nfErrorHandler, nfClient, nfCanvasUtils) {
|
}(this, function ($, d3, nfCommon, nfDialog, nfErrorHandler, nfClient, nfCanvasUtils) {
|
||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
var nfCanvas;
|
|
||||||
var nfSelectable;
|
var nfSelectable;
|
||||||
|
var nfConnectionConfiguration;
|
||||||
var nfContextMenu;
|
var nfContextMenu;
|
||||||
|
|
||||||
// the dimensions for the connection label
|
// the dimensions for the connection label
|
||||||
|
@ -1546,9 +1546,10 @@
|
||||||
* @param nfSelectableRef The nfSelectable module.
|
* @param nfSelectableRef The nfSelectable module.
|
||||||
* @param nfContextMenuRef The nfContextMenu module.
|
* @param nfContextMenuRef The nfContextMenu module.
|
||||||
*/
|
*/
|
||||||
init: function (nfSelectableRef, nfContextMenuRef) {
|
init: function (nfSelectableRef, nfContextMenuRef, nfConnectionConfigurationRef) {
|
||||||
nfSelectable = nfSelectableRef;
|
nfSelectable = nfSelectableRef;
|
||||||
nfContextMenu = nfContextMenuRef;
|
nfContextMenu = nfContextMenuRef;
|
||||||
|
nfConnectionConfiguration = nfConnectionConfigurationRef;
|
||||||
|
|
||||||
connectionMap = d3.map();
|
connectionMap = d3.map();
|
||||||
removedCache = d3.map();
|
removedCache = d3.map();
|
||||||
|
|
|
@ -30,13 +30,14 @@
|
||||||
'nf.ProcessGroup',
|
'nf.ProcessGroup',
|
||||||
'nf.Processor',
|
'nf.Processor',
|
||||||
'nf.Connection',
|
'nf.Connection',
|
||||||
|
'nf.ConnectionConfiguration',
|
||||||
'nf.CanvasUtils',
|
'nf.CanvasUtils',
|
||||||
'nf.Connectable',
|
'nf.Connectable',
|
||||||
'nf.Draggable',
|
'nf.Draggable',
|
||||||
'nf.Selectable',
|
'nf.Selectable',
|
||||||
'nf.ContextMenu'],
|
'nf.ContextMenu'],
|
||||||
function ($, d3, nfCommon, nfNgBridge, nfLabel, nfFunnel, nfPort, nfRemoteProcessGroup, nfProcessGroup, nfProcessor, nfConnection, nfCanvasUtils, nfConnectable, nfDraggable, nfSelectable, nfContextMenu) {
|
function ($, d3, nfCommon, nfNgBridge, nfLabel, nfFunnel, nfPort, nfRemoteProcessGroup, nfProcessGroup, nfProcessor, nfConnection, nfConnectionConfiguration, nfCanvasUtils, nfConnectable, nfDraggable, nfSelectable, nfContextMenu) {
|
||||||
return (nf.Graph = factory($, d3, nfCommon, nfNgBridge, nfLabel, nfFunnel, nfPort, nfRemoteProcessGroup, nfProcessGroup, nfProcessor, nfConnection, nfCanvasUtils, nfConnectable, nfDraggable, nfSelectable, nfContextMenu));
|
return (nf.Graph = factory($, d3, nfCommon, nfNgBridge, nfLabel, nfFunnel, nfPort, nfRemoteProcessGroup, nfProcessGroup, nfProcessor, nfConnection, nfConnectionConfiguration, nfCanvasUtils, nfConnectable, nfDraggable, nfSelectable, nfContextMenu));
|
||||||
});
|
});
|
||||||
} else if (typeof exports === 'object' && typeof module === 'object') {
|
} else if (typeof exports === 'object' && typeof module === 'object') {
|
||||||
module.exports = (nf.Graph =
|
module.exports = (nf.Graph =
|
||||||
|
@ -51,6 +52,7 @@
|
||||||
require('nf.ProcessGroup'),
|
require('nf.ProcessGroup'),
|
||||||
require('nf.Processor'),
|
require('nf.Processor'),
|
||||||
require('nf.Connection'),
|
require('nf.Connection'),
|
||||||
|
require('nf.ConnectionConfiguration'),
|
||||||
require('nf.CanvasUtils'),
|
require('nf.CanvasUtils'),
|
||||||
require('nf.Connectable'),
|
require('nf.Connectable'),
|
||||||
require('nf.Draggable'),
|
require('nf.Draggable'),
|
||||||
|
@ -68,13 +70,14 @@
|
||||||
root.nf.ProcessGroup,
|
root.nf.ProcessGroup,
|
||||||
root.nf.Processor,
|
root.nf.Processor,
|
||||||
root.nf.Connection,
|
root.nf.Connection,
|
||||||
|
root.nf.ConnectionConfiguration,
|
||||||
root.nf.CanvasUtils,
|
root.nf.CanvasUtils,
|
||||||
root.nf.Connectable,
|
root.nf.Connectable,
|
||||||
root.nf.Draggable,
|
root.nf.Draggable,
|
||||||
root.nf.Selectable,
|
root.nf.Selectable,
|
||||||
root.nf.ContextMenu);
|
root.nf.ContextMenu);
|
||||||
}
|
}
|
||||||
}(this, function ($, d3, nfCommon, nfNgBridge, nfLabel, nfFunnel, nfPort, nfRemoteProcessGroup, nfProcessGroup, nfProcessor, nfConnection, nfCanvasUtils, nfConnectable, nfDraggable, nfSelectable, nfContextMenu) {
|
}(this, function ($, d3, nfCommon, nfNgBridge, nfLabel, nfFunnel, nfPort, nfRemoteProcessGroup, nfProcessGroup, nfProcessor, nfConnection, nfConnectionConfiguration, nfCanvasUtils, nfConnectable, nfDraggable, nfSelectable, nfContextMenu) {
|
||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
var combinePorts = function (contents) {
|
var combinePorts = function (contents) {
|
||||||
|
@ -201,7 +204,7 @@
|
||||||
nfRemoteProcessGroup.init(nfConnectable, nfDraggable, nfSelectable, nfContextMenu);
|
nfRemoteProcessGroup.init(nfConnectable, nfDraggable, nfSelectable, nfContextMenu);
|
||||||
nfProcessGroup.init(nfConnectable, nfDraggable, nfSelectable, nfContextMenu);
|
nfProcessGroup.init(nfConnectable, nfDraggable, nfSelectable, nfContextMenu);
|
||||||
nfProcessor.init(nfConnectable, nfDraggable, nfSelectable, nfContextMenu);
|
nfProcessor.init(nfConnectable, nfDraggable, nfSelectable, nfContextMenu);
|
||||||
nfConnection.init(nfSelectable, nfContextMenu);
|
nfConnection.init(nfSelectable, nfContextMenu, nfConnectionConfiguration);
|
||||||
|
|
||||||
// load the graph
|
// load the graph
|
||||||
return nfProcessGroup.enterGroup(nfCanvasUtils.getGroupId());
|
return nfProcessGroup.enterGroup(nfCanvasUtils.getGroupId());
|
||||||
|
|
|
@ -141,7 +141,12 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
|
||||||
eventStore.initialize();
|
eventStore.initialize();
|
||||||
eventIndex.initialize(eventStore);
|
eventIndex.initialize(eventStore);
|
||||||
|
|
||||||
eventStore.reindexLatestEvents(eventIndex);
|
try {
|
||||||
|
eventStore.reindexLatestEvents(eventIndex);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
logger.error("Failed to re-index some of the Provenance Events. It is possible that some of the latest "
|
||||||
|
+ "events will not be available from the Provenance Repository when a query is issued.", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -318,7 +318,9 @@ public class SimpleIndexManager implements IndexManager {
|
||||||
|
|
||||||
// This method exists solely for unit testing purposes.
|
// This method exists solely for unit testing purposes.
|
||||||
protected void close(final IndexWriterCount count) throws IOException {
|
protected void close(final IndexWriterCount count) throws IOException {
|
||||||
|
logger.debug("Closing Index Writer for {}...", count.getWriter().getDirectory());
|
||||||
count.close();
|
count.close();
|
||||||
|
logger.debug("Finished closing Index Writer for {}...", count.getWriter().getDirectory());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int getWriterCount() {
|
protected int getWriterCount() {
|
||||||
|
|
|
@ -283,7 +283,20 @@ public abstract class CompressableRecordReader implements RecordReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isData()) {
|
if (isData()) {
|
||||||
return nextRecord(dis, serializationVersion);
|
while (true) {
|
||||||
|
try {
|
||||||
|
return nextRecord(dis, serializationVersion);
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
throw ioe;
|
||||||
|
} catch (final Exception e) {
|
||||||
|
// This would only happen if a bug were to exist such that an 'invalid' event were written
|
||||||
|
// out. For example an Event that has no FlowFile UUID. While there is in fact an underlying
|
||||||
|
// cause that would need to be sorted out in this case, the Provenance Repository should be
|
||||||
|
// resilient enough to handle this. Otherwise, we end up throwing an Exception, which may
|
||||||
|
// prevent iterating over additional events in the repository.
|
||||||
|
logger.error("Failed to read Provenance Event from " + filename + "; will skip this event and continue reading subsequent events", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,8 +128,8 @@ public class WriteAheadStorePartition implements EventStorePartition {
|
||||||
maxEventId = eventId;
|
maxEventId = eventId;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} catch (final IOException ioe) {
|
} catch (final Exception e) {
|
||||||
logger.warn("Could not read file {}; if this file contains Provenance Events, new events may be created with the same event identifiers", file, ioe);
|
logger.warn("Could not read file {}; if this file contains Provenance Events, new events may be created with the same event identifiers", file, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1905,13 +1905,6 @@ public class TestPersistentProvenanceRepository {
|
||||||
|
|
||||||
testRepo.recoverJournalFiles();
|
testRepo.recoverJournalFiles();
|
||||||
|
|
||||||
assertEquals("mergeJournals() should report a skipped journal", 1, reportedEvents.size());
|
|
||||||
assertEquals("mergeJournals() should report a skipped journal",
|
|
||||||
"Failed to read Provenance Event Record from Journal due to java.lang.IllegalArgumentException: "
|
|
||||||
+ "No enum constant org.apache.nifi.provenance.ProvenanceEventType.BADTYPE; it's possible "
|
|
||||||
+ "that the record wasn't completely written to the file. This journal will be skipped.",
|
|
||||||
reportedEvents.get(reportedEvents.size() - 1).getMessage());
|
|
||||||
|
|
||||||
final File storageDir = config.getStorageDirectories().values().iterator().next();
|
final File storageDir = config.getStorageDirectories().values().iterator().next();
|
||||||
assertTrue(checkJournalRecords(storageDir, false) < 10000);
|
assertTrue(checkJournalRecords(storageDir, false) < 10000);
|
||||||
}
|
}
|
||||||
|
@ -1937,7 +1930,7 @@ public class TestPersistentProvenanceRepository {
|
||||||
final ProvenanceEventRecord record = builder.build();
|
final ProvenanceEventRecord record = builder.build();
|
||||||
|
|
||||||
final ExecutorService exec = Executors.newFixedThreadPool(10);
|
final ExecutorService exec = Executors.newFixedThreadPool(10);
|
||||||
final List<Future> futures = new ArrayList<>();
|
final List<Future<?>> futures = new ArrayList<>();
|
||||||
for (int i = 0; i < 10000; i++) {
|
for (int i = 0; i < 10000; i++) {
|
||||||
futures.add(exec.submit(new Runnable() {
|
futures.add(exec.submit(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -1948,7 +1941,7 @@ public class TestPersistentProvenanceRepository {
|
||||||
}
|
}
|
||||||
|
|
||||||
// corrupt the first record of the first journal file
|
// corrupt the first record of the first journal file
|
||||||
for (Future future : futures) {
|
for (Future<?> future : futures) {
|
||||||
while (!future.isDone()) {
|
while (!future.isDone()) {
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
}
|
}
|
||||||
|
@ -1958,14 +1951,6 @@ public class TestPersistentProvenanceRepository {
|
||||||
|
|
||||||
testRepo.recoverJournalFiles();
|
testRepo.recoverJournalFiles();
|
||||||
|
|
||||||
assertEquals("mergeJournals should report a skipped journal", 1, reportedEvents.size());
|
|
||||||
assertEquals("mergeJournals should report a skipped journal",
|
|
||||||
"Failed to read Provenance Event Record from Journal due to java.lang.IllegalArgumentException: "
|
|
||||||
+ "No enum constant org.apache.nifi.provenance.ProvenanceEventType.BADTYPE; it's possible "
|
|
||||||
+ "that the record wasn't completely written to the file. The remainder of this journal will "
|
|
||||||
+ "be skipped.",
|
|
||||||
reportedEvents.get(reportedEvents.size() - 1).getMessage());
|
|
||||||
|
|
||||||
final File storageDir = config.getStorageDirectories().values().iterator().next();
|
final File storageDir = config.getStorageDirectories().values().iterator().next();
|
||||||
assertTrue(checkJournalRecords(storageDir, false) < 10000);
|
assertTrue(checkJournalRecords(storageDir, false) < 10000);
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,7 +97,8 @@ public class ScanAttribute extends AbstractProcessor {
|
||||||
public static final PropertyDescriptor DICTIONARY_FILE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor DICTIONARY_FILE = new PropertyDescriptor.Builder()
|
||||||
.name("dictionary-file")
|
.name("dictionary-file")
|
||||||
.displayName("Dictionary File")
|
.displayName("Dictionary File")
|
||||||
.description("A new-line-delimited text file that includes the terms that should trigger a match. Empty lines are ignored.")
|
.description("A new-line-delimited text file that includes the terms that should trigger a match. Empty lines are ignored. The contents of "
|
||||||
|
+ "the text file are loaded into memory when the processor is scheduled and reloaded when the contents are modified.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
Loading…
Reference in New Issue