diff --git a/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/operations/SNMPTrapReceiver.java b/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/operations/SNMPTrapReceiver.java index d2eaf19528..c93bb83e46 100644 --- a/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/operations/SNMPTrapReceiver.java +++ b/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/operations/SNMPTrapReceiver.java @@ -25,6 +25,7 @@ import org.snmp4j.CommandResponder; import org.snmp4j.CommandResponderEvent; import org.snmp4j.PDU; import org.snmp4j.PDUv1; +import org.snmp4j.smi.Address; import java.util.Map; @@ -46,7 +47,7 @@ public class SNMPTrapReceiver implements CommandResponder { final PDU pdu = event.getPDU(); if (isValidTrapPdu(pdu)) { final ProcessSession processSession = processSessionFactory.createSession(); - final FlowFile flowFile = createFlowFile(processSession, pdu); + final FlowFile flowFile = createFlowFile(processSession,event); processSession.getProvenanceReporter().create(flowFile, event.getPeerAddress() + "/" + pdu.getRequestID()); if (pdu.getErrorStatus() == PDU.noError) { processSession.transfer(flowFile, REL_SUCCESS); @@ -59,14 +60,19 @@ public class SNMPTrapReceiver implements CommandResponder { } } - private FlowFile createFlowFile(final ProcessSession processSession, final PDU pdu) { + private FlowFile createFlowFile(final ProcessSession processSession, final CommandResponderEvent event) { FlowFile flowFile = processSession.create(); final Map attributes; + final PDU pdu = event.getPDU(); + final Address peerAddress = event.getPeerAddress(); if (pdu instanceof PDUv1) { attributes = SNMPUtils.getV1TrapPduAttributeMap((PDUv1) pdu); } else { attributes = SNMPUtils.getPduAttributeMap(pdu); } + if (peerAddress.isValid()) { + processSession.putAttribute(flowFile, SNMPUtils.SNMP_PROP_PREFIX + "peerAddress", peerAddress.toString()); + } flowFile = processSession.putAllAttributes(flowFile, attributes); return flowFile; } diff --git a/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/test/java/org/apache/nifi/snmp/operations/SNMPTrapReceiverTest.java b/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/test/java/org/apache/nifi/snmp/operations/SNMPTrapReceiverTest.java index eeca40eefe..0d273c3921 100644 --- a/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/test/java/org/apache/nifi/snmp/operations/SNMPTrapReceiverTest.java +++ b/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/test/java/org/apache/nifi/snmp/operations/SNMPTrapReceiverTest.java @@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test; import org.snmp4j.CommandResponderEvent; import org.snmp4j.PDU; import org.snmp4j.PDUv1; +import org.snmp4j.smi.Address; import org.snmp4j.smi.OID; import org.snmp4j.smi.VariableBinding; @@ -95,9 +96,15 @@ class SNMPTrapReceiverTest { when(mockV1Pdu.getType()).thenReturn(PDU.V1TRAP); when(mockV1Pdu.getEnterprise()).thenReturn(new OID("1.3.6.1.2.1.1.1.0")); when(mockV1Pdu.getSpecificTrap()).thenReturn(4); + + final Address mockAddress = mock(Address.class); + when(mockAddress.toString()).thenReturn("127.0.0.1/62"); + when(mockAddress.isValid()).thenReturn(true); + final Vector vbs = new Vector<>(); doReturn(vbs).when(mockV1Pdu).getVariableBindings(); when(mockEvent.getPDU()).thenReturn(mockV1Pdu); + when(mockEvent.getPeerAddress()).thenReturn(mockAddress); when(mockProcessSessionFactory.createSession()).thenReturn(mockProcessSession); snmpTrapReceiver.processPdu(mockEvent); @@ -107,6 +114,8 @@ class SNMPTrapReceiverTest { assertEquals("1.3.6.1.2.1.1.1.0", flowFile.getAttribute("snmp$enterprise")); assertEquals(String.valueOf(4), flowFile.getAttribute("snmp$specificTrapType")); + assertEquals("127.0.0.1/62", flowFile.getAttribute("snmp$peerAddress")); + } @Test @@ -117,8 +126,15 @@ class SNMPTrapReceiverTest { when(mockPdu.getErrorIndex()).thenReturn(123); when(mockPdu.getErrorStatusText()).thenReturn("test error status text"); final Vector vbs = new Vector<>(); + + final Address mockAddress = mock(Address.class); + when(mockAddress.toString()).thenReturn("127.0.0.1/62"); + when(mockAddress.isValid()).thenReturn(true); + doReturn(vbs).when(mockPdu).getVariableBindings(); when(mockEvent.getPDU()).thenReturn(mockPdu); + when(mockEvent.getPeerAddress()).thenReturn(mockAddress); + when(mockProcessSessionFactory.createSession()).thenReturn(mockProcessSession); snmpTrapReceiver.processPdu(mockEvent); @@ -128,6 +144,7 @@ class SNMPTrapReceiverTest { assertEquals(String.valueOf(123), flowFile.getAttribute("snmp$errorIndex")); assertEquals("test error status text", flowFile.getAttribute("snmp$errorStatusText")); + assertEquals("127.0.0.1/62", flowFile.getAttribute("snmp$peerAddress")); } @Test @@ -136,9 +153,14 @@ class SNMPTrapReceiverTest { when(mockPdu.getType()).thenReturn(PDU.TRAP); when(mockPdu.getErrorStatus()).thenReturn(PDU.badValue); + + final Address mockAddress = mock(Address.class); + when(mockAddress.isValid()).thenReturn(false); + final Vector vbs = new Vector<>(); doReturn(vbs).when(mockPdu).getVariableBindings(); when(mockEvent.getPDU()).thenReturn(mockPdu); + when(mockEvent.getPeerAddress()).thenReturn(mockAddress); when(mockProcessSessionFactory.createSession()).thenReturn(mockProcessSession); snmpTrapReceiver.processPdu(mockEvent); @@ -149,5 +171,6 @@ class SNMPTrapReceiverTest { final FlowFile flowFile = flowFiles.get(0); assertEquals(String.valueOf(PDU.badValue), flowFile.getAttribute("snmp$errorStatus")); + assertEquals(null, flowFile.getAttribute("snmp$peerAddress")); } }