mirror of https://github.com/apache/nifi.git
NIFI-35: Provide an EventReporter to the FlowFileSwapManager and provide events for any errors
This commit is contained in:
parent
9e60aa0f25
commit
1cc3ce5755
|
@ -61,11 +61,12 @@ import org.apache.nifi.controller.repository.StandardFlowFileRecord;
|
|||
import org.apache.nifi.controller.repository.claim.ContentClaim;
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaimManager;
|
||||
import org.apache.nifi.engine.FlowEngine;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.io.BufferedOutputStream;
|
||||
import org.apache.nifi.processor.QueueSize;
|
||||
import org.apache.nifi.reporting.Severity;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -80,10 +81,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
|||
public static final int MINIMUM_SWAP_COUNT = 10000;
|
||||
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
|
||||
public static final int SWAP_ENCODING_VERSION = 6;
|
||||
public static final String EVENT_CATEGORY = "Swap FlowFiles";
|
||||
|
||||
private final ScheduledExecutorService swapQueueIdentifierExecutor;
|
||||
private final ScheduledExecutorService swapInExecutor;
|
||||
private volatile FlowFileRepository flowFileRepository;
|
||||
private volatile EventReporter eventReporter;
|
||||
|
||||
// Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which provides queue locking and necessary state for swapping back in
|
||||
private final ConcurrentMap<FlowFileQueue, QueueLockWrapper> swapMap = new ConcurrentHashMap<>();
|
||||
|
@ -129,9 +132,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
|||
}
|
||||
}
|
||||
|
||||
public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager) {
|
||||
public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) {
|
||||
this.claimManager = claimManager;
|
||||
this.flowFileRepository = flowFileRepository;
|
||||
this.eventReporter = eventReporter;
|
||||
swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, TimeUnit.MILLISECONDS);
|
||||
swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, swapInMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
@ -437,10 +441,15 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
|||
}
|
||||
|
||||
if (!swapFile.delete()) {
|
||||
logger.warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file can be cleaned up manually");
|
||||
final String errMsg = "Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually";
|
||||
logger.warn(errMsg);
|
||||
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, errMsg);
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to Swap In FlowFiles for {} due to {}", new Object[]{flowFileQueue, e.toString()}, e);
|
||||
final String errMsg = "Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e;
|
||||
logger.error(errMsg);
|
||||
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
|
||||
|
||||
if (swapFile != null) {
|
||||
queue.add(swapFile);
|
||||
}
|
||||
|
@ -488,7 +497,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
|||
} catch (final IOException ioe) {
|
||||
recordsSwapped = 0;
|
||||
flowFileQueue.putSwappedRecords(toSwap);
|
||||
logger.error("Failed to swap out {} FlowFiles from {} to Swap File {} due to {}", new Object[]{toSwap.size(), flowFileQueue, swapLocation, ioe.toString()}, ioe);
|
||||
final String errMsg = "Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe;
|
||||
logger.error(errMsg);
|
||||
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
|
||||
}
|
||||
|
||||
if (recordsSwapped > 0) {
|
||||
|
@ -549,14 +560,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
|||
|
||||
final int swapEncodingVersion = in.readInt();
|
||||
if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
|
||||
throw new IOException("Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
|
||||
+ swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
|
||||
final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
|
||||
+ swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
|
||||
|
||||
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
|
||||
throw new IOException(errMsg);
|
||||
}
|
||||
|
||||
final String connectionId = in.readUTF();
|
||||
final FlowFileQueue queue = queueMap.get(connectionId);
|
||||
if (queue == null) {
|
||||
logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId);
|
||||
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist");
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -579,7 +594,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
|
|||
maxRecoveredId = maxId;
|
||||
}
|
||||
} catch (final IOException ioe) {
|
||||
logger.error("Cannot recover Swapped FlowFiles from Swap File {} due to {}", swapFile, ioe.toString());
|
||||
final String errMsg = "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe;
|
||||
logger.error(errMsg);
|
||||
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.error("", ioe);
|
||||
}
|
||||
|
|
|
@ -388,13 +388,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
|
|||
|
||||
try {
|
||||
this.provenanceEventRepository = createProvenanceRepository(properties);
|
||||
this.provenanceEventRepository.initialize(new EventReporter() {
|
||||
@Override
|
||||
public void reportEvent(final Severity severity, final String category, final String message) {
|
||||
final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
|
||||
bulletinRepository.addBulletin(bulletin);
|
||||
}
|
||||
});
|
||||
this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository));
|
||||
|
||||
this.contentRepository = createContentRepository(properties);
|
||||
} catch (final Exception e) {
|
||||
|
@ -516,6 +510,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
|
|||
}
|
||||
}
|
||||
|
||||
private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) {
|
||||
return new EventReporter() {
|
||||
@Override
|
||||
public void reportEvent(final Severity severity, final String category, final String message) {
|
||||
final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
|
||||
bulletinRepository.addBulletin(bulletin);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void initializeFlow() throws IOException {
|
||||
writeLock.lock();
|
||||
try {
|
||||
|
@ -537,7 +541,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
|
|||
contentRepository.cleanup();
|
||||
|
||||
if (flowFileSwapManager != null) {
|
||||
flowFileSwapManager.start(flowFileRepository, this, contentClaimManager);
|
||||
flowFileSwapManager.start(flowFileRepository, this, contentClaimManager, createEventReporter(bulletinRepository));
|
||||
}
|
||||
|
||||
if (externalSiteListener != null) {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.nifi.controller.repository;
|
||||
|
||||
import org.apache.nifi.controller.repository.claim.ContentClaimManager;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
|
||||
/**
|
||||
* Defines a mechanism by which FlowFiles can be move into external storage or
|
||||
|
@ -34,8 +35,10 @@ public interface FlowFileSwapManager {
|
|||
* can be obtained and restored
|
||||
* @param claimManager the ContentClaimManager to use for interacting with
|
||||
* Content Claims
|
||||
* @param reporter the EventReporter that can be used for notifying users of
|
||||
* important events
|
||||
*/
|
||||
void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager);
|
||||
void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager, EventReporter reporter);
|
||||
|
||||
/**
|
||||
* Shuts down the manager
|
||||
|
|
Loading…
Reference in New Issue