NIFI-8195: add default connection settings to process group configuration

- include new process group property support in NiFi Registry
  - updated documentation to describe and show new feature
  - added elements to XSD schema definition

NIFI-8195: update to DAO to fix PG move and copy/paste

update condition to not null vice null

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5192
This commit is contained in:
Mark Bean 2021-02-03 22:19:27 -05:00 committed by Matthew Burgess
parent 9a125dcaaf
commit ea31634ea7
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
37 changed files with 522 additions and 45 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

After

Width:  |  Height:  |  Size: 64 KiB

View File

@ -742,7 +742,7 @@ The next configuration element is the Process Group Parameter Context, which is
The third element in the configuration dialog is the Process Group Comments. This provides a mechanism for providing any useful information or context about the Process Group.
The last two elements, Process Group FlowFile Currency and Process Group Outbound Policy, are covered in the following sections.
The next two elements, Process Group FlowFile Currency and Process Group Outbound Policy, are covered in the following sections.
[[Flowfile_Concurrency]]
===== FlowFile Concurrency
@ -833,6 +833,17 @@ input FlowFile, it is recommended that backpressure for Connections ending in an
largest expected number of FlowFiles or backpressure for those Connections be disabled all together (by setting the Backpressure Threshold to 0).
See <<Backpressure>> for more information.
[[Default_Connection_Settings]]
===== Default Settings for Connections
The final three elements in the Process Group configuration dialog are for Default FlowFile Expiration, Default Back Pressure Object Threshold, and
Default Back Pressure Data Size Threshold. These settings configure the default values when creating a new Connection. Each Connection represents a queue,
and every queue has settings for flowfile expiration, back pressure object count, and back pressure data size. The settings specified here will effect the
default values for all new Connections created within the Process Group; it will not effect existing Connections. Child Process Groups created within the
configured Process Group will inherit the default settings. Again, existing Process Groups will not be effected. If not overridden with these options, the
root Process Group obtains its default back pressure settings from nifi.properties, and has a default FlowFile expiration of "0 sec", i.e. do not expire.
NOTE: Setting the Default FlowFile Expiration to a non-zero value may lead to data loss due to a FlowFile expiring as its time limit is reached.
==== Controller Services
The Controller Services tab in the Process Group configuration dialog is covered in <<Controller_Services_for_Dataflows>>.

View File

@ -36,6 +36,9 @@ public class ProcessGroupDTO extends ComponentDTO {
private ParameterContextReferenceEntity parameterContext;
private String flowfileConcurrency;
private String flowfileOutboundPolicy;
private String defaultFlowFileExpiration;
private Long defaultBackPressureObjectThreshold;
private String defaultBackPressureDataSizeThreshold;
private Integer runningCount;
private Integer stoppedCount;
@ -364,7 +367,7 @@ public class ProcessGroupDTO extends ComponentDTO {
this.flowfileConcurrency = flowfileConcurrency;
}
@ApiModelProperty(value = "The Oubound Policy that is used for determining how FlowFiles should be transferred out of the Process Group.",
@ApiModelProperty(value = "The Outbound Policy that is used for determining how FlowFiles should be transferred out of the Process Group.",
allowableValues = "STREAM_WHEN_AVAILABLE, BATCH_OUTPUT")
public String getFlowfileOutboundPolicy() {
return flowfileOutboundPolicy;
@ -373,4 +376,31 @@ public class ProcessGroupDTO extends ComponentDTO {
public void setFlowfileOutboundPolicy(final String flowfileOutboundPolicy) {
this.flowfileOutboundPolicy = flowfileOutboundPolicy;
}
@ApiModelProperty(value = "The default FlowFile Expiration for this Process Group.")
public String getDefaultFlowFileExpiration() {
return defaultFlowFileExpiration;
}
public void setDefaultFlowFileExpiration(String defaultFlowFileExpiration) {
this.defaultFlowFileExpiration = defaultFlowFileExpiration;
}
@ApiModelProperty(value = "Default value used in this Process Group for the maximum number of objects that can be queued before back pressure is applied.")
public Long getDefaultBackPressureObjectThreshold() {
return defaultBackPressureObjectThreshold;
}
public void setDefaultBackPressureObjectThreshold(final Long defaultBackPressureObjectThreshold) {
this.defaultBackPressureObjectThreshold = defaultBackPressureObjectThreshold;
}
@ApiModelProperty(value = "Default value used in this Process Group for the maximum data size of objects that can be queued before back pressure is applied.")
public String getDefaultBackPressureDataSizeThreshold() {
return defaultBackPressureDataSizeThreshold;
}
public void setDefaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
}
}

View File

@ -84,7 +84,7 @@ public final class StandardConnection implements Connection, ConnectionEventList
relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
scheduler = builder.scheduler;
flowFileQueue = builder.flowFileQueueFactory.createFlowFileQueue(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, null, this);
flowFileQueue = builder.flowFileQueueFactory.createFlowFileQueue(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, null, this, processGroup.get());
hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
}
@ -456,6 +456,9 @@ public final class StandardConnection implements Connection, ConnectionEventList
}
public StandardConnection build() {
if (processGroup == null) {
throw new IllegalStateException("Cannot build a Connection without a Process Group");
}
if (source == null) {
throw new IllegalStateException("Cannot build a Connection without a Source");
}

View File

@ -17,6 +17,8 @@
package org.apache.nifi.controller.queue;
import org.apache.nifi.groups.ProcessGroup;
public interface FlowFileQueueFactory {
FlowFileQueue createFlowFileQueue(LoadBalanceStrategy loadBalanceStrategy, String partitioningAttribute, ConnectionEventListener connectionEventListener);
FlowFileQueue createFlowFileQueue(LoadBalanceStrategy loadBalanceStrategy, String partitioningAttribute, ConnectionEventListener connectionEventListener, ProcessGroup processGroup);
}

View File

@ -83,6 +83,7 @@ import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterReference;
import org.apache.nifi.parameter.ParameterUpdate;
import org.apache.nifi.parameter.StandardParameterUpdate;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.registry.ComponentVariableRegistry;
@ -133,6 +134,8 @@ import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FlowDifferenceFilters;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.SnippetUtils;
import org.apache.nifi.web.ResourceNotFoundException;
@ -161,6 +164,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -170,6 +174,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
@ -188,6 +193,9 @@ public final class StandardProcessGroup implements ProcessGroup {
private final AtomicReference<String> name;
private final AtomicReference<Position> position;
private final AtomicReference<String> comments;
private final AtomicReference<String> defaultFlowFileExpiration;
private final AtomicReference<Long> defaultBackPressureObjectThreshold; // use AtomicReference vs AtomicLong to allow storing null
private final AtomicReference<String> defaultBackPressureDataSizeThreshold;
private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
private final AtomicReference<StandardVersionControlInformation> versionControlInfo = new AtomicReference<>();
private static final SecureRandom randomGenerator = new SecureRandom();
@ -221,17 +229,30 @@ public final class StandardProcessGroup implements ProcessGroup {
private volatile FlowFileOutboundPolicy flowFileOutboundPolicy = FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE;
private volatile BatchCounts batchCounts = new NoOpBatchCounts();
private final DataValve dataValve;
private final Map<String, String> niFiPropertiesBackPressure;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class);
private static final String DEFAULT_FLOWFILE_EXPIRATION = "0 sec";
private static final String DEFAULT_BACKPRESSURE_OBJECT = "0";
private static final String DEFAULT_BACKPRESSURE_DATA_SIZE = "0 GB";
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
final PropertyEncryptor encryptor, final ExtensionManager extensionManager,
final StateManagerProvider stateManagerProvider, final FlowManager flowManager, final FlowRegistryClient flowRegistryClient,
final ReloadComponent reloadComponent, final MutableVariableRegistry variableRegistry, final NodeTypeProvider nodeTypeProvider) {
this(id, serviceProvider, scheduler, encryptor, extensionManager, stateManagerProvider, flowManager, flowRegistryClient,
reloadComponent, variableRegistry, nodeTypeProvider, null);
}
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
final PropertyEncryptor encryptor, final ExtensionManager extensionManager,
final StateManagerProvider stateManagerProvider, final FlowManager flowManager, final FlowRegistryClient flowRegistryClient,
final ReloadComponent reloadComponent, final MutableVariableRegistry variableRegistry, final NodeTypeProvider nodeTypeProvider,
final NiFiProperties niFiProperties) {
this.id = id;
this.controllerServiceProvider = serviceProvider;
this.parent = new AtomicReference<>();
@ -251,6 +272,16 @@ public final class StandardProcessGroup implements ProcessGroup {
final StateManager dataValveStateManager = stateManagerProvider.getStateManager(id + "-DataValve");
dataValve = new StandardDataValve(this, dataValveStateManager);
this.defaultFlowFileExpiration = new AtomicReference<>();
this.defaultBackPressureObjectThreshold = new AtomicReference<>();
this.defaultBackPressureDataSizeThreshold = new AtomicReference<>();
// save only the nifi properties needed, and account for the possibility those properties are missing
niFiPropertiesBackPressure = new ConcurrentHashMap<>();
niFiPropertiesBackPressure.put(NiFiProperties.BACKPRESSURE_COUNT,
niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_COUNT) == null ? DEFAULT_BACKPRESSURE_OBJECT : niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_COUNT));
niFiPropertiesBackPressure.put(NiFiProperties.BACKPRESSURE_SIZE,
niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_SIZE) == null ? DEFAULT_BACKPRESSURE_DATA_SIZE : niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_SIZE));
}
@Override
@ -3578,6 +3609,9 @@ public final class StandardProcessGroup implements ProcessGroup {
copy.setName(processGroup.getName());
copy.setFlowFileConcurrency(processGroup.getFlowFileConcurrency());
copy.setFlowFileOutboundPolicy(processGroup.getFlowFileOutboundPolicy());
copy.setDefaultFlowFileExpiration(processGroup.getDefaultFlowFileExpiration());
copy.setDefaultBackPressureObjectThreshold(processGroup.getDefaultBackPressureObjectThreshold());
copy.setDefaultBackPressureDataSizeThreshold(processGroup.getDefaultBackPressureDataSizeThreshold());
copy.setPosition(processGroup.getPosition());
copy.setVersionedFlowCoordinates(topLevel ? null : processGroup.getVersionedFlowCoordinates());
copy.setConnections(processGroup.getConnections());
@ -3606,6 +3640,9 @@ public final class StandardProcessGroup implements ProcessGroup {
childCopy.setVersionedFlowCoordinates(childGroup.getVersionedFlowCoordinates());
childCopy.setFlowFileConcurrency(childGroup.getFlowFileConcurrency());
childCopy.setFlowFileOutboundPolicy(childGroup.getFlowFileOutboundPolicy());
childCopy.setDefaultFlowFileExpiration(childGroup.getDefaultFlowFileExpiration());
childCopy.setDefaultBackPressureObjectThreshold(childGroup.getDefaultBackPressureObjectThreshold());
childCopy.setDefaultBackPressureDataSizeThreshold(childGroup.getDefaultBackPressureDataSizeThreshold());
copyChildren.add(childCopy);
}
@ -3937,6 +3974,10 @@ public final class StandardProcessGroup implements ProcessGroup {
FlowFileOutboundPolicy.valueOf(proposed.getFlowFileOutboundPolicy());
group.setFlowFileOutboundPolicy(outboundPolicy);
group.setDefaultFlowFileExpiration(proposed.getDefaultFlowFileExpiration());
group.setDefaultBackPressureObjectThreshold(proposed.getDefaultBackPressureObjectThreshold());
group.setDefaultBackPressureDataSizeThreshold(proposed.getDefaultBackPressureDataSizeThreshold());
final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
if (remoteCoordinates == null) {
group.disconnectVersionControl(false);
@ -5566,4 +5607,93 @@ public final class StandardProcessGroup implements ProcessGroup {
public DataValve getDataValve() {
return dataValve;
}
@Override
public void setDefaultFlowFileExpiration(final String defaultFlowFileExpiration) {
// use default if value not provided
if (StringUtils.isBlank(defaultFlowFileExpiration)) {
this.defaultFlowFileExpiration.set(DEFAULT_FLOWFILE_EXPIRATION);
} else {
// Validate entry: must include time unit label
Pattern pattern = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
String caseAdjustedExpiration = defaultFlowFileExpiration.toLowerCase();
if (pattern.matcher(caseAdjustedExpiration).matches()) {
this.defaultFlowFileExpiration.set(caseAdjustedExpiration);
} else {
throw new IllegalArgumentException("The Default FlowFile Expiration of the process group must contain a valid time unit.");
}
}
}
@Override
public String getDefaultFlowFileExpiration() {
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, use the default.
if (defaultFlowFileExpiration.get() == null) {
if (isRootGroup()) {
return DEFAULT_FLOWFILE_EXPIRATION;
} else {
return parent.get().getDefaultFlowFileExpiration();
}
}
return defaultFlowFileExpiration.get();
}
@Override
public void setDefaultBackPressureObjectThreshold(final Long defaultBackPressureObjectThreshold) {
// use default if value not provided
if (defaultBackPressureObjectThreshold == null) {
this.defaultBackPressureObjectThreshold.set(Long.parseLong(niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_COUNT)));
} else {
// Validate field is numeric
Pattern pattern = Pattern.compile("(\\d+)");
if (pattern.matcher(String.valueOf(defaultBackPressureObjectThreshold)).matches()) {
this.defaultBackPressureObjectThreshold.set(defaultBackPressureObjectThreshold);
} else {
throw new IllegalArgumentException("The Default Back Pressure Object Threshold of the process group must be numeric.");
}
}
}
@Override
public Long getDefaultBackPressureObjectThreshold() {
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.
if (defaultBackPressureObjectThreshold.get() == null) {
if (isRootGroup()) {
return Long.parseLong(niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_COUNT));
} else {
return getParent().getDefaultBackPressureObjectThreshold();
}
}
return defaultBackPressureObjectThreshold.get();
}
@Override
public void setDefaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
// use default if value not provided
if (StringUtils.isBlank(defaultBackPressureDataSizeThreshold)) {
this.defaultBackPressureDataSizeThreshold.set(niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_SIZE));
} else {
// Validate entry: must include size unit label
Pattern pattern = Pattern.compile(DataUnit.DATA_SIZE_REGEX);
String caseAdjustedSizeThreshold = defaultBackPressureDataSizeThreshold.toUpperCase();
if (pattern.matcher(caseAdjustedSizeThreshold).matches()) {
this.defaultBackPressureDataSizeThreshold.set(caseAdjustedSizeThreshold);
} else {
throw new IllegalArgumentException("The Default Back Pressure Data Size Threshold of the process group must contain a valid data size unit.");
}
}
}
@Override
public String getDefaultBackPressureDataSizeThreshold() {
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.
if (StringUtils.isEmpty(defaultBackPressureDataSizeThreshold.get())) {
if (isRootGroup()) {
return niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_SIZE);
} else {
return parent.get().getDefaultBackPressureDataSizeThreshold();
}
}
return defaultBackPressureDataSizeThreshold.get();
}
}

View File

@ -261,6 +261,9 @@ public class RestBasedFlowRegistry implements FlowRegistry {
group.setParameterContextName(contents.getParameterContextName());
group.setFlowFileConcurrency(contents.getFlowFileConcurrency());
group.setFlowFileOutboundPolicy(contents.getFlowFileOutboundPolicy());
group.setDefaultFlowFileExpiration(contents.getDefaultFlowFileExpiration());
group.setDefaultBackPressureObjectThreshold(contents.getDefaultBackPressureObjectThreshold());
group.setDefaultBackPressureDataSizeThreshold(contents.getDefaultBackPressureDataSizeThreshold());
coordinates.setLatest(snapshot.isLatest());
}

View File

@ -197,6 +197,9 @@ public class NiFiRegistryFlowMapper {
versionedGroup.setPosition(mapPosition(group.getPosition()));
versionedGroup.setFlowFileConcurrency(group.getFlowFileConcurrency().name());
versionedGroup.setFlowFileOutboundPolicy(group.getFlowFileOutboundPolicy().name());
versionedGroup.setDefaultFlowFileExpiration(group.getDefaultFlowFileExpiration());
versionedGroup.setDefaultBackPressureObjectThreshold(group.getDefaultBackPressureObjectThreshold());
versionedGroup.setDefaultBackPressureDataSizeThreshold(group.getDefaultBackPressureDataSizeThreshold());
final ParameterContext parameterContext = group.getParameterContext();
versionedGroup.setParameterContextName(parameterContext == null ? null : parameterContext.getName());

View File

@ -1174,4 +1174,40 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @return the DataValve associated with this Process Group
*/
DataValve getDataValve();
/**
* @return the default flowfile expiration of the ProcessGroup
*/
String getDefaultFlowFileExpiration();
/**
* Updates the default flowfile expiration of this ProcessGroup.
*
* @param defaultFlowFileExpiration new default flowfile expiration value (must include time unit label)
*/
void setDefaultFlowFileExpiration(String defaultFlowFileExpiration);
/**
* @return the default back pressure object threshold of this ProcessGroup
*/
Long getDefaultBackPressureObjectThreshold();
/**
* Updates the default back pressure object threshold of this ProcessGroup
*
* @param defaultBackPressureObjectThreshold new default back pressure object threshold value
*/
void setDefaultBackPressureObjectThreshold(Long defaultBackPressureObjectThreshold);
/**
* @returnthe default back pressure size threshold of this ProcessGroup
*/
String getDefaultBackPressureDataSizeThreshold();
/**
* Updates the default back pressure size threshold of this ProcessGroup
*
* @param defaultBackPressureDataSizeThreshold new default back pressure size threshold (must include size unit label)
*/
void setDefaultBackPressureDataSizeThreshold(String defaultBackPressureDataSizeThreshold);
}

View File

@ -579,7 +579,8 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
this.reloadComponent = new StandardReloadComponent(this);
final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), controllerServiceProvider, processScheduler,
encryptor, extensionManager, stateManagerProvider, flowManager, flowRegistryClient, reloadComponent, new MutableVariableRegistry(this.variableRegistry), this);
encryptor, extensionManager, stateManagerProvider, flowManager, flowRegistryClient, reloadComponent, new MutableVariableRegistry(this.variableRegistry), this,
nifiProperties);
rootGroup.setName(FlowManager.DEFAULT_ROOT_GROUP_NAME);
setRootGroup(rootGroup);
instanceId = ComponentIdGenerator.generateId().toString();
@ -1967,18 +1968,21 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() {
@Override
public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener) {
public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener,
final ProcessGroup processGroup) {
final FlowFileQueue flowFileQueue;
if (clusterCoordinator == null) {
flowFileQueue = new StandardFlowFileQueue(id, eventListener, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager,
eventReporter, nifiProperties.getQueueSwapThreshold(), nifiProperties.getDefaultBackPressureObjectThreshold(), nifiProperties.getDefaultBackPressureDataSizeThreshold());
eventReporter, nifiProperties.getQueueSwapThreshold(),
processGroup.getDefaultFlowFileExpiration(), processGroup.getDefaultBackPressureObjectThreshold(), processGroup.getDefaultBackPressureDataSizeThreshold());
} else {
flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, eventListener, processScheduler, flowFileRepository, provenanceRepository, contentRepository, resourceClaimManager,
clusterCoordinator, loadBalanceClientRegistry, swapManager, nifiProperties.getQueueSwapThreshold(), eventReporter);
flowFileQueue.setBackPressureObjectThreshold(nifiProperties.getDefaultBackPressureObjectThreshold());
flowFileQueue.setBackPressureDataSizeThreshold(nifiProperties.getDefaultBackPressureDataSizeThreshold());
flowFileQueue.setFlowFileExpiration(processGroup.getDefaultFlowFileExpiration());
flowFileQueue.setBackPressureObjectThreshold(processGroup.getDefaultBackPressureObjectThreshold());
flowFileQueue.setBackPressureDataSizeThreshold(processGroup.getDefaultBackPressureDataSizeThreshold());
}
return flowFileQueue;
@ -1987,6 +1991,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
final Connection connection = builder.id(requireNonNull(id).intern())
.name(name == null ? null : name.intern())
.processGroup(destination.getProcessGroup())
.relationships(relationships)
.source(requireNonNull(source))
.destination(destination)

View File

@ -487,6 +487,21 @@ public class StandardFlowSnippet implements FlowSnippet {
}
}
final String defaultFlowFileExpiration = groupDTO.getDefaultFlowFileExpiration();
if (defaultFlowFileExpiration != null) {
childGroup.setDefaultFlowFileExpiration(defaultFlowFileExpiration);
}
final Long defaultBackPressureObjectThreshold = groupDTO.getDefaultBackPressureObjectThreshold();
if (defaultBackPressureObjectThreshold != null) {
childGroup.setDefaultBackPressureObjectThreshold(defaultBackPressureObjectThreshold);
}
final String defaultBackPressureDataSizeThreshold = groupDTO.getDefaultBackPressureDataSizeThreshold();
if (defaultBackPressureDataSizeThreshold != null) {
childGroup.setDefaultBackPressureDataSizeThreshold(defaultBackPressureDataSizeThreshold);
}
// If this Process Group is 'top level' then we do not set versioned component ID's.
// We do this only if this component is the child of a Versioned Component.
if (!topLevel) {

View File

@ -1146,7 +1146,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
/**
* Updates the process group corresponding to the specified DTO. Any field
* in DTO that is <code>null</code> (with the exception of the required ID)
* will be ignored.
* will be ignored, or in the case of back pressure settings, will obtain
* value from the parent of this process group
*
* @throws IllegalStateException if no process group can be found with the
* ID of DTO or with the ID of the DTO's parentGroupId, if the template ID
@ -1161,6 +1162,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final String comments = dto.getComments();
final String flowfileConcurrencyName = dto.getFlowfileConcurrency();
final String flowfileOutboundPolicyName = dto.getFlowfileOutboundPolicy();
final String defaultFlowFileExpiration = dto.getDefaultFlowFileExpiration();
final Long defaultBackPressureObjectThreshold = dto.getDefaultBackPressureObjectThreshold();
final String defaultBackPressureDataSizeThreshold = dto.getDefaultBackPressureDataSizeThreshold();
if (name != null) {
group.setName(name);
@ -1193,6 +1197,15 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
}
if (defaultFlowFileExpiration != null) {
group.setDefaultFlowFileExpiration(defaultFlowFileExpiration);
}
if (defaultBackPressureObjectThreshold != null) {
group.setDefaultBackPressureObjectThreshold(defaultBackPressureObjectThreshold);
}
if (defaultBackPressureDataSizeThreshold != null) {
group.setDefaultBackPressureDataSizeThreshold(defaultBackPressureDataSizeThreshold);
}
}
private <T extends Connectable & Triggerable> ScheduledState getScheduledState(final T component, final FlowController flowController) {
@ -1305,6 +1318,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
processGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(flowfileOutboundPolicyName));
}
processGroup.setDefaultFlowFileExpiration(processGroupDTO.getDefaultFlowFileExpiration());
processGroup.setDefaultBackPressureObjectThreshold(processGroupDTO.getDefaultBackPressureObjectThreshold());
processGroup.setDefaultBackPressureDataSizeThreshold(processGroupDTO.getDefaultBackPressureDataSizeThreshold());
final String parameterContextId = getString(processGroupElement, "parameterContextId");
if (parameterContextId != null) {

View File

@ -259,7 +259,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
final ProcessGroup group = new StandardProcessGroup(requireNonNull(id), flowController.getControllerServiceProvider(), processScheduler, flowController.getEncryptor(),
flowController.getExtensionManager(), flowController.getStateManagerProvider(), this, flowController.getFlowRegistryClient(),
flowController.getReloadComponent(), mutableVariableRegistry, flowController);
flowController.getReloadComponent(), mutableVariableRegistry, flowController, nifiProperties);
onProcessGroupAdded(group);
return group;

View File

@ -56,7 +56,7 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
private final ResourceClaimManager resourceClaimManager;
private final ProcessScheduler scheduler;
private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L));
private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 sec", 0L));
private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("1 GB", 1024 * 1024 * 1024, 10000));
private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>();

View File

@ -53,9 +53,10 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow
public StandardFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
final int swapThreshold, final long defaultBackPressureObjectThreshold, final String defaultBackPressureDataSizeThreshold) {
final int swapThreshold, final String expirationPeriod, final long defaultBackPressureObjectThreshold, final String defaultBackPressureDataSizeThreshold) {
super(identifier, scheduler, flowFileRepo, provRepo, resourceClaimManager);
super.setFlowFileExpiration(expirationPeriod);
this.swapManager = swapManager;
this.queue = new SwappablePriorityQueue(swapManager, swapThreshold, eventReporter, this, this::drop, null);
this.eventListener = eventListener;

View File

@ -185,6 +185,9 @@ public class FlowFromDOMFactory {
dto.setComments(getString(element, "comment"));
dto.setFlowfileConcurrency(getString(element, "flowfileConcurrency"));
dto.setFlowfileOutboundPolicy(getString(element, "flowfileOutboundPolicy"));
dto.setDefaultFlowFileExpiration(getString(element, "defaultFlowFileExpiration"));
dto.setDefaultBackPressureObjectThreshold(getLong(element, "defaultBackPressureObjectThreshold"));
dto.setDefaultBackPressureDataSizeThreshold(getString(element, "defaultBackPressureDataSizeThreshold"));
final Map<String, String> variables = new HashMap<>();
final NodeList variableList = DomUtils.getChildNodesByTagName(element, "variable");
@ -583,8 +586,10 @@ public class FlowFromDOMFactory {
return Integer.parseInt(getString(element, childElementName));
}
private static long getLong(final Element element, final String childElementName) {
return Long.parseLong(getString(element, childElementName));
private static Long getLong(final Element element, final String childElementName) {
// missing element must be handled gracefully, e.g. flow definition from a previous version without this element
String longString = getString(element, childElementName);
return longString == null ? null : Long.parseLong(longString);
}
private static boolean getBoolean(final Element element, final String childElementName) {

View File

@ -235,6 +235,9 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
addTextElement(element, "comment", group.getComments());
addTextElement(element, "flowfileConcurrency", group.getFlowFileConcurrency().name());
addTextElement(element, "flowfileOutboundPolicy", group.getFlowFileOutboundPolicy().name());
addTextElement(element, "defaultFlowFileExpiration", group.getDefaultFlowFileExpiration());
addTextElement(element, "defaultBackPressureObjectThreshold", group.getDefaultBackPressureObjectThreshold());
addTextElement(element, "defaultBackPressureDataSizeThreshold", group.getDefaultBackPressureDataSizeThreshold());
final VersionControlInformation versionControlInfo = group.getVersionControlInformation();
if (versionControlInfo != null) {

View File

@ -346,6 +346,10 @@ public class FingerprintFactory {
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "parameterContextId"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "flowfileConcurrency"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "flowfileOutboundPolicy"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "defaultFlowFileExpiration"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "defaultBackPressureObjectThreshold"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "defaultBackPressureDataSizeThreshold"));
final Element versionControlInfo = DomUtils.getChild(processGroupElem, "versionControlInformation");
if (versionControlInfo == null) {

View File

@ -180,6 +180,9 @@
<xs:element name="comment" type="xs:string" />
<xs:element name="flowfileConcurrency" type="FlowFileConcurrencyType" minOccurs="0" maxOccurs="1" />
<xs:element name="flowfileOutboundPolicy" type="FlowFileOutboundPolicyType" minOccurs="0" maxOccurs="1" />
<xs:element name="defaultFlowFileExpiration" type="xs:string" minOccurs="0" maxOccurs="1" />
<xs:element name="defaultBackPressureObjectThreshold" type="xs:long" minOccurs="0" maxOccurs="1" />
<xs:element name="defaultBackPressureDataSizeThreshold" type="xs:string" minOccurs="0" maxOccurs="1" />
<xs:element name="versionControlInformation" type="VersionControlInformation" minOccurs="0" maxOccurs="1" />
<!-- Each "processor" defines the actual dataflow work horses that make dataflow happen-->
@ -243,6 +246,9 @@
<xs:element name="comment" type="xs:string" />
<xs:element name="flowfileConcurrency" type="FlowFileConcurrencyType" minOccurs="0" maxOccurs="1" />
<xs:element name="flowfileOutboundPolicy" type="FlowFileOutboundPolicyType" minOccurs="0" maxOccurs="1" />
<xs:element name="defaultFlowFileExpiration" type="xs:string" minOccurs="0" maxOccurs="1" />
<xs:element name="defaultBackPressureObjectThreshold" type="xs:long" minOccurs="0" maxOccurs="1" />
<xs:element name="defaultBackPressureDataSizeThreshold" type="xs:string" minOccurs="0" maxOccurs="1" />
<!-- Each "processor" defines the actual dataflow work horses that make dataflow happen-->
<xs:element name="processor" type="ProcessorType" minOccurs="0" maxOccurs="unbounded"/>

View File

@ -103,7 +103,7 @@ public class TestStandardFlowFileQueue {
}
}).when(provRepo).registerEvents(Mockito.any(Iterable.class));
queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, 0L, "0 B");
queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, "0 sec", 0L, "0 B");
MockFlowFileRecord.resetIdGenerator();
}
@ -357,7 +357,7 @@ public class TestStandardFlowFileQueue {
@Test
public void testSwapInWhenThresholdIsLessThanSwapSize() {
// create a queue where the swap threshold is less than 10k
queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000, 0L, "0 B");
queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000, "0 sec", 0L, "0 B");
for (int i = 1; i <= 20000; i++) {
queue.put(new MockFlowFileRecord());

View File

@ -224,7 +224,7 @@ public class StandardProcessSessionIT {
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", new NopConnectionEventListener(), flowFileRepo, provenanceRepo, null,
processScheduler, swapManager, null, 10000, 0L, "0 B");
processScheduler, swapManager, null, 10000, "0 sec", 0L, "0 B");
return Mockito.spy(actualQueue);
}

View File

@ -206,7 +206,7 @@ public class TestRocksDBFlowFileRepository {
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
final FlowFileSwapManager swapMgr = new TestRocksDBFlowFileRepository.MockFlowFileSwapManager();
final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, 0L, "0 B");
final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);
@ -651,7 +651,7 @@ public class TestRocksDBFlowFileRepository {
provider = new TestQueueProvider();
queuedFlowFiles = new ConcurrentSkipListSet<>(); // potentially accessed from multiple threads
final FlowFileQueue queue = new StandardFlowFileQueue("1234", null, null, null, null, null, null, null, 0, 0, "0 B") {
final FlowFileQueue queue = new StandardFlowFileQueue("1234", null, null, null, null, null, null, null, 0, "0 sec",0, "0 B") {
@Override
public void put(final FlowFileRecord file) {
queuedFlowFiles.add(file);

View File

@ -520,7 +520,7 @@ public class TestWriteAheadFlowFileRepository {
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, 0L, "0 B");
final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);

View File

@ -71,6 +71,9 @@ public class MockProcessGroup implements ProcessGroup {
private final MutableVariableRegistry variableRegistry = new MutableVariableRegistry(VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY);
private VersionControlInformation versionControlInfo;
private ParameterContext parameterContext;
private String defaultFlowfileExpiration;
private long defaultBackPressureObjectThreshold;
private String defaultBackPressureDataSizeThreshold;
public MockProcessGroup(final FlowManager flowManager) {
this.flowManager = flowManager;
@ -809,6 +812,36 @@ public class MockProcessGroup implements ProcessGroup {
return null;
}
@Override
public String getDefaultFlowFileExpiration() {
return defaultFlowfileExpiration;
}
@Override
public void setDefaultFlowFileExpiration(String defaultFlowFileExpiration) {
this.defaultFlowfileExpiration = defaultFlowFileExpiration;
}
@Override
public Long getDefaultBackPressureObjectThreshold() {
return defaultBackPressureObjectThreshold;
}
@Override
public void setDefaultBackPressureObjectThreshold(Long defaultBackPressureObjectThreshold) {
this.defaultBackPressureObjectThreshold = defaultBackPressureObjectThreshold;
}
@Override
public String getDefaultBackPressureDataSizeThreshold() {
return defaultBackPressureDataSizeThreshold;
}
@Override
public void setDefaultBackPressureDataSizeThreshold(String defaultBackPressureDataSizeThreshold) {
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
}
@Override
public void terminateProcessor(ProcessorNode processor) {
}

View File

@ -377,10 +377,11 @@ public class FrameworkIntegrationTest {
FileUtils.deleteFile(dir, true);
}
protected FlowFileQueue createFlowFileQueue(final String uuid) {
protected FlowFileQueue createFlowFileQueue(final String uuid, final ProcessGroup processGroup) {
final RepositoryContext repoContext = getRepositoryContext();
return new StandardFlowFileQueue(uuid, ConnectionEventListener.NOP_EVENT_LISTENER, repoContext.getFlowFileRepository(), repoContext.getProvenanceRepository(),
resourceClaimManager, processScheduler, flowFileSwapManager, flowController.createEventReporter(), 20000, 10000L, "1 GB");
resourceClaimManager, processScheduler, flowFileSwapManager, flowController.createEventReporter(), 20000,
processGroup.getDefaultFlowFileExpiration(), processGroup.getDefaultBackPressureObjectThreshold(), processGroup.getDefaultBackPressureDataSizeThreshold());
}
protected final ProcessorNode createProcessorNode(final Class<? extends Processor> processorType) {
@ -477,10 +478,11 @@ public class FrameworkIntegrationTest {
final Connection connection = new StandardConnection.Builder(processScheduler)
.source(source)
.destination(destination)
.processGroup(processGroup)
.relationships(relationships)
.id(id)
.clustered(false)
.flowFileQueueFactory((loadBalanceStrategy, partitioningAttribute, eventListener) -> createFlowFileQueue(id))
.flowFileQueueFactory((loadBalanceStrategy, partitioningAttribute, eventListener, processGroup1) -> createFlowFileQueue(id, processGroup))
.build();
source.addConnection(connection);

View File

@ -74,7 +74,7 @@ public class FlowFileRepositoryLifecycleIT extends FrameworkIntegrationTest {
shutdown();
final FlowFileQueue restoredQueue = createFlowFileQueue(queue.getIdentifier());
final FlowFileQueue restoredQueue = createFlowFileQueue(queue.getIdentifier(), getRootGroup());
initialize();
getFlowController().initializeFlow(() -> Collections.singleton(restoredQueue));

View File

@ -31,6 +31,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Assert;
import org.junit.Test;
@ -43,6 +44,40 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class StandardProcessGroupIT extends FrameworkIntegrationTest {
@Test
public void testProcessGroupDefaults() {
// Connect two processors with default settings of the root process group
ProcessorNode sourceProcessor = createGenerateProcessor(1);
ProcessorNode destinationProcessor = createProcessorNode((context, session) -> {});
Connection connection = connect(sourceProcessor, destinationProcessor, Collections.singleton(REL_SUCCESS));
// Verify all defaults are in place on the process group and the connection
assertEquals("0 sec", getRootGroup().getDefaultFlowFileExpiration());
assertEquals(NiFiProperties.DEFAULT_BACKPRESSURE_COUNT, (long) getRootGroup().getDefaultBackPressureObjectThreshold());
assertEquals(NiFiProperties.DEFAULT_BACKPRESSURE_SIZE, getRootGroup().getDefaultBackPressureDataSizeThreshold());
assertEquals("0 sec", connection.getFlowFileQueue().getFlowFileExpiration());
assertEquals(NiFiProperties.DEFAULT_BACKPRESSURE_COUNT, (long) connection.getFlowFileQueue().getBackPressureObjectThreshold());
assertEquals(NiFiProperties.DEFAULT_BACKPRESSURE_SIZE, connection.getFlowFileQueue().getBackPressureDataSizeThreshold());
// Update default settings of the process group, and create a new connection
getRootGroup().setDefaultFlowFileExpiration("99 min");
getRootGroup().setDefaultBackPressureObjectThreshold(99L);
getRootGroup().setDefaultBackPressureDataSizeThreshold("99 MB");
ProcessorNode sourceProcessor1 = createGenerateProcessor(1);
ProcessorNode destinationProcessor1 = createProcessorNode((context, session) -> {});
Connection connection1 = connect(sourceProcessor1, destinationProcessor1, Collections.singleton(REL_SUCCESS));
// Verify updated settings are in place on the process group and the connection
assertEquals("99 min", getRootGroup().getDefaultFlowFileExpiration());
assertEquals(99, (long) getRootGroup().getDefaultBackPressureObjectThreshold());
assertEquals("99 MB", getRootGroup().getDefaultBackPressureDataSizeThreshold());
assertEquals("99 min", connection1.getFlowFileQueue().getFlowFileExpiration());
assertEquals(99, (long) connection1.getFlowFileQueue().getBackPressureObjectThreshold());
assertEquals("99 MB", connection1.getFlowFileQueue().getBackPressureDataSizeThreshold());
}
@Test
public void testDropAllFlowFilesFromOneConnection() throws Exception {
ProcessorNode sourceProcessGroup = createGenerateProcessor(1);

View File

@ -2512,6 +2512,9 @@ public final class DtoFactory {
dto.setVersionControlInformation(createVersionControlInformationDto(group));
dto.setFlowfileConcurrency(group.getFlowFileConcurrency().name());
dto.setFlowfileOutboundPolicy(group.getFlowFileOutboundPolicy().name());
dto.setDefaultFlowFileExpiration(group.getDefaultFlowFileExpiration());
dto.setDefaultBackPressureObjectThreshold(group.getDefaultBackPressureObjectThreshold());
dto.setDefaultBackPressureDataSizeThreshold(group.getDefaultBackPressureDataSizeThreshold());
final ParameterContext parameterContext = group.getParameterContext();
if (parameterContext != null) {
@ -4342,6 +4345,9 @@ public final class DtoFactory {
copy.setVersionedComponentId(original.getVersionedComponentId());
copy.setFlowfileConcurrency(original.getFlowfileConcurrency());
copy.setFlowfileOutboundPolicy(original.getFlowfileOutboundPolicy());
copy.setDefaultFlowFileExpiration(original.getDefaultFlowFileExpiration());
copy.setDefaultBackPressureObjectThreshold(original.getDefaultBackPressureObjectThreshold());
copy.setDefaultBackPressureDataSizeThreshold(original.getDefaultBackPressureDataSizeThreshold());
copy.setRunningCount(original.getRunningCount());
copy.setStoppedCount(original.getStoppedCount());

View File

@ -339,6 +339,10 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final String outboundPolicyName = processGroupDTO.getFlowfileOutboundPolicy();
final FlowFileOutboundPolicy flowFileOutboundPolicy = outboundPolicyName == null ? null : FlowFileOutboundPolicy.valueOf(outboundPolicyName);
final String defaultFlowFileExpiration = processGroupDTO.getDefaultFlowFileExpiration();
final Long defaultBackPressureObjectThreshold = processGroupDTO.getDefaultBackPressureObjectThreshold();
final String defaultBackPressureDataSizeThreshold = processGroupDTO.getDefaultBackPressureDataSizeThreshold();
final ParameterContextReferenceEntity parameterContextReference = processGroupDTO.getParameterContext();
if (parameterContextReference != null) {
final String parameterContextId = parameterContextReference.getId();
@ -373,6 +377,17 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
if (flowFileOutboundPolicy != null) {
group.setFlowFileOutboundPolicy(flowFileOutboundPolicy);
}
if (defaultFlowFileExpiration != null) {
group.setDefaultFlowFileExpiration(processGroupDTO.getDefaultFlowFileExpiration());
}
if (defaultBackPressureObjectThreshold != null) {
group.setDefaultBackPressureObjectThreshold(processGroupDTO.getDefaultBackPressureObjectThreshold());
}
if (defaultBackPressureDataSizeThreshold != null) {
group.setDefaultBackPressureDataSizeThreshold(processGroupDTO.getDefaultBackPressureDataSizeThreshold());
}
group.onComponentModified();
return group;
}

View File

@ -72,6 +72,34 @@
<span id="read-only-process-group-outbound-policy" class="unset"></span>
</div>
</div>
<div class="setting">
<div class="setting-name">Default FlowFile Expiration</div>
<div class="editable setting-field">
<input type="text" id="process-group-default-flowfile-expiration" class="setting-input"/>
</div>
<div class="read-only setting-field">
<span id="read-only-process-group-default-flowfile-expiration" class="unset"></span>
</div>
</div>
<div class="setting">
<div class="setting-name">Default Back Pressure Object Threshold</div>
<div class="editable setting-field">
<input type="text" id="process-group-default-back-pressure-object-threshold" class="setting-input"/>
</div>
<div class="read-only setting-field">
<span id="read-only-process-group-default-back-pressure-object-threshold" class="unset"></span>
</div>
</div>
<div class="setting">
<div class="setting-name">Default Back Pressure Data Size Threshold</div>
<div class="editable setting-field">
<input type="text" id="process-group-default-back-pressure-data-size-threshold" class="setting-input"/>
</div>
<div class="read-only setting-field">
<span id="read-only-process-group-default-back-pressure-data-size-threshold" class="unset"></span>
</div>
</div>
<div class="editable settings-buttons">
<div id="process-group-configuration-save" class="button">Apply</div>
<div class="clear"></div>

View File

@ -93,6 +93,18 @@
width: 328px;
}
#process-group-default-flowfile-expiration {
width: 328px;
}
#process-group-default-back-pressure-object-threshold {
width: 328px;
}
#process-group-default-back-pressure-data-size-threshold {
width: 328px;
}
#process-group-comments {
height: 100px;
}

View File

@ -45,6 +45,12 @@
var canvas;
var origin;
var config = {
urls: {
api: '../nifi-api',
}
};
/**
* Determines if we want to allow adding connections in the current state:
*
@ -212,7 +218,19 @@
// create the connection
var destinationData = destination.datum();
nfConnectionConfiguration.createConnection(connectorData.sourceId, destinationData.id);
$.ajax({
type: 'GET',
url: config.urls.api + '/process-groups/' + encodeURIComponent(destinationData.component.parentGroupId),
dataType: 'json'
}).done(function (response) {
var defaultSettings = {
flowfileExpiration: response.component.defaultFlowFileExpiration,
objectThreshold: response.component.defaultBackPressureObjectThreshold,
dataSizeThreshold: response.component.defaultBackPressureDataSizeThreshold,
};
nfConnectionConfiguration.createConnection(connectorData.sourceId, destinationData.id, defaultSettings);
});
}
});
},

View File

@ -1267,13 +1267,10 @@
* @param nfBirdseyeRef The nfBirdseye module.
* @param nfGraphRef The nfGraph module.
*/
init: function (nfBirdseyeRef, nfGraphRef, defaultBackPressureObjectThresholdRef, defaultBackPressureDataSizeThresholdRef) {
init: function (nfBirdseyeRef, nfGraphRef) {
nfBirdseye = nfBirdseyeRef;
nfGraph = nfGraphRef;
defaultBackPressureObjectThreshold = defaultBackPressureObjectThresholdRef;
defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThresholdRef;
// initially hide the relationship names container
$('#relationship-names-container').hide();
@ -1383,7 +1380,7 @@
* @argument {string} sourceId The source id
* @argument {string} destinationId The destination id
*/
createConnection: function (sourceId, destinationId) {
createConnection: function (sourceId, destinationId, defaultSettings) {
// select the source and destination
var source = d3.select('#id-' + sourceId);
var destination = d3.select('#id-' + destinationId);
@ -1401,9 +1398,9 @@
// initialize the connection dialog
$.when(initializeSourceNewConnectionDialog(source), initializeDestinationNewConnectionDialog(destination)).done(function () {
// set the default values
$('#flow-file-expiration').val('0 sec');
$('#back-pressure-object-threshold').val(defaultBackPressureObjectThreshold);
$('#back-pressure-data-size-threshold').val(defaultBackPressureDataSizeThreshold);
$('#flow-file-expiration').val(defaultSettings.flowfileExpiration);
$('#back-pressure-object-threshold').val(defaultSettings.objectThreshold);
$('#back-pressure-data-size-threshold').val(defaultSettings.dataSizeThreshold);
// select the first tab
$('#connection-configuration-tabs').find('li:first').click();

View File

@ -61,6 +61,7 @@
var nfControllerServices;
var nfParameterContexts;
var nfBackpressureDefaults;
var config = {
urls: {
@ -107,7 +108,10 @@
'id': $('#process-group-parameter-context-combo').combo('getSelectedOption').value
},
'flowfileConcurrency': $('#process-group-flowfile-concurrency-combo').combo('getSelectedOption').value,
'flowfileOutboundPolicy': $('#process-group-outbound-policy-combo').combo('getSelectedOption').value
'flowfileOutboundPolicy': $('#process-group-outbound-policy-combo').combo('getSelectedOption').value,
'defaultFlowFileExpiration': $('#process-group-default-flowfile-expiration').val(),
'defaultBackPressureObjectThreshold': $('#process-group-default-back-pressure-object-threshold').val(),
'defaultBackPressureDataSizeThreshold': $('#process-group-default-back-pressure-data-size-threshold').val()
}
};
@ -167,6 +171,9 @@
var setUnauthorizedText = function () {
$('#read-only-process-group-name').text('Unauthorized');
$('#read-only-process-group-comments').text('Unauthorized');
$('#read-only-process-group-default-flowfile-expiration').text('Unauthorized');
$('#read-only-process-group-default-back-pressure-object-threshold').text('Unauthorized');
$('#read-only-process-group-default-back-pressure-data-size-threshold').text('Unauthorized');
};
var setEditable = function (editable) {
@ -255,6 +262,10 @@
}
});
$('#process-group-default-flowfile-expiration').removeClass('unset').val(processGroup.defaultFlowFileExpiration);
$('#process-group-default-back-pressure-object-threshold').removeClass('unset').val(processGroup.defaultBackPressureObjectThreshold);
$('#process-group-default-back-pressure-data-size-threshold').removeClass('unset').val(processGroup.defaultBackPressureDataSizeThreshold);
// populate the header
$('#process-group-configuration-header-text').text(processGroup.name + ' Configuration');
@ -290,6 +301,15 @@
// populate the header
$('#process-group-configuration-header-text').text(processGroup.name + ' Configuration');
// backpressure settings
$('#process-group-default-flowfile-expiration').text(processGroup.defaultFlowFileExpiration);
$('#process-group-default-back-pressure-object-threshold').text(processGroup.defaultBackPressureObjectThreshold);
$('#process-group-default-back-pressure-data-size-threshold').text(processGroup.defaultBackPressureDataSizeThreshold);
$('#read-only-process-group-default-flowfile-expiration').text(processGroup.defaultFlowFileExpiration);
$('#read-only-process-group-default-back-pressure-object-threshold').text(processGroup.defaultBackPressureObjectThreshold);
$('#read-only-process-group-default-back-pressure-data-size-threshold').text(processGroup.defaultBackPressureDataSizeThreshold);
} else {
setUnauthorizedText();
}
@ -519,6 +539,9 @@
$('#process-group-id').text('');
$('#process-group-name').val('');
$('#process-group-comments').val('');
$('#process-group-default-flowfile-expiration').val('');
$('#process-group-default-back-pressure-object-threshold').val('');
$('#process-group-default-back-pressure-data-size-threshold').val('');
// reset the header
$('#process-group-configuration-header-text').text('Process Group Configuration');

View File

@ -45,6 +45,10 @@ public class VersionedProcessGroup extends VersionedComponent {
private String flowfileConcurrency;
private String flowfileOutboundPolicy;
private String defaultFlowFileExpiration;
private Long defaultBackPressureObjectThreshold;
private String defaultBackPressureDataSizeThreshold;
@ApiModelProperty("The child Process Groups")
public Set<VersionedProcessGroup> getProcessGroups() {
return processGroups;
@ -175,4 +179,31 @@ public class VersionedProcessGroup extends VersionedComponent {
public void setFlowFileOutboundPolicy(final String outboundPolicy) {
this.flowfileOutboundPolicy = outboundPolicy;
}
@ApiModelProperty(value = "The default FlowFile Expiration for this Process Group.")
public String getDefaultFlowFileExpiration() {
return defaultFlowFileExpiration;
}
public void setDefaultFlowFileExpiration(String defaultFlowFileExpiration) {
this.defaultFlowFileExpiration = defaultFlowFileExpiration;
}
@ApiModelProperty(value = "Default value used in this Process Group for the maximum number of objects that can be queued before back pressure is applied.")
public Long getDefaultBackPressureObjectThreshold() {
return defaultBackPressureObjectThreshold;
}
public void setDefaultBackPressureObjectThreshold(final Long defaultBackPressureObjectThreshold) {
this.defaultBackPressureObjectThreshold = defaultBackPressureObjectThreshold;
}
@ApiModelProperty(value = "Default value used in this Process Group for the maximum data size of objects that can be queued before back pressure is applied.")
public String getDefaultBackPressureDataSizeThreshold() {
return defaultBackPressureDataSizeThreshold == null ? "" : defaultBackPressureDataSizeThreshold;
}
public void setDefaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
}
}

View File

@ -152,6 +152,9 @@ public class RegistryUtil {
group.setVariables(contents.getVariables());
group.setFlowFileConcurrency(contents.getFlowFileConcurrency());
group.setFlowFileOutboundPolicy(contents.getFlowFileOutboundPolicy());
group.setDefaultFlowFileExpiration(contents.getDefaultFlowFileExpiration());
group.setDefaultBackPressureObjectThreshold(contents.getDefaultBackPressureObjectThreshold());
group.setDefaultBackPressureDataSizeThreshold(contents.getDefaultBackPressureDataSizeThreshold());
coordinates.setLatest(snapshot.isLatest());
}

View File

@ -230,7 +230,8 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan
final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() {
@Override
public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener) {
public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener,
final ProcessGroup processGroup) {
return new StatelessFlowFileQueue(id);
}
};