NIFI-7801 Adding support for HTTP based Splunk put and indexed acknowledgement

This closes #4714.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Bence Simon 2020-12-07 22:27:47 +01:00 committed by Mark Payne
parent 0ff4367781
commit 0a10557dd5
11 changed files with 1438 additions and 1 deletions

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dto.splunk;
import java.util.List;
/**
* Request object for Splunk event index status query.
*/
public class EventIndexStatusRequest {
private List<Long> acks;
public List<Long> getAcks() {
return acks;
}
public void setAcks(final List<Long> acks) {
this.acks = acks;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dto.splunk;
import java.util.Map;
/**
* Response object for Splunk event index status query.
*/
public class EventIndexStatusResponse {
private Map<Long, Boolean> acks;
public Map<Long, Boolean> getAcks() {
return acks;
}
public void setAcks(final Map<Long, Boolean> acks) {
this.acks = acks;
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dto.splunk;
/**
* Response object for sending raw data directly to HTTP Event Collector in Splunk.
* For details, see {@see https://docs.splunk.com/Documentation/Splunk/LATEST/RESTREF/RESTinput#services.2Fcollector.2Fraw}
*/
public class SendRawDataResponse {
private String text;
private int code;
private long ackId;
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public long getAckId() {
return ackId;
}
public void setAckId(final long ackId) {
this.ackId = ackId;
}
}

View File

@ -0,0 +1,291 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.splunk;
import com.splunk.RequestMessage;
import com.splunk.ResponseMessage;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dto.splunk.SendRawDataResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"splunk", "logs", "http"})
@CapabilityDescription("Sends flow file content to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.")
@ReadsAttribute(attribute = "mime.type", description = "Uses as value for HTTP Content-Type header if set.")
@WritesAttributes({
@WritesAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."),
@WritesAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")})
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@SeeAlso(QuerySplunkIndexingStatus.class)
public class PutSplunkHTTP extends SplunkAPICall {
private static final String ENDPOINT = "/services/collector/raw";
static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder()
.name("source")
.displayName("Source")
.description("User-defined event source. Sets a default for all events when unspecified.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
.name("source-type")
.displayName("Source Type")
.description("User-defined event sourcetype. Sets a default for all events when unspecified.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
.name("host")
.displayName("Host")
.description("Specify with the host query string parameter. Sets a default for all events when unspecified.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("index")
.displayName("Index")
.description("Index name. Specify with the index query string parameter. Sets a default for all events when unspecified.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("character-set")
.displayName("Character Set")
.description("The name of the character set.")
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue(Charset.defaultCharset().name())
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("content-type")
.displayName("Content Type")
.description(
"The media type of the event sent to Splunk. " +
"If not set, \"mime.type\" flow file attribute will be used. " +
"In case of neither of them is specified, this information will not be sent to the server.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are sent successfully to the destination are sent to this relationship.")
.build();
static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to send to the destination are sent to this relationship.")
.build();
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
RELATIONSHIP_SUCCESS,
RELATIONSHIP_FAILURE)));
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> result = new ArrayList<>(super.getSupportedPropertyDescriptors());
result.add(SOURCE);
result.add(SOURCE_TYPE);
result.add(HOST);
result.add(INDEX);
result.add(CONTENT_TYPE);
result.add(CHARSET);
return result;
}
private volatile String endpoint;
private volatile String contentType;
private volatile String charset;
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
if (context.getProperty(CONTENT_TYPE).isSet()) {
contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
}
charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
final Map<String, String> queryParameters = new HashMap<>();
if (context.getProperty(SOURCE_TYPE).isSet()) {
queryParameters.put("sourcetype", context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(SOURCE).isSet()) {
queryParameters.put("source", context.getProperty(SOURCE).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(HOST).isSet()) {
queryParameters.put("host", context.getProperty(HOST).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(INDEX).isSet()) {
queryParameters.put("index", context.getProperty(INDEX).evaluateAttributeExpressions().getValue());
}
endpoint = getEndpoint(queryParameters);
}
private String getEndpoint(final Map<String, String> queryParameters) {
if (queryParameters.isEmpty()) {
return ENDPOINT;
}
try {
return URLEncoder.encode(ENDPOINT + '?' + queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + e.getValue()).collect(Collectors.joining("&")), "UTF-8");
} catch (final UnsupportedEncodingException e) {
getLogger().error("Could not be initialized because of: {}", new Object[] {e.getMessage()}, e);
throw new ProcessException(e);
}
}
@OnStopped
public void onStopped() {
super.onStopped();
contentType = null;
endpoint = null;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
ResponseMessage responseMessage = null;
FlowFile flowFile = session.get();
boolean success = false;
if (flowFile == null) {
return;
}
try {
final RequestMessage requestMessage = createRequestMessage(session, flowFile);
responseMessage = call(endpoint, requestMessage);
flowFile = session.putAttribute(flowFile, "splunk.status.code", String.valueOf(responseMessage.getStatus()));
switch (responseMessage.getStatus()) {
case 200:
final SendRawDataResponse successResponse = unmarshallResult(responseMessage.getContent(), SendRawDataResponse.class);
if (successResponse.getCode() == 0) {
flowFile = enrichFlowFile(session, flowFile, successResponse.getAckId());
success = true;
} else {
flowFile = session.putAttribute(flowFile, "splunk.response.code", String.valueOf(successResponse.getCode()));
getLogger().error("Putting data into Splunk was not successful: ({}) {}", new Object[] {successResponse.getCode(), successResponse.getText()});
}
break;
case 503 : // HEC is unhealthy, queues are full
context.yield();
// fall-through
default:
getLogger().error("Putting data into Splunk was not successful. Response with header {} was: {}",
new Object[] {responseMessage.getStatus(), IOUtils.toString(responseMessage.getContent(), "UTF-8")});
}
} catch (final Exception e) {
getLogger().error("Error during communication with Splunk: {}", new Object[] {e.getMessage()}, e);
if (responseMessage != null) {
try {
getLogger().error("The response content is: {}", new Object[]{IOUtils.toString(responseMessage.getContent(), "UTF-8")});
} catch (final IOException ioException) {
getLogger().error("An error occurred during reading response content!");
}
}
} finally {
session.transfer(flowFile, success ? RELATIONSHIP_SUCCESS : RELATIONSHIP_FAILURE);
}
}
private RequestMessage createRequestMessage(final ProcessSession session, final FlowFile flowFile) {
final RequestMessage requestMessage = new RequestMessage("POST");
final String flowFileContentType = Optional.ofNullable(contentType).orElse(flowFile.getAttribute("mime.type"));
if (flowFileContentType != null) {
requestMessage.getHeader().put("Content-Type", flowFileContentType);
}
// The current version of Splunk's {@link com.splunk.Service} class is lack of support for OutputStream as content.
// For further details please visit {@link com.splunk.HttpService#send} which is called internally.
requestMessage.setContent(extractTextMessageBody(flowFile, session, charset));
return requestMessage;
}
private String extractTextMessageBody(final FlowFile flowFile, final ProcessSession session, final String charset) {
final StringWriter writer = new StringWriter();
session.read(flowFile, in -> IOUtils.copy(in, writer, Charset.forName(charset)));
return writer.toString();
}
private FlowFile enrichFlowFile(final ProcessSession session, final FlowFile flowFile, final long ackId) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE, String.valueOf(ackId));
attributes.put(SplunkAPICall.RESPONDED_AT_ATTRIBUTE, String.valueOf(System.currentTimeMillis()));
return session.putAllAttributes(flowFile, attributes);
}
}

View File

@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.splunk;
import com.splunk.RequestMessage;
import com.splunk.ResponseMessage;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"splunk", "logs", "http", "acknowledgement"})
@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
@ReadsAttributes({
@ReadsAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."),
@ReadsAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")})
@SeeAlso(PutSplunkHTTP.class)
public class QuerySplunkIndexingStatus extends SplunkAPICall {
private static final String ENDPOINT = "/services/collector/ack";
static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder()
.name("success")
.description("A FlowFile is transferred to this relationship when the acknowledgement was successful.")
.build();
static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder()
.name("unacknowledged")
.description(
"A FlowFile is transferred to this relationship when the acknowledgement was not successful. " +
"This can happen when the acknowledgement did not happened within the time period set for Maximum Waiting Time. " +
"FlowFiles with acknowledgement id unknown for the Splunk server will be transferred to this relationship after the Maximum Waiting Time is reached.")
.build();
static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder()
.name("undetermined")
.description(
"A FlowFile is transferred to this relationship when the acknowledgement state is not determined. " +
"FlowFiles transferred to this relationship might be penalized. " +
"This happens when Splunk returns with HTTP 200 but with false response for the acknowledgement id in the flow file attribute.")
.build();
static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
.name("failure")
.description(
"A FlowFile is transferred to this relationship when the acknowledgement was not successful due to errors during the communication. " +
"FlowFiles are timing out or unknown by the Splunk server will transferred to \"undetermined\" relationship.")
.build();
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
RELATIONSHIP_ACKNOWLEDGED,
RELATIONSHIP_UNACKNOWLEDGED,
RELATIONSHIP_UNDETERMINED,
RELATIONSHIP_FAILURE
)));
static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
.name("ttl")
.displayName("Maximum Waiting Time")
.description(
"The maximum time the processor tries to acquire acknowledgement confirmation for an index, from the point of registration. " +
"After the given amount of time, the processor considers the index as not acknowledged and transfers the FlowFile to the \"unacknowledged\" relationship.")
.defaultValue("1 hour")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder()
.name("max-query-size")
.displayName("Maximum Query Size")
.description(
"The maximum number of acknowledgement identifiers the outgoing query contains in one batch. " +
"It is recommended not to set it too low in order to reduce network communication.")
.defaultValue("10000")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
private volatile Integer maxQuerySize;
private volatile Integer ttl;
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> result = new ArrayList<>();
final List<PropertyDescriptor> common = super.getSupportedPropertyDescriptors();
result.addAll(common);
result.add(TTL);
result.add(MAX_QUERY_SIZE);
return result;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger();
ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
}
@OnStopped
public void onStopped() {
super.onStopped();
maxQuerySize = null;
ttl = null;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final RequestMessage requestMessage;
final List<FlowFile> flowFiles = session.get(maxQuerySize);
if (flowFiles.isEmpty()) {
return;
}
final long currentTime = System.currentTimeMillis();
final Map<Long, FlowFile> undetermined = new HashMap<>();
for (final FlowFile flowFile : flowFiles) {
final Optional<Long> sentAt = extractLong(flowFile.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE));
final Optional<Long> ackId = extractLong(flowFile.getAttribute(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE));
if (!sentAt.isPresent() || !ackId.isPresent()) {
getLogger().error("Flow file ({}) attributes {} and {} are expected to be set using 64-bit integer values!",
new Object[]{flowFile.getId(), SplunkAPICall.RESPONDED_AT_ATTRIBUTE, SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
session.transfer(flowFile, RELATIONSHIP_FAILURE);
} else if (sentAt.get() + ttl < currentTime) {
session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);
} else {
undetermined.put(ackId.get(), flowFile);
}
}
if (undetermined.isEmpty()) {
getLogger().debug("There was no eligible flow file to send request to Splunk.");
return;
}
try {
requestMessage = createRequestMessage(undetermined);
} catch (final IOException e) {
getLogger().error("Could not prepare Splunk request!", e);
session.transfer(undetermined.values(), RELATIONSHIP_FAILURE);
return;
}
try {
final ResponseMessage responseMessage = call(ENDPOINT, requestMessage);
if (responseMessage.getStatus() == 200) {
final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
splunkResponse.getAcks().entrySet().forEach(result -> {
final FlowFile toTransfer = undetermined.get(result.getKey());
if (result.getValue()) {
session.transfer(toTransfer, RELATIONSHIP_ACKNOWLEDGED);
} else {
session.penalize(toTransfer);
session.transfer(toTransfer, RELATIONSHIP_UNDETERMINED);
}
});
} else {
getLogger().error("Query index status was not successful because of ({}) {}", new Object[] {responseMessage.getStatus(), responseMessage.getContent()});
context.yield();
session.transfer(undetermined.values(), RELATIONSHIP_UNDETERMINED);
}
} catch (final Exception e) {
getLogger().error("Error during communication with Splunk server", e);
session.transfer(undetermined.values(), RELATIONSHIP_FAILURE);
}
}
private RequestMessage createRequestMessage(Map<Long, FlowFile> undetermined) throws IOException {
final RequestMessage requestMessage = new RequestMessage("POST");
requestMessage.getHeader().put("Content-Type", "application/json");
requestMessage.setContent(generateContent(undetermined));
return requestMessage;
}
private String generateContent(final Map<Long, FlowFile> undetermined) throws IOException {
final EventIndexStatusRequest splunkRequest = new EventIndexStatusRequest();
splunkRequest.setAcks(new ArrayList<>(undetermined.keySet()));
return marshalRequest(splunkRequest);
}
private static Optional<Long> extractLong(final String value) {
try {
return Optional.ofNullable(value).map(Long::valueOf);
} catch (final NumberFormatException e) {
return Optional.empty();
}
}
}

View File

@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.splunk;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.splunk.HttpException;
import com.splunk.RequestMessage;
import com.splunk.ResponseMessage;
import com.splunk.SSLSecurityProtocol;
import com.splunk.Service;
import com.splunk.ServiceArgs;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
abstract class SplunkAPICall extends AbstractProcessor {
private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel";
private static final String HTTP_SCHEME = "http";
private static final String HTTPS_SCHEME = "https";
private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());
static final String ACKNOWLEDGEMENT_ID_ATTRIBUTE = "splunk.acknowledgement.id";
static final String RESPONDED_AT_ATTRIBUTE = "splunk.responded.at";
static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
.name("Scheme")
.description("The scheme for connecting to Splunk.")
.allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
.defaultValue(HTTPS_SCHEME)
.required(true)
.build();
static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The ip address or hostname of the Splunk server.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("localhost")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor PORT = new PropertyDescriptor
.Builder().name("Port")
.description("The HTTP Port Number of the Splunk server.")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.defaultValue("9088")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
.name("Security Protocol")
.description("The security protocol to use for communicating with Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
.defaultValue(TLS_1_2_VALUE.getValue())
.build();
static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
.name("Owner")
.description("The owner to pass to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
.name("Token")
.description("The token to pass to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("Username")
.description("The username to authenticate to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("The password to authenticate to Splunk.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.sensitive(true)
.build();
static final PropertyDescriptor REQUEST_CHANNEL = new PropertyDescriptor.Builder()
.name("request-channel")
.displayName("Splunk Request Channel")
.description("Identifier of the used request channel.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
SCHEME,
HOSTNAME,
PORT,
SECURITY_PROTOCOL,
OWNER,
TOKEN,
USERNAME,
PASSWORD,
REQUEST_CHANNEL
);
private final JsonFactory jsonFactory = new JsonFactory();
private final ObjectMapper jsonObjectMapper = new ObjectMapper(jsonFactory);
private volatile ServiceArgs splunkServiceArguments;
private volatile Service splunkService;
private volatile String requestChannel;
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return SplunkAPICall.PROPERTIES;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
splunkServiceArguments = getSplunkServiceArgs(context);
splunkService = getSplunkService(splunkServiceArguments);
requestChannel = context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue();
}
private ServiceArgs getSplunkServiceArgs(final ProcessContext context) {
final ServiceArgs splunkServiceArguments = new ServiceArgs();
splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue());
splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger());
if (context.getProperty(OWNER).isSet()) {
splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(TOKEN).isSet()) {
splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(USERNAME).isSet()) {
splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(PASSWORD).isSet()) {
splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue());
}
if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && context.getProperty(SECURITY_PROTOCOL).isSet()) {
splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue()));
}
return splunkServiceArguments;
}
protected Service getSplunkService(final ServiceArgs splunkServiceArguments) {
return Service.connect(splunkServiceArguments);
}
@OnStopped
public void onStopped() {
if (splunkService != null) {
splunkService.logout();
splunkService = null;
}
requestChannel = null;
splunkServiceArguments = null;
}
protected ResponseMessage call(final String endpoint, final RequestMessage request) {
request.getHeader().put(REQUEST_CHANNEL_HEADER_NAME, requestChannel);
try {
return splunkService.send(endpoint, request);
//Catch Stale connection exception, reinitialize, and retry
} catch (final HttpException e) {
getLogger().error("Splunk request status code: {}. Retrying the request.", new Object[] {e.getStatus()});
splunkService.logout();
splunkService = getSplunkService(splunkServiceArguments);
return splunkService.send(endpoint, request);
}
}
protected <T> T unmarshallResult(final InputStream responseBody, final Class<T> type) throws IOException {
return jsonObjectMapper.readValue(responseBody, type);
}
protected String marshalRequest(final Object request) throws IOException {
return jsonObjectMapper.writeValueAsString(request);
}
}

View File

@ -13,4 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.splunk.GetSplunk
org.apache.nifi.processors.splunk.PutSplunk
org.apache.nifi.processors.splunk.PutSplunk
org.apache.nifi.processors.splunk.PutSplunkHTTP
org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus

View File

@ -0,0 +1,75 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutSplunkHTTP</title>
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
</head>
<body>
<h2>PutSplunkHTTP</h2>
<p>
This processor serves as a counterpart for PutSplunk processor. While the later solves communication using TCP and
UDP protocols, PutSplunkHTTP aims to send events into Splunk via HTTP or HTTPS. In this fashion, this processor
shows similarities with GetSplunk processor and the properties relevant to the connection with Splunk server are
identical. There are however some aspects unique for this processor:
</p>
<h3>Content details</h3>
<p>
PutSplunkHTTP allows the user to specify some metadata about the event being sent to the Splunk. These include: the
"Character Set" and the "Content Type" of the flow file content, using the matching properties. If the incoming
flow file has "mime.type" attribute, the processor will use it, unless the "Content Type" property is set, in which
case the property will override the flow file attribute.
</p>
<h3>Event parameters</h3>
<p>
The "Source", "Source Type", "Host" and "Index" properties are optional and will be set by Splunk if unspecified. If set,
the default values will be overwritten by user specified ones. For more details about the Splunk API, please visit
<a href="https://docs.splunk.com/Documentation/Splunk/LATEST/RESTREF/RESTinput#services.2Fcollector.2Fraw">this documentation</a>.
</p>
<h3>Acknowledgements</h3>
<p>
HTTP Event Collector (HEC) in Splunk provides the possibility of index acknowledgement, which can be used to monitor
the indexing status of the individual events. PutSplunkHTTP supports this feature by enriching the outgoing flow file
with the necessary information, making it possible for a later processor to poll the status based on. The necessary
information for this is stored within flow file attributes "splunk.acknowledgement.id" and "splunk.responded.at".
</p>
<p>
For further steps of acknowledgement handling in NiFi side, please refer to QuerySplunkIndexingStatus processor. For more
details about the index acknowledgement, please visit <a href="https://docs.splunk.com/Documentation/Splunk/LATEST/Data/AboutHECIDXAck">this documentation</a>.
</p>
<h3>Error information</h3>
<p>
For more refined processing, flow files are enriched with additional information if possible. The information is stored
in the flow file attribute "splunk.status.code" or "splunk.response.code", depending on the success of the processing.
The attribute "splunk.status.code" is always filled when the Splunk API call is executed and contains the HTTP status code
of the response. In case the flow file transferred into "failure" relationship, the "splunk.response.code" might be
also filled, based on the Splunk response code.
</p>
</body>
</html>

View File

@ -0,0 +1,76 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>QuerySplunkIndexingStatus</title>
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
</head>
<body>
<h2>QuerySplunkIndexingStatus</h2>
<p>
This processor is responsible for polling Splunk server and determine if a Splunk event is acknowledged at the time of
execution. For more details about the HEC Index Acknowledgement please see
<a href="https://docs.splunk.com/Documentation/Splunk/LATEST/Data/AboutHECIDXAck">this documentation.</a>
</p>
<h3>Prerequisites</h3>
<p>
In order to work properly, the incoming flow files need to have the attributes "splunk.acknowledgement.id" and
"splunk.responded.at" filled properly. The flow file attribute "splunk.acknowledgement.id" should continue the "ackId"
contained by the response of the Splunk from the original put call. The flow file attribute "splunk.responded.at"
should contain the Unix Epoch the put call was answered by Splunk. It is suggested to use PutSplunkHTTP processor to execute
the put call and set these attributes.
</p>
<h3>Unacknowledged and undetermined cases</h3>
<p>
Splunk serves information only about successful acknowledgement. In every other case it will return a value of false. This
includes unsuccessful or ongoing indexing and unknown acknowledgement identifiers. In order to avoid infinite tries,
QuerySplunkIndexingStatus gives user the possibility to set a "Maximum waiting time". Results with value of false from Splunk
within the specified waiting time will be handled as "undetermined" and are transferred to the "undetermined" relationship.
Flow files outside of this time range will be transferred to the "unacknowledged" relationship next time the processor is
triggered. In order to determine if the indexing of a given event is within the waiting time, the Unix Epoch of the original
Splunk response is stored in the attribute "splunk.responded.at". Setting "Maximum waiting time" too low might
result some false negative result as in case under higher load, Splunk server might index slower than it is expected.
</p>
<p>
Undetermined cases are normal in healthy environment as it is possible that NiFi asks for indexing status before Splunk
finishes and acknowledges it. These cases are safe to retry and it is suggested to loop "undetermined" relationship
back to the processor for later try. Flow files transferred into the "Undetermined" relationship are penalized.
</p>
<h3>Performance</h3>
<p>
Please keep Splunk channel limitations in mind: there are multiple configuration parameters in Splunk which might have direct
effect on the performance and behaviour of the QuerySplunkIndexingStatus processor. For example "max_number_of_acked_requests_pending_query"
and "max_number_of_acked_requests_pending_query_per_ack_channel" might limit the amount of ackIDs, the Splunk stores.
</p>
<p>
Also, it is suggested to execute the query in batches. The "Maximum Query Size" property might be used for fine tune
the maximum number of events the processor will query about in one API request. This serves as an upper limit for the
batch but the processor might execute the query with less number of undetermined events.
</p>
</body>
</html>

View File

@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.splunk;
import com.splunk.RequestMessage;
import com.splunk.ResponseMessage;
import com.splunk.Service;
import com.splunk.ServiceArgs;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
@RunWith(MockitoJUnitRunner.class)
public class TestPutSplunkHTTP {
private static final String ACK_ID = "1234";
private static final String EVENT = "{\"a\"=\"b\",\"c\"=\"d\",\"e\"=\"f\"}";
private static final String SUCCESS_RESPONSE =
"{\n" +
" \"text\": \"Success\",\n" +
" \"code\": 0,\n" +
" \"ackId\": " + ACK_ID + "\n" +
"}";
private static final String FAILURE_RESPONSE = "{\n" +
" \"text\": \"Failure\",\n" +
" \"code\": 13\n" +
"}";
@Mock
private Service service;
@Mock
private ResponseMessage response;
private MockedPutSplunkHTTP processor;
private TestRunner testRunner;
private ArgumentCaptor<String> path;
private ArgumentCaptor<RequestMessage> request;
@Before
public void setUp() {
processor = new MockedPutSplunkHTTP(service);
testRunner = TestRunners.newTestRunner(processor);
testRunner.setProperty(SplunkAPICall.SCHEME, "http");
testRunner.setProperty(SplunkAPICall.TOKEN, "Splunk 888c5a81-8777-49a0-a3af-f76e050ab5d9");
testRunner.setProperty(SplunkAPICall.REQUEST_CHANNEL, "22bd7414-0d77-4c73-936d-c8f5d1b21862");
path = ArgumentCaptor.forClass(String.class);
request = ArgumentCaptor.forClass(RequestMessage.class);
Mockito.when(service.send(path.capture(), request.capture())).thenReturn(response);
}
@After
public void tearDown() {
testRunner.shutdown();
}
@Test
public void testRunSuccess() throws Exception {
// given
givenSplunkReturnsWithSuccess();
// when
testRunner.enqueue(givenFlowFile());
testRunner.run();
// then
testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_SUCCESS, 1);
final MockFlowFile outgoingFlowFile = testRunner.getFlowFilesForRelationship(PutSplunkHTTP.RELATIONSHIP_SUCCESS).get(0);
Assert.assertEquals(EVENT, outgoingFlowFile.getContent());
Assert.assertEquals(ACK_ID, outgoingFlowFile.getAttribute("splunk.acknowledgement.id"));
Assert.assertNotNull(outgoingFlowFile.getAttribute("splunk.responded.at"));
Assert.assertEquals("200", outgoingFlowFile.getAttribute("splunk.status.code"));
Assert.assertEquals("application/json", request.getValue().getHeader().get("Content-Type"));
}
@Test
public void testHappyPathWithCustomQueryParameters() throws Exception {
// given
testRunner.setProperty(PutSplunkHTTP.SOURCE, "test_source");
testRunner.setProperty(PutSplunkHTTP.SOURCE_TYPE, "test_source_type");
givenSplunkReturnsWithSuccess();
// when
testRunner.enqueue(EVENT);
testRunner.run();
// then
testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_SUCCESS, 1);
Assert.assertEquals("%2Fservices%2Fcollector%2Fraw%3Fsourcetype%3Dtest_source_type%26source%3Dtest_source", path.getValue());
}
@Test
public void testHappyPathWithContentType() throws Exception {
// given
testRunner.setProperty(PutSplunkHTTP.CONTENT_TYPE, "text/xml");
givenSplunkReturnsWithSuccess();
// when
testRunner.enqueue(givenFlowFile());
testRunner.run();
// then
testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_SUCCESS, 1);
Assert.assertEquals("text/xml", request.getValue().getHeader().get("Content-Type"));
}
@Test
public void testSplunkCallFailure() throws Exception {
// given
givenSplunkReturnsWithFailure();
// when
testRunner.enqueue(givenFlowFile());
testRunner.run();
// then
testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_FAILURE, 1);
final MockFlowFile outgoingFlowFile = testRunner.getFlowFilesForRelationship(PutSplunkHTTP.RELATIONSHIP_FAILURE).get(0);
Assert.assertEquals(EVENT, outgoingFlowFile.getContent());
Assert.assertNull(outgoingFlowFile.getAttribute("splunk.acknowledgement.id"));
Assert.assertNull(outgoingFlowFile.getAttribute("splunk.responded.at"));
Assert.assertEquals("200", outgoingFlowFile.getAttribute("splunk.status.code"));
Assert.assertEquals("13", outgoingFlowFile.getAttribute("splunk.response.code"));
}
@Test
public void testSplunkApplicationFailure() throws Exception {
// given
givenSplunkReturnsWithApplicationFailure(403);
// when
testRunner.enqueue(givenFlowFile());
testRunner.run();
// then
testRunner.assertAllFlowFilesTransferred(PutSplunkHTTP.RELATIONSHIP_FAILURE, 1);
final MockFlowFile outgoingFlowFile = testRunner.getFlowFilesForRelationship(PutSplunkHTTP.RELATIONSHIP_FAILURE).get(0);
Assert.assertEquals(EVENT, outgoingFlowFile.getContent());
Assert.assertNull(outgoingFlowFile.getAttribute("splunk.acknowledgement.id"));
Assert.assertNull(outgoingFlowFile.getAttribute("splunk.responded.at"));
Assert.assertNull(outgoingFlowFile.getAttribute("splunk.response.code"));
Assert.assertEquals("403", outgoingFlowFile.getAttribute("splunk.status.code"));
}
private MockFlowFile givenFlowFile() throws UnsupportedEncodingException {
final MockFlowFile result = new MockFlowFile(System.currentTimeMillis());
result.setData(EVENT.getBytes("UTF-8"));
result.putAttributes(Collections.singletonMap("mime.type", "application/json"));
return result;
}
private void givenSplunkReturnsWithSuccess() throws Exception {
final InputStream inputStream = new ByteArrayInputStream(SUCCESS_RESPONSE.getBytes("UTF-8"));
Mockito.when(response.getStatus()).thenReturn(200);
Mockito.when(response.getContent()).thenReturn(inputStream);
}
private void givenSplunkReturnsWithFailure() throws Exception {
final InputStream inputStream = new ByteArrayInputStream(FAILURE_RESPONSE.getBytes("UTF-8"));
Mockito.when(response.getStatus()).thenReturn(200);
Mockito.when(response.getContent()).thenReturn(inputStream);
}
private void givenSplunkReturnsWithApplicationFailure(int code) throws Exception {
final InputStream inputStream = new ByteArrayInputStream("non-json-content".getBytes("UTF-8"));
Mockito.when(response.getStatus()).thenReturn(code);
Mockito.when(response.getContent()).thenReturn(inputStream);
}
public static class MockedPutSplunkHTTP extends PutSplunkHTTP {
final Service serviceMock;
public MockedPutSplunkHTTP(final Service serviceMock) {
this.serviceMock = serviceMock;
}
@Override
protected Service getSplunkService(final ServiceArgs splunkServiceArguments) {
return serviceMock;
}
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.splunk;
import com.splunk.RequestMessage;
import com.splunk.ResponseMessage;
import com.splunk.Service;
import com.splunk.ServiceArgs;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@RunWith(MockitoJUnitRunner.class)
public class TestQuerySplunkIndexingStatus {
private static final String EVENT = "{\"a\"=\"b\",\"c\"=\"d\",\"e\"=\"f\"}";
@Mock
private Service service;
@Mock
private ResponseMessage response;
private MockedQuerySplunkIndexingStatus processor;
private TestRunner testRunner;
private ArgumentCaptor<String> path;
private ArgumentCaptor<RequestMessage> request;
@Before
public void setUp() {
processor = new MockedQuerySplunkIndexingStatus(service);
testRunner = TestRunners.newTestRunner(processor);
testRunner.setProperty(SplunkAPICall.SCHEME, "http");
testRunner.setProperty(SplunkAPICall.TOKEN, "Splunk 888c5a81-8777-49a0-a3af-f76e050ab5d9");
testRunner.setProperty(SplunkAPICall.REQUEST_CHANNEL, "22bd7414-0d77-4c73-936d-c8f5d1b21862");
path = ArgumentCaptor.forClass(String.class);
request = ArgumentCaptor.forClass(RequestMessage.class);
Mockito.when(service.send(path.capture(), request.capture())).thenReturn(response);
}
@After
public void tearDown() {
testRunner.shutdown();
}
@Test
public void testRunSuccess() throws Exception {
// given
final Map<Integer, Boolean> acks = new HashMap<>();
acks.put(1, true);
acks.put(2, false);
givenSplunkReturns(acks);
// when
testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis()));
testRunner.enqueue(givenFlowFile(2, System.currentTimeMillis()));
testRunner.run();
// then
final List<MockFlowFile> acknowledged = testRunner.getFlowFilesForRelationship(QuerySplunkIndexingStatus.RELATIONSHIP_ACKNOWLEDGED);
final List<MockFlowFile> undetermined = testRunner.getFlowFilesForRelationship(QuerySplunkIndexingStatus.RELATIONSHIP_UNDETERMINED);
Assert.assertEquals(1, acknowledged.size());
Assert.assertEquals(1, undetermined.size());
Assert.assertFalse(acknowledged.get(0).isPenalized());
Assert.assertTrue(undetermined.get(0).isPenalized());
}
@Test
public void testMoreIncomingFlowFileThanQueryLimit() throws Exception {
// given
testRunner.setProperty(QuerySplunkIndexingStatus.MAX_QUERY_SIZE, "2");
final Map<Integer, Boolean> acks = new HashMap<>();
acks.put(1, true);
acks.put(2, true);
givenSplunkReturns(acks);
// when
testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis()));
testRunner.enqueue(givenFlowFile(2, System.currentTimeMillis()));
testRunner.enqueue(givenFlowFile(3, System.currentTimeMillis()));
testRunner.run();
// then
Assert.assertEquals("{\"acks\":[1,2]}", request.getValue().getContent());
Assert.assertEquals(1, testRunner.getQueueSize().getObjectCount());
testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_ACKNOWLEDGED, 2);
}
@Test
public void testTimedOutEvents() throws Exception {
// when
testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2)));
testRunner.run();
// then
Mockito.verify(service, Mockito.never()).send(Mockito.anyString(), Mockito.any(RequestMessage.class));
testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_UNACKNOWLEDGED, 1);
}
@Test
public void testWhenFlowFileIsLackOfNecessaryAttributes() throws Exception {
// when
testRunner.enqueue(EVENT);
testRunner.run();
// then
testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_FAILURE, 1);
}
@Test
public void testWhenSplunkReturnsWithError() throws Exception {
// given
givenSplunkReturnsWithFailure();
// when
testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis()));
testRunner.enqueue(givenFlowFile(2, System.currentTimeMillis()));
testRunner.enqueue(givenFlowFile(3, System.currentTimeMillis()));
testRunner.run();
// then
testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_UNDETERMINED, 3);
}
private void givenSplunkReturns(final Map<Integer, Boolean> acks) throws Exception {
final StringBuilder responseContent = new StringBuilder("{\"acks\":{")
.append(acks.entrySet().stream().map(e -> "\"" + e.getKey() + "\": " + e.getValue()).collect(Collectors.joining(", ")))
.append("}}");
final InputStream inputStream = new ByteArrayInputStream(responseContent.toString().getBytes("UTF-8"));
Mockito.when(response.getStatus()).thenReturn(200);
Mockito.when(response.getContent()).thenReturn(inputStream);
}
private void givenSplunkReturnsWithFailure() {
Mockito.when(response.getStatus()).thenReturn(403);
}
private MockFlowFile givenFlowFile(final int ackId, final long sentAt) throws UnsupportedEncodingException {
final MockFlowFile result = new MockFlowFile(ackId);
result.setData(EVENT.getBytes("UTF-8"));
Map<String, String> attributes = new HashMap<>();
attributes.put("splunk.acknowledgement.id", String.valueOf(ackId));
attributes.put("splunk.responded.at", String.valueOf(sentAt));
result.putAttributes(attributes);
return result;
}
public static class MockedQuerySplunkIndexingStatus extends QuerySplunkIndexingStatus {
final Service serviceMock;
public MockedQuerySplunkIndexingStatus(final Service serviceMock) {
this.serviceMock = serviceMock;
}
@Override
protected Service getSplunkService(final ServiceArgs splunkServiceArguments) {
return serviceMock;
}
}
}