mirror of
https://github.com/apache/nifi.git
synced 2025-02-09 03:25:04 +00:00
NIFI-3685 Azure EventHub processor tests fail
Moved test classes into the proper folder. Fixed outdated mocking and added assertions that actually tests the content. This closes #3346. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
parent
1d06044e3b
commit
d53cefa2a7
@ -17,21 +17,28 @@
|
||||
package org.apache.nifi.processors.azure.eventhub;
|
||||
|
||||
import com.microsoft.azure.eventhubs.EventData;
|
||||
import com.microsoft.azure.eventhubs.EventData.SystemProperties;
|
||||
import com.microsoft.azure.eventhubs.PartitionReceiver;
|
||||
import com.microsoft.azure.servicebus.ServiceBusException;
|
||||
import com.microsoft.azure.servicebus.amqp.AmqpConstants;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Clock;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.powermock.reflect.Whitebox;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
|
||||
public class GetAzureEventHubTest {
|
||||
@ -40,6 +47,10 @@ public class GetAzureEventHubTest {
|
||||
private static final String eventHubName = "get-test";
|
||||
private static final String sasKeyName = "bogus-policy";
|
||||
private static final String sasKey = "9rHmHqxoOVWOb8wS09dvqXYxnNiLqxNMCbmt6qMaQyU!";
|
||||
private static final Date ENQUEUED_TIME_VALUE = Date.from(Clock.fixed(Instant.now(), ZoneId.systemDefault()).instant());
|
||||
public static final long SEQUENCE_NUMBER_VALUE = 13L;
|
||||
public static final String OFFSET_VALUE = "100";
|
||||
public static final String PARTITION_KEY_VALUE = "0";
|
||||
|
||||
private TestRunner testRunner;
|
||||
private MockGetAzureEventHub processor;
|
||||
@ -69,13 +80,12 @@ public class GetAzureEventHubTest {
|
||||
testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000");
|
||||
testRunner.assertValid();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void verifyRelationships(){
|
||||
|
||||
assert(1 == processor.getRelationships().size());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoPartitions(){
|
||||
MockGetAzureEventHubNoPartitions mockProcessor = new MockGetAzureEventHubNoPartitions();
|
||||
@ -85,6 +95,7 @@ public class GetAzureEventHubTest {
|
||||
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0);
|
||||
testRunner.clearTransferState();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullRecieve(){
|
||||
setUpStandardTestConfig();
|
||||
@ -93,6 +104,7 @@ public class GetAzureEventHubTest {
|
||||
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0);
|
||||
testRunner.clearTransferState();
|
||||
}
|
||||
|
||||
@Test(expected = AssertionError.class)
|
||||
public void testThrowGetReceiver(){
|
||||
setUpStandardTestConfig();
|
||||
@ -101,12 +113,37 @@ public class GetAzureEventHubTest {
|
||||
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0);
|
||||
testRunner.clearTransferState();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNormalFlow() throws Exception {
|
||||
|
||||
setUpStandardTestConfig();
|
||||
testRunner.run(1, true);
|
||||
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 10);
|
||||
|
||||
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GetAzureEventHub.REL_SUCCESS).get(0);
|
||||
flowFile.assertContentEquals("test event number: 0");
|
||||
flowFile.assertAttributeEquals("eventhub.enqueued.timestamp", ENQUEUED_TIME_VALUE.toInstant().toString());
|
||||
flowFile.assertAttributeEquals("eventhub.offset", OFFSET_VALUE);
|
||||
flowFile.assertAttributeEquals("eventhub.sequence", String.valueOf(SEQUENCE_NUMBER_VALUE));
|
||||
flowFile.assertAttributeEquals("eventhub.name", eventHubName);
|
||||
|
||||
testRunner.clearTransferState();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNormalNotReceivedEventsFlow() throws Exception {
|
||||
setUpStandardTestConfig();
|
||||
processor.received = false;
|
||||
testRunner.run(1, true);
|
||||
testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 10);
|
||||
|
||||
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GetAzureEventHub.REL_SUCCESS).get(0);
|
||||
flowFile.assertContentEquals("test event number: 0");
|
||||
flowFile.assertAttributeNotExists("eventhub.enqueued.timestamp");
|
||||
flowFile.assertAttributeNotExists("eventhub.offset");
|
||||
flowFile.assertAttributeNotExists("eventhub.sequence");
|
||||
flowFile.assertAttributeEquals("eventhub.name", eventHubName);
|
||||
|
||||
testRunner.clearTransferState();
|
||||
}
|
||||
|
||||
@ -117,6 +154,7 @@ public class GetAzureEventHubTest {
|
||||
|
||||
boolean nullReceive = false;
|
||||
boolean getReceiverThrow = false;
|
||||
boolean received = true;
|
||||
|
||||
@Override
|
||||
protected void setupReceiver(final String connectionString) throws ProcessException{
|
||||
@ -140,12 +178,17 @@ public class GetAzureEventHubTest {
|
||||
}
|
||||
final LinkedList<EventData> receivedEvents = new LinkedList<>();
|
||||
for(int i = 0; i < 10; i++){
|
||||
final EventData eventData = new EventData(String.format("test event number: %d",i).getBytes());
|
||||
Whitebox.setInternalState(eventData,"isReceivedEvent",true);
|
||||
Whitebox.setInternalState(eventData, "partitionKey","0");
|
||||
Whitebox.setInternalState(eventData, "offset", "100");
|
||||
Whitebox.setInternalState(eventData, "sequenceNumber",13L);
|
||||
Whitebox.setInternalState(eventData, "enqueuedTime",Instant.now().minus(100L, ChronoUnit.SECONDS));
|
||||
EventData eventData = new EventData(String.format("test event number: %d", i).getBytes());
|
||||
if (received) {
|
||||
HashMap<String, Object> properties = new HashMap<>();
|
||||
properties.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, PARTITION_KEY_VALUE);
|
||||
properties.put(AmqpConstants.OFFSET_ANNOTATION_NAME, OFFSET_VALUE);
|
||||
properties.put(AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME, SEQUENCE_NUMBER_VALUE);
|
||||
properties.put(AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME, ENQUEUED_TIME_VALUE);
|
||||
|
||||
SystemProperties systemProperties = new SystemProperties(properties);
|
||||
Whitebox.setInternalState(eventData, "systemProperties", systemProperties);
|
||||
}
|
||||
receivedEvents.add(eventData);
|
||||
}
|
||||
|
@ -99,7 +99,9 @@ public class PutAzureEventHubTest {
|
||||
private static class MockPutAzureEventHub extends PutAzureEventHub{
|
||||
|
||||
byte[] receivedBuffer = null;
|
||||
byte[] getReceivedBuffer(){return receivedBuffer;}
|
||||
byte[] getReceivedBuffer(){
|
||||
return receivedBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventHubClient createEventHubClient(final String namespace, final String eventHubName, final String policyName, final String policyKey) throws ProcessException {
|
Loading…
x
Reference in New Issue
Block a user