From e7954cf04a8523e393327f5391dbc5209d887e98 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 30 Apr 2015 13:12:40 -0400 Subject: [PATCH] NIFI-567: Use batch duration of 500 ms instead of 5 seconds when sending via site-to-site --- .../nifi/remote/StandardRemoteGroupPort.java | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 982d9ffcab..773f9cf380 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; public class StandardRemoteGroupPort extends RemoteGroupPort { - private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds + private static final long BATCH_SEND_NANOS = TimeUnit.MILLISECONDS.toNanos(500L); // send batches of up to 500 millis public static final String USER_AGENT = "NiFi-Site-to-Site"; public static final String CONTENT_TYPE = "application/octet-stream"; @@ -98,7 +98,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { return targetRunning.get(); } - public void setTargetRunning(boolean targetRunning) { + public void setTargetRunning(final boolean targetRunning) { this.targetRunning.set(targetRunning); } @@ -126,12 +126,12 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { super.onSchedulingStart(); final SiteToSiteClient client = new SiteToSiteClient.Builder() - .url(remoteGroup.getTargetUri().toString()) - .portIdentifier(getIdentifier()) - .sslContext(sslContext) - .eventReporter(remoteGroup.getEventReporter()) - .peerPersistenceFile(getPeerPersistenceFile(getIdentifier())) - .build(); + .url(remoteGroup.getTargetUri().toString()) + .portIdentifier(getIdentifier()) + .sslContext(sslContext) + .eventReporter(remoteGroup.getEventReporter()) + .peerPersistenceFile(getPeerPersistenceFile(getIdentifier())) + .build(); clientRef.set(client); } @@ -147,7 +147,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { return; } - String url = getRemoteProcessGroup().getTargetUri().toString(); + final String url = getRemoteProcessGroup().getTargetUri().toString(); // If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise, // we don't want to create a transaction at all. @@ -230,7 +230,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { return remoteGroup.getYieldDuration(); } - private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session, FlowFile firstFlowFile) throws IOException, ProtocolException { + private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session, final FlowFile firstFlowFile) throws IOException, ProtocolException { FlowFile flowFile = firstFlowFile; try { @@ -288,7 +288,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{ - this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate}); + this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate}); return flowFilesSent.size(); } catch (final Exception e) { @@ -345,7 +345,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); final String dataSize = FormatUtils.formatDataSize(bytesReceived); logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{ - this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate}); + this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate}); } return flowFilesReceived.size(); @@ -367,16 +367,16 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { ValidationResult error = null; if (!targetExists.get()) { error = new ValidationResult.Builder() - .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName())) - .subject(String.format("Remote port '%s'", getName())) - .valid(false) - .build(); + .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName())) + .subject(String.format("Remote port '%s'", getName())) + .valid(false) + .build(); } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) { error = new ValidationResult.Builder() - .explanation(String.format("Port '%s' has no outbound connections", getName())) - .subject(String.format("Remote port '%s'", getName())) - .valid(false) - .build(); + .explanation(String.format("Port '%s' has no outbound connections", getName())) + .subject(String.format("Remote port '%s'", getName())) + .valid(false) + .build(); } if (error != null) {