diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java index 9ed5210a2f..747e5e40a3 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java @@ -167,8 +167,6 @@ public class QuerySplunkIndexingStatus extends SplunkAPICall { getLogger().error("Flow file ({}) attributes {} and {} are expected to be set using 64-bit integer values!", new Object[]{flowFile.getId(), SplunkAPICall.RESPONDED_AT_ATTRIBUTE, SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE}); session.transfer(flowFile, RELATIONSHIP_FAILURE); - } else if (sentAt.get() + ttl < currentTime) { - session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED); } else { undetermined.put(ackId.get(), flowFile); } @@ -193,14 +191,18 @@ public class QuerySplunkIndexingStatus extends SplunkAPICall { if (responseMessage.getStatus() == 200) { final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class); - splunkResponse.getAcks().entrySet().forEach(result -> { - final FlowFile toTransfer = undetermined.get(result.getKey()); - - if (result.getValue()) { + splunkResponse.getAcks().forEach((flowFileId, isAcknowledged) -> { + final FlowFile toTransfer = undetermined.get(flowFileId); + if (isAcknowledged) { session.transfer(toTransfer, RELATIONSHIP_ACKNOWLEDGED); } else { - session.penalize(toTransfer); - session.transfer(toTransfer, RELATIONSHIP_UNDETERMINED); + final Long sentAt = extractLong(toTransfer.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE)).get(); + if (sentAt + ttl < currentTime) { + session.transfer(toTransfer, RELATIONSHIP_UNACKNOWLEDGED); + } else { + session.penalize(toTransfer); + session.transfer(toTransfer, RELATIONSHIP_UNDETERMINED); + } } }); } else { diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java index 46b6de8a7e..8cf4f8afae 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java @@ -54,6 +54,7 @@ abstract class SplunkAPICall extends AbstractProcessor { static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder() .name("Scheme") + .displayName("Scheme") .description("The scheme for connecting to Splunk.") .allowableValues(HTTPS_SCHEME, HTTP_SCHEME) .defaultValue(HTTPS_SCHEME) @@ -62,6 +63,7 @@ abstract class SplunkAPICall extends AbstractProcessor { static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() .name("Hostname") + .displayName("Hostname") .description("The ip address or hostname of the Splunk server.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("localhost") @@ -71,15 +73,17 @@ abstract class SplunkAPICall extends AbstractProcessor { static final PropertyDescriptor PORT = new PropertyDescriptor .Builder().name("Port") - .description("The HTTP Port Number of the Splunk server.") + .displayName("HTTP Event Collector Port") + .description("The HTTP Event Collector HTTP Port Number.") .required(true) .addValidator(StandardValidators.PORT_VALIDATOR) - .defaultValue("9088") + .defaultValue("8088") .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder() .name("Security Protocol") + .displayName("Security Protocol") .description("The security protocol to use for communicating with Splunk.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE) @@ -88,6 +92,7 @@ abstract class SplunkAPICall extends AbstractProcessor { static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder() .name("Owner") + .displayName("Owner") .description("The owner to pass to Splunk.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(false) @@ -96,7 +101,8 @@ abstract class SplunkAPICall extends AbstractProcessor { static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder() .name("Token") - .description("The token to pass to Splunk.") + .displayName("HTTP Event Collector Token") + .description("HTTP Event Collector token starting with the string Splunk. For example Splunk 1234578-abcd-1234-abcd-1234abcd") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(false) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) @@ -104,6 +110,7 @@ abstract class SplunkAPICall extends AbstractProcessor { static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() .name("Username") + .displayName("Username") .description("The username to authenticate to Splunk.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(false) @@ -112,6 +119,7 @@ abstract class SplunkAPICall extends AbstractProcessor { static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() .name("Password") + .displayName("Password") .description("The password to authenticate to Splunk.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(false) diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html index 9d81de8614..50f6f876de 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html @@ -46,8 +46,9 @@ includes unsuccessful or ongoing indexing and unknown acknowledgement identifiers. In order to avoid infinite tries, QuerySplunkIndexingStatus gives user the possibility to set a "Maximum waiting time". Results with value of false from Splunk within the specified waiting time will be handled as "undetermined" and are transferred to the "undetermined" relationship. - Flow files outside of this time range will be transferred to the "unacknowledged" relationship next time the processor is - triggered. In order to determine if the indexing of a given event is within the waiting time, the Unix Epoch of the original + Flow files outside of this time range will be queried as well and be transferred to either "acknowledged" or "unacknowledged" + relationship determined by the Splunk response. + In order to determine if the indexing of a given event is within the waiting time, the Unix Epoch of the original Splunk response is stored in the attribute "splunk.responded.at". Setting "Maximum waiting time" too low might result some false negative result as in case under higher load, Splunk server might index slower than it is expected.

diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java index 2b91f17f2f..bd33947cb7 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java @@ -39,7 +39,6 @@ import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @RunWith(MockitoJUnitRunner.class) @@ -120,17 +119,6 @@ public class TestQuerySplunkIndexingStatus { testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_ACKNOWLEDGED, 2); } - @Test - public void testTimedOutEvents() throws Exception { - // when - testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2))); - testRunner.run(); - - // then - Mockito.verify(service, Mockito.never()).send(Mockito.anyString(), Mockito.any(RequestMessage.class)); - testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_UNACKNOWLEDGED, 1); - } - @Test public void testWhenFlowFileIsLackOfNecessaryAttributes() throws Exception { // when