NIFI-74, NIFI-345, NIFI-495: Fixed several site-to-site related bugs

This commit is contained in:
Mark Payne 2015-04-09 17:59:02 -04:00
parent e9cb3b300c
commit b682b6fab5
5 changed files with 61 additions and 19 deletions

View File

@ -88,6 +88,7 @@ import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;
public class EndpointConnectionPool {
public static final long PEER_REFRESH_PERIOD = 60000L;
@ -202,6 +203,28 @@ public class EndpointConnectionPool {
}, 5, 5, TimeUnit.SECONDS);
}
void warn(final String msg, final Object... args) {
logger.warn(msg, args);
if ( eventReporter != null ) {
eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage());
}
}
void warn(final String msg, final Throwable t) {
logger.warn(msg, t);
if ( eventReporter != null ) {
eventReporter.reportEvent(Severity.WARNING, "Site-to-Site", msg + ": " + t.toString());
}
}
void error(final String msg, final Object... args) {
logger.error(msg, args);
if ( eventReporter != null ) {
eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", MessageFormatter.arrayFormat(msg, args).getMessage());
}
}
private String getPortIdentifier(final TransferDirection transferDirection) throws IOException {
if ( remoteDestination.getIdentifier() != null ) {
return remoteDestination.getIdentifier();
@ -271,6 +294,7 @@ public class EndpointConnectionPool {
logger.debug("{} No Connection available for Port {}; creating new Connection", this, portId);
protocol = new SocketClientProtocol();
protocol.setDestination(new IdEnrichedRemoteDestination(remoteDestination, portId));
protocol.setEventReporter(eventReporter);
final long penalizationMillis = remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS);
try {
@ -312,7 +336,9 @@ public class EndpointConnectionPool {
// handle error cases
if ( protocol.isDestinationFull() ) {
logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer);
logger.warn("{} {} indicates that port {}'s destination is full; penalizing peer",
this, peer, config.getPortName() == null ? config.getPortIdentifier() : config.getPortName());
penalize(peer, penalizationMillis);
try {
peer.close();
@ -341,7 +367,7 @@ public class EndpointConnectionPool {
cleanup(protocol, peer);
final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString());
logger.error(message);
error(message);
if ( logger.isDebugEnabled() ) {
logger.error("", e);
}
@ -463,7 +489,7 @@ public class EndpointConnectionPool {
peerList = createPeerStatusList(direction);
} catch (final Exception e) {
final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
logger.warn(message);
warn(message);
if ( logger.isDebugEnabled() ) {
logger.warn("", e);
}
@ -503,7 +529,7 @@ public class EndpointConnectionPool {
}
private boolean isPenalized(final PeerStatus peerStatus) {
final Long expirationEnd = peerTimeoutExpirations.get(peerStatus);
final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription());
return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis() );
}
@ -587,7 +613,7 @@ public class EndpointConnectionPool {
clientProtocol.shutdown(peer);
} catch (final IOException e) {
final String message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString());
logger.warn(message);
warn(message);
if (logger.isDebugEnabled()) {
logger.warn("", e);
}
@ -597,7 +623,7 @@ public class EndpointConnectionPool {
peer.close();
} catch (final IOException e) {
final String message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString());
logger.warn(message);
warn(message);
if (logger.isDebugEnabled()) {
logger.warn("", e);
}
@ -622,7 +648,8 @@ public class EndpointConnectionPool {
}
} catch (final IOException e) {
logger.error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString(), e);
error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString());
logger.error("", e);
}
}
@ -818,7 +845,7 @@ public class EndpointConnectionPool {
peerStatusCache = new PeerStatusCache(statuses);
logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size());
} catch (Exception e) {
logger.warn("{} Unable to refresh Remote Group's peers due to {}", this, e);
warn("{} Unable to refresh Remote Group's peers due to {}", this, e);
if (logger.isDebugEnabled()) {
logger.warn("", e);
}

View File

@ -84,6 +84,7 @@ public class SocketClient implements SiteToSiteClient {
logger.debug("Unable to resolve port [{}] to an identifier", portName);
} else {
logger.debug("Resolved port [{}] to identifier [{}]", portName, portId);
this.portIdentifier = portId;
}
return portId;
@ -136,7 +137,7 @@ public class SocketClient implements SiteToSiteClient {
connectionState.getPeer(), connectionState.getCodec(), direction);
} catch (final Throwable t) {
pool.terminate(connectionState);
throw t;
throw new IOException("Unable to create Transaction to communicate with " + connectionState.getPeer(), t);
}
// Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever

View File

@ -27,6 +27,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
@ -75,6 +76,7 @@ public class SocketClientProtocol implements ClientProtocol {
private int batchCount;
private long batchSize;
private long batchMillis;
private EventReporter eventReporter;
private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
@ -93,6 +95,10 @@ public class SocketClientProtocol implements ClientProtocol {
this.batchMillis = millis;
}
public void setEventReporter(final EventReporter eventReporter) {
this.eventReporter = eventReporter;
}
public void setDestination(final RemoteDestination destination) {
this.destination = destination;
this.useCompression = destination.isUseCompression();
@ -272,7 +278,7 @@ public class SocketClientProtocol implements ClientProtocol {
}
return new SocketClientTransaction(versionNegotiator.getVersion(), destination.getIdentifier(), peer, codec,
direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS));
direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS), eventReporter);
}

View File

@ -27,6 +27,7 @@ import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Communicant;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.Transaction;
@ -39,6 +40,7 @@ import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -56,6 +58,7 @@ public class SocketClientTransaction implements Transaction {
private final Peer peer;
private final int penaltyMillis;
private final String destinationId;
private final EventReporter eventReporter;
private boolean dataAvailable = false;
private int transfers = 0;
@ -63,7 +66,7 @@ public class SocketClientTransaction implements Transaction {
private TransactionState state;
SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec,
final TransferDirection direction, final boolean useCompression, final int penaltyMillis) throws IOException {
final TransferDirection direction, final boolean useCompression, final int penaltyMillis, final EventReporter eventReporter) throws IOException {
this.protocolVersion = protocolVersion;
this.destinationId = destinationId;
this.peer = peer;
@ -74,6 +77,7 @@ public class SocketClientTransaction implements Transaction {
this.compress = useCompression;
this.state = TransactionState.TRANSACTION_STARTED;
this.penaltyMillis = penaltyMillis;
this.eventReporter = eventReporter;
initialize();
}
@ -116,11 +120,11 @@ public class SocketClientTransaction implements Transaction {
try {
try {
if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
throw new IllegalStateException("Cannot receive data because Transaction State is " + state);
throw new IllegalStateException("Cannot receive data from " + peer + " because Transaction State is " + state);
}
if ( direction == TransferDirection.SEND ) {
throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
throw new IllegalStateException("Attempting to receive data from " + peer + " but started a SEND Transaction");
}
// if we already know there's no data, just return null
@ -142,7 +146,7 @@ public class SocketClientTransaction implements Transaction {
this.dataAvailable = false;
break;
default:
throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
throw new ProtocolException("Got unexpected response from " + peer + " when asking for data: " + dataAvailableCode);
}
}
@ -184,11 +188,11 @@ public class SocketClientTransaction implements Transaction {
try {
try {
if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
throw new IllegalStateException("Cannot send data because Transaction State is " + state);
throw new IllegalStateException("Cannot send data to " + peer + " because Transaction State is " + state);
}
if ( direction == TransferDirection.RECEIVE ) {
throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
throw new IllegalStateException("Attempting to send data to " + peer + " but started a RECEIVE Transaction");
}
if ( transfers > 0 ) {
@ -242,7 +246,7 @@ public class SocketClientTransaction implements Transaction {
try {
try {
if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
throw new IllegalStateException("Cannot complete transaction because state is " + state +
throw new IllegalStateException("Cannot complete transaction with " + peer + " because state is " + state +
"; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
}
@ -272,7 +276,7 @@ public class SocketClientTransaction implements Transaction {
peer.penalize(destinationId, penaltyMillis);
backoff = true;
} else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
throw new ProtocolException("After sending data to " + peer + ", expected TRANSACTION_FINISHED response but got " + transactionResponse);
}
state = TransactionState.TRANSACTION_COMPLETED;
@ -324,7 +328,10 @@ public class SocketClientTransaction implements Transaction {
try {
confirmTransactionResponse = Response.read(dis);
} catch (final IOException ioe) {
logger.error("Failed to receive response code from {} when expected confirmation of transaction", peer);
logger.error("Failed to receive response code from {} when expecting confirmation of transaction", peer);
if ( eventReporter != null ) {
eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", "Failed to receive response code from " + peer + " when expecting confirmation of transaction");
}
throw ioe;
}

View File

@ -183,6 +183,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
return;
} catch (final IOException e) {
context.yield();
final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString());
logger.error(message);
if ( logger.isDebugEnabled() ) {