NIFI-12500: Add dynamic target for Get/Set/SendTrapSNMP

This closes #8160.

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
lehelb 2023-12-14 19:07:00 +01:00 committed by tpalfy
parent 2e3f83eb54
commit 7fc27651a4
19 changed files with 317 additions and 509 deletions

View File

@ -17,20 +17,12 @@
package org.apache.nifi.snmp.factory.core;
import org.apache.nifi.snmp.configuration.SNMPConfiguration;
import org.apache.nifi.snmp.operations.SNMPResourceHandler;
import org.snmp4j.Snmp;
import org.snmp4j.Target;
import org.snmp4j.smi.UdpAddress;
public interface SNMPContext {
default SNMPResourceHandler createSNMPResourceHandler(final SNMPConfiguration snmpConfiguration) {
return new SNMPResourceHandler(
createSnmpManagerInstance(snmpConfiguration),
createTargetInstance(snmpConfiguration)
);
}
default void setupTargetBasicProperties(final Target target, final SNMPConfiguration configuration) {
final int snmpVersion = configuration.getVersion();
final String host = configuration.getTargetHost();

View File

@ -38,7 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.nifi.snmp.operations.SNMPResourceHandler.REQUEST_TIMEOUT_EXCEPTION_TEMPLATE;
import static org.apache.nifi.snmp.processors.AbstractSNMPProcessor.REQUEST_TIMEOUT_EXCEPTION_TEMPLATE;
public class GetSNMPHandler {
@ -56,29 +56,15 @@ public class GetSNMPHandler {
"associated with this %s OID does not contain child OIDs. Please check if the OID exists in the agent " +
"MIB or specify a parent OID with at least one child element";
private final SNMPResourceHandler snmpResourceHandler;
private Snmp snmpManager;
private TreeUtils treeUtils;
public GetSNMPHandler(final SNMPResourceHandler snmpResourceHandler) {
this.snmpResourceHandler = snmpResourceHandler;
this.treeUtils = new TreeUtils(snmpResourceHandler.getSnmpManager(), getPduFactory);
public GetSNMPHandler(final Snmp snmpManager) {
this.snmpManager = snmpManager;
this.treeUtils = new TreeUtils(snmpManager, getPduFactory);
}
public SNMPSingleResponse get(final String oid) throws IOException {
final Target target = snmpResourceHandler.getTarget();
final Snmp snmpManager = snmpResourceHandler.getSnmpManager();
final PDU pdu = getPduFactory.createPDU(target);
pdu.add(new VariableBinding(new OID(oid)));
final PDU responsePdu = getResponsePdu(target, snmpManager, pdu);
return new SNMPSingleResponse(target, responsePdu);
}
public Optional<SNMPSingleResponse> get(final Map<String, String> flowFileAttributes) throws IOException {
final Target target = snmpResourceHandler.getTarget();
final Snmp snmpManager = snmpResourceHandler.getSnmpManager();
public Optional<SNMPSingleResponse> get(final Map<String, String> flowFileAttributes, final Target target) throws IOException {
final PDU pdu = getPduFactory.createPDU(target);
VariableBinding[] variableBindings = SNMPUtils.addGetVariables(flowFileAttributes);
if (variableBindings.length == 0) {
@ -86,21 +72,11 @@ public class GetSNMPHandler {
}
pdu.addAll(variableBindings);
final PDU responsePdu = getResponsePdu(target, snmpManager, pdu);
final PDU responsePdu = getResponsePdu(target, pdu);
return Optional.of(new SNMPSingleResponse(target, responsePdu));
}
public SNMPTreeResponse walk(final String oid) {
final Target target = snmpResourceHandler.getTarget();
final List<TreeEvent> subtree = treeUtils.getSubtree(target, new OID(oid));
evaluateSubtreeErrors(oid, subtree);
return new SNMPTreeResponse(target, subtree);
}
public Optional<SNMPTreeResponse> walk(final Map<String, String> flowFileAttributes) {
final Target target = snmpResourceHandler.getTarget();
public Optional<SNMPTreeResponse> walk(final Map<String, String> flowFileAttributes, final Target target) {
final List<TreeEvent> subtree;
final OID[] oids = SNMPUtils.addWalkVariables(flowFileAttributes);
@ -114,7 +90,7 @@ public class GetSNMPHandler {
return Optional.of(new SNMPTreeResponse(target, subtree));
}
private PDU getResponsePdu(Target target, Snmp snmpManager, PDU pdu) throws IOException {
private PDU getResponsePdu(final Target target, final PDU pdu) throws IOException {
final ResponseEvent response = snmpManager.get(pdu, target);
final PDU responsePdu = response.getResponse();
if (responsePdu == null) {

View File

@ -1,70 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.snmp.operations;
import org.apache.nifi.processor.exception.ProcessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.snmp4j.Snmp;
import org.snmp4j.Target;
import org.snmp4j.security.SecurityModels;
import org.snmp4j.smi.Integer32;
import java.io.IOException;
public class SNMPResourceHandler {
public static final String INVALID_FLOWFILE_EXCEPTION_MESSAGE = "Could not read the variable bindings from the " +
"flowfile. Please, add the OIDs to set in separate properties. E.g. Property name: snmp$1.3.6.1.2.1.1.1.0 " +
"Value: Example value. ";
public static final String REQUEST_TIMEOUT_EXCEPTION_TEMPLATE = "Request timed out. Please check if (1). the " +
"agent host and port is correctly set, (2). the agent is running, (3). the agent SNMP version corresponds" +
" with the processor's one, (4) the community string is correct and has %1$s access, (5) In case of SNMPv3" +
" check if the user credentials are valid and the user in a group with %1$s access.";
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Snmp snmpManager;
private final Target target;
public SNMPResourceHandler(final Snmp snmpManager, final Target target) {
this.snmpManager = snmpManager;
this.target = target;
}
public Snmp getSnmpManager() {
return snmpManager;
}
public Target getTarget() {
return target;
}
public void close() {
try {
if (snmpManager.getUSM() != null) {
snmpManager.getUSM().removeAllUsers();
SecurityModels.getInstance().removeSecurityModel(new Integer32(snmpManager.getUSM().getID()));
}
snmpManager.close();
} catch (IOException e) {
final String errorMessage = "Could not close SNMP manager.";
logger.error(errorMessage, e);
throw new ProcessException(errorMessage);
}
}
}

View File

@ -31,32 +31,27 @@ import java.time.Instant;
import java.util.Map;
public class SendTrapSNMPHandler {
private final SNMPResourceHandler snmpResourceHandler;
private final ComponentLog logger;
private final V1TrapPDUFactory v1TrapPDUFactory;
private final V2TrapPDUFactory v2TrapPDUFactory;
private final Snmp snmpManager;
private final Instant startTime;
public SendTrapSNMPHandler(final SNMPResourceHandler snmpResourceHandler, final Instant startTime, final ComponentLog logger) {
this.snmpResourceHandler = snmpResourceHandler;
public SendTrapSNMPHandler(final Snmp snmpManager, final Instant startTime, final ComponentLog logger) {
this.snmpManager = snmpManager;
this.logger = logger;
v1TrapPDUFactory = createV1TrapPduFactory(startTime);
v2TrapPDUFactory = createV2TrapPduFactory(startTime);
this.startTime = startTime;
}
public void sendTrap(final Map<String, String> flowFileAttributes, final V1TrapConfiguration trapConfiguration) throws IOException {
final PDU pdu = v1TrapPDUFactory.get(trapConfiguration);
sendTrap(flowFileAttributes, pdu);
public void sendTrap(final Map<String, String> flowFileAttributes, final V1TrapConfiguration trapConfiguration, final Target target) throws IOException {
final PDU pdu = createV1TrapPduFactory(target, startTime).get(trapConfiguration);
sendTrap(flowFileAttributes, pdu, target);
}
public void sendTrap(final Map<String, String> flowFileAttributes, final V2TrapConfiguration trapConfiguration) throws IOException {
final PDU pdu = v2TrapPDUFactory.get(trapConfiguration);
sendTrap(flowFileAttributes, pdu);
public void sendTrap(final Map<String, String> flowFileAttributes, final V2TrapConfiguration trapConfiguration, final Target target) throws IOException {
final PDU pdu = createV2TrapPduFactory(target, startTime).get(trapConfiguration);
sendTrap(flowFileAttributes, pdu, target);
}
private void sendTrap(Map<String, String> flowFileAttributes, PDU pdu) throws IOException {
final Target target = snmpResourceHandler.getTarget();
final Snmp snmpManager = snmpResourceHandler.getSnmpManager();
private void sendTrap(final Map<String, String> flowFileAttributes, final PDU pdu, final Target target) throws IOException {
final boolean isAnyVariableAdded = SNMPUtils.addVariables(pdu, flowFileAttributes);
if (!isAnyVariableAdded) {
logger.debug("No optional SNMP specific variables found in flowfile.");
@ -65,11 +60,11 @@ public class SendTrapSNMPHandler {
snmpManager.send(pdu, target);
}
V1TrapPDUFactory createV1TrapPduFactory(final Instant startTime) {
return new V1TrapPDUFactory(snmpResourceHandler.getTarget(), startTime);
V1TrapPDUFactory createV1TrapPduFactory(final Target target, final Instant startTime) {
return new V1TrapPDUFactory(target, startTime);
}
V2TrapPDUFactory createV2TrapPduFactory(final Instant startTime) {
return new V2TrapPDUFactory(snmpResourceHandler.getTarget(), startTime);
V2TrapPDUFactory createV2TrapPduFactory(final Target target, final Instant startTime) {
return new V2TrapPDUFactory(target, startTime);
}
}

View File

@ -30,21 +30,18 @@ import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import static org.apache.nifi.snmp.operations.SNMPResourceHandler.REQUEST_TIMEOUT_EXCEPTION_TEMPLATE;
import static org.apache.nifi.snmp.processors.AbstractSNMPProcessor.REQUEST_TIMEOUT_EXCEPTION_TEMPLATE;
public class SetSNMPHandler {
private static PDUFactory setPduFactory = new DefaultPDUFactory(PDU.SET);
private final SNMPResourceHandler snmpResourceHandler;
private final Snmp snmpManager;
public SetSNMPHandler(final SNMPResourceHandler snmpResourceHandler) {
this.snmpResourceHandler = snmpResourceHandler;
public SetSNMPHandler(final Snmp snmpManager) {
this.snmpManager = snmpManager;
}
public Optional<SNMPSingleResponse> set(final Map<String, String> flowFileAttributes) throws IOException {
Target target = snmpResourceHandler.getTarget();
Snmp snmpManager = snmpResourceHandler.getSnmpManager();
public Optional<SNMPSingleResponse> set(final Map<String, String> flowFileAttributes, final Target target) throws IOException {
final PDU pdu = setPduFactory.createPDU(target);
final boolean isAnySnmpVariableInFlowFile = SNMPUtils.addVariables(pdu, flowFileAttributes);
if (isAnySnmpVariableInFlowFile) {

View File

@ -20,26 +20,34 @@ import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.snmp.configuration.SNMPConfiguration;
import org.apache.nifi.snmp.dto.ErrorStatus;
import org.apache.nifi.snmp.dto.SNMPResponseStatus;
import org.apache.nifi.snmp.dto.SNMPSingleResponse;
import org.apache.nifi.snmp.dto.SNMPValue;
import org.apache.nifi.snmp.factory.core.SNMPContext;
import org.apache.nifi.snmp.factory.core.SNMPFactoryProvider;
import org.apache.nifi.snmp.logging.SLF4JLogFactory;
import org.apache.nifi.snmp.operations.SNMPResourceHandler;
import org.apache.nifi.snmp.processors.properties.BasicProperties;
import org.apache.nifi.snmp.processors.properties.V3SecurityProperties;
import org.apache.nifi.snmp.utils.SNMPUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.snmp4j.Snmp;
import org.snmp4j.log.LogFactory;
import org.snmp4j.mp.SnmpConstants;
import org.snmp4j.security.SecurityModels;
import org.snmp4j.smi.Integer32;
import java.io.IOException;
import java.util.Optional;
/**
@ -47,12 +55,19 @@ import java.util.Optional;
* (http://www.snmp4j.org/)
*/
@RequiresInstanceClassLoading
abstract class AbstractSNMPProcessor extends AbstractProcessor {
public abstract class AbstractSNMPProcessor extends AbstractProcessor {
private static final Logger logger = LoggerFactory.getLogger(AbstractSNMPProcessor.class);
static {
LogFactory.setLogFactory(new SLF4JLogFactory());
}
public static final String REQUEST_TIMEOUT_EXCEPTION_TEMPLATE = "Request timed out. Please check if (1). the " +
"agent host and port is correctly set, (2). the agent is running, (3). the agent SNMP version corresponds" +
" with the processor's one, (4) the community string is correct and has %1$s access, (5) In case of SNMPv3" +
" check if the user credentials are valid and the user in a group with %1$s access.";
private static final String NO_SUCH_OBJECT = "noSuchObject";
public static final PropertyDescriptor AGENT_HOST = new PropertyDescriptor.Builder()
@ -61,7 +76,8 @@ abstract class AbstractSNMPProcessor extends AbstractProcessor {
.description("Hostname or network address of the SNMP Agent.")
.required(true)
.defaultValue("localhost")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor AGENT_PORT = new PropertyDescriptor.Builder()
@ -70,34 +86,45 @@ abstract class AbstractSNMPProcessor extends AbstractProcessor {
.description("Port of the SNMP Agent.")
.required(true)
.defaultValue("161")
.addValidator(StandardValidators.PORT_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
protected volatile SNMPResourceHandler snmpResourceHandler;
protected volatile Snmp snmpManager;
protected volatile SNMPContext factory;
@OnScheduled
public void initSnmpManager(final ProcessContext context) {
final int version = SNMPUtils.getVersion(context.getProperty(BasicProperties.SNMP_VERSION).getValue());
final SNMPConfiguration configuration;
final String targetHost = getTargetHost(context);
final String targetPort = getTargetPort(context);
configuration = SNMPConfiguration.builder()
.setTargetHost(targetHost)
.setTargetPort(targetPort)
.setRetries(context.getProperty(BasicProperties.SNMP_RETRIES).asInteger())
.setTimeoutInMs(context.getProperty(BasicProperties.SNMP_TIMEOUT).asInteger())
.setVersion(version)
.setAuthProtocol(context.getProperty(V3SecurityProperties.SNMP_AUTH_PROTOCOL).getValue())
.setAuthPassphrase(context.getProperty(V3SecurityProperties.SNMP_AUTH_PASSWORD).getValue())
.setPrivacyProtocol(context.getProperty(V3SecurityProperties.SNMP_PRIVACY_PROTOCOL).getValue())
.setPrivacyPassphrase(context.getProperty(V3SecurityProperties.SNMP_PRIVACY_PASSWORD).getValue())
.setSecurityName(context.getProperty(V3SecurityProperties.SNMP_SECURITY_NAME).getValue())
.build();
factory = SNMPFactoryProvider.getFactory(version);
snmpManager = factory.createSnmpManagerInstance(configuration);
}
protected SNMPConfiguration getTargetConfiguration(final ProcessContext context, final FlowFile flowFile) {
final int version = SNMPUtils.getVersion(context.getProperty(BasicProperties.SNMP_VERSION).getValue());
final String targetHost = getTargetHost(context, flowFile);
final String targetPort = getTargetPort(context, flowFile);
return SNMPConfiguration.builder()
.setVersion(version)
.setTargetHost(targetHost)
.setTargetPort(targetPort)
.setRetries(context.getProperty(BasicProperties.SNMP_RETRIES).asInteger())
.setTimeoutInMs(context.getProperty(BasicProperties.SNMP_TIMEOUT).asInteger())
.setSecurityName(context.getProperty(V3SecurityProperties.SNMP_SECURITY_NAME).getValue())
.setSecurityLevel(context.getProperty(V3SecurityProperties.SNMP_SECURITY_LEVEL).getValue())
.setCommunityString(context.getProperty(BasicProperties.SNMP_COMMUNITY).getValue())
.build();
snmpResourceHandler = SNMPFactoryProvider.getFactory(version).createSNMPResourceHandler(configuration);
}
/**
@ -105,23 +132,33 @@ abstract class AbstractSNMPProcessor extends AbstractProcessor {
*/
@OnStopped
public void close() {
if (snmpResourceHandler != null) {
snmpResourceHandler.close();
snmpResourceHandler = null;
try {
if (snmpManager.getUSM() != null) {
snmpManager.getUSM().removeAllUsers();
SecurityModels.getInstance().removeSecurityModel(new Integer32(snmpManager.getUSM().getID()));
}
snmpManager.close();
} catch (IOException e) {
final String errorMessage = "Could not close SNMP manager.";
logger.error(errorMessage, e);
throw new ProcessException(errorMessage);
}
}
protected void handleResponse(final ProcessContext context, final ProcessSession processSession, final FlowFile flowFile, final SNMPSingleResponse response,
final Relationship success, final Relationship failure, final String provenanceAddress) {
final Relationship success, final Relationship failure, final String provenanceAddress, final boolean isNewFlowFileCreated) {
final SNMPResponseStatus snmpResponseStatus = processResponse(response);
processSession.putAllAttributes(flowFile, response.getAttributes());
if (snmpResponseStatus.getErrorStatus() == ErrorStatus.FAILURE) {
getLogger().error("SNMP request failed, response error: " + snmpResponseStatus.getErrorMessage());
processSession.getProvenanceReporter().modifyAttributes(flowFile, response.getTargetAddress() + provenanceAddress);
processSession.transfer(flowFile, failure);
context.yield();
} else {
processSession.getProvenanceReporter().modifyAttributes(flowFile, response.getTargetAddress() + provenanceAddress);
if (isNewFlowFileCreated) {
processSession.getProvenanceReporter().receive(flowFile, response.getTargetAddress() + provenanceAddress);
} else {
processSession.getProvenanceReporter().fetch(flowFile, response.getTargetAddress() + provenanceAddress);
}
processSession.transfer(flowFile, success);
}
}
@ -159,7 +196,7 @@ abstract class AbstractSNMPProcessor extends AbstractProcessor {
return new SNMPResponseStatus("Successful SNMP Response", ErrorStatus.SUCCESS);
}
protected abstract String getTargetHost(ProcessContext processContext);
protected abstract String getTargetHost(final ProcessContext processContext, final FlowFile flowFile);
protected abstract String getTargetPort(ProcessContext processContext);
protected abstract String getTargetPort(final ProcessContext processContext, final FlowFile flowFile);
}

View File

@ -38,6 +38,7 @@ import org.apache.nifi.snmp.processors.properties.BasicProperties;
import org.apache.nifi.snmp.processors.properties.V3SecurityProperties;
import org.apache.nifi.snmp.utils.SNMPUtils;
import org.apache.nifi.snmp.validators.OIDValidator;
import org.snmp4j.Target;
import java.io.IOException;
import java.util.Arrays;
@ -87,7 +88,6 @@ public class GetSNMP extends AbstractSNMPProcessor {
.description("Each OID (object identifier) identifies a variable that can be read or set via SNMP." +
" This value is not taken into account for an input flowfile and will be omitted. Can be set to empty" +
"string when the OIDs are provided through flowfile.")
.required(true)
.addValidator(new OIDValidator())
.build();
@ -147,29 +147,53 @@ public class GetSNMP extends AbstractSNMPProcessor {
@OnScheduled
public void init(final ProcessContext context) {
initSnmpManager(context);
snmpHandler = new GetSNMPHandler(snmpResourceHandler);
snmpHandler = new GetSNMPHandler(snmpManager);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession processSession) {
final SNMPStrategy snmpStrategy = SNMPStrategy.valueOf(context.getProperty(SNMP_STRATEGY).getValue());
final String oid = context.getProperty(OID).getValue();
final FlowFile flowfile = processSession.get();
final boolean isNewFlowFileCreated;
FlowFile flowfile = processSession.get();
if (flowfile == null) {
isNewFlowFileCreated = true;
flowfile = processSession.create();
} else {
isNewFlowFileCreated = false;
}
final Target target = factory.createTargetInstance(getTargetConfiguration(context, flowfile));
if (SNMPStrategy.GET == snmpStrategy) {
performSnmpGet(context, processSession, oid, flowfile);
performSnmpGet(context, processSession, oid, target, flowfile, isNewFlowFileCreated);
} else if (SNMPStrategy.WALK == snmpStrategy) {
performSnmpWalk(context, processSession, oid, flowfile);
performSnmpWalk(context, processSession, oid, target, flowfile, isNewFlowFileCreated);
}
}
void performSnmpWalk(final ProcessContext context, final ProcessSession processSession, final String oid,
final FlowFile flowFile) {
final Target target, FlowFile flowFile, final boolean isNewFlowFileCreated) {
if (oid != null) {
String prefixedOid = SNMPUtils.SNMP_PROP_PREFIX + oid;
flowFile = processSession.putAttribute(flowFile, prefixedOid, "");
}
try {
if (flowFile != null) {
performSnmpWalkWithFlowFile(processSession, flowFile);
final Optional<SNMPTreeResponse> optionalResponse = snmpHandler.walk(flowFile.getAttributes(), target);
if (optionalResponse.isPresent()) {
final SNMPTreeResponse response = optionalResponse.get();
response.logErrors(getLogger());
flowFile = processSession.putAllAttributes(flowFile, response.getAttributes());
if (isNewFlowFileCreated) {
processSession.getProvenanceReporter().receive(flowFile, "/walk");
} else {
processSession.getProvenanceReporter().fetch(flowFile, "/walk");
}
processSession.transfer(flowFile, response.isError() ? REL_FAILURE : REL_SUCCESS);
} else {
performSnmpWalkWithoutFlowFile(processSession, oid);
getLogger().warn("No SNMP specific attributes found in flowfile.");
processSession.transfer(flowFile, REL_FAILURE);
}
} catch (SNMPWalkException e) {
getLogger().error(e.getMessage());
@ -177,67 +201,34 @@ public class GetSNMP extends AbstractSNMPProcessor {
}
}
private void performSnmpWalkWithFlowFile(ProcessSession processSession, FlowFile flowFile) {
final Optional<SNMPTreeResponse> optionalResponse = snmpHandler.walk(flowFile.getAttributes());
if (optionalResponse.isPresent()) {
final SNMPTreeResponse response = optionalResponse.get();
response.logErrors(getLogger());
processSession.putAllAttributes(flowFile, response.getAttributes());
processSession.getProvenanceReporter().modifyAttributes(flowFile, response.getTargetAddress() + "/walk");
processSession.transfer(flowFile, response.isError() ? REL_FAILURE : REL_SUCCESS);
} else {
getLogger().warn("No SNMP specific attributes found in flowfile.");
processSession.getProvenanceReporter().receive(flowFile, "/walk");
processSession.transfer(flowFile, REL_FAILURE);
}
}
private void performSnmpWalkWithoutFlowFile(ProcessSession processSession, String oid) {
final SNMPTreeResponse response = snmpHandler.walk(oid);
response.logErrors(getLogger());
final FlowFile outgoingFlowFile = processSession.create();
processSession.putAllAttributes(outgoingFlowFile, response.getAttributes());
processSession.getProvenanceReporter().create(outgoingFlowFile, response.getTargetAddress() + "/walk");
processSession.transfer(outgoingFlowFile, REL_SUCCESS);
}
void performSnmpGet(final ProcessContext context, final ProcessSession processSession, final String oid,
final FlowFile flowFile) {
final Target target, FlowFile flowFile, final boolean isNewFlowFileCreated) {
final String textualOidKey = SNMPUtils.SNMP_PROP_PREFIX + "textualOid";
final Map<String, String> textualOidMap = Collections.singletonMap(textualOidKey, context.getProperty(TEXTUAL_OID).getValue());
if (oid != null) {
String prefixedOid = SNMPUtils.SNMP_PROP_PREFIX + oid;
flowFile = processSession.putAttribute(flowFile, prefixedOid, "");
}
try {
if (flowFile != null) {
performSnmpGetWithFlowFile(context, processSession, flowFile, textualOidMap);
final Optional<SNMPSingleResponse> optionalResponse = snmpHandler.get(flowFile.getAttributes(), target);
if (optionalResponse.isPresent()) {
final SNMPSingleResponse response = optionalResponse.get();
flowFile = processSession.putAllAttributes(flowFile, textualOidMap);
handleResponse(context, processSession, flowFile, response, REL_SUCCESS, REL_FAILURE, "/get", isNewFlowFileCreated);
} else {
performSnmpGetWithoutFlowFile(context, processSession, oid, textualOidMap);
getLogger().warn("No SNMP specific attributes found in flowfile.");
processSession.transfer(flowFile, REL_FAILURE);
context.yield();
}
} catch (IOException e) {
getLogger().error("Failed to send request to the agent. Check if the agent supports the used version.", e);
context.yield();
}
}
private void performSnmpGetWithoutFlowFile(ProcessContext context, ProcessSession processSession, String oid, Map<String, String> textualOidMap) throws IOException {
final SNMPSingleResponse response = snmpHandler.get(oid);
final FlowFile outgoingFlowFile = processSession.create();
processSession.putAllAttributes(outgoingFlowFile, textualOidMap);
processSession.getProvenanceReporter().receive(outgoingFlowFile, response.getTargetAddress() + "/get");
handleResponse(context, processSession, outgoingFlowFile, response, REL_SUCCESS, REL_FAILURE, "/get");
}
private void performSnmpGetWithFlowFile(ProcessContext context, ProcessSession processSession, FlowFile flowFile, Map<String, String> textualOidMap) throws IOException {
final Optional<SNMPSingleResponse> optionalResponse = snmpHandler.get(flowFile.getAttributes());
if (optionalResponse.isPresent()) {
final SNMPSingleResponse response = optionalResponse.get();
processSession.putAllAttributes(flowFile, textualOidMap);
handleResponse(context, processSession, flowFile, response, REL_SUCCESS, REL_FAILURE, "/get");
} else {
getLogger().warn("No SNMP specific attributes found in flowfile.");
processSession.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
@ -248,13 +239,13 @@ public class GetSNMP extends AbstractSNMPProcessor {
return PROPERTY_DESCRIPTORS;
}
protected String getTargetHost(ProcessContext processContext) {
return processContext.getProperty(AGENT_HOST).getValue();
protected String getTargetHost(final ProcessContext processContext, final FlowFile flowFile) {
return processContext.getProperty(AGENT_HOST).evaluateAttributeExpressions(flowFile).getValue();
}
@Override
protected String getTargetPort(ProcessContext processContext) {
return processContext.getProperty(AGENT_PORT).getValue();
protected String getTargetPort(final ProcessContext processContext, final FlowFile flowFile) {
return processContext.getProperty(AGENT_PORT).evaluateAttributeExpressions(flowFile).getValue();
}
private enum SNMPStrategy {

View File

@ -22,6 +22,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -35,6 +36,7 @@ import org.apache.nifi.snmp.processors.properties.V1TrapProperties;
import org.apache.nifi.snmp.processors.properties.V2TrapProperties;
import org.apache.nifi.snmp.processors.properties.V3SecurityProperties;
import org.apache.nifi.snmp.utils.SNMPUtils;
import org.snmp4j.Target;
import org.snmp4j.mp.SnmpConstants;
import java.io.IOException;
@ -62,18 +64,20 @@ public class SendTrapSNMP extends AbstractSNMPProcessor {
public static final PropertyDescriptor SNMP_MANAGER_HOST = new PropertyDescriptor.Builder()
.name("snmp-trap-manager-host")
.displayName("SNMP Manager Host")
.description("The host where the SNMP Manager sends the trap.")
.description("The host of the SNMP Manager where the trap is sent.")
.required(true)
.defaultValue("localhost")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SNMP_MANAGER_PORT = new PropertyDescriptor.Builder()
.name("snmp-trap-manager-port")
.displayName("SNMP Manager Port")
.description("The port where the SNMP Manager listens to the incoming traps.")
.description("The port of the SNMP Manager where the trap is sent.")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -116,15 +120,16 @@ public class SendTrapSNMP extends AbstractSNMPProcessor {
public void init(ProcessContext context) {
Instant startTime = Instant.now();
initSnmpManager(context);
snmpHandler = new SendTrapSNMPHandler(snmpResourceHandler, startTime, getLogger());
snmpHandler = new SendTrapSNMPHandler(snmpManager, startTime, getLogger());
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession processSession) {
final FlowFile flowFile = Optional.ofNullable(processSession.get()).orElseGet(processSession::create);
FlowFile flowFile = Optional.ofNullable(processSession.get()).orElseGet(processSession::create);
final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
try {
final Target target = factory.createTargetInstance(getTargetConfiguration(context, flowFile));
final int snmpVersion = SNMPUtils.getVersion(context.getProperty(BasicProperties.SNMP_VERSION).getValue());
if (SnmpConstants.version1 == snmpVersion) {
@ -147,17 +152,15 @@ public class SendTrapSNMP extends AbstractSNMPProcessor {
attributes.put("enterpriseOid", enterpriseOid);
attributes.put("genericTrapType", genericTrapType);
attributes.put("specificTrapType", specificTrapType);
snmpHandler.sendTrap(attributes, v1TrapConfiguration);
snmpHandler.sendTrap(attributes, v1TrapConfiguration, target);
} else {
final String trapOidValue = context.getProperty(V2TrapProperties.TRAP_OID_VALUE).evaluateAttributeExpressions(flowFile).getValue();
V2TrapConfiguration v2TrapConfiguration = new V2TrapConfiguration(trapOidValue);
attributes.put("trapOidValue", trapOidValue);
snmpHandler.sendTrap(attributes, v2TrapConfiguration);
snmpHandler.sendTrap(attributes, v2TrapConfiguration, target);
}
processSession.putAllAttributes(flowFile, attributes);
flowFile = processSession.putAllAttributes(flowFile, attributes);
processSession.transfer(flowFile, REL_SUCCESS);
} catch (IOException e) {
getLogger().error("Failed to send request to the agent. Check if the agent supports the used version.", e);
processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
@ -179,12 +182,12 @@ public class SendTrapSNMP extends AbstractSNMPProcessor {
}
@Override
protected String getTargetHost(ProcessContext processContext) {
return processContext.getProperty(SNMP_MANAGER_HOST).getValue();
protected String getTargetHost(final ProcessContext processContext, final FlowFile flowFile) {
return processContext.getProperty(SNMP_MANAGER_HOST).evaluateAttributeExpressions(flowFile).getValue();
}
@Override
protected String getTargetPort(ProcessContext processContext) {
return processContext.getProperty(SNMP_MANAGER_PORT).getValue();
protected String getTargetPort(final ProcessContext processContext, final FlowFile flowFile) {
return processContext.getProperty(SNMP_MANAGER_PORT).evaluateAttributeExpressions(flowFile).getValue();
}
}

View File

@ -33,6 +33,7 @@ import org.apache.nifi.snmp.operations.SetSNMPHandler;
import org.apache.nifi.snmp.processors.properties.BasicProperties;
import org.apache.nifi.snmp.processors.properties.V3SecurityProperties;
import org.apache.nifi.snmp.utils.SNMPUtils;
import org.snmp4j.Target;
import java.io.IOException;
import java.util.Arrays;
@ -101,7 +102,7 @@ public class SetSNMP extends AbstractSNMPProcessor {
@OnScheduled
public void init(final ProcessContext context) {
initSnmpManager(context);
snmpHandler = new SetSNMPHandler(snmpResourceHandler);
snmpHandler = new SetSNMPHandler(snmpManager);
}
@Override
@ -109,16 +110,17 @@ public class SetSNMP extends AbstractSNMPProcessor {
final FlowFile flowFile = processSession.get();
if (flowFile != null) {
try {
final Optional<SNMPSingleResponse> optionalResponse = snmpHandler.set(flowFile.getAttributes());
final Target target = factory.createTargetInstance(getTargetConfiguration(context, flowFile));
final Optional<SNMPSingleResponse> optionalResponse = snmpHandler.set(flowFile.getAttributes(), target);
if (optionalResponse.isPresent()) {
processSession.remove(flowFile);
final FlowFile outgoingFlowFile = processSession.create();
final SNMPSingleResponse response = optionalResponse.get();
processSession.getProvenanceReporter().receive(outgoingFlowFile, "/set");
handleResponse(context, processSession, outgoingFlowFile, response, REL_SUCCESS, REL_FAILURE, "/set");
handleResponse(context, processSession, outgoingFlowFile, response, REL_SUCCESS, REL_FAILURE, "/set", true);
} else {
getLogger().warn("No SNMP specific attributes found in flowfile.");
processSession.transfer(flowFile, REL_FAILURE);
processSession.getProvenanceReporter().receive(flowFile, "/set");
}
} catch (IOException e) {
getLogger().error("Failed to send request to the agent. Check if the agent supports the used version.");
@ -138,13 +140,12 @@ public class SetSNMP extends AbstractSNMPProcessor {
return RELATIONSHIPS;
}
@Override
protected String getTargetHost(ProcessContext processContext) {
return processContext.getProperty(AGENT_HOST).getValue();
protected String getTargetHost(final ProcessContext processContext, final FlowFile flowFile) {
return processContext.getProperty(AGENT_HOST).evaluateAttributeExpressions(flowFile).getValue();
}
@Override
protected String getTargetPort(ProcessContext processContext) {
return processContext.getProperty(AGENT_PORT).getValue();
protected String getTargetPort(final ProcessContext processContext, final FlowFile flowFile) {
return processContext.getProperty(AGENT_PORT).evaluateAttributeExpressions(flowFile).getValue();
}
}

View File

@ -47,9 +47,11 @@ import org.snmp4j.smi.VariableBinding;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Vector;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -197,7 +199,7 @@ public final class SNMPUtils {
}
public static VariableBinding[] addGetVariables(final Map<String, String> attributes) {
List<VariableBinding> variableBindings = new ArrayList<>();
Set<VariableBinding> variableBindings = new HashSet<>();
try {
for (Map.Entry<String, String> attributeEntry : attributes.entrySet()) {
if (attributeEntry.getKey().startsWith(SNMPUtils.SNMP_PROP_PREFIX)) {

View File

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.snmp.factory.core;
import org.apache.nifi.snmp.configuration.SNMPConfiguration;
import org.junit.jupiter.api.Test;
import org.snmp4j.Snmp;
import org.snmp4j.Target;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class SNMPContextTest {
@Test
void testCreateSNMPContext() {
final SNMPContext snmpContext = spy(SNMPContext.class);
final Snmp mockSnmpManager = mock(Snmp.class);
final Target mockTarget = mock(Target.class);
final SNMPConfiguration snmpConfiguration = mock(SNMPConfiguration.class);
when(snmpContext.createSnmpManagerInstance(snmpConfiguration)).thenReturn(mockSnmpManager);
when(snmpContext.createTargetInstance(snmpConfiguration)).thenReturn(mockTarget);
snmpContext.createSNMPResourceHandler(snmpConfiguration);
verify(snmpContext).createSnmpManagerInstance(snmpConfiguration);
verify(snmpContext).createTargetInstance(snmpConfiguration);
}
}

View File

@ -29,8 +29,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
class V1V2cSNMPFactoryTest extends SNMPSocketSupport {
@ -56,17 +54,6 @@ class V1V2cSNMPFactoryTest extends SNMPSocketSupport {
assertNotNull(address);
}
@Test
void testFactoryCreatesResourceHandler() {
final V1V2cSNMPFactory snmpFactory = spy(V1V2cSNMPFactory.class);
final SNMPConfiguration snmpConfiguration = getSnmpConfiguration(0, "48");
snmpFactory.createSNMPResourceHandler(snmpConfiguration);
verify(snmpFactory).createTargetInstance(snmpConfiguration);
verify(snmpFactory).createSnmpManagerInstance(snmpConfiguration);
}
@Override
protected SNMPConfiguration getSnmpConfiguration(int managerPort, String targetPort) {
return new SNMPConfiguration.Builder()

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.snmp.factory.core;
import org.apache.nifi.snmp.configuration.SNMPConfiguration;
import org.junit.jupiter.api.Test;
import org.snmp4j.Snmp;
import org.snmp4j.Target;
@ -32,8 +31,6 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
class V3SNMPFactoryTest extends SNMPSocketSupport {
@ -59,14 +56,4 @@ class V3SNMPFactoryTest extends SNMPSocketSupport {
assertNotNull(address);
assertTrue(usm.hasUser(null, new OctetString("SHAAES128")));
}
@Test
void testFactoryCreatesResourceHandler() {
final V3SNMPFactory snmpFactory = spy(V3SNMPFactory.class);
final SNMPConfiguration snmpConfiguration = getSnmpConfiguration(0, "48");
snmpFactory.createSNMPResourceHandler(snmpConfiguration);
verify(snmpFactory).createTargetInstance(snmpConfiguration);
verify(snmpFactory).createSnmpManagerInstance(snmpConfiguration);
}
}

View File

@ -20,6 +20,7 @@ import org.apache.nifi.snmp.dto.SNMPSingleResponse;
import org.apache.nifi.snmp.dto.SNMPTreeResponse;
import org.apache.nifi.snmp.exception.RequestTimeoutException;
import org.apache.nifi.snmp.exception.SNMPWalkException;
import org.apache.nifi.snmp.processors.AbstractSNMPProcessor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@ -56,31 +57,11 @@ class GetSNMPHandlerTest {
private Target mockTarget;
private Snmp mockSnmpManager;
private SNMPResourceHandler snmpResourceHandler;
@BeforeEach
public void init() {
mockTarget = mock(Target.class);
mockSnmpManager = mock(Snmp.class);
snmpResourceHandler = new SNMPResourceHandler(mockSnmpManager, mockTarget);
}
@Test
void testGetSnmpWithEmptyFlowFile() throws IOException {
final ResponseEvent mockResponseEvent = mock(ResponseEvent.class);
final PDU mockPdu = mock(PDU.class);
when(mockResponseEvent.getResponse()).thenReturn(mockPdu);
when(mockSnmpManager.get(any(PDU.class), any(Target.class))).thenReturn(mockResponseEvent);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
getSNMPHandler.get(OID);
ArgumentCaptor<PDU> captor = ArgumentCaptor.forClass(PDU.class);
Mockito.verify(mockSnmpManager).get(captor.capture(), any(Target.class));
final PDU pdu = captor.getValue();
assertEquals(1, pdu.getVariableBindings().size());
assertEquals(OID, pdu.getVariableBindings().get(0).getOid().toString());
}
@Test
@ -93,55 +74,55 @@ class GetSNMPHandlerTest {
when(mockResponseEvent.getResponse()).thenReturn(mockPdu);
when(mockSnmpManager.get(any(PDU.class), any(Target.class))).thenReturn(mockResponseEvent);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final Optional<SNMPSingleResponse> optionalResponse = getSNMPHandler.get(invalidFlowFileAttributes);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(mockSnmpManager);
final Optional<SNMPSingleResponse> optionalResponse = getSNMPHandler.get(invalidFlowFileAttributes, mockTarget);
assertFalse(optionalResponse.isPresent());
}
@Test
void testGetSnmpWithValidFlowFile() throws IOException {
final String flowFileOid = "1.3.6.1.2.1.1.1.0";
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put("snmp$" + flowFileOid, "OID value");
final Map<String, String> flowFileAttributes = getFlowFileAttributesWithSingleOID();
final ResponseEvent mockResponseEvent = mock(ResponseEvent.class);
final PDU mockPdu = mock(PDU.class);
when(mockResponseEvent.getResponse()).thenReturn(mockPdu);
when(mockSnmpManager.get(any(PDU.class), any(Target.class))).thenReturn(mockResponseEvent);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
getSNMPHandler.get(flowFileAttributes);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(mockSnmpManager);
getSNMPHandler.get(flowFileAttributes, mockTarget);
ArgumentCaptor<PDU> captor = ArgumentCaptor.forClass(PDU.class);
Mockito.verify(mockSnmpManager).get(captor.capture(), any(Target.class));
final PDU pdu = captor.getValue();
assertEquals(1, pdu.getVariableBindings().size());
assertEquals(flowFileOid, pdu.getVariableBindings().get(0).getOid().toString());
assertEquals(OID, pdu.getVariableBindings().get(0).getOid().toString());
assertEquals("Null", pdu.getVariableBindings().get(0).getVariable().toString());
}
@Test
void testGetSnmpWhenTimeout() throws IOException {
final Map<String, String> flowFileAttributes = getFlowFileAttributesWithSingleOID();
final ResponseEvent mockResponseEvent = mock(ResponseEvent.class);
when(mockResponseEvent.getResponse()).thenReturn(null);
when(mockSnmpManager.get(any(PDU.class), any(Target.class))).thenReturn(mockResponseEvent);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(mockSnmpManager);
final RequestTimeoutException requestTimeoutException = assertThrows(
RequestTimeoutException.class,
() -> getSNMPHandler.get(OID)
() -> getSNMPHandler.get(flowFileAttributes, mockTarget)
);
assertEquals(String.format(SNMPResourceHandler.REQUEST_TIMEOUT_EXCEPTION_TEMPLATE, "read"),
assertEquals(String.format(AbstractSNMPProcessor.REQUEST_TIMEOUT_EXCEPTION_TEMPLATE, "read"),
requestTimeoutException.getMessage());
}
@SuppressWarnings("unchecked")
@Test
void testWalkSnmpWithEmptyFlowFile() {
final Map<String, String> flowFileAttributes = getFlowFileAttributesWithSingleOID();
final TreeUtils mockTreeUtils = mock(TreeUtils.class);
final TreeEvent mockTreeEvent = mock(TreeEvent.class);
final List<TreeEvent> mockSubtree = (List<TreeEvent>) mock(List.class);
@ -150,16 +131,16 @@ class GetSNMPHandlerTest {
variableBindings[0] = new VariableBinding(new OID(OID), new OctetString("OID value"));
when(mockTreeEvent.getVariableBindings()).thenReturn(variableBindings);
when(mockSubtree.get(0)).thenReturn(mockTreeEvent);
when(mockTreeUtils.getSubtree(mockTarget, new OID(OID))).thenReturn(mockSubtree);
when(mockTreeUtils.walk(mockTarget, new OID[] {new OID(OID)})).thenReturn(mockSubtree);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(mockSnmpManager);
getSNMPHandler.setTreeUtils(mockTreeUtils);
getSNMPHandler.walk(OID);
getSNMPHandler.walk(flowFileAttributes, mockTarget);
ArgumentCaptor<OID> captor = ArgumentCaptor.forClass(OID.class);
Mockito.verify(mockTreeUtils).getSubtree(any(Target.class), captor.capture());
ArgumentCaptor<OID[]> captor = ArgumentCaptor.forClass(OID[].class);
Mockito.verify(mockTreeUtils).walk(any(Target.class), captor.capture());
assertEquals(OID, captor.getValue().toString());
assertEquals(OID, captor.getValue()[0].toString());
}
@Test
@ -167,8 +148,8 @@ class GetSNMPHandlerTest {
final Map<String, String> invalidFlowFileAttributes = new HashMap<>();
invalidFlowFileAttributes.put("invalid", "flowfile attribute");
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final Optional<SNMPTreeResponse> optionalResponse = getSNMPHandler.walk(invalidFlowFileAttributes);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(mockSnmpManager);
final Optional<SNMPTreeResponse> optionalResponse = getSNMPHandler.walk(invalidFlowFileAttributes, mockTarget);
assertFalse(optionalResponse.isPresent());
}
@ -176,9 +157,7 @@ class GetSNMPHandlerTest {
@SuppressWarnings("unchecked")
@Test
void testWalkSnmpWithValidFlowFile() {
final String flowFileOid = "1.3.6.1.2.1.1.1.0";
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put("snmp$" + flowFileOid, "OID value");
final Map<String, String> flowFileAttributes = getFlowFileAttributesWithSingleOID();
final TreeUtils mockTreeUtils = mock(TreeUtils.class);
final TreeEvent mockTreeEvent = mock(TreeEvent.class);
@ -191,39 +170,42 @@ class GetSNMPHandlerTest {
when(mockSubtree.isEmpty()).thenReturn(false);
when(mockTreeUtils.walk(any(Target.class), any(OID[].class))).thenReturn(mockSubtree);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(mockSnmpManager);
getSNMPHandler.setTreeUtils(mockTreeUtils);
getSNMPHandler.walk(flowFileAttributes);
getSNMPHandler.walk(flowFileAttributes, mockTarget);
ArgumentCaptor<OID[]> captor = ArgumentCaptor.forClass(OID[].class);
Mockito.verify(mockTreeUtils).walk(any(Target.class), captor.capture());
assertEquals(flowFileOid, captor.getValue()[0].toString());
assertEquals(OID, captor.getValue()[0].toString());
}
@SuppressWarnings("unchecked")
@Test
void testWalkSnmpWithEmptySubtreeThrowsException() {
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put("snmp$" + OID, "");
final TreeUtils mockTreeUtils = mock(TreeUtils.class);
final List<TreeEvent> mockSubtree = (List<TreeEvent>) mock(List.class);
when(mockSubtree.isEmpty()).thenReturn(true);
when(mockTreeUtils.getSubtree(any(Target.class), any(org.snmp4j.smi.OID.class))).thenReturn(mockSubtree);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(mockSnmpManager);
getSNMPHandler.setTreeUtils(mockTreeUtils);
final SNMPWalkException snmpWalkException = assertThrows(
SNMPWalkException.class,
() -> getSNMPHandler.walk(OID)
() -> getSNMPHandler.walk(flowFileAttributes, mockTarget)
);
assertEquals(String.format(EMPTY_SUBTREE_EXCEPTION_MESSAGE, OID), snmpWalkException.getMessage());
assertEquals(String.format(EMPTY_SUBTREE_EXCEPTION_MESSAGE, "[" + OID + "]"), snmpWalkException.getMessage());
}
@SuppressWarnings("unchecked")
@Test
void testWalkSnmpWithSubtreeErrorThrowsException() {
final Map<String, String> flowFileAttributes = getFlowFileAttributesWithSingleOID();
final TreeUtils mockTreeUtils = mock(TreeUtils.class);
final TreeEvent mockTreeEvent = mock(TreeEvent.class);
final List<TreeEvent> mockSubtree = (List<TreeEvent>) mock(List.class);
@ -231,14 +213,14 @@ class GetSNMPHandlerTest {
when(mockSubtree.get(0)).thenReturn(mockTreeEvent);
when(mockSubtree.isEmpty()).thenReturn(false);
when(mockSubtree.size()).thenReturn(1);
when(mockTreeUtils.getSubtree(any(Target.class), any(org.snmp4j.smi.OID.class))).thenReturn(mockSubtree);
when(mockTreeUtils.walk(any(Target.class), any(org.snmp4j.smi.OID[].class))).thenReturn(mockSubtree);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(mockSnmpManager);
getSNMPHandler.setTreeUtils(mockTreeUtils);
final SNMPWalkException snmpWalkException = assertThrows(
SNMPWalkException.class,
() -> getSNMPHandler.walk(OID)
() -> getSNMPHandler.walk(flowFileAttributes, mockTarget)
);
assertEquals(SNMP_ERROR_EXCEPTION_MESSAGE, snmpWalkException.getMessage());
@ -247,6 +229,7 @@ class GetSNMPHandlerTest {
@SuppressWarnings("unchecked")
@Test
void testWalkSnmpWithLeafElementSubtreeThrowsException() {
final Map<String, String> flowFileAttributes = getFlowFileAttributesWithSingleOID();
final TreeUtils mockTreeUtils = mock(TreeUtils.class);
final TreeEvent mockTreeEvent = mock(TreeEvent.class);
final List<TreeEvent> mockSubtree = (List<TreeEvent>) mock(List.class);
@ -256,16 +239,22 @@ class GetSNMPHandlerTest {
when(mockSubtree.get(0)).thenReturn(mockTreeEvent);
when(mockSubtree.isEmpty()).thenReturn(false);
when(mockSubtree.size()).thenReturn(1);
when(mockTreeUtils.getSubtree(any(Target.class), any(org.snmp4j.smi.OID.class))).thenReturn(mockSubtree);
when(mockTreeUtils.walk(any(Target.class), any(org.snmp4j.smi.OID[].class))).thenReturn(mockSubtree);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(mockSnmpManager);
getSNMPHandler.setTreeUtils(mockTreeUtils);
final SNMPWalkException snmpWalkException = assertThrows(
SNMPWalkException.class,
() -> getSNMPHandler.walk(OID)
() -> getSNMPHandler.walk(flowFileAttributes, mockTarget)
);
assertEquals(String.format(LEAF_ELEMENT_EXCEPTION_MESSAGE, OID), snmpWalkException.getMessage());
assertEquals(String.format(LEAF_ELEMENT_EXCEPTION_MESSAGE, "[" + OID + "]"), snmpWalkException.getMessage());
}
private static Map<String, String> getFlowFileAttributesWithSingleOID() {
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put("snmp$" + OID, "OID value");
return flowFileAttributes;
}
}

View File

@ -21,6 +21,7 @@ import org.apache.nifi.snmp.dto.SNMPSingleResponse;
import org.apache.nifi.snmp.dto.SNMPTreeResponse;
import org.apache.nifi.snmp.dto.SNMPValue;
import org.apache.nifi.snmp.exception.RequestTimeoutException;
import org.apache.nifi.snmp.factory.core.SNMPContext;
import org.apache.nifi.snmp.factory.core.SNMPFactoryProvider;
import org.apache.nifi.snmp.helper.configurations.SNMPConfigurationFactory;
import org.apache.nifi.snmp.helper.configurations.SNMPV1V2cConfigurationFactory;
@ -29,10 +30,12 @@ import org.apache.nifi.snmp.testagents.TestAgent;
import org.apache.nifi.snmp.testagents.TestSNMPV1Agent;
import org.apache.nifi.snmp.testagents.TestSNMPV2cAgent;
import org.apache.nifi.snmp.testagents.TestSNMPV3Agent;
import org.junit.jupiter.api.AfterEach;
import org.apache.nifi.util.EqualsWrapper;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.snmp4j.Snmp;
import org.snmp4j.Target;
import org.snmp4j.agent.mo.DefaultMOFactory;
import org.snmp4j.agent.mo.MOAccessImpl;
import org.snmp4j.mp.SnmpConstants;
@ -44,10 +47,12 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -93,7 +98,6 @@ class SNMPRequestIT {
private static final TestAgent v2cTestAgent = new TestSNMPV2cAgent(LOCALHOST);
private static final TestAgent v3TestAgent = new TestSNMPV3Agent(LOCALHOST);
private SNMPResourceHandler snmpResourceHandler;
static {
registerManagedObjects(v1TestAgent);
@ -101,11 +105,6 @@ class SNMPRequestIT {
registerManagedObjects(v3TestAgent);
}
@AfterEach
public void tearDown() {
snmpResourceHandler.close();
}
private static Stream<Arguments> provideBasicArguments() {
return Stream.of(
Arguments.of(SnmpConstants.version1, snmpV1ConfigurationFactory, v1TestAgent),
@ -152,9 +151,12 @@ class SNMPRequestIT {
agent.start();
try {
final SNMPConfiguration snmpConfiguration = snmpConfigurationFactory.createSnmpGetSetConfiguration(agent.getPort());
snmpResourceHandler = SNMPFactoryProvider.getFactory(version).createSNMPResourceHandler(snmpConfiguration);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final SNMPSingleResponse response = getSNMPHandler.get(READ_ONLY_OID_1);
final SNMPContext factory = SNMPFactoryProvider.getFactory(version);
final Target target = factory.createTargetInstance(snmpConfiguration);
final Snmp snmpManager = factory.createSnmpManagerInstance(snmpConfiguration);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpManager);
final Map<String, String> attributes = Collections.singletonMap("snmp$" + READ_ONLY_OID_1, "");
final SNMPSingleResponse response = getSNMPHandler.get(attributes, target).orElseThrow();
assertEquals(READ_ONLY_OID_VALUE_1, response.getVariableBindings().get(0).getVariable());
assertEquals(SUCCESS, response.getErrorStatusText());
} catch (Exception e) {
@ -171,9 +173,11 @@ class SNMPRequestIT {
agent.start();
try {
final SNMPConfiguration snmpConfiguration = snmpConfigurationFactory.createSnmpGetSetConfiguration(agent.getPort());
snmpResourceHandler = SNMPFactoryProvider.getFactory(version).createSNMPResourceHandler(snmpConfiguration);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final Optional<SNMPSingleResponse> optionalResponse = getSNMPHandler.get(getFlowFileAttributesForSnmpGet(READ_ONLY_OID_1, READ_ONLY_OID_2));
final SNMPContext factory = SNMPFactoryProvider.getFactory(version);
final Target target = factory.createTargetInstance(snmpConfiguration);
final Snmp snmpManager = factory.createSnmpManagerInstance(snmpConfiguration);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpManager);
final Optional<SNMPSingleResponse> optionalResponse = getSNMPHandler.get(getFlowFileAttributesForSnmpGet(READ_ONLY_OID_1, READ_ONLY_OID_2), target);
if (optionalResponse.isPresent()) {
final SNMPSingleResponse response = optionalResponse.get();
Set<String> expectedVariables = new HashSet<>(Arrays.asList(READ_ONLY_OID_VALUE_1, READ_ONLY_OID_VALUE_2));
@ -198,11 +202,14 @@ class SNMPRequestIT {
agent.start();
try {
final SNMPConfiguration snmpConfiguration = snmpConfigurationFactory.createSnmpGetSetConfiguration(agent.getPort());
snmpResourceHandler = SNMPFactoryProvider.getFactory(version).createSNMPResourceHandler(snmpConfiguration);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final SNMPTreeResponse response = getSNMPHandler.walk(WALK_OID);
final SNMPContext factory = SNMPFactoryProvider.getFactory(version);
final Target target = factory.createTargetInstance(snmpConfiguration);
final Snmp snmpManager = factory.createSnmpManagerInstance(snmpConfiguration);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpManager);
final Map<String, String> attributes = Collections.singletonMap("snmp$" + WALK_OID, "");
final Optional<SNMPTreeResponse> response = getSNMPHandler.walk(attributes, target);
assertSubTreeContainsOids(response);
assertSubTreeContainsOids(response.orElseThrow());
} catch (Exception e) {
fail(e);
} finally {
@ -217,11 +224,13 @@ class SNMPRequestIT {
agent.start();
try {
final SNMPConfiguration snmpConfiguration = snmpConfigurationFactory.createSnmpGetSetConfigWithCustomHost(INVALID_HOST, agent.getPort());
snmpResourceHandler = SNMPFactoryProvider.getFactory(version).createSNMPResourceHandler(snmpConfiguration);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final SNMPContext factory = SNMPFactoryProvider.getFactory(version);
final Target target = factory.createTargetInstance(snmpConfiguration);
final Snmp snmpManager = factory.createSnmpManagerInstance(snmpConfiguration);
final Map<String, String> attributes = Collections.singletonMap("snmp$" + READ_ONLY_OID_1, "");
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpManager);
assertThrows(RequestTimeoutException.class, () ->
getSNMPHandler.get(READ_ONLY_OID_1)
getSNMPHandler.get(attributes, target)
);
} catch (Exception e) {
fail(e);
@ -237,20 +246,38 @@ class SNMPRequestIT {
agent.start();
try {
final SNMPConfiguration snmpConfiguration = snmpConfigurationFactory.createSnmpGetSetConfiguration(agent.getPort());
snmpResourceHandler = SNMPFactoryProvider.getFactory(version).createSNMPResourceHandler(snmpConfiguration);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final Optional<SNMPSingleResponse> optionalResponse = getSNMPHandler.get(getFlowFileAttributesForSnmpGet(INVALID_OID, READ_ONLY_OID_2));
final SNMPContext factory = SNMPFactoryProvider.getFactory(version);
final Target target = factory.createTargetInstance(snmpConfiguration);
final Snmp snmpManager = factory.createSnmpManagerInstance(snmpConfiguration);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpManager);
final Optional<SNMPSingleResponse> optionalResponse = getSNMPHandler.get(getFlowFileAttributesForSnmpGet(INVALID_OID, READ_ONLY_OID_2), target);
if (optionalResponse.isPresent()) {
final SNMPSingleResponse response = optionalResponse.get();
final List<SNMPValue> actualVariableBindings = response.getVariableBindings();
final List<SNMPValue> expectedVariableBindings;
final List<Function<SNMPValue, Object>> equalsProperties = Arrays.asList(
SNMPValue::getOid,
SNMPValue::getVariable
);
if (version == SnmpConstants.version1) {
assertEquals("Null", response.getVariableBindings().get(1).getVariable());
assertEquals(READ_ONLY_OID_VALUE_2, response.getVariableBindings().get(0).getVariable());
expectedVariableBindings = Arrays.asList(
new SNMPValue(INVALID_OID, "Null"),
new SNMPValue(READ_ONLY_OID_2, READ_ONLY_OID_VALUE_2)
);
assertEquals(NO_SUCH_NAME, response.getErrorStatusText());
} else {
assertEquals(NO_SUCH_OBJECT, response.getVariableBindings().get(1).getVariable());
assertEquals(READ_ONLY_OID_VALUE_2, response.getVariableBindings().get(0).getVariable());
expectedVariableBindings = Arrays.asList(
new SNMPValue(INVALID_OID, NO_SUCH_OBJECT),
new SNMPValue(READ_ONLY_OID_2, READ_ONLY_OID_VALUE_2)
);
assertEquals(SUCCESS, response.getErrorStatusText());
}
assertEquals(
new HashSet<>(EqualsWrapper.wrapList(actualVariableBindings, equalsProperties)),
new HashSet<>(EqualsWrapper.wrapList(expectedVariableBindings, equalsProperties))
);
} else {
fail("Response is not present.");
}
@ -269,9 +296,11 @@ class SNMPRequestIT {
try {
final Map<String, String> flowFileAttributes = getFlowFileAttributes(WRITE_ONLY_OID);
final SNMPConfiguration snmpConfiguration = snmpConfigurationFactory.createSnmpGetSetConfiguration(agent.getPort());
snmpResourceHandler = SNMPFactoryProvider.getFactory(version).createSNMPResourceHandler(snmpConfiguration);
final SetSNMPHandler setSNMPHandler = new SetSNMPHandler(snmpResourceHandler);
final Optional<SNMPSingleResponse> optionalResponse = setSNMPHandler.set(flowFileAttributes);
final SNMPContext factory = SNMPFactoryProvider.getFactory(version);
final Target target = factory.createTargetInstance(snmpConfiguration);
final Snmp snmpManager = factory.createSnmpManagerInstance(snmpConfiguration);
final SetSNMPHandler setSNMPHandler = new SetSNMPHandler(snmpManager);
final Optional<SNMPSingleResponse> optionalResponse = setSNMPHandler.set(flowFileAttributes, target);
if (optionalResponse.isPresent()) {
final SNMPSingleResponse response = optionalResponse.get();
assertEquals(TEST_OID_VALUE, response.getVariableBindings().get(0).getVariable());
@ -295,9 +324,11 @@ class SNMPRequestIT {
try {
final Map<String, String> flowFileAttributes = getFlowFileAttributes(READ_ONLY_OID_1);
final SNMPConfiguration snmpConfiguration = snmpConfigurationFactory.createSnmpGetSetConfiguration(agent.getPort());
snmpResourceHandler = SNMPFactoryProvider.getFactory(version).createSNMPResourceHandler(snmpConfiguration);
final SetSNMPHandler setSNMPHandler = new SetSNMPHandler(snmpResourceHandler);
final Optional<SNMPSingleResponse> optionalResponse = setSNMPHandler.set(flowFileAttributes);
final SNMPContext factory = SNMPFactoryProvider.getFactory(version);
final Target target = factory.createTargetInstance(snmpConfiguration);
final Snmp snmpManager = factory.createSnmpManagerInstance(snmpConfiguration);
final SetSNMPHandler setSNMPHandler = new SetSNMPHandler(snmpManager);
final Optional<SNMPSingleResponse> optionalResponse = setSNMPHandler.set(flowFileAttributes, target);
if (optionalResponse.isPresent()) {
final SNMPSingleResponse response = optionalResponse.get();
assertEquals(cannotSetReadOnlyOidStatusMessage, response.getErrorStatusText());
@ -319,11 +350,14 @@ class SNMPRequestIT {
agent.start();
try {
final SNMPConfiguration snmpConfiguration = snmpConfigurationFactory.createSnmpGetSetConfiguration(agent.getPort());
snmpResourceHandler = SNMPFactoryProvider.getFactory(version).createSNMPResourceHandler(snmpConfiguration);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final SNMPSingleResponse response = getSNMPHandler.get(WRITE_ONLY_OID);
final SNMPContext factory = SNMPFactoryProvider.getFactory(version);
final Target target = factory.createTargetInstance(snmpConfiguration);
final Snmp snmpManager = factory.createSnmpManagerInstance(snmpConfiguration);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpManager);
final Map<String, String> attributes = Collections.singletonMap("snmp$" + WRITE_ONLY_OID, "");
final Optional<SNMPSingleResponse> response = getSNMPHandler.get(attributes, target);
assertEquals(cannotModifyOidStatusMessage, response.getErrorStatusText());
assertEquals(cannotModifyOidStatusMessage, response.map(SNMPSingleResponse::getErrorStatusText).orElseThrow());
} catch (Exception e) {
fail(e);
} finally {
@ -339,9 +373,12 @@ class SNMPRequestIT {
agent.start();
try {
final SNMPConfiguration snmpConfiguration = snmpConfigurationFactory.createSnmpGetSetConfiguration(agent.getPort());
snmpResourceHandler = SNMPFactoryProvider.getFactory(version).createSNMPResourceHandler(snmpConfiguration);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpResourceHandler);
final SNMPSingleResponse response = getSNMPHandler.get(INVALID_OID);
final SNMPContext factory = SNMPFactoryProvider.getFactory(version);
final Target target = factory.createTargetInstance(snmpConfiguration);
final Snmp snmpManager = factory.createSnmpManagerInstance(snmpConfiguration);
final GetSNMPHandler getSNMPHandler = new GetSNMPHandler(snmpManager);
final Map<String, String> attributes = Collections.singletonMap("snmp$" + INVALID_OID, "");
final SNMPSingleResponse response = getSNMPHandler.get(attributes, target).orElseThrow();
if (version == SnmpConstants.version1) {
assertEquals(getInvalidOidStatusMessage, response.getErrorStatusText());
} else {
@ -364,9 +401,11 @@ class SNMPRequestIT {
try {
final Map<String, String> flowFileAttributes = getFlowFileAttributes(INVALID_OID);
final SNMPConfiguration snmpConfiguration = snmpConfigurationFactory.createSnmpGetSetConfiguration(agent.getPort());
snmpResourceHandler = SNMPFactoryProvider.getFactory(version).createSNMPResourceHandler(snmpConfiguration);
final SetSNMPHandler setSNMPHandler = new SetSNMPHandler(snmpResourceHandler);
final Optional<SNMPSingleResponse> optionalResponse = setSNMPHandler.set(flowFileAttributes);
final SNMPContext factory = SNMPFactoryProvider.getFactory(version);
final Target target = factory.createTargetInstance(snmpConfiguration);
final Snmp snmpManager = factory.createSnmpManagerInstance(snmpConfiguration);
final SetSNMPHandler setSNMPHandler = new SetSNMPHandler(snmpManager);
final Optional<SNMPSingleResponse> optionalResponse = setSNMPHandler.set(flowFileAttributes, target);
if (optionalResponse.isPresent()) {
final SNMPSingleResponse response = optionalResponse.get();
assertEquals(setInvalidOidStatusMessage, response.getErrorStatusText());

View File

@ -21,7 +21,6 @@ import org.apache.nifi.snmp.configuration.V2TrapConfiguration;
import org.apache.nifi.snmp.factory.trap.V1TrapPDUFactory;
import org.apache.nifi.snmp.factory.trap.V2TrapPDUFactory;
import org.apache.nifi.util.MockComponentLog;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.snmp4j.PDU;
@ -47,7 +46,6 @@ class SendTrapSNMPHandlerTest {
private MockComponentLog mockComponentLog;
private V1TrapConfiguration mockV1TrapConfiguration;
private V2TrapConfiguration mockV2TrapConfiguration;
private SNMPResourceHandler snmpResourceHandler;
private SendTrapSNMPHandler sendTrapSNMPHandler;
@BeforeEach
@ -66,30 +64,23 @@ class SendTrapSNMPHandlerTest {
when(mockSnmpManager.send(mockPdu, mockTarget)).thenReturn(mockResponseEvent);
snmpResourceHandler = new SNMPResourceHandler(mockSnmpManager, mockTarget);
sendTrapSNMPHandler = new SendTrapSNMPHandler(snmpResourceHandler, Instant.now(), mockComponentLog) {
sendTrapSNMPHandler = new SendTrapSNMPHandler(mockSnmpManager, Instant.now(), mockComponentLog) {
@Override
V1TrapPDUFactory createV1TrapPduFactory(final Instant startTime) {
V1TrapPDUFactory createV1TrapPduFactory(final Target target, final Instant startTime) {
return mockV1TrapPDUFactory;
}
@Override
V2TrapPDUFactory createV2TrapPduFactory(final Instant startTime) {
V2TrapPDUFactory createV2TrapPduFactory(final Target target, final Instant startTime) {
return mockV2TrapPDUFactory;
}
};
}
@AfterEach
public void tearDown() {
snmpResourceHandler.close();
}
@Test
void testSendV1TrapWithValidFlowfile() throws IOException {
final String flowFileOid = "1.3.6.1.2.1.1.1.0";
sendTrapSNMPHandler.sendTrap(Collections.singletonMap("snmp$" + flowFileOid, "OID value"), mockV1TrapConfiguration);
sendTrapSNMPHandler.sendTrap(Collections.singletonMap("snmp$" + flowFileOid, "OID value"), mockV1TrapConfiguration, mockTarget);
verify(mockSnmpManager).send(mockPdu, mockTarget);
}
@ -97,14 +88,14 @@ class SendTrapSNMPHandlerTest {
@Test
void testSendV2TrapWithValidFlowfile() throws IOException {
final String flowFileOid = "1.3.6.1.2.1.1.1.0";
sendTrapSNMPHandler.sendTrap(Collections.singletonMap("snmp$" + flowFileOid, "OID value"), mockV2TrapConfiguration);
sendTrapSNMPHandler.sendTrap(Collections.singletonMap("snmp$" + flowFileOid, "OID value"), mockV2TrapConfiguration, mockTarget);
verify(mockSnmpManager).send(mockPdu, mockTarget);
}
@Test
void testSendV1TrapWithFlowfileWithoutOptionalSnmpAttributes() throws IOException {
sendTrapSNMPHandler.sendTrap(Collections.singletonMap("invalid key", "invalid value"), mockV1TrapConfiguration);
sendTrapSNMPHandler.sendTrap(Collections.singletonMap("invalid key", "invalid value"), mockV1TrapConfiguration, mockTarget);
verify(mockSnmpManager).send(mockPdu, mockTarget);

View File

@ -33,7 +33,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import static org.apache.nifi.snmp.operations.SNMPResourceHandler.REQUEST_TIMEOUT_EXCEPTION_TEMPLATE;
import static org.apache.nifi.snmp.processors.AbstractSNMPProcessor.REQUEST_TIMEOUT_EXCEPTION_TEMPLATE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -65,8 +65,7 @@ class SetSNMPHandlerTest {
when(mockPduFactory.createPDU(mockTarget)).thenReturn(mockPdu);
SNMPResourceHandler snmpResourceHandler = new SNMPResourceHandler(mockSnmpManager, mockTarget);
setSNMPHandler = new SetSNMPHandler(snmpResourceHandler);
setSNMPHandler = new SetSNMPHandler(mockSnmpManager);
SetSNMPHandler.setSetPduFactory(mockPduFactory);
}
@ -84,7 +83,7 @@ class SetSNMPHandlerTest {
when(mockResponseEvent.getResponse()).thenReturn(mockResponsePdu);
when(mockSnmpManager.set(any(PDU.class), any(Target.class))).thenReturn(mockResponseEvent);
setSNMPHandler.set(flowFileAttributes);
setSNMPHandler.set(flowFileAttributes, mockTarget);
verify(mockSnmpManager).set(mockPdu, mockTarget);
}
@ -99,7 +98,7 @@ class SetSNMPHandlerTest {
final RequestTimeoutException requestTimeoutException = assertThrows(
RequestTimeoutException.class,
() -> setSNMPHandler.set(flowFileAttributes)
() -> setSNMPHandler.set(flowFileAttributes, mockTarget)
);
assertEquals(String.format(REQUEST_TIMEOUT_EXCEPTION_TEMPLATE, "write"), requestTimeoutException.getMessage());
@ -112,7 +111,7 @@ class SetSNMPHandlerTest {
when(mockSnmpManager.set(any(PDU.class), any(Target.class))).thenReturn(mockResponseEvent);
final Optional<SNMPSingleResponse> optionalResponse = setSNMPHandler.set(flowFileAttributes);
final Optional<SNMPSingleResponse> optionalResponse = setSNMPHandler.set(flowFileAttributes, mockTarget);
assertFalse(optionalResponse.isPresent());
}

View File

@ -66,7 +66,7 @@ class AbstractSNMPProcessorTest {
when(mockResponse.getErrorStatusText()).thenReturn(errorStatus);
getSNMP.handleResponse(mockProcessContext, mockProcessSession, mockFlowFile, mockResponse, GetSNMP.REL_SUCCESS, GetSNMP.REL_FAILURE, "provenanceAddress");
getSNMP.handleResponse(mockProcessContext, mockProcessSession, mockFlowFile, mockResponse, GetSNMP.REL_SUCCESS, GetSNMP.REL_FAILURE, "provenanceAddress", true);
final String actualLogMessage = getTestRunner.getLogger().getErrorMessages().get(0).getMsg();
final String expectedLogMessage = String.format("SNMP request failed, response error: %s", errorStatus);
@ -82,7 +82,7 @@ class AbstractSNMPProcessorTest {
when(mockResponse.getVariableBindings()).thenReturn(vbs);
getSNMP.handleResponse(mockProcessContext, mockProcessSession, mockFlowFile, mockResponse, GetSNMP.REL_SUCCESS, GetSNMP.REL_FAILURE, "provenanceAddress");
getSNMP.handleResponse(mockProcessContext, mockProcessSession, mockFlowFile, mockResponse, GetSNMP.REL_SUCCESS, GetSNMP.REL_FAILURE, "provenanceAddress", true);
final String actualLogMessage = getTestRunner.getLogger().getErrorMessages().get(0).getMsg();
@ -97,7 +97,7 @@ class AbstractSNMPProcessorTest {
when(mockResponse.getVariableBindings()).thenReturn(Collections.emptyList());
getSNMP.handleResponse(mockProcessContext, mockProcessSession, mockFlowFile, mockResponse, GetSNMP.REL_SUCCESS, GetSNMP.REL_FAILURE, "provenanceAddress");
getSNMP.handleResponse(mockProcessContext, mockProcessSession, mockFlowFile, mockResponse, GetSNMP.REL_SUCCESS, GetSNMP.REL_FAILURE, "provenanceAddress", true);
final String actualLogMessage = getTestRunner.getLogger().getErrorMessages().get(0).getMsg();
@ -116,7 +116,7 @@ class AbstractSNMPProcessorTest {
final Map<String, String> attributes = Collections.singletonMap(TEST_OID, "testOIDValue");
when(mockResponse.getAttributes()).thenReturn(attributes);
getSNMP.handleResponse(mockProcessContext, mockProcessSession, mockFlowFile, mockResponse, GetSNMP.REL_SUCCESS, GetSNMP.REL_FAILURE, "provenanceAddress");
getSNMP.handleResponse(mockProcessContext, mockProcessSession, mockFlowFile, mockResponse, GetSNMP.REL_SUCCESS, GetSNMP.REL_FAILURE, "provenanceAddress", true);
final List<MockFlowFile> flowFilesForRelationship = mockProcessSession.getFlowFilesForRelationship(GetSNMP.REL_SUCCESS);
assertEquals("testOIDValue", flowFilesForRelationship.get(0).getAttribute(TEST_OID));
@ -131,7 +131,7 @@ class AbstractSNMPProcessorTest {
when(mockResponse.getVariableBindings()).thenReturn(vbs);
getSNMP.handleResponse(mockProcessContext, mockProcessSession, mockFlowFile, mockResponse, GetSNMP.REL_SUCCESS, GetSNMP.REL_FAILURE, "provenanceAddress");
getSNMP.handleResponse(mockProcessContext, mockProcessSession, mockFlowFile, mockResponse, GetSNMP.REL_SUCCESS, GetSNMP.REL_FAILURE, "provenanceAddress", true);
final String actualLogMessage = getTestRunner.getLogger().getErrorMessages().get(0).getMsg();
@ -150,7 +150,7 @@ class AbstractSNMPProcessorTest {
when(mockResponse.getVariableBindings()).thenReturn(vbs);
getSNMP.handleResponse(mockProcessContext, mockProcessSession, mockFlowFile, mockResponse, GetSNMP.REL_SUCCESS, GetSNMP.REL_FAILURE, "provenanceAddress");
getSNMP.handleResponse(mockProcessContext, mockProcessSession, mockFlowFile, mockResponse, GetSNMP.REL_SUCCESS, GetSNMP.REL_FAILURE, "provenanceAddress", true);
final String actualLogMessage = getTestRunner.getLogger().getErrorMessages().get(0).getMsg();

View File

@ -1,61 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.snmp.processors;
import org.apache.nifi.snmp.helper.testrunners.SNMPV1TestRunnerFactory;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.Test;
import java.util.concurrent.atomic.AtomicLong;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
class GetSNMPTest {
private static final String OID = "1.3.6.1.4.1.32437.1.5.1.4.2.0";
@Test
void testOnTriggerWithGetStrategyPerformsSnmpGet() {
final TestRunner getSnmpTestRunner = new SNMPV1TestRunnerFactory().createSnmpGetTestRunner(0, OID, "GET");
final GetSNMP spyGetSNMP = spy((GetSNMP) getSnmpTestRunner.getProcessor());
final MockProcessSession mockProcessSession = new MockProcessSession(new SharedSessionState(spyGetSNMP, new AtomicLong(0L)), spyGetSNMP);
doNothing().when(spyGetSNMP).performSnmpGet(any(), any(), any(), any());
spyGetSNMP.onTrigger(getSnmpTestRunner.getProcessContext(), mockProcessSession);
verify(spyGetSNMP).performSnmpGet(any(), any(), any(), any());
}
@Test
void testOnTriggerWithWalkStrategyPerformsSnmpWalk() {
final TestRunner getSnmpTestRunner = new SNMPV1TestRunnerFactory().createSnmpGetTestRunner(0, OID, "WALK");
final GetSNMP spyGetSNMP = spy((GetSNMP) getSnmpTestRunner.getProcessor());
final MockProcessSession mockProcessSession = new MockProcessSession(new SharedSessionState(spyGetSNMP, new AtomicLong(0L)), spyGetSNMP);
doNothing().when(spyGetSNMP).performSnmpWalk(any(), any(), any(), any());
spyGetSNMP.onTrigger(getSnmpTestRunner.getProcessContext(), mockProcessSession);
verify(spyGetSNMP).performSnmpWalk(any(), any(), any(), any());
}
}