diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/dto/splunk/EventIndexStatusRequest.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/dto/splunk/EventIndexStatusRequest.java new file mode 100644 index 0000000000..e8f8baa037 --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/dto/splunk/EventIndexStatusRequest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dto.splunk; + +import java.util.List; + +/** + * Request object for Splunk event index status query. + */ +public class EventIndexStatusRequest { + private List acks; + + public List getAcks() { + return acks; + } + + public void setAcks(final List acks) { + this.acks = acks; + } +} diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/dto/splunk/EventIndexStatusResponse.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/dto/splunk/EventIndexStatusResponse.java new file mode 100644 index 0000000000..ed3de03dac --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/dto/splunk/EventIndexStatusResponse.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dto.splunk; + +import java.util.Map; + +/** + * Response object for Splunk event index status query. + */ +public class EventIndexStatusResponse { + private Map acks; + + public Map getAcks() { + return acks; + } + + public void setAcks(final Map acks) { + this.acks = acks; + } +} diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/dto/splunk/SendRawDataResponse.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/dto/splunk/SendRawDataResponse.java new file mode 100644 index 0000000000..c1710ffb4f --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/dto/splunk/SendRawDataResponse.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dto.splunk; + +/** + * Response object for sending raw data directly to HTTP Event Collector in Splunk. + * For details, see {@see https://docs.splunk.com/Documentation/Splunk/LATEST/RESTREF/RESTinput#services.2Fcollector.2Fraw} + */ +public class SendRawDataResponse { + private String text; + private int code; + private long ackId; + + public String getText() { + return text; + } + + public void setText(String text) { + this.text = text; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public long getAckId() { + return ackId; + } + + public void setAckId(final long ackId) { + this.ackId = ackId; + } +} diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java new file mode 100644 index 0000000000..7a19e6fb97 --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dto.splunk.SendRawDataResponse; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"splunk", "logs", "http"}) +@CapabilityDescription("Sends flow file content to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.") +@ReadsAttribute(attribute = "mime.type", description = "Uses as value for HTTP Content-Type header if set.") +@WritesAttributes({ + @WritesAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."), + @WritesAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")}) +@SystemResourceConsideration(resource = SystemResource.MEMORY) +@SeeAlso(QuerySplunkIndexingStatus.class) +public class PutSplunkHTTP extends SplunkAPICall { + private static final String ENDPOINT = "/services/collector/raw"; + + static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder() + .name("source") + .displayName("Source") + .description("User-defined event source. Sets a default for all events when unspecified.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder() + .name("source-type") + .displayName("Source Type") + .description("User-defined event sourcetype. Sets a default for all events when unspecified.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor HOST = new PropertyDescriptor.Builder() + .name("host") + .displayName("Host") + .description("Specify with the host query string parameter. Sets a default for all events when unspecified.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("index") + .displayName("Index") + .description("Index name. Specify with the index query string parameter. Sets a default for all events when unspecified.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("character-set") + .displayName("Character Set") + .description("The name of the character set.") + .required(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue(Charset.defaultCharset().name()) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder() + .name("content-type") + .displayName("Content Type") + .description( + "The media type of the event sent to Splunk. " + + "If not set, \"mime.type\" flow file attribute will be used. " + + "In case of neither of them is specified, this information will not be sent to the server.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that are sent successfully to the destination are sent to this relationship.") + .build(); + + static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that failed to send to the destination are sent to this relationship.") + .build(); + + private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + RELATIONSHIP_SUCCESS, + RELATIONSHIP_FAILURE))); + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public List getSupportedPropertyDescriptors() { + final List result = new ArrayList<>(super.getSupportedPropertyDescriptors()); + result.add(SOURCE); + result.add(SOURCE_TYPE); + result.add(HOST); + result.add(INDEX); + result.add(CONTENT_TYPE); + result.add(CHARSET); + return result; + } + + private volatile String endpoint; + private volatile String contentType; + private volatile String charset; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + super.onScheduled(context); + + if (context.getProperty(CONTENT_TYPE).isSet()) { + contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue(); + } + + charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); + + final Map queryParameters = new HashMap<>(); + + if (context.getProperty(SOURCE_TYPE).isSet()) { + queryParameters.put("sourcetype", context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(SOURCE).isSet()) { + queryParameters.put("source", context.getProperty(SOURCE).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(HOST).isSet()) { + queryParameters.put("host", context.getProperty(HOST).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(INDEX).isSet()) { + queryParameters.put("index", context.getProperty(INDEX).evaluateAttributeExpressions().getValue()); + } + + endpoint = getEndpoint(queryParameters); + } + + private String getEndpoint(final Map queryParameters) { + if (queryParameters.isEmpty()) { + return ENDPOINT; + } + + try { + return URLEncoder.encode(ENDPOINT + '?' + queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + e.getValue()).collect(Collectors.joining("&")), "UTF-8"); + } catch (final UnsupportedEncodingException e) { + getLogger().error("Could not be initialized because of: {}", new Object[] {e.getMessage()}, e); + throw new ProcessException(e); + } + } + + @OnStopped + public void onStopped() { + super.onStopped(); + contentType = null; + endpoint = null; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + ResponseMessage responseMessage = null; + FlowFile flowFile = session.get(); + boolean success = false; + + if (flowFile == null) { + return; + } + + try { + final RequestMessage requestMessage = createRequestMessage(session, flowFile); + responseMessage = call(endpoint, requestMessage); + flowFile = session.putAttribute(flowFile, "splunk.status.code", String.valueOf(responseMessage.getStatus())); + + switch (responseMessage.getStatus()) { + case 200: + final SendRawDataResponse successResponse = unmarshallResult(responseMessage.getContent(), SendRawDataResponse.class); + + if (successResponse.getCode() == 0) { + flowFile = enrichFlowFile(session, flowFile, successResponse.getAckId()); + success = true; + } else { + flowFile = session.putAttribute(flowFile, "splunk.response.code", String.valueOf(successResponse.getCode())); + getLogger().error("Putting data into Splunk was not successful: ({}) {}", new Object[] {successResponse.getCode(), successResponse.getText()}); + } + + break; + case 503 : // HEC is unhealthy, queues are full + context.yield(); + // fall-through + default: + getLogger().error("Putting data into Splunk was not successful. Response with header {} was: {}", + new Object[] {responseMessage.getStatus(), IOUtils.toString(responseMessage.getContent(), "UTF-8")}); + } + } catch (final Exception e) { + getLogger().error("Error during communication with Splunk: {}", new Object[] {e.getMessage()}, e); + + if (responseMessage != null) { + try { + getLogger().error("The response content is: {}", new Object[]{IOUtils.toString(responseMessage.getContent(), "UTF-8")}); + } catch (final IOException ioException) { + getLogger().error("An error occurred during reading response content!"); + } + } + } finally { + session.transfer(flowFile, success ? RELATIONSHIP_SUCCESS : RELATIONSHIP_FAILURE); + } + } + + private RequestMessage createRequestMessage(final ProcessSession session, final FlowFile flowFile) { + final RequestMessage requestMessage = new RequestMessage("POST"); + final String flowFileContentType = Optional.ofNullable(contentType).orElse(flowFile.getAttribute("mime.type")); + + if (flowFileContentType != null) { + requestMessage.getHeader().put("Content-Type", flowFileContentType); + } + + // The current version of Splunk's {@link com.splunk.Service} class is lack of support for OutputStream as content. + // For further details please visit {@link com.splunk.HttpService#send} which is called internally. + requestMessage.setContent(extractTextMessageBody(flowFile, session, charset)); + return requestMessage; + } + + private String extractTextMessageBody(final FlowFile flowFile, final ProcessSession session, final String charset) { + final StringWriter writer = new StringWriter(); + session.read(flowFile, in -> IOUtils.copy(in, writer, Charset.forName(charset))); + return writer.toString(); + } + + private FlowFile enrichFlowFile(final ProcessSession session, final FlowFile flowFile, final long ackId) { + final Map attributes = new HashMap<>(); + attributes.put(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE, String.valueOf(ackId)); + attributes.put(SplunkAPICall.RESPONDED_AT_ATTRIBUTE, String.valueOf(System.currentTimeMillis())); + return session.putAllAttributes(flowFile, attributes); + } +} diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java new file mode 100644 index 0000000000..9ed5210a2f --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dto.splunk.EventIndexStatusRequest; +import org.apache.nifi.dto.splunk.EventIndexStatusResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"splunk", "logs", "http", "acknowledgement"}) +@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."), + @ReadsAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")}) +@SeeAlso(PutSplunkHTTP.class) +public class QuerySplunkIndexingStatus extends SplunkAPICall { + private static final String ENDPOINT = "/services/collector/ack"; + + static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder() + .name("success") + .description("A FlowFile is transferred to this relationship when the acknowledgement was successful.") + .build(); + + static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder() + .name("unacknowledged") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement was not successful. " + + "This can happen when the acknowledgement did not happened within the time period set for Maximum Waiting Time. " + + "FlowFiles with acknowledgement id unknown for the Splunk server will be transferred to this relationship after the Maximum Waiting Time is reached.") + .build(); + + static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder() + .name("undetermined") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement state is not determined. " + + "FlowFiles transferred to this relationship might be penalized. " + + "This happens when Splunk returns with HTTP 200 but with false response for the acknowledgement id in the flow file attribute.") + .build(); + + static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder() + .name("failure") + .description( + "A FlowFile is transferred to this relationship when the acknowledgement was not successful due to errors during the communication. " + + "FlowFiles are timing out or unknown by the Splunk server will transferred to \"undetermined\" relationship.") + .build(); + + private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + RELATIONSHIP_ACKNOWLEDGED, + RELATIONSHIP_UNACKNOWLEDGED, + RELATIONSHIP_UNDETERMINED, + RELATIONSHIP_FAILURE + ))); + + static final PropertyDescriptor TTL = new PropertyDescriptor.Builder() + .name("ttl") + .displayName("Maximum Waiting Time") + .description( + "The maximum time the processor tries to acquire acknowledgement confirmation for an index, from the point of registration. " + + "After the given amount of time, the processor considers the index as not acknowledged and transfers the FlowFile to the \"unacknowledged\" relationship.") + .defaultValue("1 hour") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder() + .name("max-query-size") + .displayName("Maximum Query Size") + .description( + "The maximum number of acknowledgement identifiers the outgoing query contains in one batch. " + + "It is recommended not to set it too low in order to reduce network communication.") + .defaultValue("10000") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + private volatile Integer maxQuerySize; + private volatile Integer ttl; + + @Override + public List getSupportedPropertyDescriptors() { + final List result = new ArrayList<>(); + final List common = super.getSupportedPropertyDescriptors(); + result.addAll(common); + result.add(TTL); + result.add(MAX_QUERY_SIZE); + return result; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + super.onScheduled(context); + maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger(); + ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + } + + @OnStopped + public void onStopped() { + super.onStopped(); + maxQuerySize = null; + ttl = null; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final RequestMessage requestMessage; + final List flowFiles = session.get(maxQuerySize); + + if (flowFiles.isEmpty()) { + return; + } + + final long currentTime = System.currentTimeMillis(); + final Map undetermined = new HashMap<>(); + + for (final FlowFile flowFile : flowFiles) { + final Optional sentAt = extractLong(flowFile.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE)); + final Optional ackId = extractLong(flowFile.getAttribute(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE)); + + if (!sentAt.isPresent() || !ackId.isPresent()) { + getLogger().error("Flow file ({}) attributes {} and {} are expected to be set using 64-bit integer values!", + new Object[]{flowFile.getId(), SplunkAPICall.RESPONDED_AT_ATTRIBUTE, SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE}); + session.transfer(flowFile, RELATIONSHIP_FAILURE); + } else if (sentAt.get() + ttl < currentTime) { + session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED); + } else { + undetermined.put(ackId.get(), flowFile); + } + } + + if (undetermined.isEmpty()) { + getLogger().debug("There was no eligible flow file to send request to Splunk."); + return; + } + + try { + requestMessage = createRequestMessage(undetermined); + } catch (final IOException e) { + getLogger().error("Could not prepare Splunk request!", e); + session.transfer(undetermined.values(), RELATIONSHIP_FAILURE); + return; + } + + try { + final ResponseMessage responseMessage = call(ENDPOINT, requestMessage); + + if (responseMessage.getStatus() == 200) { + final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class); + + splunkResponse.getAcks().entrySet().forEach(result -> { + final FlowFile toTransfer = undetermined.get(result.getKey()); + + if (result.getValue()) { + session.transfer(toTransfer, RELATIONSHIP_ACKNOWLEDGED); + } else { + session.penalize(toTransfer); + session.transfer(toTransfer, RELATIONSHIP_UNDETERMINED); + } + }); + } else { + getLogger().error("Query index status was not successful because of ({}) {}", new Object[] {responseMessage.getStatus(), responseMessage.getContent()}); + context.yield(); + session.transfer(undetermined.values(), RELATIONSHIP_UNDETERMINED); + } + } catch (final Exception e) { + getLogger().error("Error during communication with Splunk server", e); + session.transfer(undetermined.values(), RELATIONSHIP_FAILURE); + } + } + + private RequestMessage createRequestMessage(Map undetermined) throws IOException { + final RequestMessage requestMessage = new RequestMessage("POST"); + requestMessage.getHeader().put("Content-Type", "application/json"); + requestMessage.setContent(generateContent(undetermined)); + return requestMessage; + } + + private String generateContent(final Map undetermined) throws IOException { + final EventIndexStatusRequest splunkRequest = new EventIndexStatusRequest(); + splunkRequest.setAcks(new ArrayList<>(undetermined.keySet())); + return marshalRequest(splunkRequest); + } + + private static Optional extractLong(final String value) { + try { + return Optional.ofNullable(value).map(Long::valueOf); + } catch (final NumberFormatException e) { + return Optional.empty(); + } + } +} diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java new file mode 100644 index 0000000000..46b6de8a7e --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.splunk.HttpException; +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import com.splunk.SSLSecurityProtocol; +import com.splunk.Service; +import com.splunk.ServiceArgs; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; + +abstract class SplunkAPICall extends AbstractProcessor { + private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel"; + + private static final String HTTP_SCHEME = "http"; + private static final String HTTPS_SCHEME = "https"; + + private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name()); + private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name()); + private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name()); + private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name()); + + static final String ACKNOWLEDGEMENT_ID_ATTRIBUTE = "splunk.acknowledgement.id"; + static final String RESPONDED_AT_ATTRIBUTE = "splunk.responded.at"; + + static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder() + .name("Scheme") + .description("The scheme for connecting to Splunk.") + .allowableValues(HTTPS_SCHEME, HTTP_SCHEME) + .defaultValue(HTTPS_SCHEME) + .required(true) + .build(); + + static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Hostname") + .description("The ip address or hostname of the Splunk server.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("localhost") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor PORT = new PropertyDescriptor + .Builder().name("Port") + .description("The HTTP Port Number of the Splunk server.") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .defaultValue("9088") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder() + .name("Security Protocol") + .description("The security protocol to use for communicating with Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE) + .defaultValue(TLS_1_2_VALUE.getValue()) + .build(); + + static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder() + .name("Owner") + .description("The owner to pass to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder() + .name("Token") + .description("The token to pass to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("Username") + .description("The username to authenticate to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("The password to authenticate to Splunk.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .sensitive(true) + .build(); + + static final PropertyDescriptor REQUEST_CHANNEL = new PropertyDescriptor.Builder() + .name("request-channel") + .displayName("Splunk Request Channel") + .description("Identifier of the used request channel.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + protected static final List PROPERTIES = Arrays.asList( + SCHEME, + HOSTNAME, + PORT, + SECURITY_PROTOCOL, + OWNER, + TOKEN, + USERNAME, + PASSWORD, + REQUEST_CHANNEL + ); + + private final JsonFactory jsonFactory = new JsonFactory(); + private final ObjectMapper jsonObjectMapper = new ObjectMapper(jsonFactory); + + private volatile ServiceArgs splunkServiceArguments; + private volatile Service splunkService; + private volatile String requestChannel; + + @Override + public List getSupportedPropertyDescriptors() { + return SplunkAPICall.PROPERTIES; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + splunkServiceArguments = getSplunkServiceArgs(context); + splunkService = getSplunkService(splunkServiceArguments); + requestChannel = context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue(); + } + + private ServiceArgs getSplunkServiceArgs(final ProcessContext context) { + final ServiceArgs splunkServiceArguments = new ServiceArgs(); + + splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue()); + splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue()); + splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger()); + + if (context.getProperty(OWNER).isSet()) { + splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(TOKEN).isSet()) { + splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(USERNAME).isSet()) { + splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue()); + } + + if (context.getProperty(PASSWORD).isSet()) { + splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue()); + } + + if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && context.getProperty(SECURITY_PROTOCOL).isSet()) { + splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue())); + } + + return splunkServiceArguments; + } + + protected Service getSplunkService(final ServiceArgs splunkServiceArguments) { + return Service.connect(splunkServiceArguments); + } + + @OnStopped + public void onStopped() { + if (splunkService != null) { + splunkService.logout(); + splunkService = null; + } + + requestChannel = null; + splunkServiceArguments = null; + } + + protected ResponseMessage call(final String endpoint, final RequestMessage request) { + request.getHeader().put(REQUEST_CHANNEL_HEADER_NAME, requestChannel); + + try { + return splunkService.send(endpoint, request); + //Catch Stale connection exception, reinitialize, and retry + } catch (final HttpException e) { + getLogger().error("Splunk request status code: {}. Retrying the request.", new Object[] {e.getStatus()}); + splunkService.logout(); + splunkService = getSplunkService(splunkServiceArguments); + return splunkService.send(endpoint, request); + } + } + + protected T unmarshallResult(final InputStream responseBody, final Class type) throws IOException { + return jsonObjectMapper.readValue(responseBody, type); + } + + protected String marshalRequest(final Object request) throws IOException { + return jsonObjectMapper.writeValueAsString(request); + } +} diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 46120391fd..dfa9a1f4f7 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,4 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.splunk.GetSplunk -org.apache.nifi.processors.splunk.PutSplunk \ No newline at end of file +org.apache.nifi.processors.splunk.PutSplunk +org.apache.nifi.processors.splunk.PutSplunkHTTP +org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.PutSplunkHTTP/additionalDetails.html b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.PutSplunkHTTP/additionalDetails.html new file mode 100644 index 0000000000..eb0dd821a0 --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.PutSplunkHTTP/additionalDetails.html @@ -0,0 +1,75 @@ + + + + + + PutSplunkHTTP + + + + +

PutSplunkHTTP

+ +

+ This processor serves as a counterpart for PutSplunk processor. While the later solves communication using TCP and + UDP protocols, PutSplunkHTTP aims to send events into Splunk via HTTP or HTTPS. In this fashion, this processor + shows similarities with GetSplunk processor and the properties relevant to the connection with Splunk server are + identical. There are however some aspects unique for this processor: +

+ +

Content details

+ +

+ PutSplunkHTTP allows the user to specify some metadata about the event being sent to the Splunk. These include: the + "Character Set" and the "Content Type" of the flow file content, using the matching properties. If the incoming + flow file has "mime.type" attribute, the processor will use it, unless the "Content Type" property is set, in which + case the property will override the flow file attribute. +

+ +

Event parameters

+ +

+ The "Source", "Source Type", "Host" and "Index" properties are optional and will be set by Splunk if unspecified. If set, + the default values will be overwritten by user specified ones. For more details about the Splunk API, please visit + this documentation. +

+ +

Acknowledgements

+ +

+ HTTP Event Collector (HEC) in Splunk provides the possibility of index acknowledgement, which can be used to monitor + the indexing status of the individual events. PutSplunkHTTP supports this feature by enriching the outgoing flow file + with the necessary information, making it possible for a later processor to poll the status based on. The necessary + information for this is stored within flow file attributes "splunk.acknowledgement.id" and "splunk.responded.at". +

+ +

+ For further steps of acknowledgement handling in NiFi side, please refer to QuerySplunkIndexingStatus processor. For more + details about the index acknowledgement, please visit this documentation. +

+ +

Error information

+ +

+ For more refined processing, flow files are enriched with additional information if possible. The information is stored + in the flow file attribute "splunk.status.code" or "splunk.response.code", depending on the success of the processing. + The attribute "splunk.status.code" is always filled when the Splunk API call is executed and contains the HTTP status code + of the response. In case the flow file transferred into "failure" relationship, the "splunk.response.code" might be + also filled, based on the Splunk response code. +

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html new file mode 100644 index 0000000000..9d81de8614 --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html @@ -0,0 +1,76 @@ + + + + + + QuerySplunkIndexingStatus + + + + +

QuerySplunkIndexingStatus

+ +

+ This processor is responsible for polling Splunk server and determine if a Splunk event is acknowledged at the time of + execution. For more details about the HEC Index Acknowledgement please see + this documentation. +

+ +

Prerequisites

+ +

+ In order to work properly, the incoming flow files need to have the attributes "splunk.acknowledgement.id" and + "splunk.responded.at" filled properly. The flow file attribute "splunk.acknowledgement.id" should continue the "ackId" + contained by the response of the Splunk from the original put call. The flow file attribute "splunk.responded.at" + should contain the Unix Epoch the put call was answered by Splunk. It is suggested to use PutSplunkHTTP processor to execute + the put call and set these attributes. +

+ +

Unacknowledged and undetermined cases

+ +

+ Splunk serves information only about successful acknowledgement. In every other case it will return a value of false. This + includes unsuccessful or ongoing indexing and unknown acknowledgement identifiers. In order to avoid infinite tries, + QuerySplunkIndexingStatus gives user the possibility to set a "Maximum waiting time". Results with value of false from Splunk + within the specified waiting time will be handled as "undetermined" and are transferred to the "undetermined" relationship. + Flow files outside of this time range will be transferred to the "unacknowledged" relationship next time the processor is + triggered. In order to determine if the indexing of a given event is within the waiting time, the Unix Epoch of the original + Splunk response is stored in the attribute "splunk.responded.at". Setting "Maximum waiting time" too low might + result some false negative result as in case under higher load, Splunk server might index slower than it is expected. +

+ +

+ Undetermined cases are normal in healthy environment as it is possible that NiFi asks for indexing status before Splunk + finishes and acknowledges it. These cases are safe to retry and it is suggested to loop "undetermined" relationship + back to the processor for later try. Flow files transferred into the "Undetermined" relationship are penalized. +

+ +

Performance

+ +

+ Please keep Splunk channel limitations in mind: there are multiple configuration parameters in Splunk which might have direct + effect on the performance and behaviour of the QuerySplunkIndexingStatus processor. For example "max_number_of_acked_requests_pending_query" + and "max_number_of_acked_requests_pending_query_per_ack_channel" might limit the amount of ackIDs, the Splunk stores. +

+ +

+ Also, it is suggested to execute the query in batches. The "Maximum Query Size" property might be used for fine tune + the maximum number of events the processor will query about in one API request. This serves as an upper limit for the + batch but the processor might execute the query with less number of undetermined events. +

+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunkHTTP.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunkHTTP.java new file mode 100644 index 0000000000..8e30424b21 --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunkHTTP.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import com.splunk.Service; +import com.splunk.ServiceArgs; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.util.Collections; + +@RunWith(MockitoJUnitRunner.class) +public class TestPutSplunkHTTP { + private static final String ACK_ID = "1234"; + private static final String EVENT = "{\"a\"=\"b\",\"c\"=\"d\",\"e\"=\"f\"}"; + private static final String SUCCESS_RESPONSE = + "{\n" + + " \"text\": \"Success\",\n" + + " \"code\": 0,\n" + + " \"ackId\": " + ACK_ID + "\n" + + "}"; + private static final String FAILURE_RESPONSE = "{\n" + + " \"text\": \"Failure\",\n" + + " \"code\": 13\n" + + "}"; + + @Mock + private Service service; + + @Mock + private ResponseMessage response; + + private MockedPutSplunkHTTP processor; + private TestRunner testRunner; + + private ArgumentCaptor path; + private ArgumentCaptor request; + + @Before + public void setUp() { + processor = new MockedPutSplunkHTTP(service); + testRunner = TestRunners.newTestRunner(processor); + testRunner.setProperty(SplunkAPICall.SCHEME, "http"); + testRunner.setProperty(SplunkAPICall.TOKEN, "Splunk 888c5a81-8777-49a0-a3af-f76e050ab5d9"); + testRunner.setProperty(SplunkAPICall.REQUEST_CHANNEL, "22bd7414-0d77-4c73-936d-c8f5d1b21862"); + + path = ArgumentCaptor.forClass(String.class); + request = ArgumentCaptor.forClass(RequestMessage.class); + Mockito.when(service.send(path.capture(), request.capture())).thenReturn(response); + } + + @After + public void tearDown() { + testRunner.shutdown(); + } + + @Test + public void testRunSuccess() throws Exception { + // given + givenSplunkReturnsWithSuccess(); + + // when + testRunner.enqueue(givenFlowFile()); + testRunner.run(); + + // then + testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_SUCCESS, 1); + final MockFlowFile outgoingFlowFile = testRunner.getFlowFilesForRelationship(PutSplunkHTTP.RELATIONSHIP_SUCCESS).get(0); + + Assert.assertEquals(EVENT, outgoingFlowFile.getContent()); + Assert.assertEquals(ACK_ID, outgoingFlowFile.getAttribute("splunk.acknowledgement.id")); + Assert.assertNotNull(outgoingFlowFile.getAttribute("splunk.responded.at")); + Assert.assertEquals("200", outgoingFlowFile.getAttribute("splunk.status.code")); + Assert.assertEquals("application/json", request.getValue().getHeader().get("Content-Type")); + } + + @Test + public void testHappyPathWithCustomQueryParameters() throws Exception { + // given + testRunner.setProperty(PutSplunkHTTP.SOURCE, "test_source"); + testRunner.setProperty(PutSplunkHTTP.SOURCE_TYPE, "test_source_type"); + givenSplunkReturnsWithSuccess(); + + // when + testRunner.enqueue(EVENT); + testRunner.run(); + + // then + testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_SUCCESS, 1); + Assert.assertEquals("%2Fservices%2Fcollector%2Fraw%3Fsourcetype%3Dtest_source_type%26source%3Dtest_source", path.getValue()); + } + + @Test + public void testHappyPathWithContentType() throws Exception { + // given + testRunner.setProperty(PutSplunkHTTP.CONTENT_TYPE, "text/xml"); + givenSplunkReturnsWithSuccess(); + + // when + testRunner.enqueue(givenFlowFile()); + testRunner.run(); + + // then + testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_SUCCESS, 1); + Assert.assertEquals("text/xml", request.getValue().getHeader().get("Content-Type")); + } + + @Test + public void testSplunkCallFailure() throws Exception { + // given + givenSplunkReturnsWithFailure(); + + // when + testRunner.enqueue(givenFlowFile()); + testRunner.run(); + + // then + testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_FAILURE, 1); + final MockFlowFile outgoingFlowFile = testRunner.getFlowFilesForRelationship(PutSplunkHTTP.RELATIONSHIP_FAILURE).get(0); + + Assert.assertEquals(EVENT, outgoingFlowFile.getContent()); + Assert.assertNull(outgoingFlowFile.getAttribute("splunk.acknowledgement.id")); + Assert.assertNull(outgoingFlowFile.getAttribute("splunk.responded.at")); + Assert.assertEquals("200", outgoingFlowFile.getAttribute("splunk.status.code")); + Assert.assertEquals("13", outgoingFlowFile.getAttribute("splunk.response.code")); + } + + @Test + public void testSplunkApplicationFailure() throws Exception { + // given + givenSplunkReturnsWithApplicationFailure(403); + + // when + testRunner.enqueue(givenFlowFile()); + testRunner.run(); + + // then + testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_FAILURE, 1); + final MockFlowFile outgoingFlowFile = testRunner.getFlowFilesForRelationship(PutSplunkHTTP.RELATIONSHIP_FAILURE).get(0); + + Assert.assertEquals(EVENT, outgoingFlowFile.getContent()); + Assert.assertNull(outgoingFlowFile.getAttribute("splunk.acknowledgement.id")); + Assert.assertNull(outgoingFlowFile.getAttribute("splunk.responded.at")); + Assert.assertNull(outgoingFlowFile.getAttribute("splunk.response.code")); + Assert.assertEquals("403", outgoingFlowFile.getAttribute("splunk.status.code")); + } + + + private MockFlowFile givenFlowFile() throws UnsupportedEncodingException { + final MockFlowFile result = new MockFlowFile(System.currentTimeMillis()); + result.setData(EVENT.getBytes("UTF-8")); + result.putAttributes(Collections.singletonMap("mime.type", "application/json")); + return result; + } + + private void givenSplunkReturnsWithSuccess() throws Exception { + final InputStream inputStream = new ByteArrayInputStream(SUCCESS_RESPONSE.getBytes("UTF-8")); + Mockito.when(response.getStatus()).thenReturn(200); + Mockito.when(response.getContent()).thenReturn(inputStream); + } + + private void givenSplunkReturnsWithFailure() throws Exception { + final InputStream inputStream = new ByteArrayInputStream(FAILURE_RESPONSE.getBytes("UTF-8")); + Mockito.when(response.getStatus()).thenReturn(200); + Mockito.when(response.getContent()).thenReturn(inputStream); + } + + private void givenSplunkReturnsWithApplicationFailure(int code) throws Exception { + final InputStream inputStream = new ByteArrayInputStream("non-json-content".getBytes("UTF-8")); + Mockito.when(response.getStatus()).thenReturn(code); + Mockito.when(response.getContent()).thenReturn(inputStream); + } + + public static class MockedPutSplunkHTTP extends PutSplunkHTTP { + final Service serviceMock; + + public MockedPutSplunkHTTP(final Service serviceMock) { + this.serviceMock = serviceMock; + } + + @Override + protected Service getSplunkService(final ServiceArgs splunkServiceArguments) { + return serviceMock; + } + } +} diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java new file mode 100644 index 0000000000..2b91f17f2f --- /dev/null +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.splunk; + +import com.splunk.RequestMessage; +import com.splunk.ResponseMessage; +import com.splunk.Service; +import com.splunk.ServiceArgs; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@RunWith(MockitoJUnitRunner.class) +public class TestQuerySplunkIndexingStatus { + private static final String EVENT = "{\"a\"=\"b\",\"c\"=\"d\",\"e\"=\"f\"}"; + + @Mock + private Service service; + + @Mock + private ResponseMessage response; + + private MockedQuerySplunkIndexingStatus processor; + private TestRunner testRunner; + + private ArgumentCaptor path; + private ArgumentCaptor request; + + @Before + public void setUp() { + processor = new MockedQuerySplunkIndexingStatus(service); + testRunner = TestRunners.newTestRunner(processor); + testRunner.setProperty(SplunkAPICall.SCHEME, "http"); + testRunner.setProperty(SplunkAPICall.TOKEN, "Splunk 888c5a81-8777-49a0-a3af-f76e050ab5d9"); + testRunner.setProperty(SplunkAPICall.REQUEST_CHANNEL, "22bd7414-0d77-4c73-936d-c8f5d1b21862"); + + path = ArgumentCaptor.forClass(String.class); + request = ArgumentCaptor.forClass(RequestMessage.class); + Mockito.when(service.send(path.capture(), request.capture())).thenReturn(response); + } + + @After + public void tearDown() { + testRunner.shutdown(); + } + + @Test + public void testRunSuccess() throws Exception { + // given + final Map acks = new HashMap<>(); + acks.put(1, true); + acks.put(2, false); + givenSplunkReturns(acks); + + // when + testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis())); + testRunner.enqueue(givenFlowFile(2, System.currentTimeMillis())); + testRunner.run(); + + // then + final List acknowledged = testRunner.getFlowFilesForRelationship(QuerySplunkIndexingStatus.RELATIONSHIP_ACKNOWLEDGED); + final List undetermined = testRunner.getFlowFilesForRelationship(QuerySplunkIndexingStatus.RELATIONSHIP_UNDETERMINED); + + Assert.assertEquals(1, acknowledged.size()); + Assert.assertEquals(1, undetermined.size()); + Assert.assertFalse(acknowledged.get(0).isPenalized()); + Assert.assertTrue(undetermined.get(0).isPenalized()); + } + + @Test + public void testMoreIncomingFlowFileThanQueryLimit() throws Exception { + // given + testRunner.setProperty(QuerySplunkIndexingStatus.MAX_QUERY_SIZE, "2"); + final Map acks = new HashMap<>(); + acks.put(1, true); + acks.put(2, true); + givenSplunkReturns(acks); + + // when + testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis())); + testRunner.enqueue(givenFlowFile(2, System.currentTimeMillis())); + testRunner.enqueue(givenFlowFile(3, System.currentTimeMillis())); + testRunner.run(); + + // then + Assert.assertEquals("{\"acks\":[1,2]}", request.getValue().getContent()); + Assert.assertEquals(1, testRunner.getQueueSize().getObjectCount()); + testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_ACKNOWLEDGED, 2); + } + + @Test + public void testTimedOutEvents() throws Exception { + // when + testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2))); + testRunner.run(); + + // then + Mockito.verify(service, Mockito.never()).send(Mockito.anyString(), Mockito.any(RequestMessage.class)); + testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_UNACKNOWLEDGED, 1); + } + + @Test + public void testWhenFlowFileIsLackOfNecessaryAttributes() throws Exception { + // when + testRunner.enqueue(EVENT); + testRunner.run(); + + // then + testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_FAILURE, 1); + } + + @Test + public void testWhenSplunkReturnsWithError() throws Exception { + // given + givenSplunkReturnsWithFailure(); + + // when + testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis())); + testRunner.enqueue(givenFlowFile(2, System.currentTimeMillis())); + testRunner.enqueue(givenFlowFile(3, System.currentTimeMillis())); + testRunner.run(); + + // then + testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_UNDETERMINED, 3); + } + + private void givenSplunkReturns(final Map acks) throws Exception { + final StringBuilder responseContent = new StringBuilder("{\"acks\":{") + .append(acks.entrySet().stream().map(e -> "\"" + e.getKey() + "\": " + e.getValue()).collect(Collectors.joining(", "))) + .append("}}"); + + final InputStream inputStream = new ByteArrayInputStream(responseContent.toString().getBytes("UTF-8")); + Mockito.when(response.getStatus()).thenReturn(200); + Mockito.when(response.getContent()).thenReturn(inputStream); + } + + private void givenSplunkReturnsWithFailure() { + Mockito.when(response.getStatus()).thenReturn(403); + } + + private MockFlowFile givenFlowFile(final int ackId, final long sentAt) throws UnsupportedEncodingException { + final MockFlowFile result = new MockFlowFile(ackId); + result.setData(EVENT.getBytes("UTF-8")); + Map attributes = new HashMap<>(); + attributes.put("splunk.acknowledgement.id", String.valueOf(ackId)); + attributes.put("splunk.responded.at", String.valueOf(sentAt)); + result.putAttributes(attributes); + return result; + } + + public static class MockedQuerySplunkIndexingStatus extends QuerySplunkIndexingStatus { + final Service serviceMock; + + public MockedQuerySplunkIndexingStatus(final Service serviceMock) { + this.serviceMock = serviceMock; + } + + @Override + protected Service getSplunkService(final ServiceArgs splunkServiceArguments) { + return serviceMock; + } + } +}