NIFI-282: Refactoring to allow for separate client

This commit is contained in:
Mark Payne 2015-02-09 20:41:39 -05:00
parent 05b64593b6
commit 081471c420
29 changed files with 437 additions and 381 deletions

View File

@ -18,10 +18,10 @@ package org.apache.nifi.remote;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
public class Peer {
@ -29,7 +29,8 @@ public class Peer {
private final String url;
private final String clusterUrl;
private final String host;
private long penalizationExpiration = 0L;
private final Map<String, Long> penaltyExpirationMap = new HashMap<>();
private boolean closed = false;
public Peer(final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) {
@ -61,20 +62,32 @@ public class Peer {
// Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer
try {
StreamUtils.copy(commsSession.getInput().getInputStream(), new NullOutputStream());
commsSession.getInput().consume();
} finally {
commsSession.close();
}
}
public void penalize(final long millis) {
penalizationExpiration = Math.max(penalizationExpiration, System.currentTimeMillis() + millis);
/**
* Penalizes this peer for the given destination only for the provided number of milliseconds
* @param destinationId
* @param millis
*/
public void penalize(final String destinationId, final long millis) {
final Long currentPenalty = penaltyExpirationMap.get(destinationId);
final long proposedPenalty = System.currentTimeMillis() + millis;
if ( currentPenalty == null || proposedPenalty > currentPenalty ) {
penaltyExpirationMap.put(destinationId, proposedPenalty);
}
}
public boolean isPenalized() {
return penalizationExpiration > System.currentTimeMillis();
public boolean isPenalized(final String destinationId) {
final Long currentPenalty = penaltyExpirationMap.get(destinationId);
return (currentPenalty != null && currentPenalty > System.currentTimeMillis());
}
public boolean isClosed() {
return closed;
}
@ -110,8 +123,6 @@ public class Peer {
sb.append("Peer[url=").append(url);
if (closed) {
sb.append(",CLOSED");
} else if (isPenalized()) {
sb.append(",PENALIZED");
}
sb.append("]");
return sb.toString();

View File

@ -138,6 +138,16 @@ public interface Transaction {
/**
* <p>
* Completes the transaction and indicates to both the sender and receiver that the data transfer was
* successful.
* </p>
*
* @throws IOException
*/
void complete() throws IOException;
/**
* <p>
* Cancels this transaction, indicating to the sender that the data has not been successfully received so that
* the sender can retry or handle however is appropriate.
* </p>

View File

@ -27,6 +27,10 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.socket.SocketClient;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.protocol.DataPacket;
/**
@ -65,18 +69,24 @@ import org.apache.nifi.remote.protocol.DataPacket;
public interface SiteToSiteClient extends Closeable {
/**
* <p>
* Creates a new Transaction that can be used to either send data to a remote NiFi instance
* or receive data from a remote NiFi instance, depending on the value passed for the {@code direction} argument.
* </p>
*
* <p>
* <b>Note:</b> If all of the nodes are penalized (See {@link Builder#nodePenalizationPeriod(long, TimeUnit)}), then
* this method will return <code>null</code>.
* </p>
*
* @param direction specifies which direction the data should be transferred. A value of {@link TransferDirection#SEND}
* indicates that this Transaction will send data to the remote instance; a value of {@link TransferDirection#RECEIVE} indicates
* that this Transaction will be used to receive data from the remote instance.
*
* @return
* @return a Transaction to use for sending or receiving data, or <code>null</code> if all nodes are penalized.
* @throws IOException
*/
Transaction createTransaction(TransferDirection direction) throws IOException;
Transaction createTransaction(TransferDirection direction) throws HandshakeException, PortNotRunningException, ProtocolException, UnknownPortException, IOException;
/**
* <p>

View File

@ -20,13 +20,13 @@ import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
public class EndpointConnectionState {
public class EndpointConnection {
private final Peer peer;
private final SocketClientProtocol socketClientProtocol;
private final FlowFileCodec codec;
private volatile long lastUsed;
public EndpointConnectionState(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) {
public EndpointConnection(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) {
this.peer = peer;
this.socketClientProtocol = socketClientProtocol;
this.codec = codec;

View File

@ -33,6 +33,7 @@ import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -86,16 +87,16 @@ import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EndpointConnectionStatePool {
public class EndpointConnectionPool {
public static final long PEER_REFRESH_PERIOD = 60000L;
public static final String CATEGORY = "Site-to-Site";
public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionStatePool.class);
private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
private final BlockingQueue<EndpointConnectionState> connectionStateQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<EndpointConnection> connectionQueue = new LinkedBlockingQueue<>();
private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>();
private final URI clusterUrl;
private final String apiUri;
@ -124,11 +125,11 @@ public class EndpointConnectionStatePool {
private volatile int commsTimeout;
public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final EventReporter eventReporter, final File persistenceFile) {
public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final EventReporter eventReporter, final File persistenceFile) {
this(clusterUrl, commsTimeoutMillis, null, eventReporter, persistenceFile);
}
public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) {
public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) {
try {
this.clusterUrl = new URI(clusterUrl);
} catch (final URISyntaxException e) {
@ -188,52 +189,59 @@ public class EndpointConnectionStatePool {
}
public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
return getEndpointConnectionState(remoteDestination, direction, null);
public EndpointConnection getEndpointConnection(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
return getEndpointConnection(remoteDestination, direction, null);
}
public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
public EndpointConnection getEndpointConnection(final RemoteDestination remoteDestination, final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
//
// Attempt to get a connection state that already exists for this URL.
//
FlowFileCodec codec = null;
CommunicationsSession commsSession = null;
SocketClientProtocol protocol = null;
EndpointConnectionState connectionState;
EndpointConnection connection;
Peer peer = null;
final List<EndpointConnectionState> addBack = new ArrayList<>();
final List<EndpointConnection> addBack = new ArrayList<>();
try {
do {
connection = connectionQueue.poll();
logger.debug("{} Connection State for {} = {}", this, clusterUrl, connection);
if ( connection == null && !addBack.isEmpty() ) {
// all available connections have been penalized.
logger.debug("{} all Connections for {} are penalized; returning no Connection", this, remoteDestination.getIdentifier());
return null;
}
if ( connection != null && connection.getPeer().isPenalized(remoteDestination.getIdentifier()) ) {
// we have a connection, but it's penalized. We want to add it back to the queue
// when we've found one to use.
addBack.add(connection);
continue;
}
// if we can't get an existing Connection, create one
if ( connection == null ) {
logger.debug("No Connection available for Port {}; creating new Connection", remoteDestination.getIdentifier());
protocol = new SocketClientProtocol();
protocol.setDestination(remoteDestination);
final PeerStatus peerStatus = getNextPeerStatus(direction);
if ( peerStatus == null ) {
return null;
}
connectionState = connectionStateQueue.poll();
logger.debug("{} Connection State for {} = {}", this, clusterUrl, connectionState);
if ( connectionState == null && !addBack.isEmpty() ) {
// all available connections have been penalized.
return null;
}
if ( connectionState != null && connectionState.getPeer().isPenalized() ) {
// we have a connection, but it's penalized. We want to add it back to the queue
// when we've found one to use.
addBack.add(connectionState);
continue;
}
// if we can't get an existing ConnectionState, create one
if ( connectionState == null ) {
protocol = new SocketClientProtocol();
protocol.setDestination(remoteDestination);
try {
commsSession = establishSiteToSiteConnection(peerStatus);
} catch (final IOException ioe) {
// TODO: penalize peer status
penalize(peerStatus, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
throw ioe;
}
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
try {
@ -245,9 +253,6 @@ public class EndpointConnectionStatePool {
throw e;
}
}
} catch (final IOException e) {
}
final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
peer = new Peer(commsSession, peerUrl, clusterUrl.toString());
@ -268,7 +273,7 @@ public class EndpointConnectionStatePool {
if ( protocol.isDestinationFull() ) {
logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer);
penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
connectionStateQueue.offer(connectionState);
connectionQueue.offer(connection);
continue;
} else if ( protocol.isPortInvalid() ) {
penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
@ -296,34 +301,34 @@ public class EndpointConnectionStatePool {
throw e;
}
connectionState = new EndpointConnectionState(peer, protocol, codec);
connection = new EndpointConnection(peer, protocol, codec);
} else {
final long lastTimeUsed = connectionState.getLastTimeUsed();
final long lastTimeUsed = connection.getLastTimeUsed();
final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed;
if ( commsTimeout > 0L && millisSinceLastUse >= commsTimeout ) {
cleanup(connectionState.getSocketClientProtocol(), connectionState.getPeer());
connectionState = null;
cleanup(connection.getSocketClientProtocol(), connection.getPeer());
connection = null;
} else {
codec = connectionState.getCodec();
peer = connectionState.getPeer();
codec = connection.getCodec();
peer = connection.getPeer();
commsSession = peer.getCommunicationsSession();
protocol = connectionState.getSocketClientProtocol();
protocol = connection.getSocketClientProtocol();
}
}
} while ( connectionState == null || codec == null || commsSession == null || protocol == null );
} while ( connection == null || codec == null || commsSession == null || protocol == null );
} finally {
if ( !addBack.isEmpty() ) {
connectionStateQueue.addAll(addBack);
connectionQueue.addAll(addBack);
}
}
return connectionState;
return connection;
}
public boolean offer(final EndpointConnectionState endpointConnectionState) {
final Peer peer = endpointConnectionState.getPeer();
public boolean offer(final EndpointConnection endpointConnection) {
final Peer peer = endpointConnection.getPeer();
if ( peer == null ) {
return false;
}
@ -333,7 +338,17 @@ public class EndpointConnectionStatePool {
return false;
}
return connectionStateQueue.offer(endpointConnectionState);
return connectionQueue.offer(endpointConnection);
}
private void penalize(final PeerStatus status, final long penalizationMillis) {
Long expiration = peerTimeoutExpirations.get(status);
if ( expiration == null ) {
expiration = Long.valueOf(0L);
}
final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
peerTimeoutExpirations.put(status, Long.valueOf(newExpiration));
}
/**
@ -353,13 +368,7 @@ public class EndpointConnectionStatePool {
}
final PeerStatus status = new PeerStatus(host, port, true, 1);
Long expiration = peerTimeoutExpirations.get(status);
if ( expiration == null ) {
expiration = Long.valueOf(0L);
}
final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis);
peerTimeoutExpirations.put(status, Long.valueOf(newExpiration));
penalize(status, penalizationMillis);
}
private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
@ -386,7 +395,8 @@ public class EndpointConnectionStatePool {
private PeerStatus getNextPeerStatus(final TransferDirection direction) {
List<PeerStatus> peerList = peerStatuses;
if ( (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) && peerRefreshLock.tryLock() ) {
if ( (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) ) {
peerRefreshLock.lock();
try {
try {
peerList = createPeerStatusList(direction);
@ -436,9 +446,14 @@ public class EndpointConnectionStatePool {
}
private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException {
final Set<PeerStatus> statuses = getPeerStatuses();
Set<PeerStatus> statuses = getPeerStatuses();
if ( statuses == null ) {
return new ArrayList<>();
refreshPeers();
statuses = getPeerStatuses();
if ( statuses == null ) {
logger.debug("{} found no peers to connect to", this);
return Collections.emptyList();
}
}
final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
@ -562,10 +577,7 @@ public class EndpointConnectionStatePool {
}
private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException {
if ( siteToSiteSecure == null ) {
throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections");
}
final boolean siteToSiteSecure = isSecure();
final String destinationUri = "nifi://" + hostname + ":" + port;
CommunicationsSession commsSession = null;
@ -665,10 +677,10 @@ public class EndpointConnectionStatePool {
private void cleanupExpiredSockets() {
final List<EndpointConnectionState> states = new ArrayList<>();
final List<EndpointConnection> states = new ArrayList<>();
EndpointConnectionState state;
while ((state = connectionStateQueue.poll()) != null) {
EndpointConnection state;
while ((state = connectionQueue.poll()) != null) {
// If the socket has not been used in 10 seconds, shut it down.
final long lastUsed = state.getLastTimeUsed();
if ( lastUsed < System.currentTimeMillis() - 10000L ) {
@ -685,7 +697,7 @@ public class EndpointConnectionStatePool {
}
}
connectionStateQueue.addAll(states);
connectionQueue.addAll(states);
}
public void shutdown() {
@ -696,13 +708,13 @@ public class EndpointConnectionStatePool {
commsSession.interrupt();
}
EndpointConnectionState state;
while ( (state = connectionStateQueue.poll()) != null) {
EndpointConnection state;
while ( (state = connectionQueue.poll()) != null) {
cleanup(state.getSocketClientProtocol(), state.getPeer());
}
}
public void terminate(final EndpointConnectionState state) {
public void terminate(final EndpointConnection state) {
cleanup(state.getSocketClientProtocol(), state.getPeer());
}
@ -811,6 +823,9 @@ public class EndpointConnectionStatePool {
return listeningPort;
}
/**
* Returns {@code true} if the remote instance is configured for secure site-to-site communications,
* {@code false} otherwise.
@ -830,6 +845,11 @@ public class EndpointConnectionStatePool {
}
final ControllerDTO controller = refreshRemoteInfo();
return controller.isSiteToSiteSecure();
final Boolean isSecure = controller.isSiteToSiteSecure();
if ( isSecure == null ) {
throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections");
}
return isSecure;
}
}

View File

@ -24,23 +24,23 @@ import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.util.ObjectHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SocketClient implements SiteToSiteClient {
private static final Logger logger = LoggerFactory.getLogger(SocketClient.class);
private final SiteToSiteClientConfig config;
private final EndpointConnectionStatePool pool;
private final EndpointConnectionPool pool;
private final boolean compress;
private final String portName;
private final long penalizationNanos;
private volatile String portIdentifier;
public SocketClient(final SiteToSiteClientConfig config) {
pool = new EndpointConnectionStatePool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
pool = new EndpointConnectionPool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
this.config = config;
@ -66,23 +66,25 @@ public class SocketClient implements SiteToSiteClient {
return id;
}
final String portId;
if ( direction == TransferDirection.SEND ) {
return pool.getInputPortIdentifier(this.portName);
portId = pool.getInputPortIdentifier(this.portName);
} else {
return pool.getOutputPortIdentifier(this.portName);
portId = pool.getOutputPortIdentifier(this.portName);
}
}
@Override
public Transaction createTransaction(final TransferDirection direction) throws IOException {
final String portId = getPortIdentifier(TransferDirection.SEND);
if (portId == null) {
throw new IOException("Could not find Port with name " + portName + " for remote NiFi instance");
logger.debug("Unable to resolve port [{}] to an identifier", portName);
} else {
logger.debug("Resolved port [{}] to identifier [{}]", portName, portId);
}
final RemoteDestination remoteDestination = new RemoteDestination() {
return portId;
}
private RemoteDestination createRemoteDestination(final String portId) {
return new RemoteDestination() {
@Override
public String getIdentifier() {
return portId;
@ -98,12 +100,21 @@ public class SocketClient implements SiteToSiteClient {
return compress;
}
};
}
final EndpointConnectionState connectionState;
try {
connectionState = pool.getEndpointConnectionState(remoteDestination, direction);
} catch (final ProtocolException | HandshakeException | PortNotRunningException | UnknownPortException e) {
throw new IOException(e);
@Override
public Transaction createTransaction(final TransferDirection direction) throws IOException {
final String portId = getPortIdentifier(direction);
if ( portId == null ) {
throw new IOException("Could not find Port with name '" + portName + "' for remote NiFi instance");
}
final RemoteDestination remoteDestination = createRemoteDestination(portId);
final EndpointConnection connectionState = pool.getEndpointConnection(remoteDestination, direction, getConfig());
if ( connectionState == null ) {
return null;
}
final Transaction transaction = connectionState.getSocketClientProtocol().startTransaction(
@ -111,19 +122,24 @@ public class SocketClient implements SiteToSiteClient {
// Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever
// the transaction is either completed or canceled.
final ObjectHolder<EndpointConnectionState> connectionStateRef = new ObjectHolder<>(connectionState);
final ObjectHolder<EndpointConnection> connectionStateRef = new ObjectHolder<>(connectionState);
return new Transaction() {
@Override
public void confirm() throws IOException {
transaction.confirm();
}
@Override
public void complete() throws IOException {
complete(false);
}
@Override
public void complete(final boolean requestBackoff) throws IOException {
try {
transaction.complete(requestBackoff);
} finally {
final EndpointConnectionState state = connectionStateRef.get();
final EndpointConnection state = connectionStateRef.get();
if ( state != null ) {
pool.offer(connectionState);
connectionStateRef.set(null);
@ -136,7 +152,7 @@ public class SocketClient implements SiteToSiteClient {
try {
transaction.cancel(explanation);
} finally {
final EndpointConnectionState state = connectionStateRef.get();
final EndpointConnection state = connectionStateRef.get();
if ( state != null ) {
pool.terminate(connectionState);
connectionStateRef.set(null);
@ -149,7 +165,7 @@ public class SocketClient implements SiteToSiteClient {
try {
transaction.error();
} finally {
final EndpointConnectionState state = connectionStateRef.get();
final EndpointConnection state = connectionStateRef.get();
if ( state != null ) {
pool.terminate(connectionState);
connectionStateRef.set(null);

View File

@ -16,8 +16,15 @@
*/
package org.apache.nifi.remote.exception;
public class HandshakeException extends Exception {
import java.io.IOException;
/**
* A HandshakeException occurs when the client and the remote NiFi instance do not agree
* on some condition during the handshake. For example, if the NiFi instance does not recognize
* one of the parameters that the client passes during the Handshaking phase.
*/
public class HandshakeException extends IOException {
private static final long serialVersionUID = 178192341908726L;
public HandshakeException(final String message) {

View File

@ -16,8 +16,12 @@
*/
package org.apache.nifi.remote.exception;
public class PortNotRunningException extends Exception {
/**
* PortNotRunningException occurs when the remote NiFi instance reports
* that the Port that the client is attempting to communicate with is not
* currently running and therefore communications with that Port are not allowed.
*/
public class PortNotRunningException extends ProtocolException {
private static final long serialVersionUID = -2790940982005516375L;
public PortNotRunningException(final String message) {

View File

@ -18,6 +18,10 @@ package org.apache.nifi.remote.exception;
import java.io.IOException;
/**
* A ProtocolException occurs when unexpected data is received, for example
* an invalid Response Code.
*/
public class ProtocolException extends IOException {
private static final long serialVersionUID = 5763900324505818495L;

View File

@ -16,8 +16,11 @@
*/
package org.apache.nifi.remote.exception;
public class UnknownPortException extends Exception {
/**
* An UnknownPortException indicates that the remote NiFi instance has reported that
* the endpoint that the client attempted to communicate with does not exist.
*/
public class UnknownPortException extends ProtocolException {
private static final long serialVersionUID = -2790940982005516375L;
public UnknownPortException(final String message) {

View File

@ -63,4 +63,9 @@ public class SocketChannelInput implements CommunicationsInput {
public void interrupt() {
interruptableIn.interrupt();
}
@Override
public void consume() throws IOException {
socketIn.consume();
}
}

View File

@ -47,4 +47,9 @@ public class SSLSocketChannelInput implements CommunicationsInput {
public long getBytesRead() {
return countingIn.getBytesRead();
}
@Override
public void consume() throws IOException {
in.consume();
}
}

View File

@ -40,9 +40,9 @@ public interface ClientProtocol extends VersionedRemoteResource {
FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException;
void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
int transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
void shutdown(Peer peer) throws IOException, ProtocolException;

View File

@ -21,6 +21,12 @@ import java.io.InputStream;
public interface CommunicationsInput {
/**
* Reads all data currently on the socket and throws it away
* @throws IOException
*/
void consume() throws IOException;
InputStream getInputStream() throws IOException;
long getBytesRead();

View File

@ -150,6 +150,7 @@ public class SocketClientProtocol implements ClientProtocol {
}
}
logger.debug("Handshaking with properties {}", properties);
dos.writeInt(properties.size());
for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) {
dos.writeUTF(entry.getKey().name());
@ -269,13 +270,13 @@ public class SocketClientProtocol implements ClientProtocol {
throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse);
}
return new SocketClientTransaction(versionNegotiator.getVersion(), peer, codec,
return new SocketClientTransaction(versionNegotiator.getVersion(), destination.getIdentifier(), peer, codec,
direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS));
}
@Override
public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
final String userDn = peer.getCommunicationsSession().getUserDn();
final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE);
@ -288,7 +289,7 @@ public class SocketClientProtocol implements ClientProtocol {
final DataPacket dataPacket = transaction.receive();
if ( dataPacket == null ) {
if ( flowFilesReceived.isEmpty() ) {
peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
peer.penalize(destination.getIdentifier(), destination.getYieldPeriod(TimeUnit.MILLISECONDS));
}
break;
}
@ -322,10 +323,7 @@ public class SocketClientProtocol implements ClientProtocol {
transaction.complete(applyBackpressure);
logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
if ( flowFilesReceived.isEmpty() ) {
return;
}
if ( !flowFilesReceived.isEmpty() ) {
stopWatch.stop();
final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
@ -335,12 +333,15 @@ public class SocketClientProtocol implements ClientProtocol {
this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate });
}
return flowFilesReceived.size();
}
@Override
public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
return 0;
}
try {
@ -401,6 +402,8 @@ public class SocketClientProtocol implements ClientProtocol {
final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
return flowFilesSent.size();
} catch (final Exception e) {
session.rollback();
throw e;

View File

@ -19,6 +19,7 @@ package org.apache.nifi.remote.protocol.socket;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
@ -29,6 +30,8 @@ import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.RequestType;
import org.slf4j.Logger;
@ -47,14 +50,16 @@ public class SocketClientTransaction implements Transaction {
private final boolean compress;
private final Peer peer;
private final int penaltyMillis;
private final String destinationId;
private boolean dataAvailable = false;
private int transfers = 0;
private TransactionState state;
SocketClientTransaction(final int protocolVersion, final Peer peer, final FlowFileCodec codec,
SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec,
final TransferDirection direction, final boolean useCompression, final int penaltyMillis) throws IOException {
this.protocolVersion = protocolVersion;
this.destinationId = destinationId;
this.peer = peer;
this.codec = codec;
this.direction = direction;
@ -140,7 +145,8 @@ public class SocketClientTransaction implements Transaction {
}
logger.debug("{} Receiving data from {}", this, peer);
final DataPacket packet = codec.decode(new CheckedInputStream(dis, crc));
final InputStream dataIn = compress ? new CompressionInputStream(dis) : dis;
final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc));
if ( packet == null ) {
this.dataAvailable = false;
@ -174,7 +180,8 @@ public class SocketClientTransaction implements Transaction {
logger.debug("{} Sending data to {}", this, peer);
final OutputStream out = new CheckedOutputStream(dos, crc);
final OutputStream dataOut = compress ? new CompressionOutputStream(dos) : dos;
final OutputStream out = new CheckedOutputStream(dataOut, crc);
codec.encode(dataPacket, out);
// need to close the CompressionOutputStream in order to force it write out any remaining bytes.
@ -208,6 +215,10 @@ public class SocketClientTransaction implements Transaction {
}
}
@Override
public void complete() throws IOException {
complete(false);
}
@Override
public void complete(boolean requestBackoff) throws IOException {
@ -246,7 +257,7 @@ public class SocketClientTransaction implements Transaction {
logger.debug("{} Received {} from {}", this, transactionResponse, peer);
if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
peer.penalize(penaltyMillis);
peer.penalize(destinationId, penaltyMillis);
} else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
}

View File

@ -39,7 +39,7 @@ public class TestEndpointConnectionStatePool {
collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
}
@ -53,7 +53,7 @@ public class TestEndpointConnectionStatePool {
collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000));
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
}
@ -73,7 +73,7 @@ public class TestEndpointConnectionStatePool {
collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096));
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
}
@ -87,7 +87,7 @@ public class TestEndpointConnectionStatePool {
collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000));
clusterNodeInfo.setNodeInformation(collection);
final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND);
for ( final PeerStatus peerStatus : destinations ) {
System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort());
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.client.socket;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.stream.io.StreamUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
public class TestSiteToSiteClient {
@Test
@Ignore("For local testing only; not really a unit test but a manual test")
public void testReceive() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
final SiteToSiteClient client = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("out")
.requestBatchCount(1)
.build();
try {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
Assert.assertNotNull(transaction);
final DataPacket packet = transaction.receive();
Assert.assertNotNull(packet);
final InputStream in = packet.getData();
final long size = packet.getSize();
final byte[] buff = new byte[(int) size];
StreamUtils.fillBuffer(in, buff);
System.out.println(buff.length);
Assert.assertNull(transaction.receive());
transaction.confirm();
transaction.complete(false);
} finally {
client.close();
}
}
@Test
@Ignore("For local testing only; not really a unit test but a manual test")
public void testSend() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
final SiteToSiteClient client = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("in")
.build();
try {
final Transaction transaction = client.createTransaction(TransferDirection.SEND);
Assert.assertNotNull(transaction);
final Map<String, String> attrs = new HashMap<>();
attrs.put("site-to-site", "yes, please!");
final byte[] bytes = "Hello".getBytes();
final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length);
transaction.send(packet);
transaction.confirm();
transaction.complete(false);
} finally {
client.close();
}
}
}

View File

@ -44,6 +44,16 @@ public class SocketChannelInputStream extends InputStream {
this.timeoutMillis = timeoutMillis;
}
public void consume() throws IOException {
final byte[] b = new byte[4096];
final ByteBuffer buffer = ByteBuffer.wrap(b);
int bytesRead;
do {
bytesRead = channel.read(buffer);
buffer.flip();
} while ( bytesRead > 0 );
}
@Override
public int read() throws IOException {
if (bufferedByte != null) {

View File

@ -258,6 +258,16 @@ public class SSLSocketChannel implements Closeable {
}
}
public void consume() throws IOException {
final byte[] b = new byte[4096];
final ByteBuffer buffer = ByteBuffer.wrap(b);
int readCount;
do {
readCount = channel.read(buffer);
buffer.flip();
} while (readCount > 0);
}
private int readData(final ByteBuffer dest) throws IOException {
final long startTime = System.currentTimeMillis();

View File

@ -27,6 +27,10 @@ public class SSLSocketChannelInputStream extends InputStream {
this.channel = channel;
}
public void consume() throws IOException {
channel.consume();
}
@Override
public int read() throws IOException {
return channel.read();

View File

@ -24,9 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
public interface RemoteProcessGroup {
@ -81,8 +79,6 @@ public interface RemoteProcessGroup {
String getYieldDuration();
EndpointConnectionStatePool getConnectionPool();
/**
* Sets the timeout using the TimePeriod format (e.g., "30 secs", "1 min")
*

View File

@ -48,6 +48,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
@ -128,14 +129,12 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.groups.StandardProcessGroup;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.logging.ProcessorLogObserver;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarClassLoader;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.apache.nifi.processor.Processor;
@ -165,6 +164,7 @@ import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
@ -184,7 +184,6 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -56,7 +56,6 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
import org.apache.nifi.remote.util.RemoteNiFiUtils;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
@ -130,7 +129,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private volatile String authorizationIssue;
private final EndpointConnectionStatePool endpointConnectionPool;
private final ScheduledExecutorService backgroundThreadExecutor;
public StandardRemoteProcessGroup(final String id, final String targetUri, final ProcessGroup processGroup,
@ -172,13 +170,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
};
endpointConnectionPool = new EndpointConnectionStatePool(getTargetUri().toString(), getCommunicationsTimeout(TimeUnit.MILLISECONDS),
sslContext, eventReporter, getPeerPersistenceFile());
final Runnable checkAuthorizations = new InitializationTask();
backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUri);
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 0L, 30L, TimeUnit.SECONDS);
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS);
}
@Override
@ -200,7 +194,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public void shutdown() {
backgroundThreadExecutor.shutdown();
endpointConnectionPool.shutdown();
}
@Override
@ -1221,11 +1214,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
return yieldDuration;
}
@Override
public EndpointConnectionStatePool getConnectionPool() {
return endpointConnectionPool;
}
@Override
public void verifyCanDelete() {
verifyCanDelete(false);

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.remote;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@ -23,6 +24,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -37,10 +39,9 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.remote.client.socket.EndpointConnectionState;
import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
import org.apache.nifi.remote.client.socket.EndpointConnection;
import org.apache.nifi.remote.client.socket.EndpointConnectionPool;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
@ -50,6 +51,7 @@ import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,9 +68,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
private final AtomicBoolean useCompression = new AtomicBoolean(false);
private final AtomicBoolean targetExists = new AtomicBoolean(true);
private final AtomicBoolean targetRunning = new AtomicBoolean(true);
private final SSLContext sslContext;
private final TransferDirection transferDirection;
private final EndpointConnectionStatePool connectionStatePool;
private final AtomicReference<EndpointConnectionPool> connectionPoolRef = new AtomicReference<>();
private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
private final Lock interruptLock = new ReentrantLock();
@ -83,9 +86,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
this.remoteGroup = remoteGroup;
this.transferDirection = direction;
this.sslContext = sslContext;
setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
}
connectionStatePool = remoteGroup.getConnectionPool();
private static File getPeerPersistenceFile(final String portId) {
final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory();
return new File(stateDir, portId + ".peers");
}
@Override
@ -111,6 +118,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
} finally {
interruptLock.unlock();
}
final EndpointConnectionPool pool = connectionPoolRef.get();
if ( pool != null ) {
pool.shutdown();
}
}
@Override
@ -123,6 +135,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
} finally {
interruptLock.unlock();
}
final EndpointConnectionPool connectionPool = new EndpointConnectionPool(remoteGroup.getTargetUri().toString(),
remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS),
sslContext, remoteGroup.getEventReporter(), getPeerPersistenceFile(getIdentifier()));
connectionPoolRef.set(connectionPool);
}
@ -140,9 +157,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
String url = getRemoteProcessGroup().getTargetUri().toString();
final EndpointConnectionState connectionState;
final EndpointConnectionPool connectionPool = connectionPoolRef.get();
final EndpointConnection connection;
try {
connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection);
connection = connectionPool.getEndpointConnection(this, transferDirection);
} catch (final PortNotRunningException e) {
context.yield();
this.targetRunning.set(false);
@ -157,7 +175,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
logger.error(message);
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
return;
} catch (final HandshakeException | IOException e) {
} catch (final IOException e) {
final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString());
logger.error(message);
if ( logger.isDebugEnabled() ) {
@ -168,15 +186,15 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return;
}
if ( connectionState == null ) {
if ( connection == null ) {
logger.debug("{} Unable to determine the next peer to communicate with; all peers must be penalized, so yielding context", this);
context.yield();
return;
}
FlowFileCodec codec = connectionState.getCodec();
SocketClientProtocol protocol = connectionState.getSocketClientProtocol();
final Peer peer = connectionState.getPeer();
FlowFileCodec codec = connection.getCodec();
SocketClientProtocol protocol = connection.getSocketClientProtocol();
final Peer peer = connection.getPeer();
url = peer.getUrl();
try {
@ -194,7 +212,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
transferFlowFiles(peer, protocol, context, session, codec);
} else {
receiveFlowFiles(peer, protocol, context, session, codec);
final int numReceived = receiveFlowFiles(peer, protocol, context, session, codec);
if ( numReceived == 0 ) {
context.yield();
}
}
interruptLock.lock();
@ -210,13 +231,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
session.commit();
connectionState.setLastTimeUsed();
connectionStatePool.offer(connectionState);
connection.setLastTimeUsed();
connectionPool.offer(connection);
} catch (final TransmissionDisabledException e) {
cleanup(protocol, peer);
session.rollback();
} catch (final Exception e) {
connectionStatePool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS));
connectionPool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS));
final String message = String.format("%s failed to communicate with %s (%s) due to %s", this, peer == null ? url : peer, protocol, e.toString());
logger.error(message);
@ -261,12 +282,12 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
}
private void transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
protocol.transferFlowFiles(peer, context, session, codec);
private int transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
return protocol.transferFlowFiles(peer, context, session, codec);
}
private void receiveFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
protocol.receiveFlowFiles(peer, context, session, codec);
private int receiveFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
return protocol.receiveFlowFiles(peer, context, session, codec);
}
@Override

View File

@ -1,93 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.io.socket.ssl;
import java.io.IOException;
import org.apache.nifi.remote.AbstractCommunicationsSession;
public class SSLSocketChannelCommunicationsSession extends AbstractCommunicationsSession {
private final SSLSocketChannel channel;
private final SSLSocketChannelInput request;
private final SSLSocketChannelOutput response;
public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel, final String uri) {
super(uri);
request = new SSLSocketChannelInput(channel);
response = new SSLSocketChannelOutput(channel);
this.channel = channel;
}
@Override
public SSLSocketChannelInput getInput() {
return request;
}
@Override
public SSLSocketChannelOutput getOutput() {
return response;
}
@Override
public void setTimeout(final int millis) throws IOException {
channel.setTimeout(millis);
}
@Override
public int getTimeout() throws IOException {
return channel.getTimeout();
}
@Override
public void close() throws IOException {
channel.close();
}
@Override
public boolean isClosed() {
return channel.isClosed();
}
@Override
public boolean isDataAvailable() {
try {
return request.isDataAvailable();
} catch (final Exception e) {
return false;
}
}
@Override
public long getBytesWritten() {
return response.getBytesWritten();
}
@Override
public long getBytesRead() {
return request.getBytesRead();
}
@Override
public void interrupt() {
channel.interrupt();
}
@Override
public String toString() {
return super.toString() + "[SSLSocketChannel=" + channel + "]";
}
}

View File

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.io.socket.ssl;
import java.io.IOException;
import java.io.InputStream;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.remote.protocol.CommunicationsInput;
public class SSLSocketChannelInput implements CommunicationsInput {
private final SSLSocketChannelInputStream in;
private final ByteCountingInputStream countingIn;
private final InputStream bufferedIn;
public SSLSocketChannelInput(final SSLSocketChannel socketChannel) {
in = new SSLSocketChannelInputStream(socketChannel);
countingIn = new ByteCountingInputStream(in);
this.bufferedIn = new BufferedInputStream(countingIn);
}
@Override
public InputStream getInputStream() throws IOException {
return bufferedIn;
}
public boolean isDataAvailable() throws IOException {
return bufferedIn.available() > 0;
}
@Override
public long getBytesRead() {
return countingIn.getBytesRead();
}
}

View File

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.io.socket.ssl;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.remote.protocol.CommunicationsOutput;
public class SSLSocketChannelOutput implements CommunicationsOutput {
private final OutputStream out;
private final ByteCountingOutputStream countingOut;
public SSLSocketChannelOutput(final SSLSocketChannel channel) {
countingOut = new ByteCountingOutputStream(new SSLSocketChannelOutputStream(channel));
out = new BufferedOutputStream(countingOut);
}
@Override
public OutputStream getOutputStream() throws IOException {
return out;
}
@Override
public long getBytesWritten() {
return countingOut.getBytesWritten();
}
}

View File

@ -430,7 +430,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
logger.debug("{} received {} from {}", new Object[] {this, transactionResponse, peer});
if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
peer.penalize(port.getYieldPeriod(TimeUnit.MILLISECONDS));
peer.penalize(port.getIdentifier(), port.getYieldPeriod(TimeUnit.MILLISECONDS));
} else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
}