NIFI-7552: When Process Group is configured to transfer data in batch, add an attribute to each outbound FlowFile that indicates how many FlowFiles went to each port. Updated user guide to explain the new attributes.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4345.
This commit is contained in:
Mark Payne 2020-06-16 16:50:10 -04:00 committed by Pierre Villard
parent 77078a85d9
commit 44fc4d9f27
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
9 changed files with 330 additions and 16 deletions

View File

@ -769,7 +769,11 @@ for Output Port B. These conditions are both considered the same in terms of the
Using an Outbound Policy of "Batch Output" along with a FlowFile Concurrency of "Single FlowFile Per Node" allows a user to easily ingest a single FlowFile
(which in and of itself may represent a batch of data) and then wait until all processing of that FlowFile has completed before continuing on to the next step
in the dataflow (i.e., the next component outside of the Process Group).
in the dataflow (i.e., the next component outside of the Process Group). Additionally, when using this mode, each FlowFile that is transferred out of the Process Group
will be given a series of attributes named "batch.output.<Port Name>" for each Output Port in the Process Group. The value will be equal to the number of FlowFiles
that were routed to that Output Port for this batch of data. For example, consider a case where a single FlowFile is split into 5 FlowFiles, and two FlowFiles go to Output Port A, one goes
to Output Port B, and two go to Output Port C, and no FlowFiles go to Output Port D. In this case, each FlowFile will attributes batch.output.A = 2,
batch.output.B = 1, batch.output.C = 2, batch.output.D = 0.
The Outbound Policy of "Batch Output" doesn't provide any benefits when used in conjunction with a FlowFile Concurrency of "Unbounded."
As a result, the Outbound Policy is ignored if the FlowFile Concurrency is set to "Unbounded."

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.groups;
import java.util.Map;
public interface BatchCounts {
/**
* Resets the counts
*/
void reset();
/**
* Captures a mapping of Output Port name to the number of FlowFiles that were transferred to that Output Port
* @return a mapping of Output Port name to the number of FlowFiles that were transferred to that Output Port
*/
Map<String, Integer> captureCounts();
}

View File

@ -1104,4 +1104,11 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @return <code>true</code> if there is data that is queued for Processing, <code>false</code> otherwise
*/
boolean isDataQueuedForProcessing();
/**
* @return the BatchCounts that can be used for determining how many FlowFiles were transferred to each of the Output Ports
* in this Process Group, or <code>null</code> if this Process Group does not have an {@link #getFlowFileOutboundPolicy()}
* of {@link FlowFileOutboundPolicy#BATCH_OUTPUT}.
*/
BatchCounts getBatchCounts();
}

View File

@ -20,6 +20,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.BatchCounts;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileGate;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
@ -33,7 +34,9 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -74,28 +77,20 @@ public class LocalPort extends AbstractPort {
return maxIterations;
}
private boolean[] validateConnections() {
// LocalPort requires both in/out.
final boolean requireInput = true;
final boolean requireOutput = true;
return new boolean[]{requireInput, hasIncomingConnection(),
requireOutput, !getConnections(Relationship.ANONYMOUS).isEmpty()};
}
@Override
public boolean isValid() {
final boolean[] connectionRequirements = validateConnections();
return (!connectionRequirements[0] || connectionRequirements[1])
&& (!connectionRequirements[2] || connectionRequirements[3]);
return hasIncomingConnection() && hasOutboundConnection();
}
private boolean hasOutboundConnection() {
return !getConnections(Relationship.ANONYMOUS).isEmpty();
}
@Override
public Collection<ValidationResult> getValidationErrors() {
final boolean[] connectionRequirements = validateConnections();
final Collection<ValidationResult> validationErrors = new ArrayList<>();
// Incoming connections are required but not set
if (connectionRequirements[0] && !connectionRequirements[1]) {
if (!hasIncomingConnection()) {
validationErrors.add(new ValidationResult.Builder()
.explanation("Port has no incoming connections")
.subject(String.format("Port '%s'", getName()))
@ -104,7 +99,7 @@ public class LocalPort extends AbstractPort {
}
// Outgoing connections are required but not set
if (connectionRequirements[2] && !connectionRequirements[3]) {
if (!hasOutboundConnection()) {
validationErrors.add(new ValidationResult.Builder()
.explanation("Port has no outgoing connections")
.subject(String.format("Port '%s'", getName()))
@ -195,9 +190,15 @@ public class LocalPort extends AbstractPort {
}
session.transfer(flowFile, Relationship.ANONYMOUS);
getProcessGroup().getBatchCounts().reset();
}
protected void transferUnboundedConcurrency(final ProcessContext context, final ProcessSession session) {
final Map<String, String> attributes = new HashMap<>();
final Map<String, Integer> counts = getProcessGroup().getBatchCounts().captureCounts();
counts.forEach((k, v) -> attributes.put("batch.output." + k, String.valueOf(v)));
Set<Relationship> available = context.getAvailableRelationships();
int iterations = 0;
while (!available.isEmpty()) {
@ -206,6 +207,10 @@ public class LocalPort extends AbstractPort {
break;
}
if (!attributes.isEmpty()) {
flowFiles.forEach(ff -> session.putAllAttributes(ff, attributes));
}
session.transfer(flowFiles, Relationship.ANONYMOUS);
session.commit();

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.groups;
import java.util.Collections;
import java.util.Map;
public class NoOpBatchCounts implements BatchCounts {
@Override
public void reset() {
}
@Override
public Map<String, Integer> captureCounts() {
return Collections.emptyMap();
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.groups;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class StandardBatchCounts implements BatchCounts {
private static final Logger logger = LoggerFactory.getLogger(StandardBatchCounts.class);
private final ProcessGroup processGroup;
private Map<String, Integer> counts = null;
private boolean hasBeenReset = false;
private final StateManager stateManager;
private Map<String, Integer> restoredValues;
public StandardBatchCounts(final ProcessGroup processGroup, final StateManager stateManager) {
this.processGroup = processGroup;
this.stateManager = stateManager;
}
@Override
public synchronized void reset() {
counts = null;
hasBeenReset = true;
try {
stateManager.clear(Scope.LOCAL);
} catch (final Exception e) {
logger.error("Failed to update local state for {}. This could result in the batch.output.* attributes being inaccurate if NiFi is restarted before this is resolved", processGroup, e);
}
}
@Override
public synchronized Map<String, Integer> captureCounts() {
// If reset() hasn't been called, we don't want to provide any counts. This can happen if the Process Group's
// FlowFileConcurrency and/or FlowFileOutboundPolicy is changed while data is already being processed in the Process Group.
// In this case, we don't want to provide counts until we've cleared all those FlowFiles out, and when that happens, reset() will be called.
if (!hasBeenReset) {
return restoreState();
}
if (counts == null) {
counts = new HashMap<>();
final Map<String, String> stateMapValues = new HashMap<>();
for (final Port outputPort : processGroup.getOutputPorts()) {
int count = 0;
for (final Connection connection : outputPort.getIncomingConnections()) {
count += connection.getFlowFileQueue().size().getObjectCount();
}
final String name = outputPort.getName();
counts.put(name, count);
stateMapValues.put(name, String.valueOf(count));
}
try {
stateManager.setState(stateMapValues, Scope.LOCAL);
} catch (final Exception e) {
logger.error("Failed to update local state for {}. This could result in the batch.output.* attributes being inaccurate if NiFi is restarted before this is resolved", processGroup, e);
}
}
return counts;
}
private Map<String, Integer> restoreState() {
if (restoredValues != null) {
return restoredValues;
}
final StateMap stateMap;
try {
stateMap = stateManager.getState(Scope.LOCAL);
} catch (IOException e) {
logger.error("Failed to restore local state for {}. This could result in the batch.output.* attributes being inaccurate for the current batch of FlowFiles", processGroup, e);
return Collections.emptyMap();
}
restoredValues = new HashMap<>();
final Map<String, String> rawValues = stateMap.toMap();
rawValues.forEach((k, v) -> restoredValues.put(k, Integer.parseInt(v)));
return restoredValues;
}
}

View File

@ -199,6 +199,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private FlowFileConcurrency flowFileConcurrency = FlowFileConcurrency.UNBOUNDED;
private volatile FlowFileGate flowFileGate = new UnboundedFlowFileGate();
private volatile FlowFileOutboundPolicy flowFileOutboundPolicy = FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE;
private volatile BatchCounts batchCounts = new NoOpBatchCounts();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
@ -707,6 +708,11 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
@Override
public BatchCounts getBatchCounts() {
return batchCounts;
}
@Override
public void addProcessGroup(final ProcessGroup group) {
if (StringUtils.isEmpty(group.getName())) {
@ -5317,6 +5323,8 @@ public final class StandardProcessGroup implements ProcessGroup {
flowFileGate = new SingleConcurrencyFlowFileGate(() -> !isDataQueued());
break;
}
setBatchCounts(getFlowFileOutboundPolicy(), flowFileConcurrency);
} finally {
writeLock.unlock();
}
@ -5372,5 +5380,21 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public void setFlowFileOutboundPolicy(final FlowFileOutboundPolicy flowFileOutboundPolicy) {
this.flowFileOutboundPolicy = flowFileOutboundPolicy;
setBatchCounts(flowFileOutboundPolicy, getFlowFileConcurrency());
}
private synchronized void setBatchCounts(final FlowFileOutboundPolicy outboundPolicy, final FlowFileConcurrency flowFileConcurrency) {
if (outboundPolicy == FlowFileOutboundPolicy.BATCH_OUTPUT && flowFileConcurrency == FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE) {
if (batchCounts instanceof NoOpBatchCounts) {
final StateManager stateManager = flowController.getStateManagerProvider().getStateManager(getIdentifier());
batchCounts = new StandardBatchCounts(this, stateManager);
}
} else {
if (batchCounts != null) {
batchCounts.reset();
}
batchCounts = new NoOpBatchCounts();
}
}
}

View File

@ -32,9 +32,11 @@ import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.groups.BatchCounts;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileGate;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.groups.NoOpBatchCounts;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
@ -764,6 +766,11 @@ public class MockProcessGroup implements ProcessGroup {
return false;
}
@Override
public BatchCounts getBatchCounts() {
return new NoOpBatchCounts();
}
@Override
public void terminateProcessor(ProcessorNode processor) {
}

View File

@ -21,7 +21,10 @@ import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ListingRequestEntity;
import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
@ -29,8 +32,11 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
@ -148,5 +154,87 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
assertEquals(1, getConnectionQueueSize(outputToTerminate.getId()));
assertEquals(1, getConnectionQueueSize(secondOutToTerminate.getId()));
final FlowFileEntity outputToTerminateFlowFile = getClientUtil().getQueueFlowFile(outputToTerminate.getId(), 0);
assertNotNull(outputToTerminateFlowFile);
final Map<String, String> attributes = outputToTerminateFlowFile.getFlowFile().getAttributes();
assertEquals("1", attributes.get("batch.output.Out"));
assertEquals("1", attributes.get("batch.output.Out2"));
final FlowFileEntity secondOutFlowFile = getClientUtil().getQueueFlowFile(outputToTerminate.getId(), 0);
assertNotNull(secondOutFlowFile);
final Map<String, String> secondOutAttributes = secondOutFlowFile.getFlowFile().getAttributes();
assertEquals("1", secondOutAttributes.get("batch.output.Out"));
assertEquals("1", secondOutAttributes.get("batch.output.Out2"));
}
@Test
public void testBatchOutputHasCorrectNumbersOnRestart() throws NiFiClientException, IOException, InterruptedException {
final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root");
final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId());
final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId());
PortEntity secondOut = getClientUtil().createOutputPort("Out2", processGroupEntity.getId());
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins");
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
// Connect Generate -> Input Port -> Count -> Output Port -> Terminate
// Also connect InputPort -> Out2 -> Terminate
final ConnectionEntity generateToInput = getClientUtil().createConnection(generate, inputPort, "success");
final ConnectionEntity inputToOutput = getClientUtil().createConnection(inputPort, outputPort);
final ConnectionEntity inputToSecondOut = getClientUtil().createConnection(inputPort, secondOut);
final ConnectionEntity outputToTerminate = getClientUtil().createConnection(outputPort, terminate);
final ConnectionEntity secondOutToTerminate = getClientUtil().createConnection(secondOut, terminate);
processGroupEntity.getComponent().setFlowfileConcurrency(FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE.name());
processGroupEntity.getComponent().setFlowfileOutboundPolicy(FlowFileOutboundPolicy.BATCH_OUTPUT.name());
getNifiClient().getProcessGroupClient().updateProcessGroup(processGroupEntity);
// Start generate so that data is created. Start Input Port so that the data is ingested.
// Start "Out" Output Ports but "Out2.". This will keep data queued up for the Out2 output port.
getNifiClient().getProcessorClient().startProcessor(generate);
getNifiClient().getInputPortClient().startInputPort(inputPort);
getNifiClient().getOutputPortClient().startOutputPort(outputPort);
waitForQueueCount(inputToSecondOut.getId(), 1);
assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId()));
// Everything is queued up at an Output Port so the first Output Port should run and its queue should become empty.
waitForQueueCount(inputToOutput.getId(), 0);
// Restart nifi.
getNiFiInstance().stop();
getNiFiInstance().start(true);
// After nifi, we should see that one FlowFile is still queued up for Out2.
assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId()));
// Get a new copy of the Out2 port because NiFi has restarted so that Revision will be different.
secondOut = getNifiClient().getOutputPortClient().getOutputPort(secondOut.getId());
getNifiClient().getOutputPortClient().startOutputPort(secondOut);
// Data should now flow from both output ports.
waitForQueueCount(inputToOutput.getId(), 0);
waitForQueueCount(inputToSecondOut.getId(), 0);
waitForQueueCount(outputToTerminate.getId(), 1);
waitForQueueCount(secondOutToTerminate.getId(), 1);
// Each FlowFile should now have attributes batch.output.Out = 1, batch.output.Out2 = 1
// Even though upon restart, the "Out" Port had no FlowFiles because there still was 1 FlowFile that went to Out as part of the same batch.
final FlowFileEntity outputToTerminateFlowFile = getClientUtil().getQueueFlowFile(outputToTerminate.getId(), 0);
assertNotNull(outputToTerminateFlowFile);
final Map<String, String> attributes = outputToTerminateFlowFile.getFlowFile().getAttributes();
assertEquals("1", attributes.get("batch.output.Out"));
assertEquals("1", attributes.get("batch.output.Out2"));
final FlowFileEntity secondOutFlowFile = getClientUtil().getQueueFlowFile(outputToTerminate.getId(), 0);
assertNotNull(secondOutFlowFile);
final Map<String, String> secondOutAttributes = secondOutFlowFile.getFlowFile().getAttributes();
assertEquals("1", secondOutAttributes.get("batch.output.Out"));
assertEquals("1", secondOutAttributes.get("batch.output.Out2"));
}
}