This commit is contained in:
Matt Gilman 2015-05-02 16:41:42 -04:00
commit b6b8c95bf4
9 changed files with 5 additions and 882 deletions

View File

@ -41,9 +41,6 @@ import static org.mockito.Mockito.when;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* @author unattributed
*/
public class ClusterManagerProtocolSenderImplTest {
private InetAddress address;
@ -90,7 +87,7 @@ public class ClusterManagerProtocolSenderImplTest {
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new FlowResponseMessage());
FlowRequestMessage request = new FlowRequestMessage();
request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port));
request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port));
FlowResponseMessage response = sender.requestFlow(request);
assertNotNull(response);
}
@ -101,7 +98,7 @@ public class ClusterManagerProtocolSenderImplTest {
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage());
FlowRequestMessage request = new FlowRequestMessage();
request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port));
request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port));
try {
sender.requestFlow(request);
fail("failed to throw exception");
@ -125,7 +122,7 @@ public class ClusterManagerProtocolSenderImplTest {
}
});
FlowRequestMessage request = new FlowRequestMessage();
request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port));
request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port));
try {
sender.requestFlow(request);
fail("failed to throw exception");

View File

@ -27,8 +27,8 @@ import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
import org.apache.nifi.cluster.protocol.message.PingMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.testutils.DelayedProtocolHandler;
import org.apache.nifi.cluster.protocol.testutils.ReflexiveProtocolHandler;
import org.apache.nifi.cluster.protocol.impl.testutils.DelayedProtocolHandler;
import org.apache.nifi.cluster.protocol.impl.testutils.ReflexiveProtocolHandler;
import org.apache.nifi.io.socket.ServerSocketConfiguration;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;

View File

@ -1,144 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.protocol.impl;
import java.io.IOException;
import java.net.InetAddress;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.PingMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.io.socket.ServerSocketConfiguration;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.junit.After;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* @author unattributed
*/
public class ClusterManagerProtocolSenderImplTest {
private InetAddress address;
private int port;
private SocketProtocolListener listener;
private ClusterManagerProtocolSenderImpl sender;
private ProtocolHandler mockHandler;
@Before
public void setup() throws IOException, InterruptedException {
address = InetAddress.getLocalHost();
final ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration();
mockHandler = mock(ProtocolHandler.class);
final ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext);
listener.addHandler(mockHandler);
listener.start();
// Need to be sure that we give the listener plenty of time to startup. Otherwise, we get intermittent
// test failures because the Thread started by listener.start() isn't ready to accept connections
// before we make them.
Thread.sleep(1000L);
port = listener.getPort();
final SocketConfiguration socketConfiguration = new SocketConfiguration();
sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, protocolContext);
}
@After
public void teardown() throws IOException {
if (listener.isRunning()) {
listener.stop();
}
}
@Test
public void testRequestFlow() throws Exception {
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new FlowResponseMessage());
final FlowRequestMessage request = new FlowRequestMessage();
request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port));
final FlowResponseMessage response = sender.requestFlow(request);
assertNotNull(response);
}
@Test
public void testRequestFlowWithBadResponseMessage() throws Exception {
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage());
final FlowRequestMessage request = new FlowRequestMessage();
request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port));
try {
sender.requestFlow(request);
fail("failed to throw exception");
} catch (final ProtocolException pe) {
}
}
@Test
public void testRequestFlowDelayedResponse() throws Exception {
final int time = 250;
sender.getSocketConfiguration().setSocketTimeout(time);
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<FlowResponseMessage>() {
@Override
public FlowResponseMessage answer(final InvocationOnMock invocation) throws Throwable {
Thread.sleep(time * 3);
return new FlowResponseMessage();
}
});
final FlowRequestMessage request = new FlowRequestMessage();
request.setNodeId(new NodeIdentifier("id", "api-address", 1, "localhost", port));
try {
sender.requestFlow(request);
fail("failed to throw exception");
} catch (final ProtocolException pe) {
}
}
}

View File

@ -1,121 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.protocol.impl;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.mockito.stubbing.OngoingStubbing;
public class ClusterServiceLocatorTest {
private ClusterServiceDiscovery mockServiceDiscovery;
private int fixedPort;
private DiscoverableService fixedService;
private ClusterServiceLocator serviceDiscoveryLocator;
private ClusterServiceLocator serviceDiscoveryFixedPortLocator;
private ClusterServiceLocator fixedServiceLocator;
@Before
public void setup() throws Exception {
fixedPort = 1;
mockServiceDiscovery = mock(ClusterServiceDiscovery.class);
fixedService = new DiscoverableServiceImpl("some-service", InetSocketAddress.createUnresolved("some-host", 20));
serviceDiscoveryLocator = new ClusterServiceLocator(mockServiceDiscovery);
serviceDiscoveryFixedPortLocator = new ClusterServiceLocator(mockServiceDiscovery, fixedPort);
fixedServiceLocator = new ClusterServiceLocator(fixedService);
}
@Test
public void getServiceWhenServiceDiscoveryNotStarted() {
assertNull(serviceDiscoveryLocator.getService());
}
@Test
public void getServiceWhenServiceDiscoveryFixedPortNotStarted() {
assertNull(serviceDiscoveryLocator.getService());
}
@Test
public void getServiceWhenFixedServiceNotStarted() {
assertEquals(fixedService, fixedServiceLocator.getService());
}
@Test
public void getServiceNotOnFirstAttempt() {
ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig();
config.setNumAttempts(2);
config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
config.setTimeBetweenAttempts(1);
serviceDiscoveryLocator.setAttemptsConfig(config);
OngoingStubbing<DiscoverableService> stubbing = null;
for (int i = 0; i < config.getNumAttempts() - 1; i++) {
if (stubbing == null) {
stubbing = when(mockServiceDiscovery.getService()).thenReturn(null);
} else {
stubbing.thenReturn(null);
}
}
stubbing.thenReturn(fixedService);
assertEquals(fixedService, serviceDiscoveryLocator.getService());
}
@Test
public void getServiceNotOnFirstAttemptWithFixedPort() {
ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig();
config.setNumAttempts(2);
config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
config.setTimeBetweenAttempts(1);
serviceDiscoveryFixedPortLocator.setAttemptsConfig(config);
OngoingStubbing<DiscoverableService> stubbing = null;
for (int i = 0; i < config.getNumAttempts() - 1; i++) {
if (stubbing == null) {
stubbing = when(mockServiceDiscovery.getService()).thenReturn(null);
} else {
stubbing.thenReturn(null);
}
}
stubbing.thenReturn(fixedService);
InetSocketAddress resultAddress = InetSocketAddress.createUnresolved(fixedService.getServiceAddress().getHostName(), fixedPort);
DiscoverableService resultService = new DiscoverableServiceImpl(fixedService.getServiceName(), resultAddress);
assertEquals(resultService, serviceDiscoveryFixedPortLocator.getService());
}
}

View File

@ -1,132 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.protocol.impl;
import java.net.InetSocketAddress;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* @author unattributed
*/
public class ClusterServicesBroadcasterTest {
private ClusterServicesBroadcaster broadcaster;
private MulticastProtocolListener listener;
private DummyProtocolHandler handler;
private InetSocketAddress multicastAddress;
private DiscoverableService broadcastedService;
private ProtocolContext protocolContext;
private MulticastConfiguration configuration;
@Before
public void setup() throws Exception {
broadcastedService = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", 11111));
multicastAddress = new InetSocketAddress("225.1.1.1", 22222);
configuration = new MulticastConfiguration();
protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
broadcaster = new ClusterServicesBroadcaster(multicastAddress, configuration, protocolContext, "500 ms");
broadcaster.addService(broadcastedService);
handler = new DummyProtocolHandler();
listener = new MulticastProtocolListener(5, multicastAddress, configuration, protocolContext);
listener.addHandler(handler);
}
@After
public void teardown() {
if (broadcaster.isRunning()) {
broadcaster.stop();
}
try {
if (listener.isRunning()) {
listener.stop();
}
} catch (Exception ex) {
ex.printStackTrace(System.out);
}
}
@Test
@Ignore
public void testBroadcastReceived() throws Exception {
broadcaster.start();
listener.start();
Thread.sleep(1000);
listener.stop();
assertNotNull(handler.getProtocolMessage());
assertEquals(ProtocolMessage.MessageType.SERVICE_BROADCAST, handler.getProtocolMessage().getType());
final ServiceBroadcastMessage msg = (ServiceBroadcastMessage) handler.getProtocolMessage();
assertEquals(broadcastedService.getServiceName(), msg.getServiceName());
assertEquals(broadcastedService.getServiceAddress().getHostName(), msg.getAddress());
assertEquals(broadcastedService.getServiceAddress().getPort(), msg.getPort());
}
private class DummyProtocolHandler implements ProtocolHandler {
private ProtocolMessage protocolMessage;
@Override
public boolean canHandle(ProtocolMessage msg) {
return true;
}
@Override
public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
this.protocolMessage = msg;
return null;
}
public ProtocolMessage getProtocolMessage() {
return protocolMessage;
}
}
}

View File

@ -1,171 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.protocol.impl;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
import org.apache.nifi.cluster.protocol.message.PingMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
import org.apache.nifi.io.socket.multicast.MulticastUtils;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
/**
* @author unattributed
*/
public class MulticastProtocolListenerTest {
private MulticastProtocolListener listener;
private MulticastSocket socket;
private InetSocketAddress address;
private MulticastConfiguration configuration;
private ProtocolContext protocolContext;
@Before
public void setup() throws Exception {
address = new InetSocketAddress("226.1.1.1", 60000);
configuration = new MulticastConfiguration();
protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
listener = new MulticastProtocolListener(5, address, configuration, protocolContext);
listener.start();
socket = MulticastUtils.createMulticastSocket(address.getPort(), configuration);
}
@After
public void teardown() throws IOException {
try {
if (listener.isRunning()) {
listener.stop();
}
} finally {
MulticastUtils.closeQuietly(socket);
}
}
@Ignore("This test must be reworked. Requires an active network connection")
@Test
public void testBadRequest() throws Exception {
DelayedProtocolHandler handler = new DelayedProtocolHandler(0);
listener.addHandler(handler);
DatagramPacket packet = new DatagramPacket(new byte[]{5}, 1, address);
socket.send(packet);
Thread.sleep(250);
assertEquals(0, handler.getMessages().size());
}
@Test
@Ignore
public void testRequest() throws Exception {
ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler();
listener.addHandler(handler);
ProtocolMessage msg = new PingMessage();
MulticastProtocolMessage multicastMsg = new MulticastProtocolMessage("some-id", msg);
// marshal message to output stream
ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
marshaller.marshal(multicastMsg, baos);
byte[] requestPacketBytes = baos.toByteArray();
DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, address);
socket.send(packet);
Thread.sleep(250);
assertEquals(1, handler.getMessages().size());
assertEquals(msg.getType(), handler.getMessages().get(0).getType());
}
private class ReflexiveProtocolHandler implements ProtocolHandler {
private List<ProtocolMessage> messages = new ArrayList<>();
@Override
public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
messages.add(msg);
return msg;
}
@Override
public boolean canHandle(ProtocolMessage msg) {
return true;
}
public List<ProtocolMessage> getMessages() {
return messages;
}
}
private class DelayedProtocolHandler implements ProtocolHandler {
private int delay = 0;
private List<ProtocolMessage> messages = new ArrayList<>();
public DelayedProtocolHandler(int delay) {
this.delay = delay;
}
@Override
public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
try {
messages.add(msg);
Thread.sleep(delay);
return null;
} catch (final InterruptedException ie) {
throw new ProtocolException(ie);
}
}
@Override
public boolean canHandle(ProtocolMessage msg) {
return true;
}
public List<ProtocolMessage> getMessages() {
return messages;
}
}
}

View File

@ -1,202 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.protocol.impl;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.UUID;
import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.PingMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.io.socket.ServerSocketConfiguration;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* @author unattributed
*/
public class NodeProtocolSenderImplTest {
private SocketProtocolListener listener;
private NodeProtocolSenderImpl sender;
private DiscoverableService service;
private ServerSocketConfiguration serverSocketConfiguration;
private ClusterServiceLocator mockServiceLocator;
private ProtocolHandler mockHandler;
private NodeIdentifier nodeIdentifier;
@Before
public void setup() throws IOException {
serverSocketConfiguration = new ServerSocketConfiguration();
mockServiceLocator = mock(ClusterServiceLocator.class);
mockHandler = mock(ProtocolHandler.class);
nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, "localhost", 5678);
ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext);
listener.setShutdownListenerSeconds(3);
listener.addHandler(mockHandler);
listener.start();
service = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", listener.getPort()));
SocketConfiguration socketConfiguration = new SocketConfiguration();
socketConfiguration.setReuseAddress(true);
sender = new NodeProtocolSenderImpl(mockServiceLocator, socketConfiguration, protocolContext);
}
@After
public void teardown() throws IOException {
if (listener.isRunning()) {
listener.stop();
}
}
@Test
public void testConnect() throws Exception {
when(mockServiceLocator.getService()).thenReturn(service);
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
ConnectionResponseMessage mockMessage = new ConnectionResponseMessage();
mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier,
new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString()));
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage);
ConnectionRequestMessage request = new ConnectionRequestMessage();
request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
ConnectionResponseMessage response = sender.requestConnection(request);
assertNotNull(response);
}
@Test(expected = UnknownServiceAddressException.class)
public void testConnectNoClusterManagerAddress() throws Exception {
when(mockServiceLocator.getService()).thenReturn(null);
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new ConnectionResponseMessage());
ConnectionRequestMessage request = new ConnectionRequestMessage();
request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
sender.requestConnection(request);
fail("failed to throw exception");
}
@Test(expected = ProtocolException.class)
public void testConnectBadResponse() throws Exception {
when(mockServiceLocator.getService()).thenReturn(service);
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage());
ConnectionRequestMessage request = new ConnectionRequestMessage();
request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
sender.requestConnection(request);
fail("failed to throw exception");
}
@Test(expected = ProtocolException.class)
public void testConnectDelayedResponse() throws Exception {
final int time = 250;
sender.getSocketConfiguration().setSocketTimeout(time);
when(mockServiceLocator.getService()).thenReturn(service);
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<ConnectionResponseMessage>() {
@Override
public ConnectionResponseMessage answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(time * 3);
return new ConnectionResponseMessage();
}
});
ConnectionRequestMessage request = new ConnectionRequestMessage();
request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
sender.requestConnection(request);
fail("failed to throw exception");
}
@Test
public void testHeartbeat() throws Exception {
when(mockServiceLocator.getService()).thenReturn(service);
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
HeartbeatMessage msg = new HeartbeatMessage();
HeartbeatPayload hbPayload = new HeartbeatPayload();
Heartbeat hb = new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4), false, false, hbPayload.marshal());
msg.setHeartbeat(hb);
sender.heartbeat(msg);
}
@Test
public void testNotifyControllerStartupFailure() throws Exception {
when(mockServiceLocator.getService()).thenReturn(service);
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage();
msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, "some-addr", 1));
msg.setExceptionMessage("some exception");
sender.notifyControllerStartupFailure(msg);
}
}

View File

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.protocol.testutils;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
/**
* @author unattributed
*/
public class DelayedProtocolHandler implements ProtocolHandler {
private int delay = 0;
private List<ProtocolMessage> messages = new ArrayList<>();
public DelayedProtocolHandler(int delay) {
this.delay = delay;
}
@Override
public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
try {
messages.add(msg);
Thread.sleep(delay);
return null;
} catch (final InterruptedException ie) {
throw new ProtocolException(ie);
}
}
@Override
public boolean canHandle(ProtocolMessage msg) {
return true;
}
public List<ProtocolMessage> getMessages() {
return messages;
}
}

View File

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.protocol.testutils;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
/**
* @author unattributed
*/
public class ReflexiveProtocolHandler implements ProtocolHandler {
private List<ProtocolMessage> messages = new ArrayList<>();
@Override
public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
messages.add(msg);
return msg;
}
@Override
public boolean canHandle(ProtocolMessage msg) {
return true;
}
public List<ProtocolMessage> getMessages() {
return messages;
}
}