NIFI-1202: Site-to-Site batch settings.

- Added batchCount, batchSize, batchDuration to limit flow files to be
  included in a single Site-to-Site transaction.
- Added batch throttling logic when StandardRemoteGroupPort transfers
  flow files to a remote input port using the batch limit configurations,
  so that users can limit batch not only for pulling data, but also pushing data.
- Added destination list shuffle to provide better load distribution.
  Previously, the load distribution algorithm produced the same host consecutively.
- Added new batch settings to FlowConfiguration.xsd.
- Added new batch settings to Flow Fingerprint.
- Added new batch settings to Audit.
- Sort ports by name at 'Remote Process Group Ports' dialog.
- Show 'No value set' when a batch configuration is not set
- Updated batch settings tooltip to clearly explain how it works the configuration works differently for input and output ports.
- Updated DTO by separating batch settings to BatchSettingsDTO to indicate count, size and duration are a set of configurations.
- This closes #1306
This commit is contained in:
Koji Kawamura 2017-04-18 13:31:27 +09:00 committed by Matt Gilman
parent cf0e8bb1d3
commit a41a2a9b1a
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
24 changed files with 908 additions and 39 deletions

View File

@ -41,6 +41,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -202,6 +203,11 @@ public class PeerSelector {
}
}
// Shuffle destinations to provide better distribution.
// Without this, same host will be used continuously, especially when remote peers have the same number of queued files.
// Use Random(0) to provide consistent result for unit testing. Randomness is not important to shuffle destinations.
Collections.shuffle(destinations, new Random(0));
final StringBuilder distributionDescription = new StringBuilder();
distributionDescription.append("New Weighted Distribution of Nodes:");
for (final Map.Entry<PeerStatus, Integer> entry : entryCountMap.entrySet()) {

View File

@ -39,6 +39,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@ -59,6 +60,40 @@ public class TestPeerSelector {
}));
}
@Test
public void testFormulateDestinationListForOutputEven() throws IOException {
final Set<PeerStatus> collection = new HashSet<>();
collection.add(new PeerStatus(new PeerDescription("Node1", 1111, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("Node2", 2222, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("Node3", 3333, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("Node4", 4444, true), 4096, true));
collection.add(new PeerStatus(new PeerDescription("Node5", 5555, true), 4096, true));
PeerStatusProvider peerStatusProvider = Mockito.mock(PeerStatusProvider.class);
PeerSelector peerSelector = new PeerSelector(peerStatusProvider, null);
final List<PeerStatus> destinations = peerSelector.formulateDestinationList(collection, TransferDirection.RECEIVE);
final Map<String, Integer> selectedCounts = calculateAverageSelectedCount(collection, destinations);
logger.info("selectedCounts={}", selectedCounts);
int consecutiveSamePeerCount = 0;
PeerStatus previousPeer = null;
for (PeerStatus peer : destinations) {
if (previousPeer != null && peer.getPeerDescription().equals(previousPeer.getPeerDescription())) {
consecutiveSamePeerCount++;
// The same peer shouldn't be used consecutively (number of nodes - 1) times or more.
if (consecutiveSamePeerCount >= (collection.size() - 1)) {
fail("The same peer is returned consecutively too frequently.");
}
} else {
consecutiveSamePeerCount = 0;
}
previousPeer = peer;
}
}
@Test
public void testFormulateDestinationListForOutput() throws IOException {
final Set<PeerStatus> collection = new HashSet<>();

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto;
import com.wordnik.swagger.annotations.ApiModelProperty;
import javax.xml.bind.annotation.XmlType;
/**
* Details of batch settings of a remote process group port.
*/
@XmlType(name = "batchSettings")
public class BatchSettingsDTO {
private Integer count;
private String size;
private String duration;
/**
* @return preferred number of flow files to include in a transaction
*/
@ApiModelProperty(
value = "Preferred number of flow files to include in a transaction."
)
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
/**
* @return preferred number of bytes to include in a transaction
*/
@ApiModelProperty(
value = "Preferred number of bytes to include in a transaction."
)
public String getSize() {
return size;
}
public void setSize(String size) {
this.size = size;
}
/**
* @return preferred amount of time that a transaction should span
*/
@ApiModelProperty(
value = "Preferred amount of time that a transaction should span."
)
public String getDuration() {
return duration;
}
public void setDuration(String duration) {
this.duration = duration;
}
}

View File

@ -35,6 +35,7 @@ public class RemoteProcessGroupPortDTO {
private Boolean exists;
private Boolean targetRunning;
private Boolean connected;
private BatchSettingsDTO batchSettings;
/**
* @return comments as configured in the target port
@ -176,6 +177,20 @@ public class RemoteProcessGroupPortDTO {
this.connected = connected;
}
/**
* @return batch settings for data transmission
*/
@ApiModelProperty(
value = "The batch settings for data transmission."
)
public BatchSettingsDTO getBatchSettings() {
return batchSettings;
}
public void setBatchSettings(BatchSettingsDTO batchSettings) {
this.batchSettings = batchSettings;
}
@Override
public int hashCode() {
return 923847 + String.valueOf(name).hashCode();

View File

@ -54,6 +54,21 @@ public interface RemoteProcessGroupPortDescriptor {
*/
Boolean getUseCompression();
/**
* @return Preferred number of flow files to include in a transaction
*/
Integer getBatchCount();
/**
* @return Preferred number of bytes to include in a transaction
*/
String getBatchSize();
/**
* @return Preferred amount of for a transaction to span
*/
String getBatchDuration();
/**
* @return Whether or not the target port exists
*/

View File

@ -41,4 +41,16 @@ public abstract class RemoteGroupPort extends AbstractPort implements Port, Remo
public abstract boolean getTargetExists();
public abstract boolean isTargetRunning();
public abstract Integer getBatchCount();
public abstract void setBatchCount(Integer batchCount);
public abstract String getBatchSize();
public abstract void setBatchSize(String batchSize);
public abstract String getBatchDuration();
public abstract void setBatchDuration(String batchDuration);
}

View File

@ -195,6 +195,7 @@ import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BatchSettingsDTO;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
@ -2011,6 +2012,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
descriptor.setConcurrentlySchedulableTaskCount(port.getConcurrentlySchedulableTaskCount());
descriptor.setTransmitting(port.isTransmitting());
descriptor.setUseCompression(port.getUseCompression());
final BatchSettingsDTO batchSettings = port.getBatchSettings();
if (batchSettings != null) {
descriptor.setBatchCount(batchSettings.getCount());
descriptor.setBatchSize(batchSettings.getSize());
descriptor.setBatchDuration(batchSettings.getDuration());
}
remotePorts.add(descriptor);
}
}

View File

@ -357,6 +357,9 @@ public class FlowFromDOMFactory {
descriptor.setComments(getString(element, "comments"));
descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks"));
descriptor.setUseCompression(getBoolean(element, "useCompression"));
descriptor.setBatchCount(getOptionalInt(element, "batchCount"));
descriptor.setBatchSize(getString(element, "batchSize"));
descriptor.setBatchDuration(getString(element, "batchDuration"));
descriptor.setTransmitting("RUNNING".equalsIgnoreCase(getString(element, "scheduledState")));
return descriptor;

View File

@ -313,6 +313,18 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(element, "scheduledState", port.getScheduledState().name());
addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks());
addTextElement(element, "useCompression", String.valueOf(port.isUseCompression()));
final Integer batchCount = port.getBatchCount();
if (batchCount != null && batchCount > 0) {
addTextElement(element, "batchCount", batchCount);
}
final String batchSize = port.getBatchSize();
if (batchSize != null && batchSize.length() > 0) {
addTextElement(element, "batchSize", batchSize);
}
final String batchDuration = port.getBatchDuration();
if (batchDuration != null && batchDuration.length() > 0) {
addTextElement(element, "batchDuration", batchDuration);
}
parentElement.appendChild(element);
}

View File

@ -511,7 +511,7 @@ public class FingerprintFactory {
}
private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) {
for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression"}) {
for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) {
appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName));
}

View File

@ -45,6 +45,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
@ -632,6 +633,15 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
if (descriptor.getUseCompression() != null) {
port.setUseCompression(descriptor.getUseCompression());
}
if (descriptor.getBatchCount() != null && descriptor.getBatchCount() > 0) {
port.setBatchCount(descriptor.getBatchCount());
}
if (!StringUtils.isBlank(descriptor.getBatchSize())) {
port.setBatchSize(descriptor.getBatchSize());
}
if (!StringUtils.isBlank(descriptor.getBatchDuration())) {
port.setBatchDuration(descriptor.getBatchDuration());
}
} finally {
writeLock.unlock();
}
@ -697,6 +707,15 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
if (descriptor.getUseCompression() != null) {
port.setUseCompression(descriptor.getUseCompression());
}
if (descriptor.getBatchCount() != null && descriptor.getBatchCount() > 0) {
port.setBatchCount(descriptor.getBatchCount());
}
if (!StringUtils.isBlank(descriptor.getBatchSize())) {
port.setBatchSize(descriptor.getBatchSize());
}
if (!StringUtils.isBlank(descriptor.getBatchDuration())) {
port.setBatchDuration(descriptor.getBatchDuration());
}
inputPorts.put(descriptor.getId(), port);
} finally {

View File

@ -27,6 +27,9 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr
private Integer concurrentlySchedulableTaskCount;
private Boolean transmitting;
private Boolean useCompression;
private Integer batchCount;
private String batchSize;
private String batchDuration;
private Boolean exists;
private Boolean targetRunning;
private Boolean connected;
@ -94,6 +97,33 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr
this.useCompression = useCompression;
}
@Override
public Integer getBatchCount() {
return batchCount;
}
public void setBatchCount(Integer batchCount) {
this.batchCount = batchCount;
}
@Override
public String getBatchSize() {
return batchSize;
}
public void setBatchSize(String batchSize) {
this.batchSize = batchSize;
}
@Override
public String getBatchDuration() {
return batchDuration;
}
public void setBatchDuration(String batchDuration) {
this.batchDuration = batchDuration;
}
@Override
public Boolean getExists() {
return exists;

View File

@ -281,6 +281,9 @@
<xs:sequence>
<xs:element name="maxConcurrentTasks" type="xs:positiveInteger"></xs:element>
<xs:element name="useCompression" type="xs:boolean"></xs:element>
<xs:element name="batchCount" type="xs:positiveInteger" minOccurs="0" maxOccurs="1" />
<xs:element name="batchSize" type="xs:string" minOccurs="0" maxOccurs="1" />
<xs:element name="batchDuration" type="xs:string" minOccurs="0" maxOccurs="1" />
</xs:sequence>
</xs:extension>
</xs:complexContent>

View File

@ -26,14 +26,17 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collections;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.serialization.FlowSerializer;
import org.apache.nifi.controller.serialization.StandardFlowSerializer;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Before;
@ -273,4 +276,42 @@ public class FingerprintFactoryTest {
assertEquals(expected.toString(), fingerprint("addRemoteProcessGroupFingerprint", Element.class, componentElement));
}
@Test
public void testRemotePortFingerprint() throws Exception {
// Fill out every configuration.
final RemoteProcessGroup groupComponent = mock(RemoteProcessGroup.class);
when(groupComponent.getName()).thenReturn("name");
when(groupComponent.getIdentifier()).thenReturn("id");
when(groupComponent.getPosition()).thenReturn(new Position(10.5, 20.3));
when(groupComponent.getTargetUri()).thenReturn("http://node1:8080/nifi");
when(groupComponent.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
final RemoteGroupPort portComponent = mock(RemoteGroupPort.class);
when(groupComponent.getInputPorts()).thenReturn(Collections.singleton(portComponent));
when(portComponent.getName()).thenReturn("portName");
when(portComponent.getIdentifier()).thenReturn("portId");
when(portComponent.getPosition()).thenReturn(new Position(10.5, 20.3));
when(portComponent.getComments()).thenReturn("portComment");
when(portComponent.getScheduledState()).thenReturn(ScheduledState.RUNNING);
when(portComponent.getMaxConcurrentTasks()).thenReturn(3);
when(portComponent.isUseCompression()).thenReturn(true);
when(portComponent.getBatchCount()).thenReturn(1234);
when(portComponent.getBatchSize()).thenReturn("64KB");
when(portComponent.getBatchDuration()).thenReturn("10sec");
// Serializer doesn't serialize if a port doesn't have any connection.
when(portComponent.hasIncomingConnection()).thenReturn(true);
// Assert fingerprints with expected one.
final String expected = "portId" +
"3" +
"true" +
"1234" +
"64KB" +
"10sec";
final Element rootElement = serializeElement(encryptor, RemoteProcessGroup.class, groupComponent, "addRemoteProcessGroup");
final Element componentElement = (Element) rootElement.getElementsByTagName("inputPort").item(0);
assertEquals(expected.toString(), fingerprint("addRemoteGroupPortFingerprint", Element.class, componentElement));
}
}

View File

@ -46,11 +46,13 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.UnknownPortException;
@ -80,6 +82,9 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
private static final Logger logger = LoggerFactory.getLogger(StandardRemoteGroupPort.class);
private final RemoteProcessGroup remoteGroup;
private final AtomicBoolean useCompression = new AtomicBoolean(false);
private final AtomicReference<Integer> batchCount = new AtomicReference<>();
private final AtomicReference<String> batchSize = new AtomicReference<>();
private final AtomicReference<String> batchDuration = new AtomicReference<>();
private final AtomicBoolean targetExists = new AtomicBoolean(true);
private final AtomicBoolean targetRunning = new AtomicBoolean(true);
private final SSLContext sslContext;
@ -157,7 +162,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final long penalizationMillis = FormatUtils.getTimeDuration(remoteGroup.getYieldDuration(), TimeUnit.MILLISECONDS);
final SiteToSiteClient client = new SiteToSiteClient.Builder()
final SiteToSiteClient.Builder clientBuilder = new SiteToSiteClient.Builder()
.urls(SiteToSiteRestApiClient.parseClusterUrls(remoteGroup.getTargetUris()))
.portIdentifier(getIdentifier())
.sslContext(sslContext)
@ -168,9 +173,24 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
.timeout(remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
.transportProtocol(remoteGroup.getTransportProtocol())
.httpProxy(new HttpProxy(remoteGroup.getProxyHost(), remoteGroup.getProxyPort(), remoteGroup.getProxyUser(), remoteGroup.getProxyPassword()))
.localAddress(remoteGroup.getLocalAddress())
.build();
clientRef.set(client);
.localAddress(remoteGroup.getLocalAddress());
final Integer batchCount = getBatchCount();
if (batchCount != null) {
clientBuilder.requestBatchCount(batchCount);
}
final String batchSize = getBatchSize();
if (batchSize != null && batchSize.length() > 0) {
clientBuilder.requestBatchSize(DataUnit.parseDataSize(batchSize.trim(), DataUnit.B).intValue());
}
final String batchDuration = getBatchDuration();
if (batchDuration != null && batchDuration.length() > 0) {
clientBuilder.requestBatchDuration(FormatUtils.getTimeDuration(batchDuration.trim(), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
}
clientRef.set(clientBuilder.build());
}
@Override
@ -278,6 +298,13 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final StopWatch stopWatch = new StopWatch(true);
long bytesSent = 0L;
final SiteToSiteClientConfig siteToSiteClientConfig = getSiteToSiteClient().getConfig();
final long maxBatchBytes = siteToSiteClientConfig.getPreferredBatchSize();
final int maxBatchCount = siteToSiteClientConfig.getPreferredBatchCount();
final long preferredBatchDuration = siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
final long maxBatchDuration = preferredBatchDuration > 0 ? preferredBatchDuration : BATCH_SEND_NANOS;
final Set<FlowFile> flowFilesSent = new HashSet<>();
boolean continueTransaction = true;
while (continueTransaction) {
@ -304,10 +331,15 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
session.remove(flowFile);
final long sendingNanos = System.nanoTime() - startSendingNanos;
if (sendingNanos < BATCH_SEND_NANOS) {
flowFile = session.get();
} else {
if (maxBatchCount > 0 && flowFilesSent.size() >= maxBatchCount) {
flowFile = null;
} else if (maxBatchBytes > 0 && bytesSent >= maxBatchBytes) {
flowFile = null;
} else if (sendingNanos >= maxBatchDuration) {
flowFile = null;
} else {
flowFile = session.get();
}
continueTransaction = (flowFile != null);
@ -477,6 +509,36 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return useCompression.get();
}
@Override
public Integer getBatchCount() {
return batchCount.get();
}
@Override
public void setBatchCount(Integer batchCount) {
this.batchCount.set(batchCount);
}
@Override
public String getBatchSize() {
return batchSize.get();
}
@Override
public void setBatchSize(String batchSize) {
this.batchSize.set(batchSize);
}
@Override
public String getBatchDuration() {
return batchDuration.get();
}
@Override
public void setBatchDuration(String batchDuration) {
this.batchDuration.set(batchDuration);
}
@Override
public String toString() {
return "RemoteGroupPort[name=" + getName() + ",targets=" + remoteGroup.getTargetUris() + "]";

View File

@ -19,6 +19,7 @@ package org.apache.nifi.remote;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.groups.ProcessGroup;
@ -28,6 +29,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
@ -43,17 +45,23 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.nifi.util.NiFiProperties;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -70,7 +78,7 @@ public class TestStandardRemoteGroupPort {
private Transaction transaction;
private EventReporter eventReporter;
private ProcessGroup processGroup;
public static final String REMOTE_CLUSTER_URL = "http://node0.example.com:8080/nifi";
private static final String REMOTE_CLUSTER_URL = "http://node0.example.com:8080/nifi";
private StandardRemoteGroupPort port;
private SharedSessionState sessionState;
private MockProcessSession processSession;
@ -84,17 +92,18 @@ public class TestStandardRemoteGroupPort {
private void setupMock(final SiteToSiteTransportProtocol protocol,
final TransferDirection direction) throws Exception {
setupMock(protocol, direction, mock(Transaction.class));
final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder().buildConfig();
setupMock(protocol, direction, siteToSiteClientConfig);
}
private void setupMock(final SiteToSiteTransportProtocol protocol,
final TransferDirection direction,
final Transaction transaction) throws Exception {
final SiteToSiteClientConfig siteToSiteClientConfig) throws Exception {
processGroup = null;
remoteGroup = mock(RemoteProcessGroup.class);
scheduler = null;
siteToSiteClient = mock(SiteToSiteClient.class);
this.transaction = transaction;
this.transaction = mock(Transaction.class);
eventReporter = mock(EventReporter.class);
@ -119,6 +128,7 @@ public class TestStandardRemoteGroupPort {
doReturn(REMOTE_CLUSTER_URL).when(remoteGroup).getTargetUri();
doReturn(siteToSiteClient).when(port).getSiteToSiteClient();
doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction));
doReturn(siteToSiteClientConfig).when(siteToSiteClient).getConfig();
doReturn(eventReporter).when(remoteGroup).getEventReporter();
}
@ -245,6 +255,144 @@ public class TestStandardRemoteGroupPort {
assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails());
}
@Test
public void testSendBatchByCount() throws Exception {
final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder()
.requestBatchCount(2)
.buildConfig();
setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, siteToSiteClientConfig);
// t1 = {0, 1}, t2 = {2, 3}, t3 = {4}
final int[] expectedNumberOfPackets = {2, 2, 1};
testSendBatch(expectedNumberOfPackets);
}
@Test
public void testSendBatchBySize() throws Exception {
final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder()
.requestBatchSize(30)
.buildConfig();
setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, siteToSiteClientConfig);
// t1 = {10, 11, 12}, t2 = {13, 14}
final int[] expectedNumberOfPackets = {3, 2};
testSendBatch(expectedNumberOfPackets);
}
@Test
public void testSendBatchByDuration() throws Exception {
final SiteToSiteClientConfig siteToSiteClientConfig = new SiteToSiteClient.Builder()
.requestBatchDuration(1, TimeUnit.NANOSECONDS)
.buildConfig();
setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND, siteToSiteClientConfig);
// t1 = {1}, t2 = {2} .. and so on.
final int[] expectedNumberOfPackets = {1, 1, 1, 1, 1};
testSendBatch(expectedNumberOfPackets);
}
/**
* Generate flow files to be sent, and execute port's onTrigger method.
* Finally, this method verifies whether packets are sent as expected.
* @param expectedNumberOfPackets Specify how many packets should be sent by each transaction.
* E.g. passing {2, 2, 1}, would generate 5 flow files in total.
* Based on the siteToSiteClientConfig batch parameters,
* it's expected to be sent via 3 transactions,
* transaction 0 will send flow file 0 and 1,
* transaction 1 will send flow file 2 and 3,
* and transaction 2 will send flow file 4.
* Each flow file has different content size generated automatically.
* The content size starts with 10, and increases as more flow files are generated.
* E.g. flow file 1 will have 10 bytes, flow file 2 has 11 bytes, f3 has 12 and so on.
*
*/
private void testSendBatch(final int[] expectedNumberOfPackets) throws Exception {
setupMockProcessSession();
final String peerUrl = "http://node1.example.com:8080/nifi";
final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, false);
final HttpCommunicationsSession commsSession = new HttpCommunicationsSession();
final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL);
final String flowFileEndpointUri = "http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files";
doReturn(peer).when(transaction).getCommunicant();
commsSession.setDataTransferUrl(flowFileEndpointUri);
// Capture packets being sent to the remote peer
final AtomicInteger totalPacketsSent = new AtomicInteger(0);
final List<List<DataPacket>> sentPackets = new ArrayList<>(expectedNumberOfPackets.length);
final List<DataPacket> sentPacketsPerTransaction = new ArrayList<>();
doAnswer(invocation -> {
sentPacketsPerTransaction.add((DataPacket)invocation.getArguments()[0]);
totalPacketsSent.incrementAndGet();
return null;
}).when(transaction).send(any(DataPacket.class));
doAnswer(invocation -> {
sentPackets.add(new ArrayList<>(sentPacketsPerTransaction));
sentPacketsPerTransaction.clear();
return null;
}).when(transaction).confirm();
// Execute onTrigger while offering new flow files.
final List<MockFlowFile> flowFiles = new ArrayList<>();
for (int i = 0; i < expectedNumberOfPackets.length; i++) {
int numOfPackets = expectedNumberOfPackets[i];
int startF = flowFiles.size();
int endF = startF + numOfPackets;
IntStream.range(startF, endF).forEach(f -> {
final StringBuilder flowFileContents = new StringBuilder("0123456789");
for (int c = 0; c < f; c++) {
flowFileContents.append(c);
}
final byte[] bytes = flowFileContents.toString().getBytes();
final MockFlowFile flowFile = spy(processSession.createFlowFile(bytes));
when(flowFile.getSize()).then(invocation -> {
Thread.sleep(1); // For testSendBatchByDuration
return bytes.length;
});
sessionState.getFlowFileQueue().offer(flowFile);
flowFiles.add(flowFile);
});
port.onTrigger(processContext, processSession);
}
// Verify transactions, sent packets, and provenance events.
assertEquals(flowFiles.size(), totalPacketsSent.get());
assertEquals("The number of transactions should match as expected.", expectedNumberOfPackets.length, sentPackets.size());
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
assertEquals(flowFiles.size(), provenanceEvents.size());
int f = 0;
for (int i = 0; i < expectedNumberOfPackets.length; i++) {
final List<DataPacket> dataPackets = sentPackets.get(i);
assertEquals(expectedNumberOfPackets[i], dataPackets.size());
for (int p = 0; p < dataPackets.size(); p++) {
final FlowFile flowFile = flowFiles.get(f);
// Assert sent packet
final DataPacket dataPacket = dataPackets.get(p);
assertEquals(flowFile.getSize(), dataPacket.getSize());
// Assert provenance event
final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(f);
assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri());
f++;
}
}
}
@Test
public void testReceiveHttp() throws Exception {

View File

@ -93,6 +93,15 @@ public class RemoteProcessGroupAuditor extends NiFiAuditor {
.setConvertName(PORT_NAME_CONVERT),
new ConfigurationRecorder<RemoteGroupPort, RemoteProcessGroupPortDTO>("Compressed",
dto -> dto.getUseCompression() != null, RemoteGroupPort::isUseCompression)
.setConvertName(PORT_NAME_CONVERT),
new ConfigurationRecorder<RemoteGroupPort, RemoteProcessGroupPortDTO>("Batch Count",
dto -> dto.getBatchSettings() != null && dto.getBatchSettings().getCount() != null, RemoteGroupPort::getBatchCount)
.setConvertName(PORT_NAME_CONVERT),
new ConfigurationRecorder<RemoteGroupPort, RemoteProcessGroupPortDTO>("Batch Size",
dto -> dto.getBatchSettings() != null && dto.getBatchSettings().getSize() != null, RemoteGroupPort::getBatchSize)
.setConvertName(PORT_NAME_CONVERT),
new ConfigurationRecorder<RemoteGroupPort, RemoteProcessGroupPortDTO>("Batch Duration",
dto -> dto.getBatchSettings() != null && dto.getBatchSettings().getDuration() != null, RemoteGroupPort::getBatchDuration)
.setConvertName(PORT_NAME_CONVERT)
);

View File

@ -1493,6 +1493,12 @@ public final class DtoFactory {
dto.setUseCompression(port.isUseCompression());
dto.setExists(port.getTargetExists());
final BatchSettingsDTO batchDTO = new BatchSettingsDTO();
batchDTO.setCount(port.getBatchCount());
batchDTO.setSize(port.getBatchSize());
batchDTO.setDuration(port.getBatchDuration());
dto.setBatchSettings(batchDTO);
// determine if this port is currently connected to another component locally
if (ConnectableType.REMOTE_OUTPUT_PORT.equals(port.getConnectableType())) {
dto.setConnected(!port.getConnections().isEmpty());
@ -2962,6 +2968,13 @@ public final class DtoFactory {
copy.setConcurrentlySchedulableTaskCount(original.getConcurrentlySchedulableTaskCount());
copy.setUseCompression(original.getUseCompression());
copy.setExists(original.getExists());
final BatchSettingsDTO batchOrg = original.getBatchSettings();
if (batchOrg != null) {
final BatchSettingsDTO batchCopy = new BatchSettingsDTO();
batchCopy.setCount(batchOrg.getCount());
batchCopy.setSize(batchOrg.getSize());
batchCopy.setDuration(batchOrg.getDuration());
}
return copy;
}

View File

@ -29,10 +29,12 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BatchSettingsDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
@ -203,7 +205,9 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
// verify update when appropriate
if (isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(), remoteProcessGroupPortDto.getUseCompression())) {
if (isAnyNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount(),
remoteProcessGroupPortDto.getUseCompression(),
remoteProcessGroupPortDto.getBatchSettings())) {
port.verifyCanUpdate();
}
}
@ -219,6 +223,30 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
validationErrors.add(String.format("Concurrent tasks for port '%s' must be a positive integer.", remoteGroupPort.getName()));
}
final BatchSettingsDTO batchSettingsDTO = remoteProcessGroupPortDTO.getBatchSettings();
if (batchSettingsDTO != null) {
final Integer batchCount = batchSettingsDTO.getCount();
if (isNotNull(batchCount) && batchCount < 0) {
validationErrors.add(String.format("Batch count for port '%s' must be a positive integer.", remoteGroupPort.getName()));
}
final String batchSize = batchSettingsDTO.getSize();
if (isNotNull(batchSize) && batchSize.length() > 0
&& !DataUnit.DATA_SIZE_PATTERN.matcher(batchSize.trim().toUpperCase()).matches()) {
validationErrors.add(String.format("Batch size for port '%s' must be of format <Data Size> <Data Unit>" +
" where <Data Size> is a non-negative integer and <Data Unit> is a supported Data"
+ " Unit, such as: B, KB, MB, GB, TB", remoteGroupPort.getName()));
}
final String batchDuration = batchSettingsDTO.getDuration();
if (isNotNull(batchDuration) && batchDuration.length() > 0
&& !FormatUtils.TIME_DURATION_PATTERN.matcher(batchDuration.trim().toLowerCase()).matches()) {
validationErrors.add(String.format("Batch duration for port '%s' must be of format <duration> <TimeUnit>" +
" where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such "
+ "as: nanos, millis, secs, mins, hrs, days", remoteGroupPort.getName()));
}
}
return validationErrors;
}
@ -284,22 +312,7 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
verifyUpdatePort(port, remoteProcessGroupPortDto);
// perform the update
if (isNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount())) {
port.setMaxConcurrentTasks(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount());
}
if (isNotNull(remoteProcessGroupPortDto.getUseCompression())) {
port.setUseCompression(remoteProcessGroupPortDto.getUseCompression());
}
final Boolean isTransmitting = remoteProcessGroupPortDto.isTransmitting();
if (isNotNull(isTransmitting)) {
// start or stop as necessary
if (!port.isRunning() && isTransmitting) {
remoteProcessGroup.startTransmitting(port);
} else if (port.isRunning() && !isTransmitting) {
remoteProcessGroup.stopTransmitting(port);
}
}
updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup);
return port;
}
@ -318,6 +331,19 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
verifyUpdatePort(port, remoteProcessGroupPortDto);
// perform the update
updatePort(port, remoteProcessGroupPortDto, remoteProcessGroup);
return port;
}
/**
*
* @param port Port instance to be updated.
* @param remoteProcessGroupPortDto DTO containing updated remote process group port settings.
* @param remoteProcessGroup If remoteProcessGroupPortDto has updated isTransmitting input,
* this method will start or stop the port in this remoteProcessGroup as necessary.
*/
private void updatePort(RemoteGroupPort port, RemoteProcessGroupPortDTO remoteProcessGroupPortDto, RemoteProcessGroup remoteProcessGroup) {
if (isNotNull(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount())) {
port.setMaxConcurrentTasks(remoteProcessGroupPortDto.getConcurrentlySchedulableTaskCount());
}
@ -325,6 +351,13 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
port.setUseCompression(remoteProcessGroupPortDto.getUseCompression());
}
final BatchSettingsDTO batchSettingsDTO = remoteProcessGroupPortDto.getBatchSettings();
if (isNotNull(batchSettingsDTO)) {
port.setBatchCount(batchSettingsDTO.getCount());
port.setBatchSize(batchSettingsDTO.getSize());
port.setBatchDuration(batchSettingsDTO.getDuration());
}
final Boolean isTransmitting = remoteProcessGroupPortDto.isTransmitting();
if (isNotNull(isTransmitting)) {
// start or stop as necessary
@ -334,8 +367,6 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
remoteProcessGroup.stopTransmitting(port);
}
}
return port;
}
@Override

View File

@ -29,6 +29,7 @@ import org.apache.nifi.authorization.user.StandardNiFiUser;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.web.api.dto.BatchSettingsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
@ -40,6 +41,7 @@ import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.web.api.dto.DtoFactory.SENSITIVE_VALUE_MASK;
@ -434,6 +436,13 @@ public class TestRemoteProcessGroupAuditor {
when(updatedRPGPort.getMaxConcurrentTasks()).thenReturn(inputRPGPortDTO.getConcurrentlySchedulableTaskCount());
when(updatedRPGPort.isUseCompression()).thenReturn(inputRPGPortDTO.getUseCompression());
final BatchSettingsDTO batchSettings = inputRPGPortDTO.getBatchSettings();
if (batchSettings != null) {
when(updatedRPGPort.getBatchCount()).thenReturn(batchSettings.getCount());
when(updatedRPGPort.getBatchSize()).thenReturn(batchSettings.getSize());
when(updatedRPGPort.getBatchDuration()).thenReturn(batchSettings.getDuration());
}
when(joinPoint.proceed()).thenReturn(updatedRPGPort);
// Capture added actions so that those can be asserted later.
@ -553,4 +562,34 @@ public class TestRemoteProcessGroupAuditor {
assertConfigureDetails(action.getActionDetails(), "input-port-1.Compressed", "false", "true");
}
@Test
public void testConfigurePortBatchSettings() throws Throwable {
final RemoteGroupPort existingRPGPort = defaultRemoteGroupPort();
when(existingRPGPort.getName()).thenReturn("input-port-1");
final RemoteProcessGroupPortDTO inputRPGPortDTO = defaultRemoteProcessGroupPortDTO();
final BatchSettingsDTO batchSettingsDTO = new BatchSettingsDTO();
batchSettingsDTO.setCount(1234);
batchSettingsDTO.setSize("64KB");
batchSettingsDTO.setDuration("10sec");
inputRPGPortDTO.setBatchSettings(batchSettingsDTO);
final Collection<Action> actions = updateProcessGroupInputPortConfiguration(inputRPGPortDTO, existingRPGPort);
assertEquals(3, actions.size());
final Iterator<Action> iterator = actions.iterator();
Action action = iterator.next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch Count", "0", "1234");
action = iterator.next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch Size", "", "64KB");
action = iterator.next();
assertEquals(Operation.Configure, action.getOperation());
assertConfigureDetails(action.getActionDetails(), "input-port-1.Batch Duration", "", "10sec");
}
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.dao.impl;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.api.dto.BatchSettingsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestStandardRemoteProcessGroupDAO {
private void validate(final StandardRemoteProcessGroupDAO dao, final RemoteProcessGroupPortDTO dto, final String ... errMessageKeywords) {
try {
dao.verifyUpdateInputPort(dto.getGroupId(), dto);
if (errMessageKeywords.length > 0) {
fail("Validation should fail with keywords: " + Arrays.asList(errMessageKeywords));
}
} catch (ValidationException e) {
if (errMessageKeywords.length == 0) {
fail("Validation should pass, but failed with: " + e);
}
final List<String> validationErrors = e.getValidationErrors();
assertEquals("Validation should return one validationErrors", 1, validationErrors.size());
final String validationError = validationErrors.get(0);
for (String errMessageKeyword : errMessageKeywords) {
assertTrue("validation error message should contain " + errMessageKeyword + ", but was: " + validationError,
validationError.contains(errMessageKeyword));
}
}
}
@Test
public void testVerifyUpdateInputPort() {
final StandardRemoteProcessGroupDAO dao = new StandardRemoteProcessGroupDAO();
final String remoteProcessGroupId = "remote-process-group-id";
final String remoteProcessGroupInputPortId = "remote-process-group-input-port-id";
final FlowController flowController = mock(FlowController.class);
final ProcessGroup processGroup = mock(ProcessGroup.class);
final RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
final RemoteGroupPort remoteGroupPort = mock(RemoteGroupPort.class);
dao.setFlowController(flowController);
when(flowController.getGroup(any())).thenReturn(processGroup);
when(processGroup.findRemoteProcessGroup(eq(remoteProcessGroupId))).thenReturn(remoteProcessGroup);
when(remoteProcessGroup.getInputPort(remoteProcessGroupInputPortId)).thenReturn(remoteGroupPort);
when(remoteGroupPort.getName()).thenReturn("remote-group-port");
final RemoteProcessGroupPortDTO dto = new RemoteProcessGroupPortDTO();
dto.setGroupId(remoteProcessGroupId);
dto.setId(remoteProcessGroupInputPortId);
final BatchSettingsDTO batchSettings = new BatchSettingsDTO();
dto.setBatchSettings(batchSettings);
// Empty input values should pass validation.
dao.verifyUpdateInputPort(remoteProcessGroupId, dto);
// Concurrent tasks
dto.setConcurrentlySchedulableTaskCount(0);
validate(dao, dto, "Concurrent tasks", "positive integer");
dto.setConcurrentlySchedulableTaskCount(2);
validate(dao, dto);
// Batch count
batchSettings.setCount(-1);
validate(dao, dto, "Batch count", "positive integer");
batchSettings.setCount(0);
validate(dao, dto);
batchSettings.setCount(1000);
validate(dao, dto);
// Batch size
batchSettings.setSize("AB");
validate(dao, dto, "Batch size", "Data Size");
batchSettings.setSize("10 days");
validate(dao, dto, "Batch size", "Data Size");
batchSettings.setSize("300MB");
validate(dao, dto);
// Batch duration
batchSettings.setDuration("AB");
validate(dao, dto, "Batch duration", "Time Unit");
batchSettings.setDuration("10 KB");
validate(dao, dto, "Batch duration", "Time Unit");
batchSettings.setDuration("10 secs");
validate(dao, dto);
}
}

View File

@ -37,6 +37,40 @@
<span class="nf-checkbox-label">Compressed</span>
</div>
</div>
<div class="clear"></div>
</div>
<div class="batch-settings">
<div class="setting-name">
Batch Settings:
</div>
<div class="setting batch-setting">
<div class="setting-name">
Count
<div class="fa fa-question-circle" alt="Info" title="The preferred number of flow files to include in a transaction for this port."></div>
</div>
<div class="setting-field">
<input id="remote-port-batch-count" type="text"/>
</div>
</div>
<div class="setting batch-setting">
<div class="setting-name">
Size
<div class="fa fa-question-circle" alt="Info" title="The preferred number of bytes to include in a transaction for this port."></div>
</div>
<div class="setting-field">
<input id="remote-port-batch-size" type="text"/>
</div>
</div>
<div class="setting batch-setting">
<div class="setting-name">
Duration
<div class="fa fa-question-circle" alt="Info" title="The preferred amount of time that a transaction should span for this port."></div>
</div>
<div class="setting-field">
<input id="remote-port-batch-duration" type="text"/>
</div>
</div>
<div class="clear"></div>
</div>
</div>
</div>

View File

@ -125,6 +125,7 @@ div.remote-port-description {
div.concurrent-task-container {
float: left;
width: 200px;
}
img.concurrent-tasks-info {
@ -133,7 +134,11 @@ img.concurrent-tasks-info {
}
div.compression-container {
float: right;
float: left;
}
div.batch-settings-container {
margin-top: 8px;
}
div.remote-port-transmission-container {
@ -165,7 +170,7 @@ div.disabled-transmission-switch {
#remote-port-concurrent-tasks {
font-size: 11px !important;
float: left;
width: 75%;
width: 266px;
}
#remote-port-use-compression-container {
@ -179,4 +184,10 @@ div.disabled-transmission-switch {
#remote-port-concurrent-task-header {
margin-top: 5px;
}
}
div.batch-setting {
margin-right: 8px;
width: 30%;
float: left;
}

View File

@ -73,9 +73,19 @@
handler: {
click: function () {
var remotePortConcurrentTasks = $('#remote-port-concurrent-tasks').val();
var remotePortBatchCount = $('#remote-port-batch-count').val();
var portValidationErrors = new Array();
// ensure the property name and value is specified
if ($.isNumeric(remotePortConcurrentTasks)) {
if (!$.isNumeric(remotePortConcurrentTasks)) {
portValidationErrors.push("Concurrent tasks must be an integer value.");
}
if (remotePortBatchCount && !$.isNumeric(remotePortBatchCount)) {
portValidationErrors.push("Batch Settings: count must be an integer value.");
}
if (portValidationErrors.length == 0) {
var remoteProcessGroupId = $('#remote-process-group-ports-id').text();
var remoteProcessGroupData = d3.select('#id-' + remoteProcessGroupId).datum();
var remotePortId = $('#remote-port-id').text();
@ -87,7 +97,12 @@
id: remotePortId,
groupId: remoteProcessGroupId,
useCompression: $('#remote-port-use-compression').hasClass('checkbox-checked'),
concurrentlySchedulableTaskCount: remotePortConcurrentTasks
concurrentlySchedulableTaskCount: remotePortConcurrentTasks,
batchSettings : {
count: remotePortBatchCount,
size: $('#remote-port-batch-size').val(),
duration: $('#remote-port-batch-duration').val()
}
}
};
@ -121,6 +136,12 @@
// set the new values
$('#' + remotePortId + '-concurrent-tasks').text(remotePort.concurrentlySchedulableTaskCount);
$('#' + remotePortId + '-compression').text(compressionLabel);
var batchSettings = getBatchSettingsDisplayValues(remotePort);
$('#' + remotePortId + '-batch-count').text(batchSettings.count);
$('#' + remotePortId + '-batch-size').text(batchSettings.size);
$('#' + remotePortId + '-batch-duration').text(batchSettings.duration);
}).fail(function (xhr, status, error) {
if (xhr.status === 400) {
var errors = xhr.responseText.split('\n');
@ -146,7 +167,9 @@
} else {
nfDialog.showOkDialog({
headerText: 'Remote Process Group Ports',
dialogContent: 'Concurrent tasks must be an integer value.'
dialogContent: portValidationErrors.reduce(function (prev, curr) {
return typeof(prev) === 'string' ? prev + ' ' + curr : curr;
})
});
// close the dialog
@ -175,6 +198,9 @@
$('#remote-port-name').text('');
$('#remote-port-concurrent-tasks').val('');
$('#remote-port-use-compression').removeClass('checkbox-checked checkbox-unchecked');
$('#remote-port-batch-count').val('');
$('#remote-port-batch-size').val('');
$('#remote-port-batch-duration').val('');
}
}
});
@ -232,6 +258,26 @@
});
};
/**
* Create and return an object contains count, size and duration values to display.
* If port does not have batch settings or batch setting value is not defined, 'No value set' is displayed.
*/
var getBatchSettingsDisplayValues = function (port) {
var values = {};
var batchSettings = port.batchSettings;
if (batchSettings) {
values.count = typeof(batchSettings.count) === 'number' ? batchSettings.count : 'No value set';
values.size = batchSettings.size ? batchSettings.size : 'No value set';
values.duration = batchSettings.duration ? batchSettings.duration : 'No value set';
} else {
// if it doesn't have batch settings, clear values
values.count = 'No value set';
values.size = 'No value set';
values.duration = 'No value set';
}
return values;
}
/**
* Creates the markup for configuration concurrent tasks for a port.
*
@ -287,9 +333,12 @@
var portName = $('#' + portId + '-name').text();
var portConcurrentTasks = $('#' + portId + '-concurrent-tasks').text();
var portCompression = $('#' + portId + '-compression').text() === 'Yes';
var batchCount = $('#' + portId + '-batch-count').text();
var batchSize = $('#' + portId + '-batch-size').text();
var batchDuration = $('#' + portId + '-batch-duration').text();
// show the configuration dialog
configureRemotePort(port.id, portName, portConcurrentTasks, portCompression, portType);
configureRemotePort(port.id, portName, portConcurrentTasks, portCompression, batchCount, batchSize, batchDuration, portType);
}).appendTo(portContainerEditContainer);
// show/hide the edit button as appropriate
@ -472,6 +521,58 @@
'</div>' +
'</div>').appendTo(compressionContainer);
// clear: Concurrent Tasks, Compressed
$('<div class="clear"></div>').appendTo(portContainerDetailsContainer);
// Batch related settings
var batchSettingsContainer = $('<div class="batch-settings-container"></div>')
.append($('<div class="setting-name">Batch Settings'
+ '<div class="processor-setting batch-settings-info fa fa-question-circle"></div></div>'))
.appendTo(portContainerDetailsContainer);
batchSettingsContainer.find('div.batch-settings-info').qtip($.extend({},
nf.Common.config.tooltipConfig,
{
content: (portType === 'input'
? 'The batch settings to control how this NiFi sends data to the remote input port in a transaction.'
+ ' This NiFi will transfer as much flow files as they are queued in incoming relationships,'
+ ' until any of these limits is met.'
+ ' If none of these setting is specified, this NiFi uses 500 milliseconds batch duration by default.'
: 'The batch settings to tell the remote NiFi how this NiFi prefers to receive data from the remote output port in a transaction.'
+ ' The remote NiFi will use these settings as a hint to control batch data transferring.'
+ ' However, actual behavior depends on the version of remote NiFi instance.'
+ ' Recent version of NiFi uses 5 seconds for batch duration if none of these setting is specified.')
}));
var batchSettings = getBatchSettingsDisplayValues(port);
var batchCount = $('<div class="setting-field"></div>').append($('<div id="' + portId + '-batch-count"></div>').text(batchSettings.count));
var batchSize = $('<div class="setting-field"></div>').append($('<div id="' + portId + '-batch-size"></div>').text(batchSettings.size));
var batchDuration = $('<div class="setting-field"></div>').append($('<div id="' + portId + '-batch-duration"></div>').text(batchSettings.duration));
// add this ports batch count
$('<div class="batch-setting">' +
'<div class="setting-name">' +
'Count' +
'<div class="processor-setting"></div>' +
'</div>' +
'</div>').append(batchCount).appendTo(batchSettingsContainer);
// add this ports batch size
$('<div class="batch-setting">' +
'<div class="setting-name">' +
'Size' +
'<div class="processor-setting"></div>' +
'</div>' +
'</div>').append(batchSize).appendTo(batchSettingsContainer);
// add this ports batch duration
$('<div class="batch-setting">' +
'<div class="setting-name">' +
'Duration' +
'<div class="processor-setting"></div>' +
'</div>' +
'</div>').append(batchDuration).appendTo(batchSettingsContainer);
// clear
$('<div class="clear"></div>').appendTo(portContainer);
@ -489,9 +590,12 @@
* @argument {string} portName The port name
* @argument {int} portConcurrentTasks The number of concurrent tasks for the port
* @argument {boolean} portCompression The compression flag for the port
* @argument {int} batchCount The flow file count in a batch transaction
* @argument {string} batchSize The size of flow files in a batch transaction
* @argument {string} batchDuration The duration of a batch transaction
* @argument {string} portType The type of port this is
*/
var configureRemotePort = function (portId, portName, portConcurrentTasks, portCompression, portType) {
var configureRemotePort = function (portId, portName, portConcurrentTasks, portCompression, batchCount, batchSize, batchDuration, portType) {
// set port identifiers
$('#remote-port-id').text(portId);
$('#remote-port-type').text(portType);
@ -503,6 +607,9 @@
}
$('#remote-port-use-compression').addClass(checkState);
$('#remote-port-concurrent-tasks').val(portConcurrentTasks);
$('#remote-port-batch-count').val(batchCount === 'No value set' ? null : batchCount);
$('#remote-port-batch-size').val(batchSize === 'No value set' ? null : batchSize);
$('#remote-port-batch-duration').val(batchDuration === 'No value set' ? null : batchDuration);
// set the port name
$('#remote-port-name').text(portName).ellipsis();
@ -549,6 +656,12 @@
var connectedInputPorts = [];
var disconnectedInputPorts = [];
var nameComparator = function (a, b) {
var nameA = a.name.toUpperCase();
var nameB = b.name.toUpperCase();
return nameA < nameB ? -1 : (nameA > nameB ? 1 : 0);
};
// show connected ports first
var inputPortContainer = $('#remote-process-group-input-ports-container');
$.each(remoteProcessGroupContents.inputPorts, function (_, inputPort) {
@ -559,6 +672,10 @@
}
});
// sort by port name within each port list
connectedInputPorts.sort(nameComparator);
disconnectedInputPorts.sort(nameComparator);
// add all connected input ports
$.each(connectedInputPorts, function (_, inputPort) {
createPortOption(inputPortContainer, inputPort, 'input');
@ -586,6 +703,10 @@
}
});
// sort by port name within each port list
connectedOutputPorts.sort(nameComparator);
disconnectedOutputPorts.sort(nameComparator);
// add all connected output ports
$.each(connectedOutputPorts, function (_, outputPort) {
createPortOption(outputPortContainer, outputPort, 'output');