NIFI-1002: Added WebSocket support.

NIFI-1002: Added WebSocket support.

- Reflecting review comments
- Added displayName to peroperty descriptors

NIFI-1002: Added WebSocket support.

This closes #1184

- Reflecting review comments:
  - Removed unnecessary use of NarCloseable.withComponentNarLoader.
  - Removed intermediate on memory queue to make it simpler and more
    robust. Received messages in WebSocket layer now will be transferred
to downstream relationships directly.
This commit is contained in:
Koji Kawamura 2016-10-18 10:43:42 +09:00 committed by Oleg Zhurakousky
parent b026f0bebe
commit 26a5881d21
54 changed files with 4019 additions and 0 deletions

View File

@ -400,6 +400,21 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-windows-event-log-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-services-jetty-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-processors-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
<profiles>
<profile>

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-bundle</artifactId>
<version>1.1.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-websocket-processors-nar</artifactId>
<version>1.1.0-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-processors</artifactId>
<version>1.1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-bundle</artifactId>
<version>1.1.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-websocket-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-services-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.websocket;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.websocket.BinaryMessageConsumer;
import org.apache.nifi.websocket.ConnectedListener;
import org.apache.nifi.websocket.TextMessageConsumer;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketConnectedMessage;
import org.apache.nifi.websocket.WebSocketMessage;
import org.apache.nifi.websocket.WebSocketService;
import org.apache.nifi.websocket.WebSocketSessionInfo;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID;
@TriggerSerially
public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionFactoryProcessor implements ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
protected volatile ComponentLog logger;
protected volatile ProcessSessionFactory processSessionFactory;
protected WebSocketService webSocketService;
protected String endpointId;
public static final Relationship REL_CONNECTED = new Relationship.Builder()
.name("connected")
.description("The WebSocket session is established")
.build();
public static final Relationship REL_MESSAGE_TEXT = new Relationship.Builder()
.name("text message")
.description("The WebSocket text message output")
.build();
public static final Relationship REL_MESSAGE_BINARY = new Relationship.Builder()
.name("binary message")
.description("The WebSocket binary message output")
.build();
static Set<Relationship> getAbstractRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_CONNECTED);
relationships.add(REL_MESSAGE_TEXT);
relationships.add(REL_MESSAGE_BINARY);
return relationships;
}
@Override
protected void init(final ProcessorInitializationContext context) {
logger = getLogger();
}
public interface WebSocketFunction {
void execute(final WebSocketService webSocketService) throws IOException, WebSocketConfigurationException;
}
@Override
public void connected(WebSocketSessionInfo sessionInfo) {
final WebSocketMessage message = new WebSocketConnectedMessage(sessionInfo);
sessionInfo.setTransitUri(getTransitUri(sessionInfo));
enqueueMessage(message);
}
@Override
public void consume(WebSocketSessionInfo sessionInfo, String messageStr) {
final WebSocketMessage message = new WebSocketMessage(sessionInfo);
sessionInfo.setTransitUri(getTransitUri(sessionInfo));
message.setPayload(messageStr);
enqueueMessage(message);
}
@Override
public void consume(WebSocketSessionInfo sessionInfo, byte[] payload, int offset, int length) {
final WebSocketMessage message = new WebSocketMessage(sessionInfo);
sessionInfo.setTransitUri(getTransitUri(sessionInfo));
message.setPayload(payload, offset, length);
enqueueMessage(message);
}
// @OnScheduled can not report error messages well on bulletin since it's an async method.
// So, let's do it in onTrigger().
public void onWebSocketServiceReady(final WebSocketService webSocketService) throws IOException {
if (webSocketService instanceof WebSocketClientService) {
// If it's a ws client, then connect to the remote here.
// Otherwise, ws server is already started at WebSocketServerService
((WebSocketClientService) webSocketService).connect(endpointId);
}
}
protected void registerProcessorToService(final ProcessContext context, final WebSocketFunction afterRegistration) throws IOException, WebSocketConfigurationException {
webSocketService = getWebSocketService(context);
endpointId = getEndpointId(context);
webSocketService.registerProcessor(endpointId, this);
afterRegistration.execute(webSocketService);
}
protected abstract WebSocketService getWebSocketService(final ProcessContext context);
protected abstract String getEndpointId(final ProcessContext context);
protected boolean isProcessorRegisteredToService() {
return webSocketService != null
&& !StringUtils.isEmpty(endpointId)
&& webSocketService.isProcessorRegistered(endpointId, this);
}
@OnStopped
public void onStopped(final ProcessContext context) throws IOException {
if (webSocketService == null) {
return;
}
try {
// Deregister processor, so that it won't receive messages anymore.
webSocketService.deregisterProcessor(endpointId, this);
webSocketService = null;
} catch (WebSocketConfigurationException e) {
logger.warn("Failed to deregister processor {} due to: {}", new Object[]{this, e}, e);
}
}
@Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
if (processSessionFactory == null) {
processSessionFactory = sessionFactory;
}
if (!isProcessorRegisteredToService()) {
try {
registerProcessorToService(context, webSocketService -> onWebSocketServiceReady(webSocketService));
} catch (IOException|WebSocketConfigurationException e) {
throw new ProcessException("Failed to register processor to WebSocket service due to: " + e, e);
}
}
context.yield();//nothing really to do here since threading managed by smtp server sessions
}
private void enqueueMessage(final WebSocketMessage incomingMessage){
final ProcessSession session = processSessionFactory.createSession();
try {
FlowFile messageFlowFile = session.create();
final Map<String, String> attrs = new HashMap<>();
attrs.put(ATTR_WS_CS_ID, webSocketService.getIdentifier());
final WebSocketSessionInfo sessionInfo = incomingMessage.getSessionInfo();
attrs.put(ATTR_WS_SESSION_ID, sessionInfo.getSessionId());
attrs.put(ATTR_WS_ENDPOINT_ID, endpointId);
attrs.put(ATTR_WS_LOCAL_ADDRESS, sessionInfo.getLocalAddress().toString());
attrs.put(ATTR_WS_REMOTE_ADDRESS, sessionInfo.getRemoteAddress().toString());
final WebSocketMessage.Type messageType = incomingMessage.getType();
if (messageType != null) {
attrs.put(ATTR_WS_MESSAGE_TYPE, messageType.name());
}
messageFlowFile = session.putAllAttributes(messageFlowFile, attrs);
final byte[] payload = incomingMessage.getPayload();
if (payload != null) {
messageFlowFile = session.write(messageFlowFile, out -> {
out.write(payload, incomingMessage.getOffset(), incomingMessage.getLength());
});
}
session.getProvenanceReporter().receive(messageFlowFile, getTransitUri(sessionInfo));
if (incomingMessage instanceof WebSocketConnectedMessage) {
session.transfer(messageFlowFile, REL_CONNECTED);
} else {
switch (messageType) {
case TEXT:
session.transfer(messageFlowFile, REL_MESSAGE_TEXT);
break;
case BINARY:
session.transfer(messageFlowFile, REL_MESSAGE_BINARY);
break;
}
}
session.commit();
} catch (Exception e) {
logger.error("Unable to fully process input due to " + e, e);
session.rollback();
}
}
abstract protected String getTransitUri(final WebSocketSessionInfo sessionInfo);
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.websocket;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketService;
import org.apache.nifi.websocket.WebSocketSessionInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID;
@Tags({"subscribe", "WebSocket", "consume", "listen"})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@CapabilityDescription("Acts as a WebSocket client endpoint to interact with a remote WebSocket server." +
" FlowFiles are transferred to downstream relationships according to received message types" +
" as WebSocket client configured with this processor receives messages from remote WebSocket server.")
@WritesAttributes({
@WritesAttribute(attribute = ATTR_WS_CS_ID, description = "WebSocket Controller Service id."),
@WritesAttribute(attribute = ATTR_WS_SESSION_ID, description = "Established WebSocket session id."),
@WritesAttribute(attribute = ATTR_WS_ENDPOINT_ID, description = "WebSocket endpoint id."),
@WritesAttribute(attribute = ATTR_WS_LOCAL_ADDRESS, description = "WebSocket client address."),
@WritesAttribute(attribute = ATTR_WS_REMOTE_ADDRESS, description = "WebSocket server address."),
@WritesAttribute(attribute = ATTR_WS_MESSAGE_TYPE, description = "TEXT or BINARY."),
})
public class ConnectWebSocket extends AbstractWebSocketGatewayProcessor {
public static final PropertyDescriptor PROP_WEBSOCKET_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("websocket-client-controller-service")
.displayName("WebSocket Client ControllerService")
.description("A WebSocket CLIENT Controller Service which can connect to a WebSocket server.")
.required(true)
.identifiesControllerService(WebSocketClientService.class)
.build();
public static final PropertyDescriptor PROP_WEBSOCKET_CLIENT_ID = new PropertyDescriptor.Builder()
.name("websocket-client-id")
.displayName("WebSocket Client Id")
.description("The client ID to identify WebSocket session." +
" It should be unique within the WebSocket Client Controller Service." +
" Otherwise, it throws WebSocketConfigurationException when it gets started.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
private static final List<PropertyDescriptor> descriptors;
private static final Set<Relationship> relationships;
static{
final List<PropertyDescriptor> innerDescriptorsList = new ArrayList<>();
innerDescriptorsList.add(PROP_WEBSOCKET_CLIENT_SERVICE);
innerDescriptorsList.add(PROP_WEBSOCKET_CLIENT_ID);
descriptors = Collections.unmodifiableList(innerDescriptorsList);
final Set<Relationship> innerRelationshipsSet = getAbstractRelationships();
relationships = Collections.unmodifiableSet(innerRelationshipsSet);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
protected WebSocketService getWebSocketService(final ProcessContext context) {
return context.getProperty(PROP_WEBSOCKET_CLIENT_SERVICE)
.asControllerService(WebSocketService.class);
}
@Override
protected String getEndpointId(final ProcessContext context) {
return context.getProperty(PROP_WEBSOCKET_CLIENT_ID).getValue();
}
@Override
protected String getTransitUri(final WebSocketSessionInfo sessionInfo) {
return ((WebSocketClientService)webSocketService).getTargetUri();
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.websocket;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.websocket.WebSocketServerService;
import org.apache.nifi.websocket.WebSocketService;
import org.apache.nifi.websocket.WebSocketSessionInfo;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID;
@Tags({"subscribe", "WebSocket", "consume", "listen"})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@CapabilityDescription("Acts as a WebSocket server endpoint to accept client connections." +
" FlowFiles are transferred to downstream relationships according to received message types" +
" as the WebSocket server configured with this processor receives client requests")
@WritesAttributes({
@WritesAttribute(attribute = ATTR_WS_CS_ID, description = "WebSocket Controller Service id."),
@WritesAttribute(attribute = ATTR_WS_SESSION_ID, description = "Established WebSocket session id."),
@WritesAttribute(attribute = ATTR_WS_ENDPOINT_ID, description = "WebSocket endpoint id."),
@WritesAttribute(attribute = ATTR_WS_LOCAL_ADDRESS, description = "WebSocket server address."),
@WritesAttribute(attribute = ATTR_WS_REMOTE_ADDRESS, description = "WebSocket client address."),
@WritesAttribute(attribute = ATTR_WS_MESSAGE_TYPE, description = "TEXT or BINARY."),
})
public class ListenWebSocket extends AbstractWebSocketGatewayProcessor {
public static final PropertyDescriptor PROP_WEBSOCKET_SERVER_SERVICE = new PropertyDescriptor.Builder()
.name("websocket-server-controller-service")
.displayName("WebSocket Server ControllerService")
.description("A WebSocket SERVER Controller Service which can accept WebSocket requests.")
.required(true)
.identifiesControllerService(WebSocketServerService.class)
.build();
public static final PropertyDescriptor PROP_SERVER_URL_PATH = new PropertyDescriptor.Builder()
.name("server-url-path")
.displayName("Server URL Path")
.description("The WetSocket URL Path on which this processor listens to. Must starts with '/', e.g. '/example'.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.addValidator((subject, input, context) -> {
final ValidationResult.Builder result = new ValidationResult.Builder()
.valid(input.startsWith("/"))
.subject(subject)
.explanation("Must starts with '/', e.g. '/example'.");
return result.build();
})
.build();
private static final List<PropertyDescriptor> descriptors;
private static final Set<Relationship> relationships;
static{
final List<PropertyDescriptor> innerDescriptorsList = new ArrayList<>();
innerDescriptorsList.add(PROP_WEBSOCKET_SERVER_SERVICE);
innerDescriptorsList.add(PROP_SERVER_URL_PATH);
descriptors = Collections.unmodifiableList(innerDescriptorsList);
final Set<Relationship> innerRelationshipsSet = getAbstractRelationships();
relationships = Collections.unmodifiableSet(innerRelationshipsSet);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
protected WebSocketService getWebSocketService(final ProcessContext context) {
return context.getProperty(PROP_WEBSOCKET_SERVER_SERVICE)
.asControllerService(WebSocketService.class);
}
@Override
protected String getEndpointId(final ProcessContext context) {
return context.getProperty(PROP_SERVER_URL_PATH).getValue();
}
@Override
protected String getTransitUri(final WebSocketSessionInfo sessionInfo) {
final InetSocketAddress localAddress = sessionInfo.getLocalAddress();
return (sessionInfo.isSecure() ? "wss:" : "ws:")
+ localAddress.getHostName() + ":" + localAddress.getPort() + endpointId;
}
}

View File

@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.websocket;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketMessage;
import org.apache.nifi.websocket.WebSocketService;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID;
import static org.apache.nifi.websocket.WebSocketMessage.CHARSET_NAME;
@Tags({"WebSocket", "publish", "send"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@TriggerSerially
@CapabilityDescription("Sends messages to a WebSocket remote endpoint" +
" using a WebSocket session that is established by either ListenWebSocket or ConnectWebSocket.")
@WritesAttributes({
@WritesAttribute(attribute = ATTR_WS_CS_ID, description = "WebSocket Controller Service id."),
@WritesAttribute(attribute = ATTR_WS_SESSION_ID, description = "Established WebSocket session id."),
@WritesAttribute(attribute = ATTR_WS_ENDPOINT_ID, description = "WebSocket endpoint id."),
@WritesAttribute(attribute = ATTR_WS_MESSAGE_TYPE, description = "TEXT or BINARY."),
@WritesAttribute(attribute = ATTR_WS_LOCAL_ADDRESS, description = "WebSocket server address."),
@WritesAttribute(attribute = ATTR_WS_REMOTE_ADDRESS, description = "WebSocket client address."),
@WritesAttribute(attribute = ATTR_WS_FAILURE_DETAIL, description = "Detail of the failure."),
})
public class PutWebSocket extends AbstractProcessor {
public static final PropertyDescriptor PROP_WS_SESSION_ID = new PropertyDescriptor.Builder()
.name("websocket-session-id")
.displayName("WebSocket Session Id")
.description("A NiFi Expression to retrieve the session id.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("${" + ATTR_WS_SESSION_ID + "}")
.build();
public static final PropertyDescriptor PROP_WS_CONTROLLER_SERVICE_ID = new PropertyDescriptor.Builder()
.name("websocket-controller-service-id")
.displayName("WebSocket ControllerService Id")
.description("A NiFi Expression to retrieve the id of a WebSocket ControllerService.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("${" + ATTR_WS_CS_ID + "}")
.build();
public static final PropertyDescriptor PROP_WS_CONTROLLER_SERVICE_ENDPOINT = new PropertyDescriptor.Builder()
.name("websocket-endpoint-id")
.displayName("WebSocket Endpoint Id")
.description("A NiFi Expression to retrieve the endpoint id of a WebSocket ControllerService.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("${" + ATTR_WS_ENDPOINT_ID + "}")
.build();
public static final PropertyDescriptor PROP_WS_MESSAGE_TYPE = new PropertyDescriptor.Builder()
.name("websocket-message-type")
.displayName("WebSocket Message Type")
.description("The type of message content: TEXT or BINARY")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.defaultValue(WebSocketMessage.Type.TEXT.toString())
.expressionLanguageSupported(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are sent successfully to the destination are transferred to this relationship.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to send to the destination are transferred to this relationship.")
.build();
private static final List<PropertyDescriptor> descriptors;
private static final Set<Relationship> relationships;
static{
final List<PropertyDescriptor> innerDescriptorsList = new ArrayList<>();
innerDescriptorsList.add(PROP_WS_SESSION_ID);
innerDescriptorsList.add(PROP_WS_CONTROLLER_SERVICE_ID);
innerDescriptorsList.add(PROP_WS_CONTROLLER_SERVICE_ENDPOINT);
innerDescriptorsList.add(PROP_WS_MESSAGE_TYPE);
descriptors = Collections.unmodifiableList(innerDescriptorsList);
final Set<Relationship> innerRelationshipsSet = new HashSet<>();
innerRelationshipsSet.add(REL_SUCCESS);
innerRelationshipsSet.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(innerRelationshipsSet);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession processSession) throws ProcessException {
final FlowFile flowfile = processSession.get();
if (flowfile == null) {
return;
}
final String sessionId = context.getProperty(PROP_WS_SESSION_ID)
.evaluateAttributeExpressions(flowfile).getValue();
final String webSocketServiceId = context.getProperty(PROP_WS_CONTROLLER_SERVICE_ID)
.evaluateAttributeExpressions(flowfile).getValue();
final String webSocketServiceEndpoint = context.getProperty(PROP_WS_CONTROLLER_SERVICE_ENDPOINT)
.evaluateAttributeExpressions(flowfile).getValue();
final String messageTypeStr = context.getProperty(PROP_WS_MESSAGE_TYPE)
.evaluateAttributeExpressions(flowfile).getValue();
final WebSocketMessage.Type messageType = WebSocketMessage.Type.valueOf(messageTypeStr);
if (StringUtils.isEmpty(sessionId)
|| StringUtils.isEmpty(webSocketServiceId)
|| StringUtils.isEmpty(webSocketServiceEndpoint)) {
transferToFailure(processSession, flowfile, "Required WebSocket attribute was not found.");
return;
}
final ControllerService controllerService = context.getControllerServiceLookup().getControllerService(webSocketServiceId);
if (controllerService == null) {
transferToFailure(processSession, flowfile, "WebSocket ControllerService was not found.");
return;
} else if (!(controllerService instanceof WebSocketService)) {
transferToFailure(processSession, flowfile, "The ControllerService found was not a WebSocket ControllerService but a "
+ controllerService.getClass().getName());
return;
}
final WebSocketService webSocketService = (WebSocketService)controllerService;
final byte[] messageContent = new byte[(int) flowfile.getSize()];
final long startSending = System.currentTimeMillis();
final Map<String, String> attrs = new HashMap<>();
attrs.put(ATTR_WS_CS_ID, webSocketService.getIdentifier());
attrs.put(ATTR_WS_SESSION_ID, sessionId);
attrs.put(ATTR_WS_ENDPOINT_ID, webSocketServiceEndpoint);
attrs.put(ATTR_WS_MESSAGE_TYPE, messageTypeStr);
processSession.read(flowfile, in -> {
StreamUtils.fillBuffer(in, messageContent, true);
});
try {
webSocketService.sendMessage(webSocketServiceEndpoint, sessionId, sender -> {
switch (messageType) {
case TEXT:
sender.sendString(new String(messageContent, CHARSET_NAME));
break;
case BINARY:
sender.sendBinary(ByteBuffer.wrap(messageContent));
break;
}
attrs.put(ATTR_WS_LOCAL_ADDRESS, sender.getLocalAddress().toString());
attrs.put(ATTR_WS_REMOTE_ADDRESS, sender.getRemoteAddress().toString());
final FlowFile updatedFlowFile = processSession.putAllAttributes(flowfile, attrs);
final long transmissionMillis = System.currentTimeMillis() - startSending;
processSession.getProvenanceReporter().send(updatedFlowFile, sender.getTransitUri(), transmissionMillis);
processSession.transfer(updatedFlowFile, REL_SUCCESS);
});
} catch (WebSocketConfigurationException|IllegalStateException|IOException e) {
// WebSocketConfigurationException: If the corresponding WebSocketGatewayProcessor has been stopped.
// IllegalStateException: Session is already closed or not found.
// IOException: other IO error.
getLogger().error("Failed to send message via WebSocket due to " + e, e);
transferToFailure(processSession, flowfile, e.toString());
}
}
private FlowFile transferToFailure(final ProcessSession processSession, FlowFile flowfile, final String value) {
flowfile = processSession.putAttribute(flowfile, ATTR_WS_FAILURE_DETAIL, value);
processSession.transfer(flowfile, REL_FAILURE);
return flowfile;
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.websocket;
public final class WebSocketProcessorAttributes{
public static final String ATTR_WS_CS_ID = "websocket.controller.service.id";
public static final String ATTR_WS_SESSION_ID = "websocket.session.id";
public static final String ATTR_WS_ENDPOINT_ID = "websocket.endpoint.id";
public static final String ATTR_WS_FAILURE_DETAIL = "websocket.failure.detail";
public static final String ATTR_WS_MESSAGE_TYPE = "websocket.message.type";
public static final String ATTR_WS_LOCAL_ADDRESS = "websocket.local.address";
public static final String ATTR_WS_REMOTE_ADDRESS = "websocket.remote.address";
private WebSocketProcessorAttributes() {
}
}

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.websocket.ListenWebSocket
org.apache.nifi.processors.websocket.ConnectWebSocket
org.apache.nifi.processors.websocket.PutWebSocket

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.websocket;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.websocket.AbstractWebSocketSession;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketMessage;
import org.apache.nifi.websocket.WebSocketSession;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class TestConnectWebSocket extends TestListenWebSocket {
@Test
public void testSuccess() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(ConnectWebSocket.class);
final ConnectWebSocket processor = (ConnectWebSocket)runner.getProcessor();
final SharedSessionState sharedSessionState = new SharedSessionState(processor, new AtomicLong(0));
// Use this custom session factory implementation so that createdSessions can be read from test case,
// because MockSessionFactory doesn't expose it.
final Set<MockProcessSession> createdSessions = new HashSet<>();
final ProcessSessionFactory sessionFactory = () -> {
final MockProcessSession session = new MockProcessSession(sharedSessionState, processor);
createdSessions.add(session);
return session;
};
final WebSocketClientService service = mock(WebSocketClientService.class);
final WebSocketSession webSocketSession = spy(AbstractWebSocketSession.class);
when(webSocketSession.getSessionId()).thenReturn("ws-session-id");
when(webSocketSession.getLocalAddress()).thenReturn(new InetSocketAddress("localhost", 12345));
when(webSocketSession.getRemoteAddress()).thenReturn(new InetSocketAddress("example.com", 80));
final String serviceId = "ws-service";
final String endpointId = "client-1";
final String textMessageFromServer = "message from server.";
when(service.getIdentifier()).thenReturn(serviceId);
when(service.getTargetUri()).thenReturn("ws://example.com/web-socket");
doAnswer(invocation -> {
processor.connected(webSocketSession);
// Two times.
processor.consume(webSocketSession, textMessageFromServer);
processor.consume(webSocketSession, textMessageFromServer);
// Three times.
final byte[] binaryMessage = textMessageFromServer.getBytes();
processor.consume(webSocketSession, binaryMessage, 0, binaryMessage.length);
processor.consume(webSocketSession, binaryMessage, 0, binaryMessage.length);
processor.consume(webSocketSession, binaryMessage, 0, binaryMessage.length);
return null;
}).when(service).connect(endpointId);
runner.addControllerService(serviceId, service);
runner.enableControllerService(service);
runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_SERVICE, serviceId);
runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_ID, endpointId);
processor.onTrigger(runner.getProcessContext(), sessionFactory);
final Map<Relationship, List<MockFlowFile>> transferredFlowFiles = getAllTransferredFlowFiles(createdSessions, processor);
List<MockFlowFile> connectedFlowFiles = transferredFlowFiles.get(AbstractWebSocketGatewayProcessor.REL_CONNECTED);
assertEquals(1, connectedFlowFiles.size());
connectedFlowFiles.forEach(ff -> {
assertFlowFile(webSocketSession, serviceId, endpointId, ff, null);
});
List<MockFlowFile> textFlowFiles = transferredFlowFiles.get(AbstractWebSocketGatewayProcessor.REL_MESSAGE_TEXT);
assertEquals(2, textFlowFiles.size());
textFlowFiles.forEach(ff -> {
assertFlowFile(webSocketSession, serviceId, endpointId, ff, WebSocketMessage.Type.TEXT);
});
List<MockFlowFile> binaryFlowFiles = transferredFlowFiles.get(AbstractWebSocketGatewayProcessor.REL_MESSAGE_BINARY);
assertEquals(3, binaryFlowFiles.size());
binaryFlowFiles.forEach(ff -> {
assertFlowFile(webSocketSession, serviceId, endpointId, ff, WebSocketMessage.Type.BINARY);
});
final List<ProvenanceEventRecord> provenanceEvents = sharedSessionState.getProvenanceEvents();
assertEquals(6, provenanceEvents.size());
assertTrue(provenanceEvents.stream().allMatch(event -> ProvenanceEventType.RECEIVE.equals(event.getEventType())));
}
}

View File

@ -0,0 +1,213 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.websocket;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.websocket.AbstractWebSocketSession;
import org.apache.nifi.websocket.WebSocketMessage;
import org.apache.nifi.websocket.WebSocketServerService;
import org.apache.nifi.websocket.WebSocketSession;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class TestListenWebSocket {
protected void assertFlowFile(WebSocketSession webSocketSession, String serviceId, String endpointId, MockFlowFile ff, WebSocketMessage.Type messageType) {
assertEquals(serviceId, ff.getAttribute(ATTR_WS_CS_ID));
assertEquals(webSocketSession.getSessionId(), ff.getAttribute(ATTR_WS_SESSION_ID));
assertEquals(endpointId, ff.getAttribute(ATTR_WS_ENDPOINT_ID));
assertEquals(webSocketSession.getLocalAddress().toString(), ff.getAttribute(ATTR_WS_LOCAL_ADDRESS));
assertEquals(webSocketSession.getRemoteAddress().toString(), ff.getAttribute(ATTR_WS_REMOTE_ADDRESS));
assertEquals(messageType != null ? messageType.name() : null, ff.getAttribute(ATTR_WS_MESSAGE_TYPE));
}
protected Map<Relationship, List<MockFlowFile>> getAllTransferredFlowFiles(final Collection<MockProcessSession> processSessions, final Processor processor) {
final Map<Relationship, List<MockFlowFile>> flowFiles = new HashMap<>();
processSessions.forEach(session -> {
processor.getRelationships().forEach(rel -> {
List<MockFlowFile> relFlowFiles = flowFiles.get(rel);
if (relFlowFiles == null) {
relFlowFiles = new ArrayList<>();
flowFiles.put(rel, relFlowFiles);
}
relFlowFiles.addAll(session.getFlowFilesForRelationship(rel));
});
});
return flowFiles;
}
@Test
public void testValidationError() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(ListenWebSocket.class);
final WebSocketServerService service = mock(WebSocketServerService.class);
final String serviceId = "ws-service";
final String endpointId = "test";
when(service.getIdentifier()).thenReturn(serviceId);
runner.addControllerService(serviceId, service);
runner.enableControllerService(service);
runner.setProperty(ListenWebSocket.PROP_WEBSOCKET_SERVER_SERVICE, serviceId);
runner.setProperty(ListenWebSocket.PROP_SERVER_URL_PATH, endpointId);
try {
runner.run();
fail("Should fail with validation error.");
} catch (AssertionError e) {
assertTrue(e.toString().contains("'server-url-path' is invalid because Must starts with"));
}
}
@Test
public void testSuccess() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(ListenWebSocket.class);
final ListenWebSocket processor = (ListenWebSocket)runner.getProcessor();
final SharedSessionState sharedSessionState = new SharedSessionState(processor, new AtomicLong(0));
// Use this custom session factory implementation so that createdSessions can be read from test case,
// because MockSessionFactory doesn't expose it.
final Set<MockProcessSession> createdSessions = new HashSet<>();
final ProcessSessionFactory sessionFactory = () -> {
final MockProcessSession session = new MockProcessSession(sharedSessionState, processor);
createdSessions.add(session);
return session;
};
final WebSocketServerService service = mock(WebSocketServerService.class);
final WebSocketSession webSocketSession = spy(AbstractWebSocketSession.class);
when(webSocketSession.getSessionId()).thenReturn("ws-session-id");
when(webSocketSession.getLocalAddress()).thenReturn(new InetSocketAddress("localhost", 12345));
when(webSocketSession.getRemoteAddress()).thenReturn(new InetSocketAddress("example.com", 80));
final String serviceId = "ws-service";
final String endpointId = "/test";
final String textMessageReceived = "message from server.";
final AtomicReference<Boolean> registered = new AtomicReference<>(false);
when(service.getIdentifier()).thenReturn(serviceId);
doAnswer(invocation -> {
registered.set(true);
processor.connected(webSocketSession);
// Two times.
processor.consume(webSocketSession, textMessageReceived);
processor.consume(webSocketSession, textMessageReceived);
// Three times.
final byte[] binaryMessage = textMessageReceived.getBytes();
processor.consume(webSocketSession, binaryMessage, 0, binaryMessage.length);
processor.consume(webSocketSession, binaryMessage, 0, binaryMessage.length);
processor.consume(webSocketSession, binaryMessage, 0, binaryMessage.length);
return null;
}).when(service).registerProcessor(endpointId, processor);
doAnswer(invocation -> registered.get())
.when(service).isProcessorRegistered(eq(endpointId), eq(processor));
doAnswer(invocation -> {
registered.set(false);
return null;
}).when(service).deregisterProcessor(eq(endpointId), eq(processor));
runner.addControllerService(serviceId, service);
runner.enableControllerService(service);
runner.setProperty(ListenWebSocket.PROP_WEBSOCKET_SERVER_SERVICE, serviceId);
runner.setProperty(ListenWebSocket.PROP_SERVER_URL_PATH, endpointId);
processor.onTrigger(runner.getProcessContext(), sessionFactory);
Map<Relationship, List<MockFlowFile>> transferredFlowFiles = getAllTransferredFlowFiles(createdSessions, processor);
List<MockFlowFile> connectedFlowFiles = transferredFlowFiles.get(AbstractWebSocketGatewayProcessor.REL_CONNECTED);
assertEquals(1, connectedFlowFiles.size());
connectedFlowFiles.forEach(ff -> {
assertFlowFile(webSocketSession, serviceId, endpointId, ff, null);
});
List<MockFlowFile> textFlowFiles = transferredFlowFiles.get(AbstractWebSocketGatewayProcessor.REL_MESSAGE_TEXT);
assertEquals(2, textFlowFiles.size());
textFlowFiles.forEach(ff -> {
assertFlowFile(webSocketSession, serviceId, endpointId, ff, WebSocketMessage.Type.TEXT);
});
List<MockFlowFile> binaryFlowFiles = transferredFlowFiles.get(AbstractWebSocketGatewayProcessor.REL_MESSAGE_BINARY);
assertEquals(3, binaryFlowFiles.size());
binaryFlowFiles.forEach(ff -> {
assertFlowFile(webSocketSession, serviceId, endpointId, ff, WebSocketMessage.Type.BINARY);
});
final List<ProvenanceEventRecord> provenanceEvents = sharedSessionState.getProvenanceEvents();
assertEquals(6, provenanceEvents.size());
assertTrue(provenanceEvents.stream().allMatch(event -> ProvenanceEventType.RECEIVE.equals(event.getEventType())));
runner.clearTransferState();
runner.clearProvenanceEvents();
createdSessions.clear();
assertEquals(0, createdSessions.size());
// Simulate that the processor has started, and it get's triggered again
processor.onTrigger(runner.getProcessContext(), sessionFactory);
assertEquals("No session should be created", 0, createdSessions.size());
// Simulate that the processor is stopped.
processor.onStopped(runner.getProcessContext());
assertEquals("No session should be created", 0, createdSessions.size());
// Simulate that the processor is restarted.
// And the mock service will emit consume msg events.
processor.onTrigger(runner.getProcessContext(), sessionFactory);
assertEquals("Processor should register it with the service again", 6, createdSessions.size());
}
}

View File

@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.websocket;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.websocket.AbstractWebSocketSession;
import org.apache.nifi.websocket.SendMessage;
import org.apache.nifi.websocket.WebSocketMessage;
import org.apache.nifi.websocket.WebSocketService;
import org.apache.nifi.websocket.WebSocketSession;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_CS_ID;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_FAILURE_DETAIL;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS;
import static org.apache.nifi.processors.websocket.WebSocketProcessorAttributes.ATTR_WS_SESSION_ID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class TestPutWebSocket {
private void assertFlowFile(WebSocketSession webSocketSession, String serviceId, String endpointId, MockFlowFile ff, WebSocketMessage.Type messageType) {
assertEquals(serviceId, ff.getAttribute(ATTR_WS_CS_ID));
assertEquals(webSocketSession.getSessionId(), ff.getAttribute(ATTR_WS_SESSION_ID));
assertEquals(endpointId, ff.getAttribute(ATTR_WS_ENDPOINT_ID));
assertEquals(webSocketSession.getLocalAddress().toString(), ff.getAttribute(ATTR_WS_LOCAL_ADDRESS));
assertEquals(webSocketSession.getRemoteAddress().toString(), ff.getAttribute(ATTR_WS_REMOTE_ADDRESS));
assertEquals(messageType != null ? messageType.name() : null, ff.getAttribute(ATTR_WS_MESSAGE_TYPE));
}
private WebSocketSession getWebSocketSession() {
final WebSocketSession webSocketSession = spy(AbstractWebSocketSession.class);
when(webSocketSession.getSessionId()).thenReturn("ws-session-id");
when(webSocketSession.getLocalAddress()).thenReturn(new InetSocketAddress("localhost", 12345));
when(webSocketSession.getRemoteAddress()).thenReturn(new InetSocketAddress("example.com", 80));
when(webSocketSession.getTransitUri()).thenReturn("ws://example.com/web-socket");
return webSocketSession;
}
@Test
public void testSessionIsNotSpecified() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class);
final WebSocketService service = spy(WebSocketService.class);
final String serviceId = "ws-service";
final String endpointId = "client-1";
final String textMessageFromServer = "message from server.";
when(service.getIdentifier()).thenReturn(serviceId);
runner.addControllerService(serviceId, service);
runner.enableControllerService(service);
final Map<String, String> attributes = new HashMap<>();
attributes.put(ATTR_WS_CS_ID, serviceId);
attributes.put(ATTR_WS_ENDPOINT_ID, endpointId);
runner.enqueue(textMessageFromServer, attributes);
runner.run();
final List<MockFlowFile> succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS);
assertEquals(0, succeededFlowFiles.size());
final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE);
assertEquals(1, failedFlowFiles.size());
final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next();
assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL));
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(0, provenanceEvents.size());
}
@Test
public void testServiceIsNotFound() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class);
final ControllerService service = spy(ControllerService.class);
final WebSocketSession webSocketSession = getWebSocketSession();
final String serviceId = "ws-service";
final String endpointId = "client-1";
final String textMessageFromServer = "message from server.";
when(service.getIdentifier()).thenReturn(serviceId);
runner.addControllerService(serviceId, service);
runner.enableControllerService(service);
final Map<String, String> attributes = new HashMap<>();
attributes.put(ATTR_WS_CS_ID, "different-service-id");
attributes.put(ATTR_WS_ENDPOINT_ID, endpointId);
attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId());
runner.enqueue(textMessageFromServer, attributes);
runner.run();
final List<MockFlowFile> succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS);
assertEquals(0, succeededFlowFiles.size());
final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE);
assertEquals(1, failedFlowFiles.size());
final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next();
assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL));
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(0, provenanceEvents.size());
}
@Test
public void testServiceIsNotWebSocketService() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class);
final ControllerService service = spy(ControllerService.class);
final WebSocketSession webSocketSession = getWebSocketSession();
final String serviceId = "ws-service";
final String endpointId = "client-1";
final String textMessageFromServer = "message from server.";
when(service.getIdentifier()).thenReturn(serviceId);
runner.addControllerService(serviceId, service);
runner.enableControllerService(service);
final Map<String, String> attributes = new HashMap<>();
attributes.put(ATTR_WS_CS_ID, serviceId);
attributes.put(ATTR_WS_ENDPOINT_ID, endpointId);
attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId());
runner.enqueue(textMessageFromServer, attributes);
runner.run();
final List<MockFlowFile> succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS);
assertEquals(0, succeededFlowFiles.size());
final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE);
assertEquals(1, failedFlowFiles.size());
final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next();
assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL));
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(0, provenanceEvents.size());
}
@Test
public void testSendFailure() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class);
final WebSocketService service = spy(WebSocketService.class);
final WebSocketSession webSocketSession = getWebSocketSession();
final String serviceId = "ws-service";
final String endpointId = "client-1";
final String textMessageFromServer = "message from server.";
when(service.getIdentifier()).thenReturn(serviceId);
doAnswer(invocation -> {
final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class);
sendMessage.send(webSocketSession);
return null;
}).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class));
doThrow(new IOException("Sending message failed.")).when(webSocketSession).sendString(anyString());
runner.addControllerService(serviceId, service);
runner.enableControllerService(service);
final Map<String, String> attributes = new HashMap<>();
attributes.put(ATTR_WS_CS_ID, serviceId);
attributes.put(ATTR_WS_ENDPOINT_ID, endpointId);
attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId());
runner.enqueue(textMessageFromServer, attributes);
runner.run();
final List<MockFlowFile> succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS);
assertEquals(0, succeededFlowFiles.size());
final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE);
assertEquals(1, failedFlowFiles.size());
final MockFlowFile failedFlowFile = failedFlowFiles.iterator().next();
assertNotNull(failedFlowFile.getAttribute(ATTR_WS_FAILURE_DETAIL));
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(0, provenanceEvents.size());
}
@Test
public void testSuccess() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(PutWebSocket.class);
final WebSocketService service = spy(WebSocketService.class);
final WebSocketSession webSocketSession = getWebSocketSession();
final String serviceId = "ws-service";
final String endpointId = "client-1";
final String textMessageFromServer = "message from server.";
when(service.getIdentifier()).thenReturn(serviceId);
doAnswer(invocation -> {
final SendMessage sendMessage = invocation.getArgumentAt(2, SendMessage.class);
sendMessage.send(webSocketSession);
return null;
}).when(service).sendMessage(anyString(), anyString(), any(SendMessage.class));
runner.addControllerService(serviceId, service);
runner.enableControllerService(service);
runner.setProperty(PutWebSocket.PROP_WS_MESSAGE_TYPE, "${" + ATTR_WS_MESSAGE_TYPE + "}");
// Enqueue 1st file as Text.
final Map<String, String> attributes = new HashMap<>();
attributes.put(ATTR_WS_CS_ID, serviceId);
attributes.put(ATTR_WS_ENDPOINT_ID, endpointId);
attributes.put(ATTR_WS_SESSION_ID, webSocketSession.getSessionId());
attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.TEXT.name());
runner.enqueue(textMessageFromServer, attributes);
// Enqueue 2nd file as Binary.
attributes.put(ATTR_WS_MESSAGE_TYPE, WebSocketMessage.Type.BINARY.name());
runner.enqueue(textMessageFromServer.getBytes(), attributes);
runner.run(2);
final List<MockFlowFile> succeededFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_SUCCESS);
assertEquals(2, succeededFlowFiles.size());
assertFlowFile(webSocketSession, serviceId, endpointId, succeededFlowFiles.get(0), WebSocketMessage.Type.TEXT);
assertFlowFile(webSocketSession, serviceId, endpointId, succeededFlowFiles.get(1), WebSocketMessage.Type.BINARY);
final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutWebSocket.REL_FAILURE);
assertEquals(0, failedFlowFiles.size());
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(2, provenanceEvents.size());
}
}

View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-bundle</artifactId>
<version>1.1.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-websocket-services-api-nar</artifactId>
<version>1.1.0-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-services-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,41 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF licenses
this file to You under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may obtain
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the License. -->
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-bundle</artifactId>
<version>1.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-websocket-services-api</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.processor.Processor;
import java.io.IOException;
public abstract class AbstractWebSocketService extends AbstractControllerService implements WebSocketService {
final protected WebSocketMessageRouters routers = new WebSocketMessageRouters();
@Override
public void registerProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException {
routers.registerProcessor(endpointId, processor);
}
@Override
public boolean isProcessorRegistered(final String endpointId, final Processor processor) {
return routers.isProcessorRegistered(endpointId, processor);
}
@Override
public void deregisterProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException {
routers.deregisterProcessor(endpointId, processor);
}
@Override
public void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException {
routers.sendMessage(endpointId, sessionId, sendMessage);
}
@Override
public void disconnect(final String endpointId, final String sessionId, final String reason) throws IOException, WebSocketConfigurationException {
routers.disconnect(endpointId, sessionId, reason);
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
public abstract class AbstractWebSocketSession implements WebSocketSession {
private String transitUri;
@Override
public void setTransitUri(final String transitUri) {
this.transitUri = transitUri;
}
@Override
public String getTransitUri() {
return transitUri;
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
public interface BinaryMessageConsumer {
void consume(final WebSocketSessionInfo sessionInfo, final byte[] payload, final int offset, final int length);
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
/**
* To be performed when a WebSocket connection is established.
*/
public interface ConnectedListener {
void connected(final WebSocketSessionInfo sessionInfo);
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import java.io.IOException;
import java.nio.ByteBuffer;
public interface MessageSender extends WebSocketSessionInfo {
void sendString(final String message) throws IOException;
void sendBinary(final ByteBuffer data) throws IOException;
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import java.io.IOException;
public interface SendMessage {
void send(final MessageSender sender) throws IOException;
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
public interface TextMessageConsumer {
void consume(final WebSocketSessionInfo sessionInfo, final String message);
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.controller.ConfigurationContext;
import java.io.IOException;
/**
* Control a WebSocket client instance.
*/
@CapabilityDescription("Control a WebSocket client instance," +
" so that NiFi can connect with external systems via WebSocket protocol.")
public interface WebSocketClientService extends WebSocketService {
void startClient(final ConfigurationContext context) throws Exception;
void stopClient() throws Exception;
void connect(final String clientId) throws IOException;
String getTargetUri();
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
public class WebSocketConfigurationException extends Exception {
public WebSocketConfigurationException(String message) {
super(message);
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
public class WebSocketConnectedMessage extends WebSocketMessage {
public WebSocketConnectedMessage(final WebSocketSessionInfo sessionInfo) {
super(sessionInfo);
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import java.io.UnsupportedEncodingException;
public class WebSocketMessage {
public static final String CHARSET_NAME = "UTF-8";
public enum Type {
TEXT,
BINARY
}
private final WebSocketSessionInfo sessionInfo;
private byte[] payload;
private int offset;
private int length;
private Type type;
public WebSocketMessage(final WebSocketSessionInfo sessionInfo) {
this.sessionInfo = sessionInfo;
}
public WebSocketSessionInfo getSessionInfo() {
return sessionInfo;
}
public byte[] getPayload() {
return payload;
}
public void setPayload(final String text) {
if (text == null) {
return;
}
try {
final byte[] bytes = text.getBytes(CHARSET_NAME);
setPayload(bytes, 0, bytes.length);
type = Type.TEXT;
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("Failed to serialize messageStr, due to " + e, e);
}
}
public void setPayload(final byte[] payload, final int offset, final int length) {
this.payload = payload;
this.offset = offset;
this.length = length;
type = Type.BINARY;
}
public int getOffset() {
return offset;
}
public int getLength() {
return length;
}
public Type getType() {
return type;
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import org.apache.nifi.processor.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class WebSocketMessageRouter {
private static final Logger logger = LoggerFactory.getLogger(WebSocketMessageRouter.class);
private final String endpointId;
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private volatile Processor processor;
public WebSocketMessageRouter(final String endpointId) {
this.endpointId = endpointId;
}
public synchronized void registerProcessor(final Processor processor) throws WebSocketConfigurationException {
if (this.processor != null) {
throw new WebSocketConfigurationException("Processor " + this.processor + " is already assigned to this router.");
}
this.processor = processor;
}
public boolean isProcessorRegistered(final Processor processor) {
return this.processor != null && this.processor.getIdentifier().equals(processor.getIdentifier());
}
public synchronized void deregisterProcessor(final Processor processor) {
if (!isProcessorRegistered(processor)) {
if (this.processor == null) {
logger.info("Deregister processor {}, do nothing because this router doesn't have registered processor",
new Object[]{processor});
} else {
logger.info("Deregister processor {}, do nothing because this router is assigned to different processor {}",
new Object[]{processor, this.processor});
}
return;
}
this.processor = null;
sessions.keySet().forEach(sessionId -> {
try {
disconnect(sessionId, "Processing has stopped.");
} catch (IOException e) {
logger.warn("Failed to disconnect session {} due to {}", sessionId, e, e);
}
});
}
public void captureSession(final WebSocketSession session) {
final String sessionId = session.getSessionId();
sessions.put(sessionId, session);
if (processor != null && processor instanceof ConnectedListener) {
((ConnectedListener)processor).connected(session);
}
}
public void onWebSocketClose(final String sessionId, final int statusCode, final String reason) {
sessions.remove(sessionId);
}
public void onWebSocketText(final String sessionId, final String message) {
if (processor != null && processor instanceof TextMessageConsumer) {
((TextMessageConsumer)processor).consume(getSessionOrFail(sessionId), message);
}
}
public void onWebSocketBinary(final String sessionId, final byte[] payload, final int offset, final int length) {
if (processor != null && processor instanceof BinaryMessageConsumer) {
((BinaryMessageConsumer)processor).consume(getSessionOrFail(sessionId), payload, offset, length);
}
}
private WebSocketSession getSessionOrFail(final String sessionId) {
final WebSocketSession session = sessions.get(sessionId);
if (session == null) {
throw new IllegalStateException("Session was not found for the sessionId: " + sessionId);
}
return session;
}
public void sendMessage(final String sessionId, final SendMessage sendMessage) throws IOException {
final WebSocketSession session = getSessionOrFail(sessionId);
sendMessage.send(session);
}
public void disconnect(final String sessionId, final String reason) throws IOException {
final WebSocketSession session = getSessionOrFail(sessionId);
session.close(reason);
sessions.remove(sessionId);
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import org.apache.nifi.processor.Processor;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class WebSocketMessageRouters {
private Map<String, WebSocketMessageRouter> routers = new ConcurrentHashMap<>();
private synchronized WebSocketMessageRouter getRouterOrCreate(final String endpointId) {
WebSocketMessageRouter router = routers.get(endpointId);
if (router == null) {
router = new WebSocketMessageRouter(endpointId);
routers.put(endpointId, router);
}
return router;
}
public WebSocketMessageRouter getRouterOrFail(final String endpointId) throws WebSocketConfigurationException {
final WebSocketMessageRouter router = routers.get(endpointId);
if (router == null) {
throw new WebSocketConfigurationException("No WebSocket router is bound with endpointId: " + endpointId);
}
return router;
}
public synchronized void registerProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException {
final WebSocketMessageRouter router = getRouterOrCreate(endpointId);
router.registerProcessor(processor);
}
public boolean isProcessorRegistered(final String endpointId, final Processor processor) {
try {
final WebSocketMessageRouter router = getRouterOrFail(endpointId);
return router.isProcessorRegistered(processor);
} catch (WebSocketConfigurationException e) {
return false;
}
}
public synchronized void deregisterProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException {
final WebSocketMessageRouter router = getRouterOrFail(endpointId);
router.deregisterProcessor(processor);
}
public void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException {
final WebSocketMessageRouter router = getRouterOrFail(endpointId);
router.sendMessage(sessionId, sendMessage);
}
public void disconnect(final String endpointId, final String sessionId, final String reason) throws IOException, WebSocketConfigurationException {
final WebSocketMessageRouter router = getRouterOrFail(endpointId);
router.disconnect(sessionId, reason);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.controller.ConfigurationContext;
/**
* Control an embedded WebSocket server instance.
*/
@CapabilityDescription("Control an embedded WebSocket server instance," +
" so that external system can connect this NiFi via WebSocket protocol.")
public interface WebSocketServerService extends WebSocketService {
@OnEnabled
void startServer(final ConfigurationContext context) throws Exception;
@OnDisabled
@OnShutdown
void stopServer() throws Exception;
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.ssl.SSLContextService;
import java.io.IOException;
/**
* Control an embedded WebSocket service instance.
*/
public interface WebSocketService extends ControllerService {
PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder()
.name("ssl-context-service")
.displayName("SSL Context Service")
.description("The SSL Context Service to use in order to secure the server. If specified, the server will accept only WSS requests; "
+ "otherwise, the server will accept only WS requests")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
void registerProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException;
boolean isProcessorRegistered(final String endpointId, final Processor processor);
void deregisterProcessor(final String endpointId, final Processor processor) throws WebSocketConfigurationException;
void sendMessage(final String endpointId, final String sessionId, final SendMessage sendMessage) throws IOException, WebSocketConfigurationException;
void disconnect(final String endpointId, final String sessionId, final String reason) throws Exception;
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import java.io.IOException;
/**
* This is the concrete WebSocket session interface, which provides session information and operations.
*/
public interface WebSocketSession extends MessageSender {
void close(final String reason) throws IOException;
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import java.net.InetSocketAddress;
/**
* This interface only expose session information.
*/
public interface WebSocketSessionInfo {
String getSessionId();
InetSocketAddress getLocalAddress();
InetSocketAddress getRemoteAddress();
boolean isSecure();
void setTransitUri(final String transitUri);
String getTransitUri();
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import org.apache.nifi.processor.Processor;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestWebSocketMessageRouter {
@Test
public void testRegisterProcessor() throws Exception {
final WebSocketMessageRouter router = new WebSocketMessageRouter("endpoint-id");
final Processor processor1 = mock(Processor.class);
when(processor1.getIdentifier()).thenReturn("processor-1");
final Processor processor2 = mock(Processor.class);
when(processor1.getIdentifier()).thenReturn("processor-2");
router.registerProcessor(processor1);
try {
router.registerProcessor(processor2);
fail("Should fail since a processor is already registered.");
} catch (WebSocketConfigurationException e) {
}
assertTrue(router.isProcessorRegistered(processor1));
assertFalse(router.isProcessorRegistered(processor2));
// It's safe to call deregister even if it's not registered.
router.deregisterProcessor(processor2);
router.deregisterProcessor(processor1);
// It's safe to call deregister even if it's not registered.
router.deregisterProcessor(processor2);
}
@Test
public void testSendMessage() throws Exception {
final WebSocketMessageRouter router = new WebSocketMessageRouter("endpoint-id");
final Processor processor1 = mock(Processor.class);
when(processor1.getIdentifier()).thenReturn("processor-1");
final AbstractWebSocketSession session = mock(AbstractWebSocketSession.class);
when(session.getSessionId()).thenReturn("session-1");
doAnswer(invocation -> {
assertEquals("message", invocation.getArgumentAt(0, String.class));
return null;
}).when(session).sendString(anyString());
router.registerProcessor(processor1);
router.captureSession(session);
router.sendMessage("session-1", sender -> sender.sendString("message"));
try {
router.sendMessage("session-2", sender -> sender.sendString("message"));
fail("Should fail because there's no session with id session-2.");
} catch (IllegalStateException e) {
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import org.apache.nifi.processor.Processor;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestWebSocketMessageRouters {
@Test
public void testRegisterProcessor() throws Exception {
final String endpointId = "endpoint-id";
final WebSocketMessageRouters routers = new WebSocketMessageRouters();
try {
routers.getRouterOrFail(endpointId);
fail("Should fail because no route exists with the endpointId.");
} catch (WebSocketConfigurationException e) {
}
final Processor processor1 = mock(Processor.class);
when(processor1.getIdentifier()).thenReturn("processor-1");
assertFalse(routers.isProcessorRegistered(endpointId, processor1));
routers.registerProcessor(endpointId, processor1);
assertNotNull(routers.getRouterOrFail(endpointId));
assertTrue(routers.isProcessorRegistered(endpointId, processor1));
routers.deregisterProcessor(endpointId, processor1);
assertFalse(routers.isProcessorRegistered(endpointId, processor1));
}
}

View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-bundle</artifactId>
<version>1.1.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-websocket-services-jetty-nar</artifactId>
<version>1.1.0-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-services-jetty</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,55 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF licenses
this file to You under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may obtain
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the License. -->
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-bundle</artifactId>
<version>1.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-websocket-services-jetty</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-services-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
<version>9.3.13.v20161014</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket.jetty;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.websocket.AbstractWebSocketService;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import java.util.ArrayList;
import java.util.List;
public abstract class AbstractJettyWebSocketService extends AbstractWebSocketService {
public static final PropertyDescriptor INPUT_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("input-buffer-size")
.displayName("Input Buffer Size")
.description("The size of the input (read from network layer) buffer size.")
.required(true)
.defaultValue("4 kb")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_TEXT_MESSAGE_SIZE = new PropertyDescriptor.Builder()
.name("max-text-message-size")
.displayName("Max Text Message Size")
.description("The maximum size of a text message during parsing/generating.")
.required(true)
.defaultValue("64 kb")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_BINARY_MESSAGE_SIZE = new PropertyDescriptor.Builder()
.name("max-binary-message-size")
.displayName("Max Binary Message Size")
.description("The maximum size of a binary message during parsing/generating.")
.required(true)
.defaultValue("64 kb")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
static List<PropertyDescriptor> getAbstractPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(INPUT_BUFFER_SIZE);
descriptors.add(MAX_TEXT_MESSAGE_SIZE);
descriptors.add(MAX_BINARY_MESSAGE_SIZE);
return descriptors;
}
protected SslContextFactory createSslFactory(final SSLContextService sslService, final boolean needClientAuth, final boolean wantClientAuth) {
final SslContextFactory sslFactory = new SslContextFactory();
sslFactory.setNeedClientAuth(needClientAuth);
sslFactory.setWantClientAuth(wantClientAuth);
if (sslService.isKeyStoreConfigured()) {
sslFactory.setKeyStorePath(sslService.getKeyStoreFile());
sslFactory.setKeyStorePassword(sslService.getKeyStorePassword());
sslFactory.setKeyStoreType(sslService.getKeyStoreType());
}
if (sslService.isTrustStoreConfigured()) {
sslFactory.setTrustStorePath(sslService.getTrustStoreFile());
sslFactory.setTrustStorePassword(sslService.getTrustStorePassword());
sslFactory.setTrustStoreType(sslService.getTrustStoreType());
}
return sslFactory;
}
protected void configurePolicy(final ConfigurationContext context, final WebSocketPolicy policy) {
final int inputBufferSize = context.getProperty(INPUT_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final int maxTextMessageSize = context.getProperty(MAX_TEXT_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
final int maxBinaryMessageSize = context.getProperty(MAX_BINARY_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
policy.setInputBufferSize(inputBufferSize);
policy.setMaxTextMessageSize(maxTextMessageSize);
policy.setMaxBinaryMessageSize(maxBinaryMessageSize);
}
}

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket.jetty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketMessageRouter;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@Tags({"WebSocket", "Jetty", "client"})
@CapabilityDescription("Implementation of WebSocketClientService." +
" This service uses Jetty WebSocket client module to provide" +
" WebSocket session management throughout the application.")
public class JettyWebSocketClient extends AbstractJettyWebSocketService implements WebSocketClientService {
public static final PropertyDescriptor WS_URI = new PropertyDescriptor.Builder()
.name("websocket-uri")
.displayName("WebSocket URI")
.description("The WebSocket URI this client connects to.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.URI_VALIDATOR)
.addValidator((subject, input, context) -> {
final ValidationResult.Builder result = new ValidationResult.Builder()
.valid(input.startsWith("/"))
.subject(subject);
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
result.explanation("Expression Language Present").valid(true);
} else {
result.explanation("Protocol should be either 'ws' or 'wss'.")
.valid(input.startsWith("ws://") || input.startsWith("wss://"));
}
return result.build();
})
.build();
public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
.name("connection-timeout")
.displayName("Connection Timeout")
.description("The timeout to connect the WebSocket URI.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("3 sec")
.build();
private static final List<PropertyDescriptor> properties;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.addAll(getAbstractPropertyDescriptors());
props.add(WS_URI);
props.add(SSL_CONTEXT);
props.add(CONNECTION_TIMEOUT);
properties = Collections.unmodifiableList(props);
}
private WebSocketClient client;
private URI webSocketUri;
private long connectionTimeoutMillis;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@OnEnabled
@Override
public void startClient(final ConfigurationContext context) throws Exception{
final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
SslContextFactory sslContextFactory = null;
if (sslService != null) {
sslContextFactory = createSslFactory(sslService, false, false);
}
client = new WebSocketClient(sslContextFactory);
configurePolicy(context, client.getPolicy());
client.start();
webSocketUri = new URI(context.getProperty(WS_URI).getValue());
connectionTimeoutMillis = context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
}
@OnDisabled
@OnShutdown
@Override
public void stopClient() throws Exception {
if (client == null) {
return;
}
client.stop();
client = null;
}
@Override
public void connect(final String clientId) throws IOException {
final WebSocketMessageRouter router;
try {
router = routers.getRouterOrFail(clientId);
} catch (WebSocketConfigurationException e) {
throw new IllegalStateException("Failed to get router due to: " + e, e);
}
final RoutingWebSocketListener listener = new RoutingWebSocketListener(router);
final ClientUpgradeRequest request = new ClientUpgradeRequest();
final Future<Session> connect = client.connect(listener, webSocketUri, request);
getLogger().info("Connecting to : {}", new Object[]{webSocketUri});
final Session session;
try {
session = connect.get(connectionTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new IOException("Failed to connect " + webSocketUri + " due to: " + e, e);
}
getLogger().info("Connected, session={}", new Object[]{session});
}
@Override
public String getTargetUri() {
return webSocketUri.toString();
}
}

View File

@ -0,0 +1,253 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket.jetty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketMessageRouter;
import org.apache.nifi.websocket.WebSocketServerService;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Tags({"WebSocket", "Jetty", "server"})
@CapabilityDescription("Implementation of WebSocketServerService." +
" This service uses Jetty WebSocket server module to provide" +
" WebSocket session management throughout the application.")
public class JettyWebSocketServer extends AbstractJettyWebSocketService implements WebSocketServerService {
/**
* A global map to refer a controller service instance by requested port number.
*/
private static final Map<Integer, JettyWebSocketServer> portToControllerService = new ConcurrentHashMap<>();
// Allowable values for client auth
public static final AllowableValue CLIENT_NONE = new AllowableValue("no", "No Authentication",
"Processor will not authenticate clients. Anyone can communicate with this Processor anonymously");
public static final AllowableValue CLIENT_WANT = new AllowableValue("want", "Want Authentication",
"Processor will try to verify the client but if unable to verify will allow the client to communicate anonymously");
public static final AllowableValue CLIENT_NEED = new AllowableValue("need", "Need Authentication",
"Processor will reject communications from any client unless the client provides a certificate that is trusted by the TrustStore"
+ "specified in the SSL Context Service");
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("client-authentication")
.displayName("Client Authentication")
.description("Specifies whether or not the Processor should authenticate clients. This value is ignored if the <SSL Context Service> "
+ "Property is not specified or the SSL Context provided uses only a KeyStore and not a TrustStore.")
.required(true)
.allowableValues(CLIENT_NONE, CLIENT_WANT, CLIENT_NEED)
.defaultValue(CLIENT_NONE.getValue())
.build();
public static final PropertyDescriptor LISTEN_PORT = new PropertyDescriptor.Builder()
.name("listen-port")
.displayName("Listen Port")
.description("The port number on which this WebSocketServer listens to.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
private static final List<PropertyDescriptor> properties;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.addAll(getAbstractPropertyDescriptors());
props.add(LISTEN_PORT);
props.add(SSL_CONTEXT);
props.add(CLIENT_AUTH);
properties = Collections.unmodifiableList(props);
}
private WebSocketPolicy configuredPolicy;
private Server server;
private Integer listenPort;
private ServletHandler servletHandler;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
public static class JettyWebSocketServlet extends WebSocketServlet implements WebSocketCreator {
@Override
public void configure(WebSocketServletFactory webSocketServletFactory) {
webSocketServletFactory.setCreator(this);
}
@Override
public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) {
final URI requestURI = servletUpgradeRequest.getRequestURI();
final int port = requestURI.getPort();
final JettyWebSocketServer service = portToControllerService.get(port);
if (service == null) {
throw new RuntimeException("No controller service is bound with port: " + port);
}
final String path = requestURI.getPath();
final WebSocketMessageRouter router;
try {
router = service.routers.getRouterOrFail(path);
} catch (WebSocketConfigurationException e) {
throw new IllegalStateException("Failed to get router due to: " + e, e);
}
final RoutingWebSocketListener listener = new RoutingWebSocketListener(router) {
@Override
public void onWebSocketConnect(Session session) {
final WebSocketPolicy currentPolicy = session.getPolicy();
currentPolicy.setInputBufferSize(service.configuredPolicy.getInputBufferSize());
currentPolicy.setMaxTextMessageSize(service.configuredPolicy.getMaxTextMessageSize());
currentPolicy.setMaxBinaryMessageSize(service.configuredPolicy.getMaxBinaryMessageSize());
super.onWebSocketConnect(session);
}
};
return listener;
}
}
@OnEnabled
@Override
public void startServer(final ConfigurationContext context) throws Exception {
if (server != null && server.isRunning()) {
getLogger().info("A WebSocket server is already running. {}", new Object[]{server});
return;
}
configuredPolicy = WebSocketPolicy.newServerPolicy();
configurePolicy(context, configuredPolicy);
server = new Server();
final ContextHandlerCollection handlerCollection = new ContextHandlerCollection();
final ServletContextHandler contextHandler = new ServletContextHandler();
servletHandler = new ServletHandler();
contextHandler.insertHandler(servletHandler);
handlerCollection.setHandlers(new Handler[]{contextHandler});
server.setHandler(handlerCollection);
listenPort = context.getProperty(LISTEN_PORT).asInteger();
final SslContextFactory sslContextFactory = createSslFactory(context);
final ServerConnector serverConnector = createConnector(sslContextFactory, listenPort);
server.setConnectors(new Connector[] {serverConnector});
servletHandler.addServletWithMapping(JettyWebSocketServlet.class, "/*");
getLogger().info("Starting JettyWebSocketServer on port {}.", new Object[]{listenPort});
server.start();
portToControllerService.put(listenPort, this);
}
private ServerConnector createConnector(final SslContextFactory sslContextFactory, final Integer listenPort) {
final ServerConnector serverConnector;
if (sslContextFactory == null) {
serverConnector = new ServerConnector(server);
} else {
final HttpConfiguration httpsConfiguration = new HttpConfiguration();
httpsConfiguration.setSecureScheme("https");
httpsConfiguration.addCustomizer(new SecureRequestCustomizer());
serverConnector = new ServerConnector(server,
new SslConnectionFactory(sslContextFactory, "http/1.1"),
new HttpConnectionFactory(httpsConfiguration));
}
serverConnector.setPort(listenPort);
return serverConnector;
}
private SslContextFactory createSslFactory(final ConfigurationContext context) {
final SSLContextService sslService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
final boolean need;
final boolean want;
if (CLIENT_NEED.equals(clientAuthValue)) {
need = true;
want = false;
} else if (CLIENT_WANT.equals(clientAuthValue)) {
need = false;
want = true;
} else {
need = false;
want = false;
}
final SslContextFactory sslFactory = (sslService == null) ? null : createSslFactory(sslService, need, want);
return sslFactory;
}
@OnDisabled
@OnShutdown
@Override
public void stopServer() throws Exception {
if (server == null) {
return;
}
getLogger().info("Stopping JettyWebSocketServer.");
server.stop();
if (portToControllerService.containsKey(listenPort)
&& this.getIdentifier().equals(portToControllerService.get(listenPort).getIdentifier())) {
portToControllerService.remove(listenPort);
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket.jetty;
import org.apache.nifi.websocket.AbstractWebSocketSession;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
public class JettyWebSocketSession extends AbstractWebSocketSession {
private final String sessionId;
private final Session session;
public JettyWebSocketSession(final String sessionId, final Session session) {
this.sessionId = sessionId;
this.session = session;
}
@Override
public String getSessionId() {
return sessionId;
}
@Override
public void sendString(final String message) throws IOException {
session.getRemote().sendString(message);
}
@Override
public void sendBinary(final ByteBuffer data) throws IOException {
session.getRemote().sendBytes(data);
}
@Override
public void close(final String reason) throws IOException {
session.close(StatusCode.NORMAL, reason);
}
@Override
public InetSocketAddress getRemoteAddress() {
return session.getRemoteAddress();
}
@Override
public InetSocketAddress getLocalAddress() {
return session.getLocalAddress();
}
@Override
public boolean isSecure() {
return session.isSecure();
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket.jetty;
import org.apache.nifi.websocket.WebSocketMessageRouter;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import java.util.UUID;
public class RoutingWebSocketListener extends WebSocketAdapter {
private final WebSocketMessageRouter router;
private String sessionId;
public RoutingWebSocketListener(final WebSocketMessageRouter router) {
this.router = router;
}
@Override
public void onWebSocketConnect(final Session session) {
super.onWebSocketConnect(session);
sessionId = UUID.randomUUID().toString();
final JettyWebSocketSession webSocketSession = new JettyWebSocketSession(sessionId, session);
router.captureSession(webSocketSession);
}
@Override
public void onWebSocketClose(final int statusCode, final String reason) {
super.onWebSocketClose(statusCode, reason);
router.onWebSocketClose(sessionId, statusCode, reason);
}
@Override
public void onWebSocketText(final String message) {
router.onWebSocketText(sessionId, message);
}
@Override
public void onWebSocketBinary(final byte[] payload, final int offset, final int len) {
router.onWebSocketBinary(sessionId, payload, offset, len);
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.websocket.jetty.JettyWebSocketServer
org.apache.nifi.websocket.jetty.JettyWebSocketClient

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.util.MockControllerServiceInitializationContext;
import org.apache.nifi.util.MockPropertyValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ControllerServiceTestContext {
private final ConfigurationContext configurationContext = mock(ConfigurationContext.class);
private final ValidationContext validationContext = mock(ValidationContext.class);
private MockControllerServiceInitializationContext initializationContext;
public ControllerServiceTestContext(ControllerService controllerService, String id) {
initializationContext = new MockControllerServiceInitializationContext(controllerService, id);
doAnswer(invocation -> configurationContext.getProperty(invocation.getArgumentAt(0, PropertyDescriptor.class)))
.when(validationContext).getProperty(any(PropertyDescriptor.class));
controllerService.getPropertyDescriptors().forEach(prop -> setDefaultValue(prop));
}
public MockControllerServiceInitializationContext getInitializationContext() {
return initializationContext;
}
public ConfigurationContext getConfigurationContext() {
return configurationContext;
}
public MockPropertyValue setDefaultValue(PropertyDescriptor propertyDescriptor) {
return setCustomValue(propertyDescriptor, propertyDescriptor.getDefaultValue());
}
public MockPropertyValue setCustomValue(PropertyDescriptor propertyDescriptor, String value) {
final MockPropertyValue propertyValue = new MockPropertyValue(value, initializationContext);
when(configurationContext.getProperty(eq(propertyDescriptor)))
.thenReturn(propertyValue);
return propertyValue;
}
public ValidationContext getValidationContext() {
return validationContext;
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.websocket.jetty.JettyWebSocketClient;
import org.junit.Test;
import java.util.Collection;
import static org.junit.Assert.assertEquals;
public class TestJettyWebSocketClient {
@Test
public void testValidationRequiredProperties() throws Exception {
final JettyWebSocketClient service = new JettyWebSocketClient();
final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
service.initialize(context.getInitializationContext());
final Collection<ValidationResult> results = service.validate(context.getValidationContext());
assertEquals(1, results.size());
final ValidationResult result = results.iterator().next();
assertEquals(JettyWebSocketClient.WS_URI.getName(), result.getSubject());
}
@Test
public void testValidationSuccess() throws Exception {
final JettyWebSocketClient service = new JettyWebSocketClient();
final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
context.setCustomValue(JettyWebSocketClient.WS_URI, "ws://localhost:9001/test");
service.initialize(context.getInitializationContext());
final Collection<ValidationResult> results = service.validate(context.getValidationContext());
assertEquals(0, results.size());
}
@Test
public void testValidationProtocol() throws Exception {
final JettyWebSocketClient service = new JettyWebSocketClient();
final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
context.setCustomValue(JettyWebSocketClient.WS_URI, "http://localhost:9001/test");
service.initialize(context.getInitializationContext());
final Collection<ValidationResult> results = service.validate(context.getValidationContext());
assertEquals(1, results.size());
final ValidationResult result = results.iterator().next();
assertEquals(JettyWebSocketClient.WS_URI.getName(), result.getSubject());
}
}

View File

@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.websocket.jetty.JettyWebSocketClient;
import org.apache.nifi.websocket.jetty.JettyWebSocketServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
public class TestJettyWebSocketCommunication {
protected int serverPort;
protected String serverPath = "/test";
protected WebSocketServerService serverService;
protected ControllerServiceTestContext serverServiceContext;
protected WebSocketClientService clientService;
protected ControllerServiceTestContext clientServiceContext;
protected boolean isSecure() {
return false;
}
@Before
public void setup() throws Exception {
setupServer();
setupClient();
}
private void setupServer() throws Exception {
// Find an open port.
try (final ServerSocket serverSocket = new ServerSocket(0)) {
serverPort = serverSocket.getLocalPort();
}
serverService = new JettyWebSocketServer();
serverServiceContext = new ControllerServiceTestContext(serverService, "JettyWebSocketServer1");
serverServiceContext.setCustomValue(JettyWebSocketServer.LISTEN_PORT, String.valueOf(serverPort));
customizeServer();
serverService.initialize(serverServiceContext.getInitializationContext());
serverService.startServer(serverServiceContext.getConfigurationContext());
}
protected void customizeServer() {
}
private void setupClient() throws Exception {
clientService = new JettyWebSocketClient();
clientServiceContext = new ControllerServiceTestContext(clientService, "JettyWebSocketClient1");
clientServiceContext.setCustomValue(JettyWebSocketClient.WS_URI, (isSecure() ? "wss" : "ws") + "://localhost:" + serverPort + serverPath);
customizeClient();
clientService.initialize(clientServiceContext.getInitializationContext());
clientService.startClient(clientServiceContext.getConfigurationContext());
}
protected void customizeClient() {
}
@After
public void teardown() throws Exception {
clientService.stopClient();
serverService.stopServer();
}
protected interface MockWebSocketProcessor extends Processor, ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
}
@Test
public void testClientServerCommunication() throws Exception {
// Expectations.
final CountDownLatch serverIsConnectedByClient = new CountDownLatch(1);
final CountDownLatch clientConnectedServer = new CountDownLatch(1);
final CountDownLatch serverReceivedTextMessageFromClient = new CountDownLatch(1);
final CountDownLatch serverReceivedBinaryMessageFromClient = new CountDownLatch(1);
final CountDownLatch clientReceivedTextMessageFromServer = new CountDownLatch(1);
final CountDownLatch clientReceivedBinaryMessageFromServer = new CountDownLatch(1);
final String textMessageFromClient = "Message from client.";
final String textMessageFromServer = "Message from server.";
final MockWebSocketProcessor serverProcessor = mock(MockWebSocketProcessor.class);
doReturn("serverProcessor1").when(serverProcessor).getIdentifier();
final AtomicReference<String> serverSessionIdRef = new AtomicReference<>();
doAnswer(invocation -> assertConnectedEvent(serverIsConnectedByClient, serverSessionIdRef, invocation))
.when(serverProcessor).connected(any(WebSocketSessionInfo.class));
doAnswer(invocation -> assertConsumeTextMessage(serverReceivedTextMessageFromClient, textMessageFromClient, invocation))
.when(serverProcessor).consume(any(WebSocketSessionInfo.class), anyString());
doAnswer(invocation -> assertConsumeBinaryMessage(serverReceivedBinaryMessageFromClient, textMessageFromClient, invocation))
.when(serverProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
serverService.registerProcessor(serverPath, serverProcessor);
final String clientId = "client1";
final MockWebSocketProcessor clientProcessor = mock(MockWebSocketProcessor.class);
doReturn("clientProcessor1").when(clientProcessor).getIdentifier();
final AtomicReference<String> clientSessionIdRef = new AtomicReference<>();
doAnswer(invocation -> assertConnectedEvent(clientConnectedServer, clientSessionIdRef, invocation))
.when(clientProcessor).connected(any(WebSocketSessionInfo.class));
doAnswer(invocation -> assertConsumeTextMessage(clientReceivedTextMessageFromServer, textMessageFromServer, invocation))
.when(clientProcessor).consume(any(WebSocketSessionInfo.class), anyString());
doAnswer(invocation -> assertConsumeBinaryMessage(clientReceivedBinaryMessageFromServer, textMessageFromServer, invocation))
.when(clientProcessor).consume(any(WebSocketSessionInfo.class), any(byte[].class), anyInt(), anyInt());
clientService.registerProcessor(clientId, clientProcessor);
clientService.connect(clientId);
assertTrue("WebSocket client should be able to fire connected event.", clientConnectedServer.await(5, TimeUnit.SECONDS));
assertTrue("WebSocket server should be able to fire connected event.", serverIsConnectedByClient.await(5, TimeUnit.SECONDS));
clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendString(textMessageFromClient));
clientService.sendMessage(clientId, clientSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromClient.getBytes())));
assertTrue("WebSocket server should be able to consume text message.", serverReceivedTextMessageFromClient.await(5, TimeUnit.SECONDS));
assertTrue("WebSocket server should be able to consume binary message.", serverReceivedBinaryMessageFromClient.await(5, TimeUnit.SECONDS));
serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendString(textMessageFromServer));
serverService.sendMessage(serverPath, serverSessionIdRef.get(), sender -> sender.sendBinary(ByteBuffer.wrap(textMessageFromServer.getBytes())));
assertTrue("WebSocket client should be able to consume text message.", clientReceivedTextMessageFromServer.await(5, TimeUnit.SECONDS));
assertTrue("WebSocket client should be able to consume binary message.", clientReceivedBinaryMessageFromServer.await(5, TimeUnit.SECONDS));
clientService.deregisterProcessor(clientId, clientProcessor);
serverService.deregisterProcessor(serverPath, serverProcessor);
}
protected Object assertConnectedEvent(CountDownLatch latch, AtomicReference<String> sessionIdRef, InvocationOnMock invocation) {
final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
assertNotNull(sessionInfo.getLocalAddress());
assertNotNull(sessionInfo.getRemoteAddress());
assertNotNull(sessionInfo.getSessionId());
assertEquals(isSecure(), sessionInfo.isSecure());
sessionIdRef.set(sessionInfo.getSessionId());
latch.countDown();
return null;
}
protected Object assertConsumeTextMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
assertNotNull(sessionInfo.getLocalAddress());
assertNotNull(sessionInfo.getRemoteAddress());
assertNotNull(sessionInfo.getSessionId());
assertEquals(isSecure(), sessionInfo.isSecure());
final String receivedMessage = invocation.getArgumentAt(1, String.class);
assertNotNull(receivedMessage);
assertEquals(expectedMessage, receivedMessage);
latch.countDown();
return null;
}
protected Object assertConsumeBinaryMessage(CountDownLatch latch, String expectedMessage, InvocationOnMock invocation) {
final WebSocketSessionInfo sessionInfo = invocation.getArgumentAt(0, WebSocketSessionInfo.class);
assertNotNull(sessionInfo.getLocalAddress());
assertNotNull(sessionInfo.getRemoteAddress());
assertNotNull(sessionInfo.getSessionId());
assertEquals(isSecure(), sessionInfo.isSecure());
final byte[] receivedMessage = invocation.getArgumentAt(1, byte[].class);
final byte[] expectedBinary = expectedMessage.getBytes();
final int offset = invocation.getArgumentAt(2, Integer.class);
final int length = invocation.getArgumentAt(3, Integer.class);
assertNotNull(receivedMessage);
assertEquals(expectedBinary.length, receivedMessage.length);
assertEquals(expectedMessage, new String(receivedMessage));
assertEquals(0, offset);
assertEquals(expectedBinary.length, length);
latch.countDown();
return null;
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.junit.Test;
public class TestJettyWebSocketSecureCommunication extends TestJettyWebSocketCommunication{
private final StandardSSLContextService sslContextService = new StandardSSLContextService();
private final ControllerServiceTestContext sslTestContext = new ControllerServiceTestContext(sslContextService, "SSLContextService");
public TestJettyWebSocketSecureCommunication() {
try {
sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE, "src/test/resources/certs/localhost-ks.jks");
sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest");
sslTestContext.setCustomValue(StandardSSLContextService.KEYSTORE_TYPE, "JKS");
sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE, "src/test/resources/certs/localhost-ks.jks");
sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest");
sslTestContext.setCustomValue(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS");
sslContextService.initialize(sslTestContext.getInitializationContext());
sslContextService.onConfigured(sslTestContext.getConfigurationContext());
} catch (InitializationException e) {
throw new RuntimeException(e);
}
}
@Override
protected boolean isSecure() {
return true;
}
@Override
protected void customizeServer() {
serverServiceContext.getInitializationContext().addControllerService(sslContextService);
serverServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
}
@Override
protected void customizeClient() {
clientServiceContext.getInitializationContext().addControllerService(sslContextService);
clientServiceContext.setCustomValue(WebSocketService.SSL_CONTEXT, sslContextService.getIdentifier());
}
@Test
public void testClientServerCommunication() throws Exception {
super.testClientServerCommunication();
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.websocket.jetty.JettyWebSocketServer;
import org.junit.Test;
import java.util.Collection;
import static org.junit.Assert.assertEquals;
public class TestJettyWebSocketServer {
@Test
public void testValidationRequiredProperties() throws Exception {
final JettyWebSocketServer service = new JettyWebSocketServer();
final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
service.initialize(context.getInitializationContext());
final Collection<ValidationResult> results = service.validate(context.getValidationContext());
assertEquals(1, results.size());
final ValidationResult result = results.iterator().next();
assertEquals(JettyWebSocketServer.LISTEN_PORT.getName(), result.getSubject());
}
@Test
public void testValidationSuccess() throws Exception {
final JettyWebSocketServer service = new JettyWebSocketServer();
final ControllerServiceTestContext context = new ControllerServiceTestContext(service, "service-id");
context.setCustomValue(JettyWebSocketServer.LISTEN_PORT, "9001");
service.initialize(context.getInitializationContext());
final Collection<ValidationResult> results = service.validate(context.getValidationContext());
assertEquals(0, results.size());
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket.example;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* This is a WebSocket client example testcase.
*/
@Ignore
public class WebSocketClientExample {
private static Logger logger = LoggerFactory.getLogger(WebSocketClientExample.class);
@Test
public void test() {
String destUri = "wss://localhost:50010/test";
final CountDownLatch replyLatch = new CountDownLatch(1);
final SslContextFactory sslContextFactory = new SslContextFactory();
sslContextFactory.setKeyStorePath("src/test/resources/certs/localhost-ks.jks");
sslContextFactory.setKeyStorePassword("localtest");
sslContextFactory.setKeyStoreType("JKS");
sslContextFactory.setTrustStorePath("src/test/resources/certs/localhost-ks.jks");
sslContextFactory.setTrustStorePassword("localtest");
sslContextFactory.setTrustStoreType("JKS");
WebSocketClient client = new WebSocketClient(sslContextFactory);
WebSocketAdapter socket = new WebSocketAdapter() {
@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
try {
session.getRemote().sendString("Hello, this is Jetty ws client.");
} catch (IOException e) {
logger.error("Failed to send a message due to " + e, e);
}
}
@Override
public void onWebSocketText(String message) {
logger.info("Received a reply: {}", message);
replyLatch.countDown();
}
};
try {
client.start();
URI echoUri = new URI(destUri);
ClientUpgradeRequest request = new ClientUpgradeRequest();
final Future<Session> connect = client.connect(socket, echoUri, request);
logger.info("Connecting to : {}", echoUri);
final Session session = connect.get(3, TimeUnit.SECONDS);
logger.info("Connected, session={}", session);
session.close(StatusCode.NORMAL, "Bye");
} catch (Throwable t) {
t.printStackTrace();
} finally {
try {
client.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.websocket.example;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* This is a WebSocket server example testcase.
*/
@Ignore
public class WebSocketServerExample {
private static Logger logger = LoggerFactory.getLogger(WebSocketServerExample.class);
private static Server server;
private static ServletHandler servletHandler;
private static ServletHolder servletHolder;
private static ServerConnector httpConnector;
private static ServerConnector sslConnector;
private static final Map<Integer, WebSocketServerExample> portToController = new HashMap<>();
private Map<String, WebSocketListener> listeners = new HashMap<>();
public class SocketListener extends WebSocketAdapter {
public SocketListener() {
logger.info("New instance is created: {}", this);
}
@Override
public void onWebSocketConnect(Session session) {
logger.info("Connected, {}, {}", session.getLocalAddress(), session.getRemoteAddress());
super.onWebSocketConnect(session);
session.getUpgradeRequest().getRequestURI();
}
@Override
public void onWebSocketText(String message) {
logger.info("Received: {}", message);
final String resultMessage;
if (message.startsWith("add-servlet")) {
// Is it possible to add servlet mapping??
final String path = message.split(":")[1].trim();
servletHandler.addServletWithMapping(servletHolder, path);
resultMessage = "Deployed new servlet under: " + path;
} else {
resultMessage = "Got message: " + message;
}
try {
getSession().getRemote().sendString(resultMessage);
} catch (IOException e) {
logger.error("Failed to send a message back to remote.", e);
}
}
}
public WebSocketServerExample() {
this.listeners.put("/test", new SocketListener());
portToController.put(httpConnector.getPort(), this);
portToController.put(sslConnector.getPort(), this);
}
public static class WSServlet extends WebSocketServlet implements WebSocketCreator {
@Override
public void configure(WebSocketServletFactory webSocketServletFactory) {
webSocketServletFactory.setCreator(this);
}
@Override
public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, ServletUpgradeResponse servletUpgradeResponse) {
final WebSocketServerExample testWebSocket = portToController.get(servletUpgradeRequest.getLocalPort());
return testWebSocket.listeners.get(servletUpgradeRequest.getRequestURI().getPath());
}
}
public static class ConnectionCheckServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
resp.setContentType("text/plain");
resp.setStatus(HttpServletResponse.SC_OK);
resp.getWriter().println("Ok :)");
}
}
@BeforeClass
public static void setup() throws Exception {
server = new Server(0);
final ContextHandlerCollection handlerCollection = new ContextHandlerCollection();
final ServletContextHandler contextHandler = new ServletContextHandler();
servletHandler = new ServletHandler();
contextHandler.insertHandler(servletHandler);
handlerCollection.setHandlers(new Handler[]{contextHandler});
server.setHandler(handlerCollection);
httpConnector = new ServerConnector(server);
httpConnector.setPort(50010);
final SslContextFactory sslContextFactory = new SslContextFactory();
sslContextFactory.setKeyStorePath("src/test/resources/certs/localhost-ks.jks");
sslContextFactory.setKeyStorePassword("localtest");
sslContextFactory.setKeyStoreType("JKS");
final HttpConfiguration https = new HttpConfiguration();
https.addCustomizer(new SecureRequestCustomizer());
sslConnector = new ServerConnector(server,
new SslConnectionFactory(sslContextFactory, "http/1.1"),
new HttpConnectionFactory(https));
sslConnector.setPort(50011);
server.setConnectors(new Connector[]{httpConnector, sslConnector});
servletHolder = servletHandler.addServletWithMapping(WSServlet.class, "/test");
servletHolder = servletHandler.addServletWithMapping(ConnectionCheckServlet.class, "/check");
server.start();
logger.info("Starting server on port {} for HTTP, and {} for HTTPS", httpConnector.getLocalPort(), sslConnector.getLocalPort());
}
@AfterClass
public static void teardown() throws Exception {
logger.info("Stopping server.");
try {
server.stop();
} catch (Exception e) {
logger.error("Failed to stop Jetty server due to " + e, e);
}
}
@Test
public void test() throws Exception {
logger.info("Waiting for a while...");
Thread.sleep(1000_000);
}
}

View File

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.1.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-bundle</artifactId>
<version>1.1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-websocket-processors</module>
<module>nifi-websocket-processors-nar</module>
<module>nifi-websocket-services-api</module>
<module>nifi-websocket-services-api-nar</module>
<module>nifi-websocket-services-jetty</module>
<module>nifi-websocket-services-jetty-nar</module>
</modules>
</project>

View File

@ -71,6 +71,7 @@
<module>nifi-ignite-bundle</module>
<module>nifi-email-bundle</module>
<module>nifi-ranger-bundle</module>
<module>nifi-websocket-bundle</module>
</modules>
<dependencyManagement>

28
pom.xml
View File

@ -1251,6 +1251,24 @@ language governing permissions and limitations under the License. -->
<version>1.1.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-services-api-nar</artifactId>
<version>1.1.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-services-jetty-nar</artifactId>
<version>1.1.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-processors-nar</artifactId>
<version>1.1.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-datadog-nar</artifactId>
@ -1318,6 +1336,16 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-hbase-client-service-api</artifactId>
<version>1.1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-services-api</artifactId>
<version>1.1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-websocket-services-jetty</artifactId>
<version>1.1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-assembly</artifactId>