NIFI-282: Refactored to remove Jersey client from dependencies; made site-to-site config serializable; allowed SiteToSiteClient.Builder to build a SiteToSiteClientConfig without building the client itself.

This commit is contained in:
Mark Payne 2015-02-16 14:18:24 -05:00
parent 8f0402fbbc
commit e16fc7972c
10 changed files with 318 additions and 140 deletions

View File

@ -30,6 +30,12 @@ public interface RemoteDestination {
*/ */
String getIdentifier(); String getIdentifier();
/**
* Returns the human-readable name of the remote destination
* @return
*/
String getName();
/** /**
* Returns the amount of time that system should pause sending to a particular node if unable to * Returns the amount of time that system should pause sending to a particular node if unable to
* send data to or receive data from this endpoint * send data to or receive data from this endpoint

View File

@ -1,6 +1,7 @@
<?xml version="1.0"?> <?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" <project
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
@ -20,19 +21,20 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId> <artifactId>nifi-utils</artifactId>
</dependency> </dependency>
<!-- <dependency> <groupId>com.sun.jersey</groupId> <artifactId>jersey-client</artifactId>
</dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-web-utils</artifactId>
</dependency> -->
<dependency> <dependency>
<groupId>com.sun.jersey</groupId> <groupId>org.codehaus.jackson</groupId>
<artifactId>jersey-client</artifactId> <artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-client-dto</artifactId> <artifactId>nifi-client-dto</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version> <version>0.0.2-incubating-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-utils</artifactId>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@ -19,6 +19,7 @@ package org.apache.nifi.remote.client;
import java.io.Closeable; import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@ -122,7 +123,9 @@ public interface SiteToSiteClient extends Closeable {
* and a new client created. * and a new client created.
* </p> * </p>
*/ */
public static class Builder { public static class Builder implements Serializable {
private static final long serialVersionUID = -4954962284343090219L;
private String url; private String url;
private long timeoutNanos = TimeUnit.SECONDS.toNanos(30); private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
private long penalizationNanos = TimeUnit.SECONDS.toNanos(3); private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
@ -309,21 +312,13 @@ public interface SiteToSiteClient extends Closeable {
return this; return this;
} }
/** /**
* Builds a new SiteToSiteClient that can be used to send and receive data with remote instances of NiFi * Returns a {@link SiteToSiteClientConfig} for the configured values but does not create a SiteToSiteClient
* @return * @return
*/ */
public SiteToSiteClient build() { public SiteToSiteClientConfig buildConfig() {
if ( url == null ) {
throw new IllegalStateException("Must specify URL to build Site-to-Site client");
}
if ( portName == null && portIdentifier == null ) {
throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client");
}
final SiteToSiteClientConfig config = new SiteToSiteClientConfig() { final SiteToSiteClientConfig config = new SiteToSiteClientConfig() {
private static final long serialVersionUID = 1323119754841633818L;
@Override @Override
public boolean isUseCompression() { public boolean isUseCompression() {
@ -391,7 +386,26 @@ public interface SiteToSiteClient extends Closeable {
} }
}; };
return new SocketClient(config); return config;
}
/**
* Builds a new SiteToSiteClient that can be used to send and receive data with remote instances of NiFi
* @return
*
* @throws IllegalStateException if either the url is not set or neither the port name nor port identifier
* is set.
*/
public SiteToSiteClient build() {
if ( url == null ) {
throw new IllegalStateException("Must specify URL to build Site-to-Site client");
}
if ( portName == null && portIdentifier == null ) {
throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client");
}
return new SocketClient(buildConfig());
} }
/** /**

View File

@ -17,6 +17,7 @@
package org.apache.nifi.remote.client; package org.apache.nifi.remote.client;
import java.io.File; import java.io.File;
import java.io.Serializable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@ -24,7 +25,7 @@ import javax.net.ssl.SSLContext;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.DataPacket;
public interface SiteToSiteClientConfig { public interface SiteToSiteClientConfig extends Serializable {
/** /**
* Returns the configured URL for the remote NiFi instance * Returns the configured URL for the remote NiFi instance

View File

@ -79,8 +79,8 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.remote.util.NiFiRestApiUtil;
import org.apache.nifi.remote.util.PeerStatusCache; import org.apache.nifi.remote.util.PeerStatusCache;
import org.apache.nifi.remote.util.RemoteNiFiUtils;
import org.apache.nifi.reporting.Severity; import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.web.api.dto.ControllerDTO; import org.apache.nifi.web.api.dto.ControllerDTO;
@ -201,6 +201,17 @@ public class EndpointConnectionPool {
}, 5, 5, TimeUnit.SECONDS); }, 5, 5, TimeUnit.SECONDS);
} }
private String getPortIdentifier(final TransferDirection transferDirection) throws IOException {
if ( remoteDestination.getIdentifier() != null ) {
return remoteDestination.getIdentifier();
}
if ( transferDirection == TransferDirection.RECEIVE ) {
return getOutputPortIdentifier(remoteDestination.getName());
} else {
return getInputPortIdentifier(remoteDestination.getName());
}
}
public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
return getEndpointConnection(direction, null); return getEndpointConnection(direction, null);
@ -222,14 +233,15 @@ public class EndpointConnectionPool {
do { do {
connection = connectionQueue.poll(); connection = connectionQueue.poll();
logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection); logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection);
final String portId = getPortIdentifier(direction);
if ( connection == null && !addBack.isEmpty() ) { if ( connection == null && !addBack.isEmpty() ) {
// all available connections have been penalized. // all available connections have been penalized.
logger.debug("{} all Connections for {} are penalized; returning no Connection", this, remoteDestination.getIdentifier()); logger.debug("{} all Connections for {} are penalized; returning no Connection", this, portId);
return null; return null;
} }
if ( connection != null && connection.getPeer().isPenalized(remoteDestination.getIdentifier()) ) { if ( connection != null && connection.getPeer().isPenalized(portId) ) {
// we have a connection, but it's penalized. We want to add it back to the queue // we have a connection, but it's penalized. We want to add it back to the queue
// when we've found one to use. // when we've found one to use.
addBack.add(connection); addBack.add(connection);
@ -238,9 +250,9 @@ public class EndpointConnectionPool {
// if we can't get an existing Connection, create one // if we can't get an existing Connection, create one
if ( connection == null ) { if ( connection == null ) {
logger.debug("{} No Connection available for Port {}; creating new Connection", this, remoteDestination.getIdentifier()); logger.debug("{} No Connection available for Port {}; creating new Connection", this, portId);
protocol = new SocketClientProtocol(); protocol = new SocketClientProtocol();
protocol.setDestination(remoteDestination); protocol.setDestination(new IdEnrichedRemoteDestination(remoteDestination, portId));
logger.debug("{} getting next peer status", this); logger.debug("{} getting next peer status", this);
final PeerStatus peerStatus = getNextPeerStatus(direction); final PeerStatus peerStatus = getNextPeerStatus(direction);
@ -249,11 +261,12 @@ public class EndpointConnectionPool {
return null; return null;
} }
final long penalizationMillis = remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS);
try { try {
logger.debug("{} Establishing site-to-site connection with {}", this, peerStatus); logger.debug("{} Establishing site-to-site connection with {}", this, peerStatus);
commsSession = establishSiteToSiteConnection(peerStatus); commsSession = establishSiteToSiteConnection(peerStatus);
} catch (final IOException ioe) { } catch (final IOException ioe) {
penalize(peerStatus, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); penalize(peerStatus, penalizationMillis);
throw ioe; throw ioe;
} }
@ -289,17 +302,17 @@ public class EndpointConnectionPool {
// handle error cases // handle error cases
if ( protocol.isDestinationFull() ) { if ( protocol.isDestinationFull() ) {
logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer); logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer);
penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); penalize(peer, penalizationMillis);
connectionQueue.offer(connection); connectionQueue.offer(connection);
continue; continue;
} else if ( protocol.isPortInvalid() ) { } else if ( protocol.isPortInvalid() ) {
penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); penalize(peer, penalizationMillis);
cleanup(protocol, peer); cleanup(protocol, peer);
throw new PortNotRunningException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not running"); throw new PortNotRunningException(peer.toString() + " indicates that port " + portId + " is not running");
} else if ( protocol.isPortUnknown() ) { } else if ( protocol.isPortUnknown() ) {
penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); penalize(peer, penalizationMillis);
cleanup(protocol, peer); cleanup(protocol, peer);
throw new UnknownPortException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not known"); throw new UnknownPortException(peer.toString() + " indicates that port " + portId + " is not known");
} }
// negotiate the FlowFileCodec to use // negotiate the FlowFileCodec to use
@ -309,7 +322,7 @@ public class EndpointConnectionPool {
} catch (final PortNotRunningException | UnknownPortException e) { } catch (final PortNotRunningException | UnknownPortException e) {
throw e; throw e;
} catch (final Exception e) { } catch (final Exception e) {
penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); penalize(peer, penalizationMillis);
cleanup(protocol, peer); cleanup(protocol, peer);
final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString()); final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString());
@ -539,7 +552,16 @@ public class EndpointConnectionPool {
clientProtocol.setTimeout(commsTimeout); clientProtocol.setTimeout(commsTimeout);
if (clientProtocol.getVersionNegotiator().getVersion() < 5) { if (clientProtocol.getVersionNegotiator().getVersion() < 5) {
clientProtocol.handshake(peer, remoteDestination.getIdentifier()); String portId = getPortIdentifier(TransferDirection.RECEIVE);
if ( portId == null ) {
portId = getPortIdentifier(TransferDirection.SEND);
}
if ( portId == null ) {
peer.close();
throw new IOException("Failed to determine the identifier of port " + remoteDestination.getName());
}
clientProtocol.handshake(peer, portId);
} else { } else {
clientProtocol.handshake(peer, null); clientProtocol.handshake(peer, null);
} }
@ -818,8 +840,8 @@ public class EndpointConnectionPool {
private ControllerDTO refreshRemoteInfo() throws IOException { private ControllerDTO refreshRemoteInfo() throws IOException {
final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https"); final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https");
final RemoteNiFiUtils utils = new RemoteNiFiUtils(webInterfaceSecure ? sslContext : null); final NiFiRestApiUtil utils = new NiFiRestApiUtil(webInterfaceSecure ? sslContext : null);
final ControllerDTO controller = utils.getController(URI.create(apiUri + "/controller"), commsTimeout); final ControllerDTO controller = utils.getController(apiUri + "/controller", commsTimeout);
remoteInfoWriteLock.lock(); remoteInfoWriteLock.lock();
try { try {
@ -898,4 +920,35 @@ public class EndpointConnectionPool {
return isSecure; return isSecure;
} }
private class IdEnrichedRemoteDestination implements RemoteDestination {
private final RemoteDestination original;
private final String identifier;
public IdEnrichedRemoteDestination(final RemoteDestination original, final String identifier) {
this.original = original;
this.identifier = identifier;
}
@Override
public String getIdentifier() {
return identifier;
}
@Override
public String getName() {
return original.getName();
}
@Override
public long getYieldPeriod(final TimeUnit timeUnit) {
return original.getYieldPeriod(timeUnit);
}
@Override
public boolean isUseCompression() {
return original.isUseCompression();
}
}
} }

View File

@ -43,7 +43,8 @@ public class SocketClient implements SiteToSiteClient {
private volatile boolean closed = false; private volatile boolean closed = false;
public SocketClient(final SiteToSiteClientConfig config) { public SocketClient(final SiteToSiteClientConfig config) {
pool = new EndpointConnectionPool(config.getUrl(), createRemoteDestination(config.getPortIdentifier()), pool = new EndpointConnectionPool(config.getUrl(),
createRemoteDestination(config.getPortIdentifier(), config.getPortName()),
(int) config.getTimeout(TimeUnit.MILLISECONDS), (int) config.getTimeout(TimeUnit.MILLISECONDS),
(int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS), (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile()); config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
@ -88,13 +89,18 @@ public class SocketClient implements SiteToSiteClient {
} }
private RemoteDestination createRemoteDestination(final String portId) { private RemoteDestination createRemoteDestination(final String portId, final String portName) {
return new RemoteDestination() { return new RemoteDestination() {
@Override @Override
public String getIdentifier() { public String getIdentifier() {
return portId; return portId;
} }
@Override
public String getName() {
return portName;
}
@Override @Override
public long getYieldPeriod(final TimeUnit timeUnit) { public long getYieldPeriod(final TimeUnit timeUnit) {
return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS); return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS);

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.util;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
public class NiFiRestApiUtil {
public static final int RESPONSE_CODE_OK = 200;
private final SSLContext sslContext;
public NiFiRestApiUtil(final SSLContext sslContext) {
this.sslContext = sslContext;
}
private HttpURLConnection getConnection(final String connUrl, final int timeoutMillis) throws IOException {
final URL url = new URL(connUrl);
final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setConnectTimeout(timeoutMillis);
connection.setReadTimeout(timeoutMillis);
// special handling for https
if (sslContext != null && connection instanceof HttpsURLConnection) {
HttpsURLConnection secureConnection = (HttpsURLConnection) connection;
secureConnection.setSSLSocketFactory(sslContext.getSocketFactory());
// check the trusted hostname property and override the HostnameVerifier
secureConnection.setHostnameVerifier(new OverrideHostnameVerifier(url.getHost(),
secureConnection.getHostnameVerifier()));
}
return connection;
}
public ControllerDTO getController(final String url, final int timeoutMillis) throws IOException {
final HttpURLConnection connection = getConnection(url, timeoutMillis);
connection.setRequestMethod("GET");
final int responseCode = connection.getResponseCode();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
StreamUtils.copy(connection.getInputStream(), baos);
final String responseMessage = baos.toString();
if ( responseCode == RESPONSE_CODE_OK ) {
final ObjectMapper mapper = new ObjectMapper();
final JsonNode jsonNode = mapper.readTree(responseMessage);
final JsonNode controllerNode = jsonNode.get("controller");
return mapper.readValue(controllerNode, ControllerDTO.class);
} else {
throw new IOException("Got HTTP response Code " + responseCode + ": " + connection.getResponseMessage() + " with explanation: " + responseMessage);
}
}
private static class OverrideHostnameVerifier implements HostnameVerifier {
private final String trustedHostname;
private final HostnameVerifier delegate;
private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) {
this.trustedHostname = trustedHostname;
this.delegate = delegate;
}
@Override
public boolean verify(String hostname, SSLSession session) {
if (trustedHostname.equalsIgnoreCase(hostname)) {
return true;
}
return delegate.verify(hostname, session);
}
}
}

View File

@ -36,7 +36,7 @@ import org.junit.Test;
public class TestSiteToSiteClient { public class TestSiteToSiteClient {
@Test @Test
@Ignore("For local testing only; not really a unit test but a manual test") //@Ignore("For local testing only; not really a unit test but a manual test")
public void testReceive() throws IOException { public void testReceive() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
@ -58,7 +58,6 @@ public class TestSiteToSiteClient {
final byte[] buff = new byte[(int) size]; final byte[] buff = new byte[(int) size];
StreamUtils.fillBuffer(in, buff); StreamUtils.fillBuffer(in, buff);
System.out.println(buff.length);
Assert.assertNull(transaction.receive()); Assert.assertNull(transaction.receive());
@ -71,7 +70,7 @@ public class TestSiteToSiteClient {
@Test @Test
@Ignore("For local testing only; not really a unit test but a manual test") //@Ignore("For local testing only; not really a unit test but a manual test")
public void testSend() throws IOException { public void testSend() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.remote.util; package org.apache.nifi.remote;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;

View File

@ -56,7 +56,6 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.util.RemoteNiFiUtils;
import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity; import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;