diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java index 0ec8951a43..a7bd0945f0 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java @@ -41,6 +41,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -202,6 +203,11 @@ public class PeerSelector { } } + // Shuffle destinations to provide better distribution. + // Without this, same host will be used continuously, especially when remote peers have the same number of queued files. + // Use Random(0) to provide consistent result for unit testing. Randomness is not important to shuffle destinations. + Collections.shuffle(destinations, new Random(0)); + final StringBuilder distributionDescription = new StringBuilder(); distributionDescription.append("New Weighted Distribution of Nodes:"); for (final Map.Entry entry : entryCountMap.entrySet()) { diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java index c434c7b4cd..6a69fee656 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/TestPeerSelector.java @@ -39,6 +39,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -59,6 +60,40 @@ public class TestPeerSelector { })); } + @Test + public void testFormulateDestinationListForOutputEven() throws IOException { + final Set collection = new HashSet<>(); + collection.add(new PeerStatus(new PeerDescription("Node1", 1111, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("Node2", 2222, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("Node3", 3333, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("Node4", 4444, true), 4096, true)); + collection.add(new PeerStatus(new PeerDescription("Node5", 5555, true), 4096, true)); + + PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class); + PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null); + + final List destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE); + final Map selectedCounts = calculateAverageSelectedCount(collection, destinations); + + logger.info("selectedCounts={}", selectedCounts); + + int consecutiveSamePeerCount = 0; + PeerStatus previousPeer = null; + for (PeerStatus peer : destinations) { + if (previousPeer != null && peer.getPeerDescription().equals(previousPeer.getPeerDescription())) { + consecutiveSamePeerCount++; + // The same peer shouldn't be used consecutively (number of nodes - 1) times or more. + if (consecutiveSamePeerCount >= (collection.size() - 1)) { + fail("The same peer is returned consecutively too frequently."); + } + } else { + consecutiveSamePeerCount = 0; + } + previousPeer = peer; + } + + } + @Test public void testFormulateDestinationListForOutput() throws IOException { final Set collection = new HashSet<>(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java new file mode 100644 index 0000000000..e1d63f813d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/BatchSettingsDTO.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import com.wordnik.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; + +/** + * Details of batch settings of a remote process group port. + */ +@XmlType(name = "batchSettings") +public class BatchSettingsDTO { + + private Integer count; + private String size; + private String duration; + + /** + * @return preferred number of flow files to include in a transaction + */ + @ApiModelProperty( + value = "Preferred number of flow files to include in a transaction." + ) + public Integer getCount() { + return count; + } + + public void setCount(Integer count) { + this.count = count; + } + + /** + * @return preferred number of bytes to include in a transaction + */ + @ApiModelProperty( + value = "Preferred number of bytes to include in a transaction." + ) + public String getSize() { + return size; + } + + public void setSize(String size) { + this.size = size; + } + + /** + * @return preferred amount of time that a transaction should span + */ + @ApiModelProperty( + value = "Preferred amount of time that a transaction should span." + ) + public String getDuration() { + return duration; + } + + public void setDuration(String duration) { + this.duration = duration; + } + + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java index e4a8131323..2a34d9c7e9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java @@ -35,6 +35,7 @@ public class RemoteProcessGroupPortDTO { private Boolean exists; private Boolean targetRunning; private Boolean connected; + private BatchSettingsDTO batchSettings; /** * @return comments as configured in the target port @@ -176,6 +177,20 @@ public class RemoteProcessGroupPortDTO { this.connected = connected; } + /** + * @return batch settings for data transmission + */ + @ApiModelProperty( + value = "The batch settings for data transmission." + ) + public BatchSettingsDTO getBatchSettings() { + return batchSettings; + } + + public void setBatchSettings(BatchSettingsDTO batchSettings) { + this.batchSettings = batchSettings; + } + @Override public int hashCode() { return 923847 + String.valueOf(name).hashCode(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java index 4d7f774432..c330c1342d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java @@ -54,6 +54,21 @@ public interface RemoteProcessGroupPortDescriptor { */ Boolean getUseCompression(); + /** + * @return Preferred number of flow files to include in a transaction + */ + Integer getBatchCount(); + + /** + * @return Preferred number of bytes to include in a transaction + */ + String getBatchSize(); + + /** + * @return Preferred amount of for a transaction to span + */ + String getBatchDuration(); + /** * @return Whether or not the target port exists */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java index f8f4b20922..07faf42036 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java @@ -41,4 +41,16 @@ public abstract class RemoteGroupPort extends AbstractPort implements Port, Remo public abstract boolean getTargetExists(); public abstract boolean isTargetRunning(); + + public abstract Integer getBatchCount(); + + public abstract void setBatchCount(Integer batchCount); + + public abstract String getBatchSize(); + + public abstract void setBatchSize(String batchSize); + + public abstract String getBatchDuration(); + + public abstract void setBatchDuration(String batchDuration); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 151640e40f..b628668d1a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -195,6 +195,7 @@ import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.api.dto.BatchSettingsDTO; import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; @@ -2011,6 +2012,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R descriptor.setConcurrentlySchedulableTaskCount(port.getConcurrentlySchedulableTaskCount()); descriptor.setTransmitting(port.isTransmitting()); descriptor.setUseCompression(port.getUseCompression()); + final BatchSettingsDTO batchSettings = port.getBatchSettings(); + if (batchSettings != null) { + descriptor.setBatchCount(batchSettings.getCount()); + descriptor.setBatchSize(batchSettings.getSize()); + descriptor.setBatchDuration(batchSettings.getDuration()); + } remotePorts.add(descriptor); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index 731d914bb1..3bd037d50c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -357,6 +357,9 @@ public class FlowFromDOMFactory { descriptor.setComments(getString(element, "comments")); descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks")); descriptor.setUseCompression(getBoolean(element, "useCompression")); + descriptor.setBatchCount(getOptionalInt(element, "batchCount")); + descriptor.setBatchSize(getString(element, "batchSize")); + descriptor.setBatchDuration(getString(element, "batchDuration")); descriptor.setTransmitting("RUNNING".equalsIgnoreCase(getString(element, "scheduledState"))); return descriptor; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index aa7022b1f1..c5f3f48d18 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -313,6 +313,18 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "scheduledState", port.getScheduledState().name()); addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks()); addTextElement(element, "useCompression", String.valueOf(port.isUseCompression())); + final Integer batchCount = port.getBatchCount(); + if (batchCount != null && batchCount > 0) { + addTextElement(element, "batchCount", batchCount); + } + final String batchSize = port.getBatchSize(); + if (batchSize != null && batchSize.length() > 0) { + addTextElement(element, "batchSize", batchSize); + } + final String batchDuration = port.getBatchDuration(); + if (batchDuration != null && batchDuration.length() > 0) { + addTextElement(element, "batchDuration", batchDuration); + } parentElement.appendChild(element); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 1db80fc82f..d9e048e713 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -511,7 +511,7 @@ public class FingerprintFactory { } private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) { - for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression"}) { + for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) { appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName)); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 7aee480da7..0cc8433724 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -45,6 +45,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.net.ssl.SSLContext; import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; @@ -632,6 +633,15 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { if (descriptor.getUseCompression() != null) { port.setUseCompression(descriptor.getUseCompression()); } + if (descriptor.getBatchCount() != null && descriptor.getBatchCount() > 0) { + port.setBatchCount(descriptor.getBatchCount()); + } + if (!StringUtils.isBlank(descriptor.getBatchSize())) { + port.setBatchSize(descriptor.getBatchSize()); + } + if (!StringUtils.isBlank(descriptor.getBatchDuration())) { + port.setBatchDuration(descriptor.getBatchDuration()); + } } finally { writeLock.unlock(); } @@ -697,6 +707,15 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { if (descriptor.getUseCompression() != null) { port.setUseCompression(descriptor.getUseCompression()); } + if (descriptor.getBatchCount() != null && descriptor.getBatchCount() > 0) { + port.setBatchCount(descriptor.getBatchCount()); + } + if (!StringUtils.isBlank(descriptor.getBatchSize())) { + port.setBatchSize(descriptor.getBatchSize()); + } + if (!StringUtils.isBlank(descriptor.getBatchDuration())) { + port.setBatchDuration(descriptor.getBatchDuration()); + } inputPorts.put(descriptor.getId(), port); } finally { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java index ed901863db..c3a8f5e195 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java @@ -27,6 +27,9 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr private Integer concurrentlySchedulableTaskCount; private Boolean transmitting; private Boolean useCompression; + private Integer batchCount; + private String batchSize; + private String batchDuration; private Boolean exists; private Boolean targetRunning; private Boolean connected; @@ -94,6 +97,33 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr this.useCompression = useCompression; } + @Override + public Integer getBatchCount() { + return batchCount; + } + + public void setBatchCount(Integer batchCount) { + this.batchCount = batchCount; + } + + @Override + public String getBatchSize() { + return batchSize; + } + + public void setBatchSize(String batchSize) { + this.batchSize = batchSize; + } + + @Override + public String getBatchDuration() { + return batchDuration; + } + + public void setBatchDuration(String batchDuration) { + this.batchDuration = batchDuration; + } + @Override public Boolean getExists() { return exists; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 8cf2ad87a7..30fff1ff41 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -281,6 +281,9 @@ + + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java index 87a372d1d8..67f1ad4e85 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/fingerprint/FingerprintFactoryTest.java @@ -26,14 +26,17 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.lang.reflect.Method; +import java.util.Collections; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.connectable.Position; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.serialization.FlowSerializer; import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.util.NiFiProperties; import org.junit.Before; @@ -273,4 +276,42 @@ public class FingerprintFactoryTest { assertEquals(expected.toString(), fingerprint("addRemoteProcessGroupFingerprint", Element.class, componentElement)); } + @Test + public void testRemotePortFingerprint() throws Exception { + + // Fill out every configuration. + final RemoteProcessGroup groupComponent = mock(RemoteProcessGroup.class); + when(groupComponent.getName()).thenReturn("name"); + when(groupComponent.getIdentifier()).thenReturn("id"); + when(groupComponent.getPosition()).thenReturn(new Position(10.5, 20.3)); + when(groupComponent.getTargetUri()).thenReturn("http://node1:8080/nifi"); + when(groupComponent.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW); + + final RemoteGroupPort portComponent = mock(RemoteGroupPort.class); + when(groupComponent.getInputPorts()).thenReturn(Collections.singleton(portComponent)); + when(portComponent.getName()).thenReturn("portName"); + when(portComponent.getIdentifier()).thenReturn("portId"); + when(portComponent.getPosition()).thenReturn(new Position(10.5, 20.3)); + when(portComponent.getComments()).thenReturn("portComment"); + when(portComponent.getScheduledState()).thenReturn(ScheduledState.RUNNING); + when(portComponent.getMaxConcurrentTasks()).thenReturn(3); + when(portComponent.isUseCompression()).thenReturn(true); + when(portComponent.getBatchCount()).thenReturn(1234); + when(portComponent.getBatchSize()).thenReturn("64KB"); + when(portComponent.getBatchDuration()).thenReturn("10sec"); + // Serializer doesn't serialize if a port doesn't have any connection. + when(portComponent.hasIncomingConnection()).thenReturn(true); + + // Assert fingerprints with expected one. + final String expected = "portId" + + "3" + + "true" + + "1234" + + "64KB" + + "10sec"; + + final Element rootElement = serializeElement(encryptor, RemoteProcessGroup.class, groupComponent, "addRemoteProcessGroup"); + final Element componentElement = (Element) rootElement.getElementsByTagName("inputPort").item(0); + assertEquals(expected.toString(), fingerprint("addRemoteGroupPortFingerprint", Element.class, componentElement)); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index 92931f2859..b1288f3d36 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -46,11 +46,13 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.exception.PortNotRunningException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.UnknownPortException; @@ -80,6 +82,9 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { private static final Logger logger = LoggerFactory.getLogger(StandardRemoteGroupPort.class); private final RemoteProcessGroup remoteGroup; private final AtomicBoolean useCompression = new AtomicBoolean(false); + private final AtomicReference batchCount = new AtomicReference<>(); + private final AtomicReference batchSize = new AtomicReference<>(); + private final AtomicReference batchDuration = new AtomicReference<>(); private final AtomicBoolean targetExists = new AtomicBoolean(true); private final AtomicBoolean targetRunning = new AtomicBoolean(true); private final SSLContext sslContext; @@ -157,7 +162,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final long penalizationMillis = FormatUtils.getTimeDuration(remoteGroup.getYieldDuration(), TimeUnit.MILLISECONDS); - final SiteToSiteClient client = new SiteToSiteClient.Builder() + final SiteToSiteClient.Builder clientBuilder = new SiteToSiteClient.Builder() .urls(SiteToSiteRestApiClient.parseClusterUrls(remoteGroup.getTargetUris())) .portIdentifier(getIdentifier()) .sslContext(sslContext) @@ -168,9 +173,24 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { .timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) .transportProtocol(remoteGroup.getTransportProtocol()) .httpProxy(new HttpProxy(remoteGroup.getProxyHost(), remoteGroup.getProxyPort(), remoteGroup.getProxyUser(), remoteGroup.getProxyPassword())) - .localAddress(remoteGroup.getLocalAddress()) - .build(); - clientRef.set(client); + .localAddress(remoteGroup.getLocalAddress()); + + final Integer batchCount = getBatchCount(); + if (batchCount != null) { + clientBuilder.requestBatchCount(batchCount); + } + + final String batchSize = getBatchSize(); + if (batchSize != null && batchSize.length() > 0) { + clientBuilder.requestBatchSize(DataUnit.parseDataSize(batchSize.trim(), DataUnit.B).intValue()); + } + + final String batchDuration = getBatchDuration(); + if (batchDuration != null && batchDuration.length() > 0) { + clientBuilder.requestBatchDuration(FormatUtils.getTimeDuration(batchDuration.trim(), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + } + + clientRef.set(clientBuilder.build()); } @Override @@ -278,6 +298,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { final StopWatch stopWatch = new StopWatch(true); long bytesSent = 0L; + final SiteToSiteClientConfig siteToSiteClientConfig = getSiteToSiteClient().getConfig(); + final long maxBatchBytes = siteToSiteClientConfig.getPreferredBatchSize(); + final int maxBatchCount = siteToSiteClientConfig.getPreferredBatchCount(); + final long preferredBatchDuration = siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS); + final long maxBatchDuration = preferredBatchDuration > 0 ? preferredBatchDuration : BATCH_SEND_NANOS; + + final Set flowFilesSent = new HashSet<>(); boolean continueTransaction = true; while (continueTransaction) { @@ -304,10 +331,15 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { session.remove(flowFile); final long sendingNanos = System.nanoTime() - startSendingNanos; - if (sendingNanos < BATCH_SEND_NANOS) { - flowFile = session.get(); - } else { + + if (maxBatchCount > 0 && flowFilesSent.size() >= maxBatchCount) { flowFile = null; + } else if (maxBatchBytes > 0 && bytesSent >= maxBatchBytes) { + flowFile = null; + } else if (sendingNanos >= maxBatchDuration) { + flowFile = null; + } else { + flowFile = session.get(); } continueTransaction = (flowFile != null); @@ -477,6 +509,36 @@ public class StandardRemoteGroupPort extends RemoteGroupPort { return useCompression.get(); } + @Override + public Integer getBatchCount() { + return batchCount.get(); + } + + @Override + public void setBatchCount(Integer batchCount) { + this.batchCount.set(batchCount); + } + + @Override + public String getBatchSize() { + return batchSize.get(); + } + + @Override + public void setBatchSize(String batchSize) { + this.batchSize.set(batchSize); + } + + @Override + public String getBatchDuration() { + return batchDuration.get(); + } + + @Override + public void setBatchDuration(String batchDuration) { + this.batchDuration.set(batchDuration); + } + @Override public String toString() { return "RemoteGroupPort[name=" + getName() + ",targets=" + remoteGroup.getTargetUris() + "]"; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java index 31cd1549c5..f677b88db1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java @@ -19,6 +19,7 @@ package org.apache.nifi.remote; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes; import org.apache.nifi.groups.ProcessGroup; @@ -28,6 +29,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.io.http.HttpCommunicationsSession; import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; import org.apache.nifi.remote.protocol.CommunicationsSession; @@ -43,17 +45,23 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.nio.channels.SocketChannel; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; import org.apache.nifi.util.NiFiProperties; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -70,7 +78,7 @@ public class TestStandardRemoteGroupPort { private Transaction transaction; private EventReporter eventReporter; private ProcessGroup processGroup; - public static final String REMOTE_CLUSTER_URL = "http://node0.example.com:8080/nifi"; + private static final String REMOTE_CLUSTER_URL = "http://node0.example.com:8080/nifi"; private StandardRemoteGroupPort port; private SharedSessionState sessionState; private MockProcessSession processSession; @@ -84,17 +92,18 @@ public class TestStandardRemoteGroupPort { private void setupMock(final SiteToSiteTransportProtocol protocol, final TransferDirection direction) throws Exception { - setupMock(protocol, direction, mock(Transaction.class)); + final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder().buildConfig(); + setupMock(protocol, direction, siteToSiteClientConfig); } private void setupMock(final SiteToSiteTransportProtocol protocol, final TransferDirection direction, - final Transaction transaction) throws Exception { + final SiteToSiteClientConfig siteToSiteClientConfig) throws Exception { processGroup = null; remoteGroup = mock(RemoteProcessGroup.class); scheduler = null; siteToSiteClient = mock(SiteToSiteClient.class); - this.transaction = transaction; + this.transaction = mock(Transaction.class); eventReporter = mock(EventReporter.class); @@ -119,6 +128,7 @@ public class TestStandardRemoteGroupPort { doReturn(REMOTE_CLUSTER_URL).when(remoteGroup).getTargetUri(); doReturn(siteToSiteClient).when(port).getSiteToSiteClient(); doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction)); + doReturn(siteToSiteClientConfig).when(siteToSiteClient).getConfig(); doReturn(eventReporter).when(remoteGroup).getEventReporter(); } @@ -245,6 +255,144 @@ public class TestStandardRemoteGroupPort { assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails()); } + @Test + public void testSendBatchByCount() throws Exception { + + final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder() + .requestBatchCount(2) + .buildConfig(); + setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, siteToSiteClientConfig); + + // t1 = {0, 1}, t2 = {2, 3}, t3 = {4} + final int[] expectedNumberOfPackets = {2, 2, 1}; + testSendBatch(expectedNumberOfPackets); + + } + + @Test + public void testSendBatchBySize() throws Exception { + + final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder() + .requestBatchSize(30) + .buildConfig(); + setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, siteToSiteClientConfig); + + // t1 = {10, 11, 12}, t2 = {13, 14} + final int[] expectedNumberOfPackets = {3, 2}; + testSendBatch(expectedNumberOfPackets); + + } + + @Test + public void testSendBatchByDuration() throws Exception { + + final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder() + .requestBatchDuration(1, TimeUnit.NANOSECONDS) + .buildConfig(); + setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, siteToSiteClientConfig); + + // t1 = {1}, t2 = {2} .. and so on. + final int[] expectedNumberOfPackets = {1, 1, 1, 1, 1}; + testSendBatch(expectedNumberOfPackets); + + } + + /** + * Generate flow files to be sent, and execute port's onTrigger method. + * Finally, this method verifies whether packets are sent as expected. + * @param expectedNumberOfPackets Specify how many packets should be sent by each transaction. + * E.g. passing {2, 2, 1}, would generate 5 flow files in total. + * Based on the siteToSiteClientConfig batch parameters, + * it's expected to be sent via 3 transactions, + * transaction 0 will send flow file 0 and 1, + * transaction 1 will send flow file 2 and 3, + * and transaction 2 will send flow file 4. + * Each flow file has different content size generated automatically. + * The content size starts with 10, and increases as more flow files are generated. + * E.g. flow file 1 will have 10 bytes, flow file 2 has 11 bytes, f3 has 12 and so on. + * + */ + private void testSendBatch(final int[] expectedNumberOfPackets) throws Exception { + + setupMockProcessSession(); + + final String peerUrl = "http://node1.example.com:8080/nifi"; + final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, false); + final HttpCommunicationsSession commsSession = new HttpCommunicationsSession(); + final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL); + + final String flowFileEndpointUri = "http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files"; + + doReturn(peer).when(transaction).getCommunicant(); + commsSession.setDataTransferUrl(flowFileEndpointUri); + + // Capture packets being sent to the remote peer + final AtomicInteger totalPacketsSent = new AtomicInteger(0); + final List> sentPackets = new ArrayList<>(expectedNumberOfPackets.length); + final List sentPacketsPerTransaction = new ArrayList<>(); + doAnswer(invocation -> { + sentPacketsPerTransaction.add((DataPacket)invocation.getArguments()[0]); + totalPacketsSent.incrementAndGet(); + return null; + }).when(transaction).send(any(DataPacket.class)); + doAnswer(invocation -> { + sentPackets.add(new ArrayList<>(sentPacketsPerTransaction)); + sentPacketsPerTransaction.clear(); + return null; + }).when(transaction).confirm(); + + + // Execute onTrigger while offering new flow files. + final List flowFiles = new ArrayList<>(); + for (int i = 0; i < expectedNumberOfPackets.length; i++) { + int numOfPackets = expectedNumberOfPackets[i]; + int startF = flowFiles.size(); + int endF = startF + numOfPackets; + IntStream.range(startF, endF).forEach(f -> { + final StringBuilder flowFileContents = new StringBuilder("0123456789"); + for (int c = 0; c < f; c++) { + flowFileContents.append(c); + } + final byte[] bytes = flowFileContents.toString().getBytes(); + final MockFlowFile flowFile = spy(processSession.createFlowFile(bytes)); + when(flowFile.getSize()).then(invocation -> { + Thread.sleep(1); // For testSendBatchByDuration + return bytes.length; + }); + sessionState.getFlowFileQueue().offer(flowFile); + flowFiles.add(flowFile); + }); + port.onTrigger(processContext, processSession); + } + + // Verify transactions, sent packets, and provenance events. + assertEquals(flowFiles.size(), totalPacketsSent.get()); + assertEquals("The number of transactions should match as expected.", expectedNumberOfPackets.length, sentPackets.size()); + final List provenanceEvents = sessionState.getProvenanceEvents(); + assertEquals(flowFiles.size(), provenanceEvents.size()); + + int f = 0; + for (int i = 0; i < expectedNumberOfPackets.length; i++) { + final List dataPackets = sentPackets.get(i); + assertEquals(expectedNumberOfPackets[i], dataPackets.size()); + + for (int p = 0; p < dataPackets.size(); p++) { + final FlowFile flowFile = flowFiles.get(f); + + // Assert sent packet + final DataPacket dataPacket = dataPackets.get(p); + assertEquals(flowFile.getSize(), dataPacket.getSize()); + + // Assert provenance event + final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(f); + assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType()); + assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri()); + + f++; + } + } + } + @Test public void testReceiveHttp() throws Exception { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java index 5a69cfe49e..690d404d8b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/RemoteProcessGroupAuditor.java @@ -93,6 +93,15 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor { .setConvertName(PORT_NAME_CONVERT), new ConfigurationRecorder("Compressed", dto -> dto.getUseCompression() != null, RemoteGroupPort::isUseCompression) + .setConvertName(PORT_NAME_CONVERT), + new ConfigurationRecorder("Batch Count", + dto -> dto.getBatchSettings() != null && dto.getBatchSettings().getCount() != null, RemoteGroupPort::getBatchCount) + .setConvertName(PORT_NAME_CONVERT), + new ConfigurationRecorder("Batch Size", + dto -> dto.getBatchSettings() != null && dto.getBatchSettings().getSize() != null, RemoteGroupPort::getBatchSize) + .setConvertName(PORT_NAME_CONVERT), + new ConfigurationRecorder("Batch Duration", + dto -> dto.getBatchSettings() != null && dto.getBatchSettings().getDuration() != null, RemoteGroupPort::getBatchDuration) .setConvertName(PORT_NAME_CONVERT) ); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index aaa33d0f9f..51548118be 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -1493,6 +1493,12 @@ public final class DtoFactory { dto.setUseCompression(port.isUseCompression()); dto.setExists(port.getTargetExists()); + final BatchSettingsDTO batchDTO = new BatchSettingsDTO(); + batchDTO.setCount(port.getBatchCount()); + batchDTO.setSize(port.getBatchSize()); + batchDTO.setDuration(port.getBatchDuration()); + dto.setBatchSettings(batchDTO); + // determine if this port is currently connected to another component locally if (ConnectableType.REMOTE_OUTPUT_PORT.equals(port.getConnectableType())) { dto.setConnected(!port.getConnections().isEmpty()); @@ -2962,6 +2968,13 @@ public final class DtoFactory { copy.setConcurrentlySchedulableTaskCount(original.getConcurrentlySchedulableTaskCount()); copy.setUseCompression(original.getUseCompression()); copy.setExists(original.getExists()); + final BatchSettingsDTO batchOrg = original.getBatchSettings(); + if (batchOrg != null) { + final BatchSettingsDTO batchCopy = new BatchSettingsDTO(); + batchCopy.setCount(batchOrg.getCount()); + batchCopy.setSize(batchOrg.getSize()); + batchCopy.setDuration(batchOrg.getDuration()); + } return copy; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java index a93c41084f..039244227c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java @@ -29,10 +29,12 @@ import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.exception.ValidationException; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.api.dto.BatchSettingsDTO; import org.apache.nifi.web.api.dto.DtoFactory; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; @@ -203,7 +205,9 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot // verify update when appropriate - if (isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(), remoteProcessGroupPortDto.getUseCompression())) { + if (isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(), + remoteProcessGroupPortDto.getUseCompression(), + remoteProcessGroupPortDto.getBatchSettings())) { port.verifyCanUpdate(); } } @@ -219,6 +223,30 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot validationErrors.add(String.format("Concurrent tasks for port '%s' must be a positive integer.", remoteGroupPort.getName())); } + final BatchSettingsDTO batchSettingsDTO = remoteProcessGroupPortDTO.getBatchSettings(); + if (batchSettingsDTO != null) { + final Integer batchCount = batchSettingsDTO.getCount(); + if (isNotNull(batchCount) && batchCount < 0) { + validationErrors.add(String.format("Batch count for port '%s' must be a positive integer.", remoteGroupPort.getName())); + } + + final String batchSize = batchSettingsDTO.getSize(); + if (isNotNull(batchSize) && batchSize.length() > 0 + && !DataUnit.DATA_SIZE_PATTERN.matcher(batchSize.trim().toUpperCase()).matches()) { + validationErrors.add(String.format("Batch size for port '%s' must be of format " + + " where is a non-negative integer and is a supported Data" + + " Unit, such as: B, KB, MB, GB, TB", remoteGroupPort.getName())); + } + + final String batchDuration = batchSettingsDTO.getDuration(); + if (isNotNull(batchDuration) && batchDuration.length() > 0 + && !FormatUtils.TIME_DURATION_PATTERN.matcher(batchDuration.trim().toLowerCase()).matches()) { + validationErrors.add(String.format("Batch duration for port '%s' must be of format " + + " where is a non-negative integer and TimeUnit is a supported Time Unit, such " + + "as: nanos, millis, secs, mins, hrs, days", remoteGroupPort.getName())); + } + } + return validationErrors; } @@ -284,22 +312,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot verifyUpdatePort(port, remoteProcessGroupPortDto); // perform the update - if (isNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount())) { - port.setMaxConcurrentTasks(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount()); - } - if (isNotNull(remoteProcessGroupPortDto.getUseCompression())) { - port.setUseCompression(remoteProcessGroupPortDto.getUseCompression()); - } - - final Boolean isTransmitting = remoteProcessGroupPortDto.isTransmitting(); - if (isNotNull(isTransmitting)) { - // start or stop as necessary - if (!port.isRunning() && isTransmitting) { - remoteProcessGroup.startTransmitting(port); - } else if (port.isRunning() && !isTransmitting) { - remoteProcessGroup.stopTransmitting(port); - } - } + updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup); return port; } @@ -318,6 +331,19 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot verifyUpdatePort(port, remoteProcessGroupPortDto); // perform the update + updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup); + + return port; + } + + /** + * + * @param port Port instance to be updated. + * @param remoteProcessGroupPortDto DTO containing updated remote process group port settings. + * @param remoteProcessGroup If remoteProcessGroupPortDto has updated isTransmitting input, + * this method will start or stop the port in this remoteProcessGroup as necessary. + */ + private void updatePort(RemoteGroupPort port, RemoteProcessGroupPortDTO remoteProcessGroupPortDto, RemoteProcessGroup remoteProcessGroup) { if (isNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount())) { port.setMaxConcurrentTasks(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount()); } @@ -325,6 +351,13 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot port.setUseCompression(remoteProcessGroupPortDto.getUseCompression()); } + final BatchSettingsDTO batchSettingsDTO = remoteProcessGroupPortDto.getBatchSettings(); + if (isNotNull(batchSettingsDTO)) { + port.setBatchCount(batchSettingsDTO.getCount()); + port.setBatchSize(batchSettingsDTO.getSize()); + port.setBatchDuration(batchSettingsDTO.getDuration()); + } + final Boolean isTransmitting = remoteProcessGroupPortDto.isTransmitting(); if (isNotNull(isTransmitting)) { // start or stop as necessary @@ -334,8 +367,6 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot remoteProcessGroup.stopTransmitting(port); } } - - return port; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java index 725e4d4fc4..ea7fa7d41e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestRemoteProcessGroupAuditor.java @@ -29,6 +29,7 @@ import org.apache.nifi.authorization.user.StandardNiFiUser; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; +import org.apache.nifi.web.api.dto.BatchSettingsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.dao.RemoteProcessGroupDAO; @@ -40,6 +41,7 @@ import org.springframework.security.core.context.SecurityContext; import org.springframework.security.core.context.SecurityContextHolder; import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicReference; import static org.apache.nifi.web.api.dto.DtoFactory.SENSITIVE_VALUE_MASK; @@ -434,6 +436,13 @@ public class TestRemoteProcessGroupAuditor { when(updatedRPGPort.getMaxConcurrentTasks()).thenReturn(inputRPGPortDTO.getConcurrentlySchedulableTaskCount()); when(updatedRPGPort.isUseCompression()).thenReturn(inputRPGPortDTO.getUseCompression()); + final BatchSettingsDTO batchSettings = inputRPGPortDTO.getBatchSettings(); + if (batchSettings != null) { + when(updatedRPGPort.getBatchCount()).thenReturn(batchSettings.getCount()); + when(updatedRPGPort.getBatchSize()).thenReturn(batchSettings.getSize()); + when(updatedRPGPort.getBatchDuration()).thenReturn(batchSettings.getDuration()); + } + when(joinPoint.proceed()).thenReturn(updatedRPGPort); // Capture added actions so that those can be asserted later. @@ -553,4 +562,34 @@ public class TestRemoteProcessGroupAuditor { assertConfigureDetails(action.getActionDetails(), "input-port-1.Compressed", "false", "true"); } + + @Test + public void testConfigurePortBatchSettings() throws Throwable { + + final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort(); + when(existingRPGPort.getName()).thenReturn("input-port-1"); + + final RemoteProcessGroupPortDTO inputRPGPortDTO = defaultRemoteProcessGroupPortDTO(); + final BatchSettingsDTO batchSettingsDTO = new BatchSettingsDTO(); + batchSettingsDTO.setCount(1234); + batchSettingsDTO.setSize("64KB"); + batchSettingsDTO.setDuration("10sec"); + inputRPGPortDTO.setBatchSettings(batchSettingsDTO); + + final Collection actions = updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort); + + assertEquals(3, actions.size()); + final Iterator iterator = actions.iterator(); + Action action = iterator.next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch Count", "0", "1234"); + + action = iterator.next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch Size", "", "64KB"); + + action = iterator.next(); + assertEquals(Operation.Configure, action.getOperation()); + assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch Duration", "", "10sec"); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java new file mode 100644 index 0000000000..f68a115d6e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/TestStandardRemoteProcessGroupDAO.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.dao.impl; + +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.exception.ValidationException; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.web.api.dto.BatchSettingsDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestStandardRemoteProcessGroupDAO { + + private void validate(final StandardRemoteProcessGroupDAO dao, final RemoteProcessGroupPortDTO dto, final String ... errMessageKeywords) { + try { + dao.verifyUpdateInputPort(dto.getGroupId(), dto); + if (errMessageKeywords.length > 0) { + fail("Validation should fail with keywords: " + Arrays.asList(errMessageKeywords)); + } + } catch (ValidationException e) { + if (errMessageKeywords.length == 0) { + fail("Validation should pass, but failed with: " + e); + } + final List validationErrors = e.getValidationErrors(); + assertEquals("Validation should return one validationErrors", 1, validationErrors.size()); + final String validationError = validationErrors.get(0); + for (String errMessageKeyword : errMessageKeywords) { + assertTrue("validation error message should contain " + errMessageKeyword + ", but was: " + validationError, + validationError.contains(errMessageKeyword)); + } + } + } + + @Test + public void testVerifyUpdateInputPort() { + final StandardRemoteProcessGroupDAO dao = new StandardRemoteProcessGroupDAO(); + + final String remoteProcessGroupId = "remote-process-group-id"; + final String remoteProcessGroupInputPortId = "remote-process-group-input-port-id"; + + final FlowController flowController = mock(FlowController.class); + final ProcessGroup processGroup = mock(ProcessGroup.class); + final RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class); + final RemoteGroupPort remoteGroupPort = mock(RemoteGroupPort.class); + + dao.setFlowController(flowController); + when(flowController.getGroup(any())).thenReturn(processGroup); + when(processGroup.findRemoteProcessGroup(eq(remoteProcessGroupId))).thenReturn(remoteProcessGroup); + when(remoteProcessGroup.getInputPort(remoteProcessGroupInputPortId)).thenReturn(remoteGroupPort); + when(remoteGroupPort.getName()).thenReturn("remote-group-port"); + + final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO(); + dto.setGroupId(remoteProcessGroupId); + dto.setId(remoteProcessGroupInputPortId); + final BatchSettingsDTO batchSettings = new BatchSettingsDTO(); + dto.setBatchSettings(batchSettings); + + // Empty input values should pass validation. + dao.verifyUpdateInputPort(remoteProcessGroupId, dto); + + // Concurrent tasks + dto.setConcurrentlySchedulableTaskCount(0); + validate(dao, dto, "Concurrent tasks", "positive integer"); + + dto.setConcurrentlySchedulableTaskCount(2); + validate(dao, dto); + + // Batch count + batchSettings.setCount(-1); + validate(dao, dto, "Batch count", "positive integer"); + + batchSettings.setCount(0); + validate(dao, dto); + + batchSettings.setCount(1000); + validate(dao, dto); + + // Batch size + batchSettings.setSize("AB"); + validate(dao, dto, "Batch size", "Data Size"); + + batchSettings.setSize("10 days"); + validate(dao, dto, "Batch size", "Data Size"); + + batchSettings.setSize("300MB"); + validate(dao, dto); + + // Batch duration + batchSettings.setDuration("AB"); + validate(dao, dto, "Batch duration", "Time Unit"); + + batchSettings.setDuration("10 KB"); + validate(dao, dto, "Batch duration", "Time Unit"); + + batchSettings.setDuration("10 secs"); + validate(dao, dto); + + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp index 8f38369283..a62700c84c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/remote-port-configuration.jsp @@ -37,6 +37,40 @@ Compressed +
+ +
+
+ Batch Settings: +
+
+
+ Count +
+
+
+ +
+
+
+
+ Size +
+
+
+ +
+
+
+
+ Duration +
+
+
+ +
+
+
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css index 58dcfcd005..4a3fad4f19 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/remote-process-group-configuration.css @@ -125,6 +125,7 @@ div.remote-port-description { div.concurrent-task-container { float: left; + width: 200px; } img.concurrent-tasks-info { @@ -133,7 +134,11 @@ img.concurrent-tasks-info { } div.compression-container { - float: right; + float: left; +} + +div.batch-settings-container { + margin-top: 8px; } div.remote-port-transmission-container { @@ -165,7 +170,7 @@ div.disabled-transmission-switch { #remote-port-concurrent-tasks { font-size: 11px !important; float: left; - width: 75%; + width: 266px; } #remote-port-use-compression-container { @@ -179,4 +184,10 @@ div.disabled-transmission-switch { #remote-port-concurrent-task-header { margin-top: 5px; -} \ No newline at end of file +} + +div.batch-setting { + margin-right: 8px; + width: 30%; + float: left; +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js index 519a00bbdd..4232cabfdd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-remote-process-group-ports.js @@ -73,9 +73,19 @@ handler: { click: function () { var remotePortConcurrentTasks = $('#remote-port-concurrent-tasks').val(); + var remotePortBatchCount = $('#remote-port-batch-count').val(); + var portValidationErrors = new Array(); // ensure the property name and value is specified - if ($.isNumeric(remotePortConcurrentTasks)) { + if (!$.isNumeric(remotePortConcurrentTasks)) { + portValidationErrors.push("Concurrent tasks must be an integer value."); + } + + if (remotePortBatchCount && !$.isNumeric(remotePortBatchCount)) { + portValidationErrors.push("Batch Settings: count must be an integer value."); + } + + if (portValidationErrors.length == 0) { var remoteProcessGroupId = $('#remote-process-group-ports-id').text(); var remoteProcessGroupData = d3.select('#id-' + remoteProcessGroupId).datum(); var remotePortId = $('#remote-port-id').text(); @@ -87,7 +97,12 @@ id: remotePortId, groupId: remoteProcessGroupId, useCompression: $('#remote-port-use-compression').hasClass('checkbox-checked'), - concurrentlySchedulableTaskCount: remotePortConcurrentTasks + concurrentlySchedulableTaskCount: remotePortConcurrentTasks, + batchSettings : { + count: remotePortBatchCount, + size: $('#remote-port-batch-size').val(), + duration: $('#remote-port-batch-duration').val() + } } }; @@ -121,6 +136,12 @@ // set the new values $('#' + remotePortId + '-concurrent-tasks').text(remotePort.concurrentlySchedulableTaskCount); $('#' + remotePortId + '-compression').text(compressionLabel); + + var batchSettings = getBatchSettingsDisplayValues(remotePort); + $('#' + remotePortId + '-batch-count').text(batchSettings.count); + $('#' + remotePortId + '-batch-size').text(batchSettings.size); + $('#' + remotePortId + '-batch-duration').text(batchSettings.duration); + }).fail(function (xhr, status, error) { if (xhr.status === 400) { var errors = xhr.responseText.split('\n'); @@ -146,7 +167,9 @@ } else { nfDialog.showOkDialog({ headerText: 'Remote Process Group Ports', - dialogContent: 'Concurrent tasks must be an integer value.' + dialogContent: portValidationErrors.reduce(function (prev, curr) { + return typeof(prev) === 'string' ? prev + ' ' + curr : curr; + }) }); // close the dialog @@ -175,6 +198,9 @@ $('#remote-port-name').text(''); $('#remote-port-concurrent-tasks').val(''); $('#remote-port-use-compression').removeClass('checkbox-checked checkbox-unchecked'); + $('#remote-port-batch-count').val(''); + $('#remote-port-batch-size').val(''); + $('#remote-port-batch-duration').val(''); } } }); @@ -232,6 +258,26 @@ }); }; + /** + * Create and return an object contains count, size and duration values to display. + * If port does not have batch settings or batch setting value is not defined, 'No value set' is displayed. + */ + var getBatchSettingsDisplayValues = function (port) { + var values = {}; + var batchSettings = port.batchSettings; + if (batchSettings) { + values.count = typeof(batchSettings.count) === 'number' ? batchSettings.count : 'No value set'; + values.size = batchSettings.size ? batchSettings.size : 'No value set'; + values.duration = batchSettings.duration ? batchSettings.duration : 'No value set'; + } else { + // if it doesn't have batch settings, clear values + values.count = 'No value set'; + values.size = 'No value set'; + values.duration = 'No value set'; + } + return values; + } + /** * Creates the markup for configuration concurrent tasks for a port. * @@ -287,9 +333,12 @@ var portName = $('#' + portId + '-name').text(); var portConcurrentTasks = $('#' + portId + '-concurrent-tasks').text(); var portCompression = $('#' + portId + '-compression').text() === 'Yes'; + var batchCount = $('#' + portId + '-batch-count').text(); + var batchSize = $('#' + portId + '-batch-size').text(); + var batchDuration = $('#' + portId + '-batch-duration').text(); // show the configuration dialog - configureRemotePort(port.id, portName, portConcurrentTasks, portCompression, portType); + configureRemotePort(port.id, portName, portConcurrentTasks, portCompression, batchCount, batchSize, batchDuration, portType); }).appendTo(portContainerEditContainer); // show/hide the edit button as appropriate @@ -472,6 +521,58 @@ '' + '').appendTo(compressionContainer); + // clear: Concurrent Tasks, Compressed + $('
').appendTo(portContainerDetailsContainer); + + // Batch related settings + var batchSettingsContainer = $('
') + .append($('
Batch Settings' + + '
')) + .appendTo(portContainerDetailsContainer); + + batchSettingsContainer.find('div.batch-settings-info').qtip($.extend({}, + nf.Common.config.tooltipConfig, + { + content: (portType === 'input' + ? 'The batch settings to control how this NiFi sends data to the remote input port in a transaction.' + + ' This NiFi will transfer as much flow files as they are queued in incoming relationships,' + + ' until any of these limits is met.' + + ' If none of these setting is specified, this NiFi uses 500 milliseconds batch duration by default.' + : 'The batch settings to tell the remote NiFi how this NiFi prefers to receive data from the remote output port in a transaction.' + + ' The remote NiFi will use these settings as a hint to control batch data transferring.' + + ' However, actual behavior depends on the version of remote NiFi instance.' + + ' Recent version of NiFi uses 5 seconds for batch duration if none of these setting is specified.') + })); + + var batchSettings = getBatchSettingsDisplayValues(port); + var batchCount = $('
').append($('
').text(batchSettings.count)); + var batchSize = $('
').append($('
').text(batchSettings.size)); + var batchDuration = $('
').append($('
').text(batchSettings.duration)); + + // add this ports batch count + $('
' + + '
' + + 'Count' + + '
' + + '
' + + '
').append(batchCount).appendTo(batchSettingsContainer); + + // add this ports batch size + $('
' + + '
' + + 'Size' + + '
' + + '
' + + '
').append(batchSize).appendTo(batchSettingsContainer); + + // add this ports batch duration + $('
' + + '
' + + 'Duration' + + '
' + + '
' + + '
').append(batchDuration).appendTo(batchSettingsContainer); + // clear $('
').appendTo(portContainer); @@ -489,9 +590,12 @@ * @argument {string} portName The port name * @argument {int} portConcurrentTasks The number of concurrent tasks for the port * @argument {boolean} portCompression The compression flag for the port + * @argument {int} batchCount The flow file count in a batch transaction + * @argument {string} batchSize The size of flow files in a batch transaction + * @argument {string} batchDuration The duration of a batch transaction * @argument {string} portType The type of port this is */ - var configureRemotePort = function (portId, portName, portConcurrentTasks, portCompression, portType) { + var configureRemotePort = function (portId, portName, portConcurrentTasks, portCompression, batchCount, batchSize, batchDuration, portType) { // set port identifiers $('#remote-port-id').text(portId); $('#remote-port-type').text(portType); @@ -503,6 +607,9 @@ } $('#remote-port-use-compression').addClass(checkState); $('#remote-port-concurrent-tasks').val(portConcurrentTasks); + $('#remote-port-batch-count').val(batchCount === 'No value set' ? null : batchCount); + $('#remote-port-batch-size').val(batchSize === 'No value set' ? null : batchSize); + $('#remote-port-batch-duration').val(batchDuration === 'No value set' ? null : batchDuration); // set the port name $('#remote-port-name').text(portName).ellipsis(); @@ -549,6 +656,12 @@ var connectedInputPorts = []; var disconnectedInputPorts = []; + var nameComparator = function (a, b) { + var nameA = a.name.toUpperCase(); + var nameB = b.name.toUpperCase(); + return nameA < nameB ? -1 : (nameA > nameB ? 1 : 0); + }; + // show connected ports first var inputPortContainer = $('#remote-process-group-input-ports-container'); $.each(remoteProcessGroupContents.inputPorts, function (_, inputPort) { @@ -559,6 +672,10 @@ } }); + // sort by port name within each port list + connectedInputPorts.sort(nameComparator); + disconnectedInputPorts.sort(nameComparator); + // add all connected input ports $.each(connectedInputPorts, function (_, inputPort) { createPortOption(inputPortContainer, inputPort, 'input'); @@ -586,6 +703,10 @@ } }); + // sort by port name within each port list + connectedOutputPorts.sort(nameComparator); + disconnectedOutputPorts.sort(nameComparator); + // add all connected output ports $.each(connectedOutputPorts, function (_, outputPort) { createPortOption(outputPortContainer, outputPort, 'output');