From b682b6fab543cabeb3a321d6e5cf22f7ce9968c1 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 9 Apr 2015 17:59:02 -0400 Subject: [PATCH] NIFI-74, NIFI-345, NIFI-495: Fixed several site-to-site related bugs --- .../client/socket/EndpointConnectionPool.java | 43 +++++++++++++++---- .../remote/client/socket/SocketClient.java | 3 +- .../protocol/socket/SocketClientProtocol.java | 8 +++- .../socket/SocketClientTransaction.java | 25 +++++++---- .../nifi/remote/StandardRemoteGroupPort.java | 1 + 5 files changed, 61 insertions(+), 19 deletions(-) diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 1a6dfd510a..1b5412cd01 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -88,6 +88,7 @@ import org.apache.nifi.web.api.dto.ControllerDTO; import org.apache.nifi.web.api.dto.PortDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.helpers.MessageFormatter; public class EndpointConnectionPool { public static final long PEER_REFRESH_PERIOD = 60000L; @@ -202,6 +203,28 @@ public class EndpointConnectionPool { }, 5, 5, TimeUnit.SECONDS); } + void warn(final String msg, final Object... args) { + logger.warn(msg, args); + if ( eventReporter != null ) { + eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage()); + } + } + + void warn(final String msg, final Throwable t) { + logger.warn(msg, t); + + if ( eventReporter != null ) { + eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", msg + ": " + t.toString()); + } + } + + void error(final String msg, final Object... args) { + logger.error(msg, args); + if ( eventReporter != null ) { + eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage()); + } + } + private String getPortIdentifier(final TransferDirection transferDirection) throws IOException { if ( remoteDestination.getIdentifier() != null ) { return remoteDestination.getIdentifier(); @@ -271,6 +294,7 @@ public class EndpointConnectionPool { logger.debug("{} No Connection available for Port {}; creating new Connection", this, portId); protocol = new SocketClientProtocol(); protocol.setDestination(new IdEnrichedRemoteDestination(remoteDestination, portId)); + protocol.setEventReporter(eventReporter); final long penalizationMillis = remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS); try { @@ -312,7 +336,9 @@ public class EndpointConnectionPool { // handle error cases if ( protocol.isDestinationFull() ) { - logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer); + logger.warn("{} {} indicates that port {}'s destination is full; penalizing peer", + this, peer, config.getPortName() == null ? config.getPortIdentifier() : config.getPortName()); + penalize(peer, penalizationMillis); try { peer.close(); @@ -341,7 +367,7 @@ public class EndpointConnectionPool { cleanup(protocol, peer); final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString()); - logger.error(message); + error(message); if ( logger.isDebugEnabled() ) { logger.error("", e); } @@ -463,7 +489,7 @@ public class EndpointConnectionPool { peerList = createPeerStatusList(direction); } catch (final Exception e) { final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString()); - logger.warn(message); + warn(message); if ( logger.isDebugEnabled() ) { logger.warn("", e); } @@ -503,7 +529,7 @@ public class EndpointConnectionPool { } private boolean isPenalized(final PeerStatus peerStatus) { - final Long expirationEnd = peerTimeoutExpirations.get(peerStatus); + final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription()); return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis() ); } @@ -587,7 +613,7 @@ public class EndpointConnectionPool { clientProtocol.shutdown(peer); } catch (final IOException e) { final String message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString()); - logger.warn(message); + warn(message); if (logger.isDebugEnabled()) { logger.warn("", e); } @@ -597,7 +623,7 @@ public class EndpointConnectionPool { peer.close(); } catch (final IOException e) { final String message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString()); - logger.warn(message); + warn(message); if (logger.isDebugEnabled()) { logger.warn("", e); } @@ -622,7 +648,8 @@ public class EndpointConnectionPool { } } catch (final IOException e) { - logger.error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString(), e); + error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString()); + logger.error("", e); } } @@ -818,7 +845,7 @@ public class EndpointConnectionPool { peerStatusCache = new PeerStatusCache(statuses); logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size()); } catch (Exception e) { - logger.warn("{} Unable to refresh Remote Group's peers due to {}", this, e); + warn("{} Unable to refresh Remote Group's peers due to {}", this, e); if (logger.isDebugEnabled()) { logger.warn("", e); } diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index ed54ccb99b..4aab3f7f8e 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -84,6 +84,7 @@ public class SocketClient implements SiteToSiteClient { logger.debug("Unable to resolve port [{}] to an identifier", portName); } else { logger.debug("Resolved port [{}] to identifier [{}]", portName, portId); + this.portIdentifier = portId; } return portId; @@ -136,7 +137,7 @@ public class SocketClient implements SiteToSiteClient { connectionState.getPeer(), connectionState.getCodec(), direction); } catch (final Throwable t) { pool.terminate(connectionState); - throw t; + throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t); } // Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index c3275eab00..83c5305054 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; @@ -75,6 +76,7 @@ public class SocketClientProtocol implements ClientProtocol { private int batchCount; private long batchSize; private long batchMillis; + private EventReporter eventReporter; private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds @@ -93,6 +95,10 @@ public class SocketClientProtocol implements ClientProtocol { this.batchMillis = millis; } + public void setEventReporter(final EventReporter eventReporter) { + this.eventReporter = eventReporter; + } + public void setDestination(final RemoteDestination destination) { this.destination = destination; this.useCompression = destination.isUseCompression(); @@ -272,7 +278,7 @@ public class SocketClientProtocol implements ClientProtocol { } return new SocketClientTransaction(versionNegotiator.getVersion(), destination.getIdentifier(), peer, codec, - direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS)); + direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS), eventReporter); } diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java index a1ce07e450..e69104f201 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java @@ -27,6 +27,7 @@ import java.util.zip.CRC32; import java.util.zip.CheckedInputStream; import java.util.zip.CheckedOutputStream; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.Communicant; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.Transaction; @@ -39,6 +40,7 @@ import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.util.StandardDataPacket; +import org.apache.nifi.reporting.Severity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +58,7 @@ public class SocketClientTransaction implements Transaction { private final Peer peer; private final int penaltyMillis; private final String destinationId; + private final EventReporter eventReporter; private boolean dataAvailable = false; private int transfers = 0; @@ -63,7 +66,7 @@ public class SocketClientTransaction implements Transaction { private TransactionState state; SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec, - final TransferDirection direction, final boolean useCompression, final int penaltyMillis) throws IOException { + final TransferDirection direction, final boolean useCompression, final int penaltyMillis, final EventReporter eventReporter) throws IOException { this.protocolVersion = protocolVersion; this.destinationId = destinationId; this.peer = peer; @@ -74,6 +77,7 @@ public class SocketClientTransaction implements Transaction { this.compress = useCompression; this.state = TransactionState.TRANSACTION_STARTED; this.penaltyMillis = penaltyMillis; + this.eventReporter = eventReporter; initialize(); } @@ -116,11 +120,11 @@ public class SocketClientTransaction implements Transaction { try { try { if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { - throw new IllegalStateException("Cannot receive data because Transaction State is " + state); + throw new IllegalStateException("Cannot receive data from " + peer + " because Transaction State is " + state); } if ( direction == TransferDirection.SEND ) { - throw new IllegalStateException("Attempting to receive data but started a SEND Transaction"); + throw new IllegalStateException("Attempting to receive data from " + peer + " but started a SEND Transaction"); } // if we already know there's no data, just return null @@ -142,7 +146,7 @@ public class SocketClientTransaction implements Transaction { this.dataAvailable = false; break; default: - throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); + throw new ProtocolException("Got unexpected response from " + peer + " when asking for data: " + dataAvailableCode); } } @@ -184,11 +188,11 @@ public class SocketClientTransaction implements Transaction { try { try { if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { - throw new IllegalStateException("Cannot send data because Transaction State is " + state); + throw new IllegalStateException("Cannot send data to " + peer + " because Transaction State is " + state); } if ( direction == TransferDirection.RECEIVE ) { - throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction"); + throw new IllegalStateException("Attempting to send data to " + peer + " but started a RECEIVE Transaction"); } if ( transfers > 0 ) { @@ -242,7 +246,7 @@ public class SocketClientTransaction implements Transaction { try { try { if ( state != TransactionState.TRANSACTION_CONFIRMED ) { - throw new IllegalStateException("Cannot complete transaction because state is " + state + + throw new IllegalStateException("Cannot complete transaction with " + peer + " because state is " + state + "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED); } @@ -272,7 +276,7 @@ public class SocketClientTransaction implements Transaction { peer.penalize(destinationId, penaltyMillis); backoff = true; } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { - throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); + throw new ProtocolException("After sending data to " + peer + ", expected TRANSACTION_FINISHED response but got " + transactionResponse); } state = TransactionState.TRANSACTION_COMPLETED; @@ -324,7 +328,10 @@ public class SocketClientTransaction implements Transaction { try { confirmTransactionResponse = Response.read(dis); } catch (final IOException ioe) { - logger.error("Failed to receive response code from {} when expected confirmation of transaction", peer); + logger.error("Failed to receive response code from {} when expecting confirmation of transaction", peer); + if ( eventReporter != null ) { + eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", "Failed to receive response code from " + peer + " when expecting confirmation of transaction"); + } throw ioe; } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 69ba0fd4f8..eec6ed58b5 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -183,6 +183,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); return; } catch (final IOException e) { + context.yield(); final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString()); logger.error(message); if ( logger.isDebugEnabled() ) {