NIFI-5645: Auto reconnect ConsumeWindowsEventLog

This commit also contains following refactoring:
- Catch URISyntaxException inside subscribe when constructing provenance
URI as it does not affect the core responsibility of this processor.
Even if it fails to be a proper URI, if the query works for consuming
logs, the processor should proceed forward.

Upgrade JNA version.

Do not update lastActivityTimestamp when subscribe failed.

This closes #3037
This commit is contained in:
Koji Kawamura 2018-09-28 17:37:34 +09:00 committed by Matt Gilman
parent 97afa4e7ba
commit a6f722222a
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
2 changed files with 76 additions and 23 deletions

View File

@ -21,8 +21,8 @@ language governing permissions and limitations under the License. -->
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<jna.version>4.2.2</jna.version> <jna.version>4.5.2</jna.version>
<javassist.version>3.20.0-GA</javassist.version> <javassist.version>3.23.1-GA</javassist.version>
</properties> </properties>
<dependencies> <dependencies>

View File

@ -20,6 +20,7 @@ package org.apache.nifi.processors.windows.event.log;
import com.sun.jna.platform.win32.Kernel32; import com.sun.jna.platform.win32.Kernel32;
import com.sun.jna.platform.win32.Kernel32Util; import com.sun.jna.platform.win32.Kernel32Util;
import com.sun.jna.platform.win32.WinNT; import com.sun.jna.platform.win32.WinNT;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -32,6 +33,8 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -43,6 +46,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
@ -77,6 +81,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
.defaultValue(DEFAULT_CHANNEL) .defaultValue(DEFAULT_CHANNEL)
.description("The Windows Event Log Channel to listen to.") .description("The Windows Event Log Channel to listen to.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
@ -86,6 +91,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
.defaultValue(DEFAULT_XPATH) .defaultValue(DEFAULT_XPATH)
.description("XPath Query to filter events. (See https://msdn.microsoft.com/en-us/library/windows/desktop/dd996910(v=vs.85).aspx for examples.)") .description("XPath Query to filter events. (See https://msdn.microsoft.com/en-us/library/windows/desktop/dd996910(v=vs.85).aspx for examples.)")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
@ -108,7 +114,21 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
public static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(CHANNEL, QUERY, MAX_BUFFER_SIZE, MAX_EVENT_QUEUE_SIZE)); public static final PropertyDescriptor INACTIVE_DURATION_TO_RECONNECT = new PropertyDescriptor.Builder()
.name("inactiveDurationToReconnect")
.displayName("Inactive duration to reconnect")
.description("If no new event logs are processed for the specified time period," +
" this processor will try reconnecting to recover from a state where any further messages cannot be consumed." +
" Such situation can happen if Windows Event Log service is restarted, or ERROR_EVT_QUERY_RESULT_STALE (15011) is returned." +
" Setting no duration, e.g. '0 ms' disables auto-reconnection.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("10 mins")
.addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.MILLISECONDS))
.build();
public static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
Arrays.asList(CHANNEL, QUERY, MAX_BUFFER_SIZE, MAX_EVENT_QUEUE_SIZE, INACTIVE_DURATION_TO_RECONNECT));
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
@ -134,6 +154,9 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
private ProcessSessionFactory sessionFactory; private ProcessSessionFactory sessionFactory;
private String provenanceUri; private String provenanceUri;
private long inactiveDurationToReconnect = 0;
private long lastActivityTimestamp = 0;
/** /**
* Framework constructor * Framework constructor
*/ */
@ -182,12 +205,20 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
* *
* @param context the process context * @param context the process context
*/ */
private String subscribe(ProcessContext context) throws URISyntaxException { private String subscribe(ProcessContext context) {
String channel = context.getProperty(CHANNEL).getValue(); final String channel = context.getProperty(CHANNEL).evaluateAttributeExpressions().getValue();
String query = context.getProperty(QUERY).getValue(); final String query = context.getProperty(QUERY).evaluateAttributeExpressions().getValue();
renderedXMLs = new LinkedBlockingQueue<>(context.getProperty(MAX_EVENT_QUEUE_SIZE).asInteger()); renderedXMLs = new LinkedBlockingQueue<>(context.getProperty(MAX_EVENT_QUEUE_SIZE).asInteger());
provenanceUri = new URI("winlog", name, "/" + channel, query, null).toASCIIString();
try {
provenanceUri = new URI("winlog", name, "/" + channel, query, null).toASCIIString();
} catch (URISyntaxException e) {
getLogger().debug("Failed to construct detailed provenanceUri from channel={}, query={}, use simpler one.", new Object[]{channel, query});
provenanceUri = String.format("winlog://%s/%s", name, channel);
}
inactiveDurationToReconnect = context.getProperty(INACTIVE_DURATION_TO_RECONNECT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
evtSubscribeCallback = new EventSubscribeXmlRenderingCallback(getLogger(), s -> { evtSubscribeCallback = new EventSubscribeXmlRenderingCallback(getLogger(), s -> {
try { try {
@ -199,9 +230,12 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
subscriptionHandle = wEvtApi.EvtSubscribe(null, null, channel, query, null, null, subscriptionHandle = wEvtApi.EvtSubscribe(null, null, channel, query, null, null,
evtSubscribeCallback, WEvtApi.EvtSubscribeFlags.SUBSCRIBE_TO_FUTURE | WEvtApi.EvtSubscribeFlags.EVT_SUBSCRIBE_STRICT); evtSubscribeCallback, WEvtApi.EvtSubscribeFlags.SUBSCRIBE_TO_FUTURE | WEvtApi.EvtSubscribeFlags.EVT_SUBSCRIBE_STRICT);
if (!isSubscribed()) { if (!isSubscribed()) {
return UNABLE_TO_SUBSCRIBE + errorLookup.getLastError(); return UNABLE_TO_SUBSCRIBE + errorLookup.getLastError();
} }
lastActivityTimestamp = System.currentTimeMillis();
return null; return null;
} }
@ -210,7 +244,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
} }
@OnScheduled @OnScheduled
public void onScheduled(ProcessContext context) throws AlreadySubscribedException, URISyntaxException { public void onScheduled(ProcessContext context) throws AlreadySubscribedException {
if (isSubscribed()) { if (isSubscribed()) {
throw new AlreadySubscribedException(PROCESSOR_ALREADY_SUBSCRIBED); throw new AlreadySubscribedException(PROCESSOR_ALREADY_SUBSCRIBED);
} }
@ -225,11 +259,8 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
*/ */
@OnStopped @OnStopped
public void stop() { public void stop() {
if (isSubscribed()) { unsubscribe();
wEvtApi.EvtClose(subscriptionHandle);
}
subscriptionHandle = null;
evtSubscribeCallback = null;
if (!renderedXMLs.isEmpty()) { if (!renderedXMLs.isEmpty()) {
if (sessionFactory != null) { if (sessionFactory != null) {
getLogger().info("Finishing processing leftover events"); getLogger().info("Finishing processing leftover events");
@ -246,29 +277,49 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
renderedXMLs = null; renderedXMLs = null;
} }
private void unsubscribe() {
if (isSubscribed()) {
wEvtApi.EvtClose(subscriptionHandle);
}
subscriptionHandle = null;
evtSubscribeCallback = null;
}
@Override @Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
this.sessionFactory = sessionFactory; this.sessionFactory = sessionFactory;
if (!isSubscribed()) { if (!isSubscribed()) {
String errorMessage; String errorMessage = subscribe(context);
try {
errorMessage = subscribe(context);
} catch (URISyntaxException e) {
getLogger().error(e.getMessage(), e);
context.yield();
return;
}
if (errorMessage != null) { if (errorMessage != null) {
context.yield(); context.yield();
getLogger().error(errorMessage); getLogger().error(errorMessage);
return; return;
} }
} }
processQueue(sessionFactory.createSession());
final int flowFileCount = processQueue(sessionFactory.createSession());
final long now = System.currentTimeMillis();
if (flowFileCount > 0) {
lastActivityTimestamp = now;
} else if (inactiveDurationToReconnect > 0) {
if ((now - lastActivityTimestamp) > inactiveDurationToReconnect) {
getLogger().info("Exceeds configured 'inactive duration to reconnect' {} ms. Unsubscribe to reconnect..", new Object[]{inactiveDurationToReconnect});
unsubscribe();
}
}
} }
private void processQueue(ProcessSession session) { /**
* Create FlowFiles from received logs.
* @return the number of created FlowFiles
*/
private int processQueue(ProcessSession session) {
String xml; String xml;
int flowFileCount = 0;
while ((xml = renderedXMLs.peek()) != null) { while ((xml = renderedXMLs.peek()) != null) {
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
byte[] xmlBytes = xml.getBytes(StandardCharsets.UTF_8); byte[] xmlBytes = xml.getBytes(StandardCharsets.UTF_8);
@ -277,6 +328,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
session.getProvenanceReporter().receive(flowFile, provenanceUri); session.getProvenanceReporter().receive(flowFile, provenanceUri);
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
session.commit(); session.commit();
flowFileCount++;
if (!renderedXMLs.remove(xml) && getLogger().isWarnEnabled()) { if (!renderedXMLs.remove(xml) && getLogger().isWarnEnabled()) {
getLogger().warn(new StringBuilder("Event ") getLogger().warn(new StringBuilder("Event ")
.append(xml) .append(xml)
@ -286,6 +338,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
.toString()); .toString());
} }
} }
return flowFileCount;
} }
@Override @Override