diff --git a/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/pom.xml b/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/pom.xml
index a504a3bd21..4b7d063628 100644
--- a/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/pom.xml
@@ -21,8 +21,8 @@ language governing permissions and limitations under the License. -->
jar
- 4.2.2
- 3.20.0-GA
+ 4.5.2
+ 3.23.1-GA
diff --git a/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java b/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java
index 87aa07f191..7dc4236921 100644
--- a/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java
+++ b/nifi-nar-bundles/nifi-windows-event-log-bundle/nifi-windows-event-log-processors/src/main/java/org/apache/nifi/processors/windows/event/log/ConsumeWindowsEventLog.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.windows.event.log;
import com.sun.jna.platform.win32.Kernel32;
import com.sun.jna.platform.win32.Kernel32Util;
import com.sun.jna.platform.win32.WinNT;
+
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
@@ -32,6 +33,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -43,6 +46,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
@@ -77,6 +81,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
.defaultValue(DEFAULT_CHANNEL)
.description("The Windows Event Log Channel to listen to.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
@@ -86,6 +91,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
.defaultValue(DEFAULT_XPATH)
.description("XPath Query to filter events. (See https://msdn.microsoft.com/en-us/library/windows/desktop/dd996910(v=vs.85).aspx for examples.)")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
@@ -108,7 +114,21 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
- public static final List PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(CHANNEL, QUERY, MAX_BUFFER_SIZE, MAX_EVENT_QUEUE_SIZE));
+ public static final PropertyDescriptor INACTIVE_DURATION_TO_RECONNECT = new PropertyDescriptor.Builder()
+ .name("inactiveDurationToReconnect")
+ .displayName("Inactive duration to reconnect")
+ .description("If no new event logs are processed for the specified time period," +
+ " this processor will try reconnecting to recover from a state where any further messages cannot be consumed." +
+ " Such situation can happen if Windows Event Log service is restarted, or ERROR_EVT_QUERY_RESULT_STALE (15011) is returned." +
+ " Setting no duration, e.g. '0 ms' disables auto-reconnection.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("10 mins")
+ .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.MILLISECONDS))
+ .build();
+
+ public static final List PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
+ Arrays.asList(CHANNEL, QUERY, MAX_BUFFER_SIZE, MAX_EVENT_QUEUE_SIZE, INACTIVE_DURATION_TO_RECONNECT));
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -134,6 +154,9 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
private ProcessSessionFactory sessionFactory;
private String provenanceUri;
+ private long inactiveDurationToReconnect = 0;
+ private long lastActivityTimestamp = 0;
+
/**
* Framework constructor
*/
@@ -182,12 +205,20 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
*
* @param context the process context
*/
- private String subscribe(ProcessContext context) throws URISyntaxException {
- String channel = context.getProperty(CHANNEL).getValue();
- String query = context.getProperty(QUERY).getValue();
+ private String subscribe(ProcessContext context) {
+ final String channel = context.getProperty(CHANNEL).evaluateAttributeExpressions().getValue();
+ final String query = context.getProperty(QUERY).evaluateAttributeExpressions().getValue();
renderedXMLs = new LinkedBlockingQueue<>(context.getProperty(MAX_EVENT_QUEUE_SIZE).asInteger());
- provenanceUri = new URI("winlog", name, "/" + channel, query, null).toASCIIString();
+
+ try {
+ provenanceUri = new URI("winlog", name, "/" + channel, query, null).toASCIIString();
+ } catch (URISyntaxException e) {
+ getLogger().debug("Failed to construct detailed provenanceUri from channel={}, query={}, use simpler one.", new Object[]{channel, query});
+ provenanceUri = String.format("winlog://%s/%s", name, channel);
+ }
+
+ inactiveDurationToReconnect = context.getProperty(INACTIVE_DURATION_TO_RECONNECT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
evtSubscribeCallback = new EventSubscribeXmlRenderingCallback(getLogger(), s -> {
try {
@@ -199,9 +230,12 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
subscriptionHandle = wEvtApi.EvtSubscribe(null, null, channel, query, null, null,
evtSubscribeCallback, WEvtApi.EvtSubscribeFlags.SUBSCRIBE_TO_FUTURE | WEvtApi.EvtSubscribeFlags.EVT_SUBSCRIBE_STRICT);
+
if (!isSubscribed()) {
return UNABLE_TO_SUBSCRIBE + errorLookup.getLastError();
}
+
+ lastActivityTimestamp = System.currentTimeMillis();
return null;
}
@@ -210,7 +244,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
}
@OnScheduled
- public void onScheduled(ProcessContext context) throws AlreadySubscribedException, URISyntaxException {
+ public void onScheduled(ProcessContext context) throws AlreadySubscribedException {
if (isSubscribed()) {
throw new AlreadySubscribedException(PROCESSOR_ALREADY_SUBSCRIBED);
}
@@ -225,11 +259,8 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
*/
@OnStopped
public void stop() {
- if (isSubscribed()) {
- wEvtApi.EvtClose(subscriptionHandle);
- }
- subscriptionHandle = null;
- evtSubscribeCallback = null;
+ unsubscribe();
+
if (!renderedXMLs.isEmpty()) {
if (sessionFactory != null) {
getLogger().info("Finishing processing leftover events");
@@ -246,29 +277,49 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
renderedXMLs = null;
}
+ private void unsubscribe() {
+ if (isSubscribed()) {
+ wEvtApi.EvtClose(subscriptionHandle);
+ }
+ subscriptionHandle = null;
+ evtSubscribeCallback = null;
+ }
+
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
this.sessionFactory = sessionFactory;
+
if (!isSubscribed()) {
- String errorMessage;
- try {
- errorMessage = subscribe(context);
- } catch (URISyntaxException e) {
- getLogger().error(e.getMessage(), e);
- context.yield();
- return;
- }
+ String errorMessage = subscribe(context);
if (errorMessage != null) {
context.yield();
getLogger().error(errorMessage);
return;
}
}
- processQueue(sessionFactory.createSession());
+
+ final int flowFileCount = processQueue(sessionFactory.createSession());
+
+ final long now = System.currentTimeMillis();
+ if (flowFileCount > 0) {
+ lastActivityTimestamp = now;
+
+ } else if (inactiveDurationToReconnect > 0) {
+ if ((now - lastActivityTimestamp) > inactiveDurationToReconnect) {
+ getLogger().info("Exceeds configured 'inactive duration to reconnect' {} ms. Unsubscribe to reconnect..", new Object[]{inactiveDurationToReconnect});
+ unsubscribe();
+ }
+ }
}
- private void processQueue(ProcessSession session) {
+ /**
+ * Create FlowFiles from received logs.
+ * @return the number of created FlowFiles
+ */
+ private int processQueue(ProcessSession session) {
String xml;
+ int flowFileCount = 0;
+
while ((xml = renderedXMLs.peek()) != null) {
FlowFile flowFile = session.create();
byte[] xmlBytes = xml.getBytes(StandardCharsets.UTF_8);
@@ -277,6 +328,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
session.getProvenanceReporter().receive(flowFile, provenanceUri);
session.transfer(flowFile, REL_SUCCESS);
session.commit();
+ flowFileCount++;
if (!renderedXMLs.remove(xml) && getLogger().isWarnEnabled()) {
getLogger().warn(new StringBuilder("Event ")
.append(xml)
@@ -286,6 +338,7 @@ public class ConsumeWindowsEventLog extends AbstractSessionFactoryProcessor {
.toString());
}
}
+ return flowFileCount;
}
@Override