NIFI-12944 - Add PeerAddress as Attribute into the flowfile

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8557.
This commit is contained in:
Ricardo Ferreira 2024-03-21 16:34:21 +00:00 committed by Pierre Villard
parent dd9d1c978f
commit 258715539e
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 31 additions and 2 deletions

View File

@ -25,6 +25,7 @@ import org.snmp4j.CommandResponder;
import org.snmp4j.CommandResponderEvent;
import org.snmp4j.PDU;
import org.snmp4j.PDUv1;
import org.snmp4j.smi.Address;
import java.util.Map;
@ -46,7 +47,7 @@ public class SNMPTrapReceiver implements CommandResponder {
final PDU pdu = event.getPDU();
if (isValidTrapPdu(pdu)) {
final ProcessSession processSession = processSessionFactory.createSession();
final FlowFile flowFile = createFlowFile(processSession, pdu);
final FlowFile flowFile = createFlowFile(processSession,event);
processSession.getProvenanceReporter().create(flowFile, event.getPeerAddress() + "/" + pdu.getRequestID());
if (pdu.getErrorStatus() == PDU.noError) {
processSession.transfer(flowFile, REL_SUCCESS);
@ -59,14 +60,19 @@ public class SNMPTrapReceiver implements CommandResponder {
}
}
private FlowFile createFlowFile(final ProcessSession processSession, final PDU pdu) {
private FlowFile createFlowFile(final ProcessSession processSession, final CommandResponderEvent event) {
FlowFile flowFile = processSession.create();
final Map<String, String> attributes;
final PDU pdu = event.getPDU();
final Address peerAddress = event.getPeerAddress();
if (pdu instanceof PDUv1) {
attributes = SNMPUtils.getV1TrapPduAttributeMap((PDUv1) pdu);
} else {
attributes = SNMPUtils.getPduAttributeMap(pdu);
}
if (peerAddress.isValid()) {
processSession.putAttribute(flowFile, SNMPUtils.SNMP_PROP_PREFIX + "peerAddress", peerAddress.toString());
}
flowFile = processSession.putAllAttributes(flowFile, attributes);
return flowFile;
}

View File

@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test;
import org.snmp4j.CommandResponderEvent;
import org.snmp4j.PDU;
import org.snmp4j.PDUv1;
import org.snmp4j.smi.Address;
import org.snmp4j.smi.OID;
import org.snmp4j.smi.VariableBinding;
@ -95,9 +96,15 @@ class SNMPTrapReceiverTest {
when(mockV1Pdu.getType()).thenReturn(PDU.V1TRAP);
when(mockV1Pdu.getEnterprise()).thenReturn(new OID("1.3.6.1.2.1.1.1.0"));
when(mockV1Pdu.getSpecificTrap()).thenReturn(4);
final Address mockAddress = mock(Address.class);
when(mockAddress.toString()).thenReturn("127.0.0.1/62");
when(mockAddress.isValid()).thenReturn(true);
final Vector<VariableBinding> vbs = new Vector<>();
doReturn(vbs).when(mockV1Pdu).getVariableBindings();
when(mockEvent.getPDU()).thenReturn(mockV1Pdu);
when(mockEvent.getPeerAddress()).thenReturn(mockAddress);
when(mockProcessSessionFactory.createSession()).thenReturn(mockProcessSession);
snmpTrapReceiver.processPdu(mockEvent);
@ -107,6 +114,8 @@ class SNMPTrapReceiverTest {
assertEquals("1.3.6.1.2.1.1.1.0", flowFile.getAttribute("snmp$enterprise"));
assertEquals(String.valueOf(4), flowFile.getAttribute("snmp$specificTrapType"));
assertEquals("127.0.0.1/62", flowFile.getAttribute("snmp$peerAddress"));
}
@Test
@ -117,8 +126,15 @@ class SNMPTrapReceiverTest {
when(mockPdu.getErrorIndex()).thenReturn(123);
when(mockPdu.getErrorStatusText()).thenReturn("test error status text");
final Vector<VariableBinding> vbs = new Vector<>();
final Address mockAddress = mock(Address.class);
when(mockAddress.toString()).thenReturn("127.0.0.1/62");
when(mockAddress.isValid()).thenReturn(true);
doReturn(vbs).when(mockPdu).getVariableBindings();
when(mockEvent.getPDU()).thenReturn(mockPdu);
when(mockEvent.getPeerAddress()).thenReturn(mockAddress);
when(mockProcessSessionFactory.createSession()).thenReturn(mockProcessSession);
snmpTrapReceiver.processPdu(mockEvent);
@ -128,6 +144,7 @@ class SNMPTrapReceiverTest {
assertEquals(String.valueOf(123), flowFile.getAttribute("snmp$errorIndex"));
assertEquals("test error status text", flowFile.getAttribute("snmp$errorStatusText"));
assertEquals("127.0.0.1/62", flowFile.getAttribute("snmp$peerAddress"));
}
@Test
@ -136,9 +153,14 @@ class SNMPTrapReceiverTest {
when(mockPdu.getType()).thenReturn(PDU.TRAP);
when(mockPdu.getErrorStatus()).thenReturn(PDU.badValue);
final Address mockAddress = mock(Address.class);
when(mockAddress.isValid()).thenReturn(false);
final Vector<VariableBinding> vbs = new Vector<>();
doReturn(vbs).when(mockPdu).getVariableBindings();
when(mockEvent.getPDU()).thenReturn(mockPdu);
when(mockEvent.getPeerAddress()).thenReturn(mockAddress);
when(mockProcessSessionFactory.createSession()).thenReturn(mockProcessSession);
snmpTrapReceiver.processPdu(mockEvent);
@ -149,5 +171,6 @@ class SNMPTrapReceiverTest {
final FlowFile flowFile = flowFiles.get(0);
assertEquals(String.valueOf(PDU.badValue), flowFile.getAttribute("snmp$errorStatus"));
assertEquals(null, flowFile.getAttribute("snmp$peerAddress"));
}
}