NIFI-8176 - Move the timeout check after we process the response from Splunk to make sure we poll for acknowledgement at least once. (No need for flag.)

This closes #4824.

Signed-off-by: Tamas Palfy <tamas.bertalan.palfy@gmail.com>
This commit is contained in:
Timea Barna 2021-02-16 08:40:35 +01:00 committed by Tamas Palfy
parent 14e6dc3dc6
commit bfd964b9c7
4 changed files with 24 additions and 25 deletions

View File

@ -167,8 +167,6 @@ public class QuerySplunkIndexingStatus extends SplunkAPICall {
getLogger().error("Flow file ({}) attributes {} and {} are expected to be set using 64-bit integer values!",
new Object[]{flowFile.getId(), SplunkAPICall.RESPONDED_AT_ATTRIBUTE, SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
session.transfer(flowFile, RELATIONSHIP_FAILURE);
} else if (sentAt.get() + ttl < currentTime) {
session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);
} else {
undetermined.put(ackId.get(), flowFile);
}
@ -193,14 +191,18 @@ public class QuerySplunkIndexingStatus extends SplunkAPICall {
if (responseMessage.getStatus() == 200) {
final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
splunkResponse.getAcks().entrySet().forEach(result -> {
final FlowFile toTransfer = undetermined.get(result.getKey());
if (result.getValue()) {
splunkResponse.getAcks().forEach((flowFileId, isAcknowledged) -> {
final FlowFile toTransfer = undetermined.get(flowFileId);
if (isAcknowledged) {
session.transfer(toTransfer, RELATIONSHIP_ACKNOWLEDGED);
} else {
session.penalize(toTransfer);
session.transfer(toTransfer, RELATIONSHIP_UNDETERMINED);
final Long sentAt = extractLong(toTransfer.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE)).get();
if (sentAt + ttl < currentTime) {
session.transfer(toTransfer, RELATIONSHIP_UNACKNOWLEDGED);
} else {
session.penalize(toTransfer);
session.transfer(toTransfer, RELATIONSHIP_UNDETERMINED);
}
}
});
} else {

View File

@ -54,6 +54,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
.name("Scheme")
.displayName("Scheme")
.description("The scheme for connecting to Splunk.")
.allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
.defaultValue(HTTPS_SCHEME)
@ -62,6 +63,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.displayName("Hostname")
.description("The ip address or hostname of the Splunk server.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("localhost")
@ -71,15 +73,17 @@ abstract class SplunkAPICall extends AbstractProcessor {
static final PropertyDescriptor PORT = new PropertyDescriptor
.Builder().name("Port")
.description("The HTTP Port Number of the Splunk server.")
.displayName("HTTP Event Collector Port")
.description("The HTTP Event Collector HTTP Port Number.")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.defaultValue("9088")
.defaultValue("8088")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
.name("Security Protocol")
.displayName("Security Protocol")
.description("The security protocol to use for communicating with Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
@ -88,6 +92,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
.name("Owner")
.displayName("Owner")
.description("The owner to pass to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
@ -96,7 +101,8 @@ abstract class SplunkAPICall extends AbstractProcessor {
static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
.name("Token")
.description("The token to pass to Splunk.")
.displayName("HTTP Event Collector Token")
.description("HTTP Event Collector token starting with the string Splunk. For example Splunk 1234578-abcd-1234-abcd-1234abcd")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@ -104,6 +110,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("Username")
.displayName("Username")
.description("The username to authenticate to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
@ -112,6 +119,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.displayName("Password")
.description("The password to authenticate to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)

View File

@ -46,8 +46,9 @@
includes unsuccessful or ongoing indexing and unknown acknowledgement identifiers. In order to avoid infinite tries,
QuerySplunkIndexingStatus gives user the possibility to set a "Maximum waiting time". Results with value of false from Splunk
within the specified waiting time will be handled as "undetermined" and are transferred to the "undetermined" relationship.
Flow files outside of this time range will be transferred to the "unacknowledged" relationship next time the processor is
triggered. In order to determine if the indexing of a given event is within the waiting time, the Unix Epoch of the original
Flow files outside of this time range will be queried as well and be transferred to either "acknowledged" or "unacknowledged"
relationship determined by the Splunk response.
In order to determine if the indexing of a given event is within the waiting time, the Unix Epoch of the original
Splunk response is stored in the attribute "splunk.responded.at". Setting "Maximum waiting time" too low might
result some false negative result as in case under higher load, Splunk server might index slower than it is expected.
</p>

View File

@ -39,7 +39,6 @@ import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@RunWith(MockitoJUnitRunner.class)
@ -120,17 +119,6 @@ public class TestQuerySplunkIndexingStatus {
testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_ACKNOWLEDGED, 2);
}
@Test
public void testTimedOutEvents() throws Exception {
// when
testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2)));
testRunner.run();
// then
Mockito.verify(service, Mockito.never()).send(Mockito.anyString(), Mockito.any(RequestMessage.class));
testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_UNACKNOWLEDGED, 1);
}
@Test
public void testWhenFlowFileIsLackOfNecessaryAttributes() throws Exception {
// when