NIFI-567: Use batch duration of 500 ms instead of 5 seconds when sending via site-to-site

This commit is contained in:
Mark Payne 2015-04-30 13:12:40 -04:00
parent e30cd23fc2
commit e7954cf04a
1 changed files with 20 additions and 20 deletions

View File

@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
public class StandardRemoteGroupPort extends RemoteGroupPort { public class StandardRemoteGroupPort extends RemoteGroupPort {
private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds private static final long BATCH_SEND_NANOS = TimeUnit.MILLISECONDS.toNanos(500L); // send batches of up to 500 millis
public static final String USER_AGENT = "NiFi-Site-to-Site"; public static final String USER_AGENT = "NiFi-Site-to-Site";
public static final String CONTENT_TYPE = "application/octet-stream"; public static final String CONTENT_TYPE = "application/octet-stream";
@ -98,7 +98,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return targetRunning.get(); return targetRunning.get();
} }
public void setTargetRunning(boolean targetRunning) { public void setTargetRunning(final boolean targetRunning) {
this.targetRunning.set(targetRunning); this.targetRunning.set(targetRunning);
} }
@ -126,12 +126,12 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
super.onSchedulingStart(); super.onSchedulingStart();
final SiteToSiteClient client = new SiteToSiteClient.Builder() final SiteToSiteClient client = new SiteToSiteClient.Builder()
.url(remoteGroup.getTargetUri().toString()) .url(remoteGroup.getTargetUri().toString())
.portIdentifier(getIdentifier()) .portIdentifier(getIdentifier())
.sslContext(sslContext) .sslContext(sslContext)
.eventReporter(remoteGroup.getEventReporter()) .eventReporter(remoteGroup.getEventReporter())
.peerPersistenceFile(getPeerPersistenceFile(getIdentifier())) .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
.build(); .build();
clientRef.set(client); clientRef.set(client);
} }
@ -147,7 +147,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return; return;
} }
String url = getRemoteProcessGroup().getTargetUri().toString(); final String url = getRemoteProcessGroup().getTargetUri().toString();
// If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise, // If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise,
// we don't want to create a transaction at all. // we don't want to create a transaction at all.
@ -230,7 +230,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return remoteGroup.getYieldDuration(); return remoteGroup.getYieldDuration();
} }
private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session, FlowFile firstFlowFile) throws IOException, ProtocolException { private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session, final FlowFile firstFlowFile) throws IOException, ProtocolException {
FlowFile flowFile = firstFlowFile; FlowFile flowFile = firstFlowFile;
try { try {
@ -288,7 +288,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{ logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{
this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate}); this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
return flowFilesSent.size(); return flowFilesSent.size();
} catch (final Exception e) { } catch (final Exception e) {
@ -345,7 +345,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
final String dataSize = FormatUtils.formatDataSize(bytesReceived); final String dataSize = FormatUtils.formatDataSize(bytesReceived);
logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{ logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate}); this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
} }
return flowFilesReceived.size(); return flowFilesReceived.size();
@ -367,16 +367,16 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
ValidationResult error = null; ValidationResult error = null;
if (!targetExists.get()) { if (!targetExists.get()) {
error = new ValidationResult.Builder() error = new ValidationResult.Builder()
.explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName())) .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
.subject(String.format("Remote port '%s'", getName())) .subject(String.format("Remote port '%s'", getName()))
.valid(false) .valid(false)
.build(); .build();
} else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) { } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
error = new ValidationResult.Builder() error = new ValidationResult.Builder()
.explanation(String.format("Port '%s' has no outbound connections", getName())) .explanation(String.format("Port '%s' has no outbound connections", getName()))
.subject(String.format("Remote port '%s'", getName())) .subject(String.format("Remote port '%s'", getName()))
.valid(false) .valid(false)
.build(); .build();
} }
if (error != null) { if (error != null) {