Refactored client and add javadocs

This commit is contained in:
Mark Payne 2015-02-12 08:16:55 -05:00
parent 4ab5c308fd
commit d1e058cde7
15 changed files with 721 additions and 383 deletions

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote;
/**
* Represents the remote entity that the client is communicating with
*/
public interface Communicant {
/**
* Returns the NiFi site-to-site URL for the remote NiFi instance
* @return
*/
String getUrl();
/**
* The Host of the remote NiFi instance
* @return
*/
String getHost();
/**
* The Port that the remote NiFi instance is listening on for site-to-site communications
* @return
*/
int getPort();
/**
* The distinguished name that the remote NiFi instance has provided in its certificate if
* using secure communications, or <code>null</code> if the Distinguished Name is unknown
* @return
*/
String getDistinguishedName();
}

View File

@ -23,12 +23,13 @@ import java.util.Map;
import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.CommunicationsSession;
public class Peer { public class Peer implements Communicant {
private final CommunicationsSession commsSession; private final CommunicationsSession commsSession;
private final String url; private final String url;
private final String clusterUrl; private final String clusterUrl;
private final String host; private final String host;
private final int port;
private final Map<String, Long> penaltyExpirationMap = new HashMap<>(); private final Map<String, Long> penaltyExpirationMap = new HashMap<>();
private boolean closed = false; private boolean closed = false;
@ -39,12 +40,15 @@ public class Peer {
this.clusterUrl = clusterUrl; this.clusterUrl = clusterUrl;
try { try {
this.host = new URI(peerUrl).getHost(); final URI uri = new URI(peerUrl);
this.port = uri.getPort();
this.host = uri.getHost();
} catch (final Exception e) { } catch (final Exception e) {
throw new IllegalArgumentException("Invalid URL: " + peerUrl); throw new IllegalArgumentException("Invalid URL: " + peerUrl);
} }
} }
@Override
public String getUrl() { public String getUrl() {
return url; return url;
} }
@ -92,6 +96,7 @@ public class Peer {
return closed; return closed;
} }
@Override
public String getHost() { public String getHost() {
return host; return host;
} }
@ -127,4 +132,14 @@ public class Peer {
sb.append("]"); sb.append("]");
return sb.toString(); return sb.toString();
} }
@Override
public int getPort() {
return port;
}
@Override
public String getDistinguishedName() {
return commsSession.getUserDn();
}
} }

View File

@ -21,26 +21,33 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.HandshakeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RemoteResourceInitiator { public class RemoteResourceInitiator {
public static final int RESOURCE_OK = 20; public static final int RESOURCE_OK = 20;
public static final int DIFFERENT_RESOURCE_VERSION = 21; public static final int DIFFERENT_RESOURCE_VERSION = 21;
public static final int ABORT = 255; public static final int ABORT = 255;
private static final Logger logger = LoggerFactory.getLogger(RemoteResourceInitiator.class);
public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
// Write the classname of the RemoteStreamCodec, followed by its version // Write the classname of the RemoteStreamCodec, followed by its version
logger.debug("Negotiating resource; proposal is {}", resource);
dos.writeUTF(resource.getResourceName()); dos.writeUTF(resource.getResourceName());
final VersionNegotiator negotiator = resource.getVersionNegotiator(); final VersionNegotiator negotiator = resource.getVersionNegotiator();
dos.writeInt(negotiator.getVersion()); dos.writeInt(negotiator.getVersion());
dos.flush(); dos.flush();
// wait for response from server. // wait for response from server.
logger.debug("Receiving response from remote instance");
final int statusCode = dis.read(); final int statusCode = dis.read();
switch (statusCode) { switch (statusCode) {
case RESOURCE_OK: // server accepted our proposal of codec name/version case RESOURCE_OK: // server accepted our proposal of codec name/version
logger.debug("Response was RESOURCE_OK");
return resource; return resource;
case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version
logger.debug("Response was DIFFERENT_RESOURCE_VERSION");
// Get server's preferred version // Get server's preferred version
final int newVersion = dis.readInt(); final int newVersion = dis.readInt();
@ -56,8 +63,10 @@ public class RemoteResourceInitiator {
// Attempt negotiation of resource based on our new preferred version. // Attempt negotiation of resource based on our new preferred version.
return initiateResourceNegotiation(resource, dis, dos); return initiateResourceNegotiation(resource, dis, dos);
case ABORT: case ABORT:
logger.debug("Response was ABORT");
throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF()); throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
default: default:
logger.debug("Response was {}; unable to negotiate codec", statusCode);
return null; // Unable to negotiate codec return null; // Unable to negotiate codec
} }
} }

View File

@ -123,28 +123,14 @@ public interface Transaction {
/** /**
* <p> * <p>
* Completes the transaction and indicates to both the sender and receiver that the data transfer was * Completes the transaction and indicates to both the sender and receiver that the data transfer was
* successful. If receiving data, this method can also optionally request that the sender back off sending
* data for a short period of time. This is used, for instance, to apply backpressure or to notify the sender
* that the receiver is not ready to receive data and made not service another request in the short term.
* </p>
*
* @param requestBackoff if <code>true</code> and the TransferDirection is RECEIVE, indicates to sender that it
* should back off sending data for a short period of time. If <code>false</code> or if the TransferDirection of
* this Transaction is SEND, then this argument is ignored.
*
* @throws IOException
*/
void complete(boolean requestBackoff) throws IOException;
/**
* <p>
* Completes the transaction and indicates to both the sender and receiver that the data transfer was
* successful. * successful.
* </p> * </p>
* *
* @throws IOException * @throws IOException
*
* @return a TransactionCompletion that contains details about the Transaction
*/ */
void complete() throws IOException; TransactionCompletion complete() throws IOException;
/** /**
* <p> * <p>
@ -174,6 +160,13 @@ public interface Transaction {
*/ */
TransactionState getState() throws IOException; TransactionState getState() throws IOException;
/**
* Returns a Communicant that represents the other side of this Transaction (i.e.,
* the remote NiFi instance)
* @return
*/
Communicant getCommunicant();
public enum TransactionState { public enum TransactionState {
/** /**

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.remote.protocol.DataPacket;
/**
* A TransactionCompletion provides information about a {@link Transaction} that has completed successfully.
*/
public interface TransactionCompletion {
/**
* When a sending to a NiFi instance, the server may accept the content sent to it
* but indicate that its queues are full and that the client should backoff sending
* data for a bit. This method returns <code>true</code> if the server did in fact
* request that, <code>false</code> otherwise.
* @return
*/
boolean isBackoff();
/**
* Returns the number of Data Packets that were sent to or received from the remote
* NiFi instance in the Transaction
* @return
*/
int getDataPacketsTransferred();
/**
* Returns the number of bytes of DataPacket content that were sent to or received from
* the remote NiFI instance in the Transaction. Note that this is different than the number
* of bytes actually transferred between the client and server, as it does not take into
* account the attributes or protocol-specific information that is exchanged but rather
* takes into account only the data in the {@link InputStream} of the {@link DataPacket}
* @return
*/
long getBytesTransferred();
/**
* Returns the amount of time that the Transaction took, from the time that the Transaction
* was created to the time that the Transaction was completed.
* @param timeUnit
* @return
*/
long getDuration(TimeUnit timeUnit);
}

View File

@ -126,6 +126,7 @@ public interface SiteToSiteClient extends Closeable {
private String url; private String url;
private long timeoutNanos = TimeUnit.SECONDS.toNanos(30); private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
private long penalizationNanos = TimeUnit.SECONDS.toNanos(3); private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L);
private SSLContext sslContext; private SSLContext sslContext;
private EventReporter eventReporter; private EventReporter eventReporter;
private File peerPersistenceFile; private File peerPersistenceFile;
@ -162,6 +163,19 @@ public interface SiteToSiteClient extends Closeable {
return this; return this;
} }
/**
* Specifies the amount of time that a connection can remain idle in the connection pool before it
* is "expired" and shutdown. The default value is 30 seconds.
*
* @param timeout
* @param unit
* @return
*/
public Builder idleExpiration(final long timeout, final TimeUnit unit) {
this.idleExpirationNanos = unit.toNanos(timeout);
return this;
}
/** /**
* If there is a problem communicating with a node (i.e., any node in the remote NiFi cluster * If there is a problem communicating with a node (i.e., any node in the remote NiFi cluster
* or the remote instance of NiFi if it is standalone), specifies how long the client should * or the remote instance of NiFi if it is standalone), specifies how long the client should
@ -326,6 +340,11 @@ public interface SiteToSiteClient extends Closeable {
return Builder.this.getTimeout(timeUnit); return Builder.this.getTimeout(timeUnit);
} }
@Override
public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
return Builder.this.getIdleConnectionExpiration(timeUnit);
}
@Override @Override
public SSLContext getSslContext() { public SSLContext getSslContext() {
return Builder.this.getSslContext(); return Builder.this.getSslContext();
@ -384,13 +403,23 @@ public interface SiteToSiteClient extends Closeable {
} }
/** /**
* Returns the communications timeout in nanoseconds * Returns the communications timeout
* @return * @return
*/ */
public long getTimeout(final TimeUnit timeUnit) { public long getTimeout(final TimeUnit timeUnit) {
return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS); return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS);
} }
/**
* Returns the amount of of time that a connection can remain idle in the connection
* pool before being shutdown
* @param timeUnit
* @return
*/
public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS);
}
/** /**
* Returns the amount of time that a particular node will be ignored after a * Returns the amount of time that a particular node will be ignored after a
* communications error with that node occurs * communications error with that node occurs

View File

@ -38,6 +38,14 @@ public interface SiteToSiteClientConfig {
*/ */
long getTimeout(final TimeUnit timeUnit); long getTimeout(final TimeUnit timeUnit);
/**
* Returns the amount of time that a connection can remain idle before it is
* "expired" and shut down
* @param timeUnit
* @return
*/
long getIdleConnectionExpiration(TimeUnit timeUnit);
/** /**
* Returns the amount of time that a particular node will be ignored after a * Returns the amount of time that a particular node will be ignored after a
* communications error with that node occurs * communications error with that node occurs
@ -52,12 +60,6 @@ public interface SiteToSiteClientConfig {
*/ */
SSLContext getSslContext(); SSLContext getSslContext();
/**
* Returns the EventReporter that is to be used by clients to report events
* @return
*/
EventReporter getEventReporter();
/** /**
* Returns the file that is to be used for persisting the nodes of a remote cluster, if any. * Returns the file that is to be used for persisting the nodes of a remote cluster, if any.
* @return * @return
@ -111,4 +113,11 @@ public interface SiteToSiteClientConfig {
* @return * @return
*/ */
int getPreferredBatchCount(); int getPreferredBatchCount();
/**
* Returns the EventReporter that is to be used by clients to report events
* @return
*/
EventReporter getEventReporter();
} }

View File

@ -107,12 +107,13 @@ public class EndpointConnectionPool {
private volatile List<PeerStatus> peerStatuses; private volatile List<PeerStatus> peerStatuses;
private volatile long peerRefreshTime = 0L; private volatile long peerRefreshTime = 0L;
private volatile PeerStatusCache peerStatusCache; private volatile PeerStatusCache peerStatusCache;
private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>(); private final Set<EndpointConnection> activeConnections = Collections.synchronizedSet(new HashSet<EndpointConnection>());
private final File peersFile; private final File peersFile;
private final EventReporter eventReporter; private final EventReporter eventReporter;
private final SSLContext sslContext; private final SSLContext sslContext;
private final ScheduledExecutorService taskExecutor; private final ScheduledExecutorService taskExecutor;
private final int idleExpirationMillis;
private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock(); private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock();
private final Lock remoteInfoReadLock = listeningPortRWLock.readLock(); private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
@ -124,12 +125,18 @@ public class EndpointConnectionPool {
private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier
private volatile int commsTimeout; private volatile int commsTimeout;
private volatile boolean shutdown = false;
public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final EventReporter eventReporter, final File persistenceFile) {
this(clusterUrl, commsTimeoutMillis, null, eventReporter, persistenceFile); public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis,
final EventReporter eventReporter, final File persistenceFile)
{
this(clusterUrl, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile);
} }
public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) { public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis,
final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile)
{
try { try {
this.clusterUrl = new URI(clusterUrl); this.clusterUrl = new URI(clusterUrl);
} catch (final URISyntaxException e) { } catch (final URISyntaxException e) {
@ -147,6 +154,7 @@ public class EndpointConnectionPool {
this.peersFile = persistenceFile; this.peersFile = persistenceFile;
this.eventReporter = eventReporter; this.eventReporter = eventReporter;
this.commsTimeout = commsTimeoutMillis; this.commsTimeout = commsTimeoutMillis;
this.idleExpirationMillis = idleExpirationMillis;
Set<PeerStatus> recoveredStatuses; Set<PeerStatus> recoveredStatuses;
if ( persistenceFile != null && persistenceFile.exists() ) { if ( persistenceFile != null && persistenceFile.exists() ) {
@ -225,19 +233,21 @@ public class EndpointConnectionPool {
// if we can't get an existing Connection, create one // if we can't get an existing Connection, create one
if ( connection == null ) { if ( connection == null ) {
logger.debug("No Connection available for Port {}; creating new Connection", remoteDestination.getIdentifier()); logger.debug("{} No Connection available for Port {}; creating new Connection", this, remoteDestination.getIdentifier());
protocol = new SocketClientProtocol(); protocol = new SocketClientProtocol();
protocol.setDestination(remoteDestination); protocol.setDestination(remoteDestination);
logger.debug("{} getting next peer status", this);
final PeerStatus peerStatus = getNextPeerStatus(direction); final PeerStatus peerStatus = getNextPeerStatus(direction);
logger.debug("{} next peer status = {}", this, peerStatus);
if ( peerStatus == null ) { if ( peerStatus == null ) {
return null; return null;
} }
try { try {
logger.debug("{} Establishing site-to-site connection with {}", this, peerStatus);
commsSession = establishSiteToSiteConnection(peerStatus); commsSession = establishSiteToSiteConnection(peerStatus);
} catch (final IOException ioe) { } catch (final IOException ioe) {
// TODO: penalize peer status
penalize(peerStatus, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); penalize(peerStatus, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
throw ioe; throw ioe;
} }
@ -245,6 +255,7 @@ public class EndpointConnectionPool {
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
try { try {
logger.debug("{} Negotiating protocol", this);
RemoteResourceInitiator.initiateResourceNegotiation(protocol, dis, dos); RemoteResourceInitiator.initiateResourceNegotiation(protocol, dis, dos);
} catch (final HandshakeException e) { } catch (final HandshakeException e) {
try { try {
@ -267,6 +278,7 @@ public class EndpointConnectionPool {
// perform handshake // perform handshake
try { try {
logger.debug("{} performing handshake", this);
protocol.handshake(peer); protocol.handshake(peer);
// handle error cases // handle error cases
@ -286,7 +298,9 @@ public class EndpointConnectionPool {
} }
// negotiate the FlowFileCodec to use // negotiate the FlowFileCodec to use
logger.debug("{} negotiating codec", this);
codec = protocol.negotiateCodec(peer); codec = protocol.negotiateCodec(peer);
logger.debug("{} negotiated codec is {}", this, codec);
} catch (final PortNotRunningException | UnknownPortException e) { } catch (final PortNotRunningException | UnknownPortException e) {
throw e; throw e;
} catch (final Exception e) { } catch (final Exception e) {
@ -323,6 +337,7 @@ public class EndpointConnectionPool {
} }
} }
activeConnections.add(connection);
return connection; return connection;
} }
@ -338,8 +353,15 @@ public class EndpointConnectionPool {
return false; return false;
} }
activeConnections.remove(endpointConnection);
if ( shutdown ) {
terminate(endpointConnection);
return false;
} else {
endpointConnection.setLastTimeUsed();
return connectionQueue.offer(endpointConnection); return connectionQueue.offer(endpointConnection);
} }
}
private void penalize(final PeerStatus status, final long penalizationMillis) { private void penalize(final PeerStatus status, final long penalizationMillis) {
Long expiration = peerTimeoutExpirations.get(status); Long expiration = peerTimeoutExpirations.get(status);
@ -393,11 +415,19 @@ public class EndpointConnectionPool {
} }
} }
private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
return (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
}
private PeerStatus getNextPeerStatus(final TransferDirection direction) { private PeerStatus getNextPeerStatus(final TransferDirection direction) {
List<PeerStatus> peerList = peerStatuses; List<PeerStatus> peerList = peerStatuses;
if ( (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) ) { if ( isPeerRefreshNeeded(peerList) ) {
peerRefreshLock.lock(); peerRefreshLock.lock();
try { try {
// now that we have the lock, check again that we need to refresh (because another thread
// could have been refreshing while we were waiting for the lock).
peerList = peerStatuses;
if (isPeerRefreshNeeded(peerList)) {
try { try {
peerList = createPeerStatusList(direction); peerList = createPeerStatusList(direction);
} catch (final Exception e) { } catch (final Exception e) {
@ -414,6 +444,7 @@ public class EndpointConnectionPool {
this.peerStatuses = peerList; this.peerStatuses = peerList;
peerRefreshTime = System.currentTimeMillis(); peerRefreshTime = System.currentTimeMillis();
}
} finally { } finally {
peerRefreshLock.unlock(); peerRefreshLock.unlock();
} }
@ -488,7 +519,10 @@ public class EndpointConnectionPool {
private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException { private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException {
final String hostname = clusterUrl.getHost(); final String hostname = clusterUrl.getHost();
final int port = getSiteToSitePort(); final Integer port = getSiteToSitePort();
if ( port == null ) {
throw new IOException("Remote instance of NiFi is not configured to allow site-to-site communications");
}
final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port); final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString()); final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
@ -667,7 +701,7 @@ public class EndpointConnectionPool {
distributionDescription.append("New Weighted Distribution of Nodes:"); distributionDescription.append("New Weighted Distribution of Nodes:");
for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) { for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) {
final double percentage = entry.getValue() * 100D / (double) destinations.size(); final double percentage = entry.getValue() * 100D / (double) destinations.size();
distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of FlowFiles"); distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
} }
logger.info(distributionDescription.toString()); logger.info(distributionDescription.toString());
@ -677,35 +711,36 @@ public class EndpointConnectionPool {
private void cleanupExpiredSockets() { private void cleanupExpiredSockets() {
final List<EndpointConnection> states = new ArrayList<>(); final List<EndpointConnection> connections = new ArrayList<>();
EndpointConnection state; EndpointConnection connection;
while ((state = connectionQueue.poll()) != null) { while ((connection = connectionQueue.poll()) != null) {
// If the socket has not been used in 10 seconds, shut it down. // If the socket has not been used in 10 seconds, shut it down.
final long lastUsed = state.getLastTimeUsed(); final long lastUsed = connection.getLastTimeUsed();
if ( lastUsed < System.currentTimeMillis() - 10000L ) { if ( lastUsed < System.currentTimeMillis() - idleExpirationMillis ) {
try { try {
state.getSocketClientProtocol().shutdown(state.getPeer()); connection.getSocketClientProtocol().shutdown(connection.getPeer());
} catch (final Exception e) { } catch (final Exception e) {
logger.debug("Failed to shut down {} using {} due to {}", logger.debug("Failed to shut down {} using {} due to {}",
new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} ); new Object[] {connection.getSocketClientProtocol(), connection.getPeer(), e} );
} }
cleanup(state.getSocketClientProtocol(), state.getPeer()); terminate(connection);
} else { } else {
states.add(state); connections.add(connection);
} }
} }
connectionQueue.addAll(states); connectionQueue.addAll(connections);
} }
public void shutdown() { public void shutdown() {
shutdown = true;
taskExecutor.shutdown(); taskExecutor.shutdown();
peerTimeoutExpirations.clear(); peerTimeoutExpirations.clear();
for ( final CommunicationsSession commsSession : activeCommsChannels ) { for ( final EndpointConnection conn : activeConnections ) {
commsSession.interrupt(); conn.getPeer().getCommunicationsSession().interrupt();
} }
EndpointConnection state; EndpointConnection state;
@ -714,8 +749,8 @@ public class EndpointConnectionPool {
} }
} }
public void terminate(final EndpointConnection state) { public void terminate(final EndpointConnection connection) {
cleanup(state.getSocketClientProtocol(), state.getPeer()); cleanup(connection.getSocketClientProtocol(), connection.getPeer());
} }
private void refreshPeers() { private void refreshPeers() {

View File

@ -19,8 +19,10 @@ package org.apache.nifi.remote.client.socket;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.nifi.remote.Communicant;
import org.apache.nifi.remote.RemoteDestination; import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransactionCompletion;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.client.SiteToSiteClientConfig;
@ -41,6 +43,7 @@ public class SocketClient implements SiteToSiteClient {
public SocketClient(final SiteToSiteClientConfig config) { public SocketClient(final SiteToSiteClientConfig config) {
pool = new EndpointConnectionPool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS), pool = new EndpointConnectionPool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
(int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile()); config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
this.config = config; this.config = config;
@ -130,14 +133,9 @@ public class SocketClient implements SiteToSiteClient {
} }
@Override @Override
public void complete() throws IOException { public TransactionCompletion complete() throws IOException {
complete(false);
}
@Override
public void complete(final boolean requestBackoff) throws IOException {
try { try {
transaction.complete(requestBackoff); return transaction.complete();
} finally { } finally {
final EndpointConnection state = connectionStateRef.get(); final EndpointConnection state = connectionStateRef.get();
if ( state != null ) { if ( state != null ) {
@ -188,6 +186,10 @@ public class SocketClient implements SiteToSiteClient {
return transaction.getState(); return transaction.getState();
} }
@Override
public Communicant getCommunicant() {
return transaction.getCommunicant();
}
}; };
} }

View File

@ -317,10 +317,7 @@ public class SocketClientProtocol implements ClientProtocol {
// Commit the session so that we have persisted the data // Commit the session so that we have persisted the data
session.commit(); session.commit();
// We want to apply backpressure if the outgoing connections are full. I.e., there are no available relationships. transaction.complete();
final boolean applyBackpressure = context.getAvailableRelationships().isEmpty();
transaction.complete(applyBackpressure);
logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
if ( !flowFilesReceived.isEmpty() ) { if ( !flowFilesReceived.isEmpty() ) {
@ -397,7 +394,7 @@ public class SocketClientProtocol implements ClientProtocol {
final String dataSize = FormatUtils.formatDataSize(bytesSent); final String dataSize = FormatUtils.formatDataSize(bytesSent);
session.commit(); session.commit();
transaction.complete(false); transaction.complete();
final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] { logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {

View File

@ -25,8 +25,10 @@ import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream; import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream; import java.util.zip.CheckedOutputStream;
import org.apache.nifi.remote.Communicant;
import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransactionCompletion;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.ProtocolException;
@ -40,7 +42,7 @@ import org.slf4j.LoggerFactory;
public class SocketClientTransaction implements Transaction { public class SocketClientTransaction implements Transaction {
private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class); private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class);
private final long creationNanoTime = System.nanoTime();
private final CRC32 crc = new CRC32(); private final CRC32 crc = new CRC32();
private final int protocolVersion; private final int protocolVersion;
private final FlowFileCodec codec; private final FlowFileCodec codec;
@ -54,6 +56,7 @@ public class SocketClientTransaction implements Transaction {
private boolean dataAvailable = false; private boolean dataAvailable = false;
private int transfers = 0; private int transfers = 0;
private long contentBytes = 0;
private TransactionState state; private TransactionState state;
SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec, SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec,
@ -107,6 +110,7 @@ public class SocketClientTransaction implements Transaction {
@Override @Override
public DataPacket receive() throws IOException { public DataPacket receive() throws IOException {
try {
try { try {
if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
throw new IllegalStateException("Cannot receive data because Transaction State is " + state); throw new IllegalStateException("Cannot receive data because Transaction State is " + state);
@ -152,10 +156,14 @@ public class SocketClientTransaction implements Transaction {
this.dataAvailable = false; this.dataAvailable = false;
} else { } else {
transfers++; transfers++;
contentBytes += packet.getSize();
} }
this.state = TransactionState.DATA_EXCHANGED; this.state = TransactionState.DATA_EXCHANGED;
return packet; return packet;
} catch (final IOException ioe) {
throw new IOException("Failed to receive data from " + peer + " due to " + ioe, ioe);
}
} catch (final Exception e) { } catch (final Exception e) {
error(); error();
throw e; throw e;
@ -164,7 +172,8 @@ public class SocketClientTransaction implements Transaction {
@Override @Override
public void send(DataPacket dataPacket) throws IOException { public void send(final DataPacket dataPacket) throws IOException {
try {
try { try {
if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
throw new IllegalStateException("Cannot send data because Transaction State is " + state); throw new IllegalStateException("Cannot send data because Transaction State is " + state);
@ -192,7 +201,11 @@ public class SocketClientTransaction implements Transaction {
} }
transfers++; transfers++;
contentBytes += dataPacket.getSize();
this.state = TransactionState.DATA_EXCHANGED; this.state = TransactionState.DATA_EXCHANGED;
} catch (final IOException ioe) {
throw new IOException("Failed to send data to " + peer + " due to " + ioe, ioe);
}
} catch (final Exception e) { } catch (final Exception e) {
error(); error();
throw e; throw e;
@ -211,39 +224,30 @@ public class SocketClientTransaction implements Transaction {
state = TransactionState.TRANSACTION_CANCELED; state = TransactionState.TRANSACTION_CANCELED;
} catch (final IOException ioe) { } catch (final IOException ioe) {
error(); error();
throw ioe; throw new IOException("Failed to send 'cancel transaction' message to " + peer + " due to " + ioe, ioe);
} }
} }
@Override
public void complete() throws IOException {
complete(false);
}
@Override @Override
public void complete(boolean requestBackoff) throws IOException { public TransactionCompletion complete() throws IOException {
try {
try { try {
if ( state != TransactionState.TRANSACTION_CONFIRMED ) { if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
throw new IllegalStateException("Cannot complete transaction because state is " + state + throw new IllegalStateException("Cannot complete transaction because state is " + state +
"; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED); "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
} }
boolean backoff = false;
if ( direction == TransferDirection.RECEIVE ) { if ( direction == TransferDirection.RECEIVE ) {
if ( transfers == 0 ) { if ( transfers == 0 ) {
state = TransactionState.TRANSACTION_COMPLETED; state = TransactionState.TRANSACTION_COMPLETED;
return; return new SocketClientTransactionCompletion(false, 0, 0L, System.nanoTime() - creationNanoTime);
} }
if ( requestBackoff ) {
// Confirm that we received the data and the peer can now discard it but that the peer should not
// send any more data for a bit
logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
} else {
// Confirm that we received the data and the peer can now discard it // Confirm that we received the data and the peer can now discard it
logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
ResponseCode.TRANSACTION_FINISHED.writeResponse(dos); ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
}
state = TransactionState.TRANSACTION_COMPLETED; state = TransactionState.TRANSACTION_COMPLETED;
} else { } else {
@ -258,11 +262,17 @@ public class SocketClientTransaction implements Transaction {
logger.debug("{} Received {} from {}", this, transactionResponse, peer); logger.debug("{} Received {} from {}", this, transactionResponse, peer);
if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
peer.penalize(destinationId, penaltyMillis); peer.penalize(destinationId, penaltyMillis);
backoff = true;
} else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
} }
state = TransactionState.TRANSACTION_COMPLETED; state = TransactionState.TRANSACTION_COMPLETED;
}
return new SocketClientTransactionCompletion(backoff, transfers, contentBytes, System.nanoTime() - creationNanoTime);
} catch (final IOException ioe) {
throw new IOException("Failed to complete transaction with " + peer + " due to " + ioe, ioe);
} }
} catch (final Exception e) { } catch (final Exception e) {
error(); error();
@ -273,6 +283,7 @@ public class SocketClientTransaction implements Transaction {
@Override @Override
public void confirm() throws IOException { public void confirm() throws IOException {
try {
try { try {
if ( state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE ) { if ( state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE ) {
// client requested to receive data but no data available. no need to confirm. // client requested to receive data but no data available. no need to confirm.
@ -349,6 +360,9 @@ public class SocketClientTransaction implements Transaction {
state = TransactionState.TRANSACTION_CONFIRMED; state = TransactionState.TRANSACTION_CONFIRMED;
} }
} catch (final IOException ioe) {
throw new IOException("Failed to confirm transaction with " + peer + " due to " + ioe, ioe);
}
} catch (final Exception e) { } catch (final Exception e) {
error(); error();
throw e; throw e;
@ -365,4 +379,13 @@ public class SocketClientTransaction implements Transaction {
return state; return state;
} }
@Override
public Communicant getCommunicant() {
return peer;
}
@Override
public String toString() {
return "SocketClientTransaction[Url=" + peer.getUrl() + ", TransferDirection=" + direction + ", State=" + state + "]";
}
} }

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.protocol.socket;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.remote.TransactionCompletion;
public class SocketClientTransactionCompletion implements TransactionCompletion {
private final boolean backoff;
private final int dataPacketsTransferred;
private final long bytesTransferred;
private final long durationNanos;
public SocketClientTransactionCompletion(final boolean backoff, final int dataPacketsTransferred, final long bytesTransferred, final long durationNanos) {
this.backoff = backoff;
this.dataPacketsTransferred = dataPacketsTransferred;
this.bytesTransferred = bytesTransferred;
this.durationNanos = durationNanos;
}
@Override
public boolean isBackoff() {
return backoff;
}
@Override
public int getDataPacketsTransferred() {
return dataPacketsTransferred;
}
@Override
public long getBytesTransferred() {
return bytesTransferred;
}
@Override
public long getDuration(final TimeUnit timeUnit) {
return timeUnit.convert(durationNanos, TimeUnit.NANOSECONDS);
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.TransferDirection;
@ -35,13 +36,13 @@ import org.junit.Test;
public class TestSiteToSiteClient { public class TestSiteToSiteClient {
@Test @Test
@Ignore("For local testing only; not really a unit test but a manual test") //@Ignore("For local testing only; not really a unit test but a manual test")
public void testReceive() throws IOException { public void testReceive() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
final SiteToSiteClient client = new SiteToSiteClient.Builder() final SiteToSiteClient client = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi") .url("http://localhost:8080/nifi")
.portName("out") .portName("cba")
.requestBatchCount(1) .requestBatchCount(1)
.build(); .build();
@ -62,7 +63,7 @@ public class TestSiteToSiteClient {
Assert.assertNull(transaction.receive()); Assert.assertNull(transaction.receive());
transaction.confirm(); transaction.confirm();
transaction.complete(false); transaction.complete();
} finally { } finally {
client.close(); client.close();
} }
@ -70,13 +71,14 @@ public class TestSiteToSiteClient {
@Test @Test
@Ignore("For local testing only; not really a unit test but a manual test") //@Ignore("For local testing only; not really a unit test but a manual test")
public void testSend() throws IOException { public void testSend() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
final SiteToSiteClient client = new SiteToSiteClient.Builder() final SiteToSiteClient client = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi") .url("http://10.0.64.63:8080/nifi")
.portName("in") .portName("input")
.nodePenalizationPeriod(10, TimeUnit.MILLISECONDS)
.build(); .build();
try { try {
@ -91,7 +93,7 @@ public class TestSiteToSiteClient {
transaction.send(packet); transaction.send(packet);
transaction.confirm(); transaction.confirm();
transaction.complete(false); transaction.complete();
} finally { } finally {
client.close(); client.close();
} }

View File

@ -22,6 +22,7 @@ import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
@ -130,7 +131,9 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
final Thread thread = new Thread(new Runnable() { final Thread thread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
String hostname = socket.getInetAddress().getHostName(); LOG.debug("{} Determining URL of connection", this);
final InetAddress inetAddress = socket.getInetAddress();
String hostname = inetAddress.getHostName();
final int slashIndex = hostname.indexOf("/"); final int slashIndex = hostname.indexOf("/");
if ( slashIndex == 0 ) { if ( slashIndex == 0 ) {
hostname = hostname.substring(1); hostname = hostname.substring(1);
@ -140,6 +143,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
final int port = socket.getPort(); final int port = socket.getPort();
final String peerUri = "nifi://" + hostname + ":" + port; final String peerUri = "nifi://" + hostname + ":" + port;
LOG.debug("{} Connection URL is {}", this, peerUri);
final CommunicationsSession commsSession; final CommunicationsSession commsSession;
final String dn; final String dn;
@ -154,6 +158,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
dn = sslSocketChannel.getDn(); dn = sslSocketChannel.getDn();
commsSession.setUserDn(dn); commsSession.setUserDn(dn);
} else { } else {
LOG.trace("{} Channel is not secure", this);
commsSession = new SocketChannelCommunicationsSession(socketChannel, peerUri); commsSession = new SocketChannelCommunicationsSession(socketChannel, peerUri);
dn = null; dn = null;
} }
@ -306,6 +311,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
} }
}); });
thread.setName("Site-to-Site Worker Thread-" + (threadCount++)); thread.setName("Site-to-Site Worker Thread-" + (threadCount++));
LOG.debug("Handing connection to {}", thread);
thread.start(); thread.start();
} }
} }

View File

@ -18,6 +18,7 @@ package org.apache.nifi.remote;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
@ -25,8 +26,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@ -34,28 +33,30 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.remote.client.socket.EndpointConnection; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.remote.client.socket.EndpointConnectionPool; import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.protocol.ClientProtocol; import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.reporting.Severity; import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class StandardRemoteGroupPort extends RemoteGroupPort { public class StandardRemoteGroupPort extends RemoteGroupPort {
private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
public static final String USER_AGENT = "NiFi-Site-to-Site"; public static final String USER_AGENT = "NiFi-Site-to-Site";
public static final String CONTENT_TYPE = "application/octet-stream"; public static final String CONTENT_TYPE = "application/octet-stream";
@ -71,11 +72,8 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
private final SSLContext sslContext; private final SSLContext sslContext;
private final TransferDirection transferDirection; private final TransferDirection transferDirection;
private final AtomicReference<EndpointConnectionPool> connectionPoolRef = new AtomicReference<>(); private final AtomicReference<SiteToSiteClient> clientRef = new AtomicReference<>();
private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
private final Lock interruptLock = new ReentrantLock();
private boolean shutdown = false; // guarded by codecLock
public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup, public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup,
final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler) { final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler) {
@ -112,16 +110,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
@Override @Override
public void shutdown() { public void shutdown() {
super.shutdown(); super.shutdown();
interruptLock.lock();
try {
this.shutdown = true;
} finally {
interruptLock.unlock();
}
final EndpointConnectionPool pool = connectionPoolRef.get(); final SiteToSiteClient client = clientRef.get();
if ( pool != null ) { if ( client != null ) {
pool.shutdown(); try {
client.close();
} catch (final IOException ioe) {
logger.warn("Failed to properly shutdown Site-to-Site Client due to {}", ioe);
}
} }
} }
@ -129,17 +125,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
public void onSchedulingStart() { public void onSchedulingStart() {
super.onSchedulingStart(); super.onSchedulingStart();
interruptLock.lock(); final SiteToSiteClient client = new SiteToSiteClient.Builder()
try { .url(remoteGroup.getTargetUri().toString())
this.shutdown = false; .portIdentifier(getIdentifier())
} finally { .sslContext(sslContext)
interruptLock.unlock(); .eventReporter(remoteGroup.getEventReporter())
} .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
.build();
final EndpointConnectionPool connectionPool = new EndpointConnectionPool(remoteGroup.getTargetUri().toString(), clientRef.set(client);
remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS),
sslContext, remoteGroup.getEventReporter(), getPeerPersistenceFile(getIdentifier()));
connectionPoolRef.set(connectionPool);
} }
@ -157,10 +150,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
String url = getRemoteProcessGroup().getTargetUri().toString(); String url = getRemoteProcessGroup().getTargetUri().toString();
final EndpointConnectionPool connectionPool = connectionPoolRef.get(); final SiteToSiteClient client = clientRef.get();
final EndpointConnection connection; final Transaction transaction;
try { try {
connection = connectionPool.getEndpointConnection(this, transferDirection); transaction = client.createTransaction(transferDirection);
} catch (final PortNotRunningException e) { } catch (final PortNotRunningException e) {
context.yield(); context.yield();
this.targetRunning.set(false); this.targetRunning.set(false);
@ -186,95 +179,36 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return; return;
} }
if ( connection == null ) { if ( transaction == null ) {
logger.debug("{} Unable to determine the next peer to communicate with; all peers must be penalized, so yielding context", this); logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this);
context.yield(); context.yield();
return; return;
} }
FlowFileCodec codec = connection.getCodec();
SocketClientProtocol protocol = connection.getSocketClientProtocol();
final Peer peer = connection.getPeer();
url = peer.getUrl();
try { try {
interruptLock.lock();
try {
if ( shutdown ) {
peer.getCommunicationsSession().interrupt();
}
activeCommsChannels.add(peer.getCommunicationsSession());
} finally {
interruptLock.unlock();
}
if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) { if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
transferFlowFiles(peer, protocol, context, session, codec); transferFlowFiles(transaction, context, session);
} else { } else {
final int numReceived = receiveFlowFiles(peer, protocol, context, session, codec); final int numReceived = receiveFlowFiles(transaction, context, session);
if ( numReceived == 0 ) { if ( numReceived == 0 ) {
context.yield(); context.yield();
} }
} }
interruptLock.lock();
try {
if ( shutdown ) {
peer.getCommunicationsSession().interrupt();
}
activeCommsChannels.remove(peer.getCommunicationsSession());
} finally {
interruptLock.unlock();
}
session.commit(); session.commit();
connection.setLastTimeUsed();
connectionPool.offer(connection);
} catch (final TransmissionDisabledException e) {
cleanup(protocol, peer);
session.rollback();
} catch (final Exception e) { } catch (final Exception e) {
connectionPool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS)); final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, e.toString());
logger.error("{} failed to communicate with remote NiFi instance due to {}", this, e.toString());
final String message = String.format("%s failed to communicate with %s (%s) due to %s", this, peer == null ? url : peer, protocol, e.toString());
logger.error(message);
if ( logger.isDebugEnabled() ) { if ( logger.isDebugEnabled() ) {
logger.error("", e); logger.error("", e);
} }
cleanup(protocol, peer);
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
session.rollback(); session.rollback();
} }
} }
private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
if ( protocol != null && peer != null ) {
try {
protocol.shutdown(peer);
} catch (final TransmissionDisabledException e) {
// User disabled transmission.... do nothing.
logger.debug(this + " Transmission Disabled by User");
} catch (IOException e1) {
}
}
if ( peer != null ) {
try {
peer.close();
} catch (final TransmissionDisabledException e) {
// User disabled transmission.... do nothing.
logger.debug(this + " Transmission Disabled by User");
} catch (IOException e1) {
}
}
}
@Override @Override
public String getYieldPeriod() { public String getYieldPeriod() {
// delegate yield duration to remote process group // delegate yield duration to remote process group
@ -282,12 +216,129 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
} }
private int transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException {
return protocol.transferFlowFiles(peer, context, session, codec); FlowFile flowFile = session.get();
if (flowFile == null) {
return 0;
} }
private int receiveFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { try {
return protocol.receiveFlowFiles(peer, context, session, codec); final String userDn = transaction.getCommunicant().getDistinguishedName();
final long startSendingNanos = System.nanoTime();
final StopWatch stopWatch = new StopWatch(true);
long bytesSent = 0L;
final Set<FlowFile> flowFilesSent = new HashSet<>();
boolean continueTransaction = true;
while (continueTransaction) {
final long startNanos = System.nanoTime();
// call codec.encode within a session callback so that we have the InputStream to read the FlowFile
final FlowFile toWrap = flowFile;
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
transaction.send(dataPacket);
}
});
final long transferNanos = System.nanoTime() - startNanos;
final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
flowFilesSent.add(flowFile);
bytesSent += flowFile.getSize();
logger.debug("{} Sent {} to {}", this, flowFile, transaction.getCommunicant().getUrl());
final String transitUri = transaction.getCommunicant().getUrl() + "/" + flowFile.getAttribute(CoreAttributes.UUID.key());
session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false);
session.remove(flowFile);
final long sendingNanos = System.nanoTime() - startSendingNanos;
if ( sendingNanos < BATCH_SEND_NANOS ) {
flowFile = session.get();
} else {
flowFile = null;
}
continueTransaction = (flowFile != null);
}
transaction.confirm();
// consume input stream entirely, ignoring its contents. If we
// don't do this, the Connection will not be returned to the pool
stopWatch.stop();
final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final String dataSize = FormatUtils.formatDataSize(bytesSent);
session.commit();
transaction.complete();
final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
return flowFilesSent.size();
} catch (final Exception e) {
session.rollback();
throw e;
}
}
private int receiveFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException {
final String userDn = transaction.getCommunicant().getDistinguishedName();
final StopWatch stopWatch = new StopWatch(true);
final Set<FlowFile> flowFilesReceived = new HashSet<>();
long bytesReceived = 0L;
while (true) {
final long start = System.nanoTime();
final DataPacket dataPacket = transaction.receive();
if ( dataPacket == null ) {
break;
}
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
flowFile = session.importFrom(dataPacket.getData(), flowFile);
final long receiveNanos = System.nanoTime() - start;
String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
if ( sourceFlowFileIdentifier == null ) {
sourceFlowFileIdentifier = "<Unknown Identifier>";
}
final String transitUri = transaction.getCommunicant().getUrl() + sourceFlowFileIdentifier;
session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier,
"Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
session.transfer(flowFile, Relationship.ANONYMOUS);
bytesReceived += dataPacket.getSize();
}
// Confirm that what we received was the correct data.
transaction.confirm();
// Commit the session so that we have persisted the data
session.commit();
transaction.complete();
if ( !flowFilesReceived.isEmpty() ) {
stopWatch.stop();
final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final String dataSize = FormatUtils.formatDataSize(bytesReceived);
logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate });
}
return flowFilesReceived.size();
} }
@Override @Override