mirror of https://github.com/apache/nifi.git
NIFI-557: Added sleep statements to address timing issues
This commit is contained in:
parent
ab6794b29e
commit
5aeac2ebf3
|
@ -18,6 +18,7 @@ package org.apache.nifi.cluster.protocol.impl;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
|
||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||
import org.apache.nifi.cluster.protocol.ProtocolContext;
|
||||
import org.apache.nifi.cluster.protocol.ProtocolException;
|
||||
|
@ -31,13 +32,17 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
|
|||
import org.apache.nifi.io.socket.ServerSocketConfiguration;
|
||||
import org.apache.nifi.io.socket.SocketConfiguration;
|
||||
import org.junit.After;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
|
@ -57,22 +62,27 @@ public class ClusterManagerProtocolSenderImplTest {
|
|||
private ProtocolHandler mockHandler;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
public void setup() throws IOException, InterruptedException {
|
||||
|
||||
address = InetAddress.getLocalHost();
|
||||
ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration();
|
||||
final ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration();
|
||||
|
||||
mockHandler = mock(ProtocolHandler.class);
|
||||
|
||||
ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
|
||||
final ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
|
||||
|
||||
listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext);
|
||||
listener.addHandler(mockHandler);
|
||||
listener.start();
|
||||
|
||||
// Need to be sure that we give the listener plenty of time to startup. Otherwise, we get intermittent
|
||||
// test failures because the Thread started by listener.start() isn't ready to accept connections
|
||||
// before we make them.
|
||||
Thread.sleep(1000L);
|
||||
|
||||
port = listener.getPort();
|
||||
|
||||
SocketConfiguration socketConfiguration = new SocketConfiguration();
|
||||
final SocketConfiguration socketConfiguration = new SocketConfiguration();
|
||||
sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, protocolContext);
|
||||
}
|
||||
|
||||
|
@ -88,9 +98,9 @@ public class ClusterManagerProtocolSenderImplTest {
|
|||
|
||||
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
|
||||
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new FlowResponseMessage());
|
||||
FlowRequestMessage request = new FlowRequestMessage();
|
||||
final FlowRequestMessage request = new FlowRequestMessage();
|
||||
request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port));
|
||||
FlowResponseMessage response = sender.requestFlow(request);
|
||||
final FlowResponseMessage response = sender.requestFlow(request);
|
||||
assertNotNull(response);
|
||||
}
|
||||
|
||||
|
@ -99,12 +109,12 @@ public class ClusterManagerProtocolSenderImplTest {
|
|||
|
||||
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
|
||||
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage());
|
||||
FlowRequestMessage request = new FlowRequestMessage();
|
||||
final FlowRequestMessage request = new FlowRequestMessage();
|
||||
request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port));
|
||||
try {
|
||||
sender.requestFlow(request);
|
||||
fail("failed to throw exception");
|
||||
} catch (ProtocolException pe) {
|
||||
} catch (final ProtocolException pe) {
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -118,17 +128,17 @@ public class ClusterManagerProtocolSenderImplTest {
|
|||
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
|
||||
when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<FlowResponseMessage>() {
|
||||
@Override
|
||||
public FlowResponseMessage answer(InvocationOnMock invocation) throws Throwable {
|
||||
public FlowResponseMessage answer(final InvocationOnMock invocation) throws Throwable {
|
||||
Thread.sleep(time * 3);
|
||||
return new FlowResponseMessage();
|
||||
}
|
||||
});
|
||||
FlowRequestMessage request = new FlowRequestMessage();
|
||||
final FlowRequestMessage request = new FlowRequestMessage();
|
||||
request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port));
|
||||
try {
|
||||
sender.requestFlow(request);
|
||||
fail("failed to throw exception");
|
||||
} catch (ProtocolException pe) {
|
||||
} catch (final ProtocolException pe) {
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -42,8 +42,8 @@ import org.junit.Test;
|
|||
|
||||
public class TestHandleHttpRequest {
|
||||
|
||||
@Test
|
||||
public void testRequestAddedToService() throws InitializationException, MalformedURLException, IOException {
|
||||
@Test(timeout=10000)
|
||||
public void testRequestAddedToService() throws InitializationException, MalformedURLException, IOException, InterruptedException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
|
||||
runner.setProperty(HandleHttpRequest.PORT, "0");
|
||||
|
||||
|
@ -79,15 +79,11 @@ public class TestHandleHttpRequest {
|
|||
});
|
||||
httpThread.start();
|
||||
|
||||
// give processor a bit to handle the http request
|
||||
try {
|
||||
Thread.sleep(1000L);
|
||||
} catch (final InterruptedException ie) {
|
||||
while ( runner.getFlowFilesForRelationship(HandleHttpRequest.REL_SUCCESS).isEmpty() ) {
|
||||
// process the request.
|
||||
runner.run(1, false);
|
||||
}
|
||||
|
||||
// process the request.
|
||||
runner.run(1, false);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(HandleHttpRequest.REL_SUCCESS, 1);
|
||||
assertEquals(1, contextMap.size());
|
||||
|
||||
|
@ -110,18 +106,18 @@ public class TestHandleHttpRequest {
|
|||
private final ConcurrentMap<String, HttpServletResponse> responseMap = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public boolean register(String identifier, HttpServletRequest request, HttpServletResponse response, AsyncContext context) {
|
||||
public boolean register(final String identifier, final HttpServletRequest request, final HttpServletResponse response, final AsyncContext context) {
|
||||
responseMap.put(identifier, response);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpServletResponse getResponse(String identifier) {
|
||||
public HttpServletResponse getResponse(final String identifier) {
|
||||
return responseMap.get(identifier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void complete(String identifier) {
|
||||
public void complete(final String identifier) {
|
||||
responseMap.remove(identifier);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue