NIFI-8347 Set Thread Context ClassLoader for provided jetty-server classes

- Updated unit test with WebSocket connect method

NIFI-8347 Replaced init() method with ServletContextHandler.setClassLoader()

This closes #4918.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
exceptionfactory 2021-03-19 16:08:57 -05:00 committed by Peter Turcsanyi
parent 9df717f69b
commit 36e0187648
2 changed files with 92 additions and 29 deletions

View File

@ -263,6 +263,8 @@ public class JettyWebSocketServer extends AbstractJettyWebSocketService implemen
final ContextHandlerCollection handlerCollection = new ContextHandlerCollection(); final ContextHandlerCollection handlerCollection = new ContextHandlerCollection();
final ServletContextHandler contextHandler = new ServletContextHandler(); final ServletContextHandler contextHandler = new ServletContextHandler();
// Set ClassLoader so that jetty-server classes are available to WebSocketServletFactory.Loader
contextHandler.setClassLoader(getClass().getClassLoader());
// Add basic auth. // Add basic auth.
if (context.getProperty(BASIC_AUTH).asBoolean()) { if (context.getProperty(BASIC_AUTH).asBoolean()) {

View File

@ -16,49 +16,110 @@
*/ */
package org.apache.nifi.websocket.jetty; package org.apache.nifi.websocket.jetty;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.Processor;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.Collection; import java.net.URI;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class TestJettyWebSocketServer { public class TestJettyWebSocketServer {
private static final long TIMEOUT_SECONDS = 5;
@Test private static final String ROOT_ENDPOINT_ID = "/";
public void testValidationRequiredProperties() throws Exception {
final JettyWebSocketServer service = new JettyWebSocketServer(); private static final String IDENTIFIER = JettyWebSocketServer.class.getSimpleName();
final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
service.initialize(context.getInitializationContext()); private static final int MAX_PORT = 65535;
final Collection<ValidationResult> results = service.validate(context.getValidationContext());
assertEquals(1, results.size()); private TestRunner runner;
final ValidationResult result = results.iterator().next();
assertEquals(JettyWebSocketServer.LISTEN_PORT.getDisplayName(), result.getSubject()); @Before
public void setRunner() {
final Processor processor = mock(Processor.class);
runner = TestRunners.newTestRunner(processor);
}
@After
public void shutdown() {
runner.shutdown();
} }
@Test @Test
public void testValidationHashLoginService() throws Exception { public void testValidationHashLoginService() throws Exception {
final JettyWebSocketServer service = new JettyWebSocketServer(); final JettyWebSocketServer server = new JettyWebSocketServer();
final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id"); runner.addControllerService(IDENTIFIER, server);
context.setCustomValue(JettyWebSocketServer.LISTEN_PORT, "9001"); runner.setProperty(server, JettyWebSocketServer.LISTEN_PORT, Integer.toString(MAX_PORT));
context.setCustomValue(JettyWebSocketServer.LOGIN_SERVICE, "hash"); runner.setProperty(server, JettyWebSocketServer.LOGIN_SERVICE, JettyWebSocketServer.LOGIN_SERVICE_HASH.getValue());
context.setCustomValue(JettyWebSocketServer.BASIC_AUTH, "true"); runner.setProperty(server, JettyWebSocketServer.BASIC_AUTH, Boolean.TRUE.toString());
service.initialize(context.getInitializationContext()); runner.assertNotValid();
final Collection<ValidationResult> results = service.validate(context.getValidationContext());
assertEquals(1, results.size());
final ValidationResult result = results.iterator().next();
assertEquals(JettyWebSocketServer.USERS_PROPERTIES_FILE.getDisplayName(), result.getSubject());
} }
@Test @Test
public void testValidationSuccess() throws Exception { public void testValidationSuccess() throws Exception {
final JettyWebSocketServer service = new JettyWebSocketServer(); final JettyWebSocketServer server = new JettyWebSocketServer();
final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id"); runner.addControllerService(IDENTIFIER, server);
context.setCustomValue(JettyWebSocketServer.LISTEN_PORT, "9001"); runner.setProperty(server, JettyWebSocketServer.LISTEN_PORT, Integer.toString(MAX_PORT));
service.initialize(context.getInitializationContext()); runner.assertValid(server);
final Collection<ValidationResult> results = service.validate(context.getValidationContext());
assertEquals(0, results.size());
} }
@Test
public void testWebSocketConnect() throws Exception {
final int port = NetworkUtils.availablePort();
final String identifier = JettyWebSocketServer.class.getSimpleName();
final JettyWebSocketServer server = new JettyWebSocketServer();
runner.addControllerService(identifier, server);
runner.setProperty(server, JettyWebSocketServer.LISTEN_PORT, Integer.toString(port));
runner.enableControllerService(server);
server.registerProcessor(ROOT_ENDPOINT_ID, runner.getProcessor());
final String command = String.class.getName();
final AtomicBoolean connected = new AtomicBoolean();
final WebSocketClient client = new WebSocketClient();
final WebSocketAdapter adapter = new WebSocketAdapter() {
@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
connected.set(true);
}
@Override
public void onWebSocketText(final String message) {
}
};
try {
client.start();
final URI uri = getWebSocketUri(port);
final Future<Session> connectSession = client.connect(adapter, uri);
final Session session = connectSession.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
session.getRemote().sendString(command);
session.close();
assertTrue("Connection not found", connected.get());
} finally {
client.stop();
runner.disableControllerService(server);
}
}
private URI getWebSocketUri(final int port) {
return URI.create(String.format("ws://localhost:%d", port));
}
} }