NIFI-8352 This closes #4922. Changed assertServerConnections to loop and sleep while evaluating

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
exceptionfactory 2021-03-22 15:14:42 -05:00 committed by Joe Witt
parent 4189a1c1eb
commit e505e8b42d
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
2 changed files with 39 additions and 44 deletions

View File

@ -27,7 +27,9 @@ import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
import javax.net.ServerSocketFactory;
@ -36,9 +38,9 @@ import javax.net.ssl.SSLServerSocketFactory;
import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@ -58,13 +60,14 @@ public class TestPutTCP {
private final static int DEFAULT_ITERATIONS = 1;
private final static int DEFAULT_THREAD_COUNT = 1;
private final static char CONTENT_CHAR = 'x';
private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
private final static int LONG_TEST_TIMEOUT_PERIOD = 300000;
private final static String OUTGOING_MESSAGE_DELIMITER = "\n";
private final static String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
private final static String[] EMPTY_FILE = { "" };
private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
@Rule
public Timeout timeout = new Timeout(30, TimeUnit.SECONDS);
private TCPTestServer server;
private int port;
private ArrayBlockingQueue<List<Byte>> received;
@ -98,7 +101,7 @@ public class TestPutTCP {
runner.assertNotValid();
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@Test
public void testRunSuccess() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
@ -107,7 +110,7 @@ public class TestPutTCP {
assertServerConnections(1);
}
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
@Test
public void testRunSuccessSslContextService() throws Exception {
final TlsConfiguration tlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
@ -130,7 +133,7 @@ public class TestPutTCP {
assertServerConnections(1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@Test
public void testRunSuccessServerVariableExpression() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, false);
@ -139,7 +142,7 @@ public class TestPutTCP {
assertServerConnections(1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@Test
public void testRunSuccessPruneSenders() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
@ -156,7 +159,7 @@ public class TestPutTCP {
assertServerConnections(2);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@Test
public void testRunSuccessMultiCharDelimiter() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false);
@ -165,7 +168,7 @@ public class TestPutTCP {
assertServerConnections(1);
}
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
@Test
public void testRunSuccessConnectionPerFlowFile() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER, true);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
@ -174,7 +177,7 @@ public class TestPutTCP {
assertServerConnections(VALID_FILES.length);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@Test
public void testRunSuccessConnectionFailure() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
@ -193,7 +196,7 @@ public class TestPutTCP {
assertServerConnections(1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@Test
public void testRunSuccessEmptyFile() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
@ -203,7 +206,7 @@ public class TestPutTCP {
assertServerConnections(1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@Test
public void testRunSuccessLargeValidFile() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
@ -213,7 +216,7 @@ public class TestPutTCP {
assertServerConnections(testData.length);
}
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
@Test
public void testRunSuccessFiveHundredMessages() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
Thread.sleep(1000);
@ -295,10 +298,10 @@ public class TestPutTCP {
assertNull("Unexpected Message Found", received.poll());
}
private void assertServerConnections(final int connections) {
// Shutdown server to get completed number of connections
shutdownServer();
assertEquals("Server Connections not matched", server.getTotalNumConnections(), connections);
private void assertServerConnections(final int connections) throws InterruptedException {
while (server.getTotalConnections() != connections) {
Thread.sleep(10);
}
}
private String[] createContent(final int size) {

View File

@ -16,7 +16,8 @@
*/
package org.apache.nifi.processors.standard.util;
import java.io.IOException;
import org.apache.nifi.io.socket.SocketUtils;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
@ -32,7 +33,7 @@ public class TCPTestServer implements Runnable {
private final InetAddress ipAddress;
private final String messageDelimiter;
private final ArrayBlockingQueue<List<Byte>> queue;
private final AtomicInteger totalNumConnections = new AtomicInteger();
private final AtomicInteger totalConnections = new AtomicInteger();
private final boolean closeOnMessageReceived;
private volatile ServerSocket serverSocket;
@ -65,30 +66,22 @@ public class TCPTestServer implements Runnable {
shutdownServer();
}
public synchronized void shutdownServer() {
if (isServerRunning()) {
try {
serverSocket.close();
} catch (IOException ioe) {
// Do Nothing.
}
}
}
public synchronized void shutdownConnection() {
if (isConnected()) {
try {
connectionSocket.close();
} catch (IOException ioe) {
// Do Nothing.
}
}
}
public int getPort(){
return port;
}
private synchronized void shutdownServer() {
if (isServerRunning()) {
SocketUtils.closeQuietly(serverSocket);
}
}
private synchronized void shutdownConnection() {
if (isConnected()) {
SocketUtils.closeQuietly(connectionSocket);
}
}
private void storeReceivedMessage(final List<Byte> message) {
queue.add(message);
if (closeOnMessageReceived) {
@ -104,8 +97,8 @@ public class TCPTestServer implements Runnable {
return connectionSocket != null && !connectionSocket.isClosed();
}
public int getTotalNumConnections() {
return totalNumConnections.get();
public int getTotalConnections() {
return totalConnections.get();
}
protected boolean isDelimiterPresent(final List<Byte> message) {
@ -140,7 +133,7 @@ public class TCPTestServer implements Runnable {
try {
while (isServerRunning()) {
connectionSocket = serverSocket.accept();
totalNumConnections.incrementAndGet();
totalConnections.incrementAndGet();
final InputStream inputStream = connectionSocket.getInputStream();
while (isConnected()) {
final List<Byte> message = new ArrayList<>();
@ -166,8 +159,7 @@ public class TCPTestServer implements Runnable {
} catch (Exception e) {
// Do Nothing
} finally {
shutdownConnection();
shutdownServer();
shutdown();
}
}