NIFI-6507 Unsubscribe if WindowsEventLog subscription encounters an error

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3617.
This commit is contained in:
Koji Kawamura 2019-07-01 11:39:20 +09:00 committed by Pierre Villard
parent 944d256d31
commit 05cbf8b24f
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
2 changed files with 28 additions and 5 deletions

View File

@ -240,7 +240,16 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
}
private boolean isSubscribed() {
return subscriptionHandle != null && subscriptionHandle.getPointer() != null;
final boolean subscribed = subscriptionHandle != null && subscriptionHandle.getPointer() != null;
final boolean subscriptionFailed = evtSubscribeCallback != null
&& ((EventSubscribeXmlRenderingCallback) evtSubscribeCallback).isSubscriptionFailed();
final boolean subscribing = subscribed && !subscriptionFailed;
getLogger().debug("subscribing? {}, subscribed={}, subscriptionFailed={}", new Object[]{subscribing, subscribed, subscriptionFailed});
if (subscriptionFailed) {
getLogger().info("Canceling a failed subscription.");
unsubscribe();
}
return subscribing;
}
@OnScheduled

View File

@ -46,6 +46,7 @@ public class EventSubscribeXmlRenderingCallback implements WEvtApi.EVT_SUBSCRIBE
private Memory buffer;
private Memory used;
private Memory propertyCount;
private boolean subscriptionFailed;
public EventSubscribeXmlRenderingCallback(ComponentLog logger, Consumer<String> consumer, int maxBufferSize, WEvtApi wEvtApi, Kernel32 kernel32, ErrorLookup errorLookup) {
this.logger = logger;
@ -67,11 +68,20 @@ public class EventSubscribeXmlRenderingCallback implements WEvtApi.EVT_SUBSCRIBE
}
if (evtSubscribeNotifyAction == WEvtApi.EvtSubscribeNotifyAction.ERROR) {
if (eventHandle.getPointer().getInt(0) == WEvtApi.EvtSubscribeErrors.ERROR_EVT_QUERY_RESULT_STALE) {
logger.error(MISSING_EVENT_MESSAGE);
} else {
logger.error(RECEIVED_THE_FOLLOWING_WIN32_ERROR + eventHandle.getPointer().getInt(0));
try {
final int errorCode = eventHandle.getPointer().getInt(0);
if (errorCode == WEvtApi.EvtSubscribeErrors.ERROR_EVT_QUERY_RESULT_STALE) {
logger.error(MISSING_EVENT_MESSAGE);
} else {
logger.error(RECEIVED_THE_FOLLOWING_WIN32_ERROR + errorCode);
}
} catch (final Error e) {
logger.error("Failed to get error code onEvent("
+ evtSubscribeNotifyAction + ", " + userContext + ", " + eventHandle);
} finally {
subscriptionFailed = true;
}
} else if (evtSubscribeNotifyAction == WEvtApi.EvtSubscribeNotifyAction.DELIVER) {
wEvtApi.EvtRender(null, eventHandle, WEvtApi.EvtRenderFlags.EVENT_XML, size, buffer, used, propertyCount);
@ -104,4 +114,8 @@ public class EventSubscribeXmlRenderingCallback implements WEvtApi.EVT_SUBSCRIBE
// Ignored, see https://msdn.microsoft.com/en-us/library/windows/desktop/aa385577(v=vs.85).aspx
return 0;
}
public boolean isSubscriptionFailed() {
return subscriptionFailed;
}
}