mirror of
https://github.com/apache/nifi.git
synced 2025-03-03 16:09:19 +00:00
Revert "NIFI-8195: add default connection settings to process group configuration"
This reverts commit 5ebbe0028b9ff4d23f86d87e4942665db62e9bc0.
This commit is contained in:
parent
27c35c8c42
commit
5e4f32663e
Binary file not shown.
Before Width: | Height: | Size: 64 KiB After Width: | Height: | Size: 37 KiB |
@ -742,7 +742,7 @@ The next configuration element is the Process Group Parameter Context, which is
|
|||||||
|
|
||||||
The third element in the configuration dialog is the Process Group Comments. This provides a mechanism for providing any useful information or context about the Process Group.
|
The third element in the configuration dialog is the Process Group Comments. This provides a mechanism for providing any useful information or context about the Process Group.
|
||||||
|
|
||||||
The next two elements, Process Group FlowFile Currency and Process Group Outbound Policy, are covered in the following sections.
|
The last two elements, Process Group FlowFile Currency and Process Group Outbound Policy, are covered in the following sections.
|
||||||
|
|
||||||
[[Flowfile_Concurrency]]
|
[[Flowfile_Concurrency]]
|
||||||
===== FlowFile Concurrency
|
===== FlowFile Concurrency
|
||||||
@ -833,17 +833,6 @@ input FlowFile, it is recommended that backpressure for Connections ending in an
|
|||||||
largest expected number of FlowFiles or backpressure for those Connections be disabled all together (by setting the Backpressure Threshold to 0).
|
largest expected number of FlowFiles or backpressure for those Connections be disabled all together (by setting the Backpressure Threshold to 0).
|
||||||
See <<Backpressure>> for more information.
|
See <<Backpressure>> for more information.
|
||||||
|
|
||||||
[[Default_Connection_Settings]]
|
|
||||||
===== Default Settings for Connections
|
|
||||||
The final three elements in the Process Group configuration dialog are for Default FlowFile Expiration, Default Back Pressure Object Threshold, and
|
|
||||||
Default Back Pressure Data Size Threshold. These settings configure the default values when creating a new Connection. Each Connection represents a queue,
|
|
||||||
and every queue has settings for flowfile expiration, back pressure object count, and back pressure data size. The settings specified here will effect the
|
|
||||||
default values for all new Connections created within the Process Group; it will not effect existing Connections. Child Process Groups created within the
|
|
||||||
configured Process Group will inherit the default settings. Again, existing Process Groups will not be effected. If not overridden with these options, the
|
|
||||||
root Process Group obtains its default back pressure settings from nifi.properties, and has a default FlowFile expiration of "0 sec", i.e. do not expire.
|
|
||||||
|
|
||||||
NOTE: Setting the Default FlowFile Expiration to a non-zero value may lead to data loss due to a FlowFile expiring as its time limit is reached.
|
|
||||||
|
|
||||||
==== Controller Services
|
==== Controller Services
|
||||||
The Controller Services tab in the Process Group configuration dialog is covered in <<Controller_Services_for_Dataflows>>.
|
The Controller Services tab in the Process Group configuration dialog is covered in <<Controller_Services_for_Dataflows>>.
|
||||||
|
|
||||||
|
@ -36,9 +36,6 @@ public class ProcessGroupDTO extends ComponentDTO {
|
|||||||
private ParameterContextReferenceEntity parameterContext;
|
private ParameterContextReferenceEntity parameterContext;
|
||||||
private String flowfileConcurrency;
|
private String flowfileConcurrency;
|
||||||
private String flowfileOutboundPolicy;
|
private String flowfileOutboundPolicy;
|
||||||
private String defaultFlowFileExpiration;
|
|
||||||
private Long defaultBackPressureObjectThreshold;
|
|
||||||
private String defaultBackPressureDataSizeThreshold;
|
|
||||||
|
|
||||||
private Integer runningCount;
|
private Integer runningCount;
|
||||||
private Integer stoppedCount;
|
private Integer stoppedCount;
|
||||||
@ -367,7 +364,7 @@ public class ProcessGroupDTO extends ComponentDTO {
|
|||||||
this.flowfileConcurrency = flowfileConcurrency;
|
this.flowfileConcurrency = flowfileConcurrency;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiModelProperty(value = "The Outbound Policy that is used for determining how FlowFiles should be transferred out of the Process Group.",
|
@ApiModelProperty(value = "The Oubound Policy that is used for determining how FlowFiles should be transferred out of the Process Group.",
|
||||||
allowableValues = "STREAM_WHEN_AVAILABLE, BATCH_OUTPUT")
|
allowableValues = "STREAM_WHEN_AVAILABLE, BATCH_OUTPUT")
|
||||||
public String getFlowfileOutboundPolicy() {
|
public String getFlowfileOutboundPolicy() {
|
||||||
return flowfileOutboundPolicy;
|
return flowfileOutboundPolicy;
|
||||||
@ -376,31 +373,4 @@ public class ProcessGroupDTO extends ComponentDTO {
|
|||||||
public void setFlowfileOutboundPolicy(final String flowfileOutboundPolicy) {
|
public void setFlowfileOutboundPolicy(final String flowfileOutboundPolicy) {
|
||||||
this.flowfileOutboundPolicy = flowfileOutboundPolicy;
|
this.flowfileOutboundPolicy = flowfileOutboundPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiModelProperty(value = "The default FlowFile Expiration for this Process Group.")
|
|
||||||
public String getDefaultFlowFileExpiration() {
|
|
||||||
return defaultFlowFileExpiration;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setDefaultFlowFileExpiration(String defaultFlowFileExpiration) {
|
|
||||||
this.defaultFlowFileExpiration = defaultFlowFileExpiration;
|
|
||||||
}
|
|
||||||
|
|
||||||
@ApiModelProperty(value = "Default value used in this Process Group for the maximum number of objects that can be queued before back pressure is applied.")
|
|
||||||
public Long getDefaultBackPressureObjectThreshold() {
|
|
||||||
return defaultBackPressureObjectThreshold;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setDefaultBackPressureObjectThreshold(final Long defaultBackPressureObjectThreshold) {
|
|
||||||
this.defaultBackPressureObjectThreshold = defaultBackPressureObjectThreshold;
|
|
||||||
}
|
|
||||||
|
|
||||||
@ApiModelProperty(value = "Default value used in this Process Group for the maximum data size of objects that can be queued before back pressure is applied.")
|
|
||||||
public String getDefaultBackPressureDataSizeThreshold() {
|
|
||||||
return defaultBackPressureDataSizeThreshold == null ? "" : defaultBackPressureDataSizeThreshold;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setDefaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
|
|
||||||
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -84,7 +84,7 @@ public final class StandardConnection implements Connection, ConnectionEventList
|
|||||||
relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
|
relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
|
||||||
scheduler = builder.scheduler;
|
scheduler = builder.scheduler;
|
||||||
|
|
||||||
flowFileQueue = builder.flowFileQueueFactory.createFlowFileQueue(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, null, this, processGroup.get());
|
flowFileQueue = builder.flowFileQueueFactory.createFlowFileQueue(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, null, this);
|
||||||
hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
|
hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -456,9 +456,6 @@ public final class StandardConnection implements Connection, ConnectionEventList
|
|||||||
}
|
}
|
||||||
|
|
||||||
public StandardConnection build() {
|
public StandardConnection build() {
|
||||||
if (processGroup == null) {
|
|
||||||
throw new IllegalStateException("Cannot build a Connection without a Process Group");
|
|
||||||
}
|
|
||||||
if (source == null) {
|
if (source == null) {
|
||||||
throw new IllegalStateException("Cannot build a Connection without a Source");
|
throw new IllegalStateException("Cannot build a Connection without a Source");
|
||||||
}
|
}
|
||||||
|
@ -17,8 +17,6 @@
|
|||||||
|
|
||||||
package org.apache.nifi.controller.queue;
|
package org.apache.nifi.controller.queue;
|
||||||
|
|
||||||
import org.apache.nifi.groups.ProcessGroup;
|
|
||||||
|
|
||||||
public interface FlowFileQueueFactory {
|
public interface FlowFileQueueFactory {
|
||||||
FlowFileQueue createFlowFileQueue(LoadBalanceStrategy loadBalanceStrategy, String partitioningAttribute, ConnectionEventListener connectionEventListener, ProcessGroup processGroup);
|
FlowFileQueue createFlowFileQueue(LoadBalanceStrategy loadBalanceStrategy, String partitioningAttribute, ConnectionEventListener connectionEventListener);
|
||||||
}
|
}
|
||||||
|
@ -83,7 +83,6 @@ import org.apache.nifi.parameter.ParameterDescriptor;
|
|||||||
import org.apache.nifi.parameter.ParameterReference;
|
import org.apache.nifi.parameter.ParameterReference;
|
||||||
import org.apache.nifi.parameter.ParameterUpdate;
|
import org.apache.nifi.parameter.ParameterUpdate;
|
||||||
import org.apache.nifi.parameter.StandardParameterUpdate;
|
import org.apache.nifi.parameter.StandardParameterUpdate;
|
||||||
import org.apache.nifi.processor.DataUnit;
|
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.StandardProcessContext;
|
import org.apache.nifi.processor.StandardProcessContext;
|
||||||
import org.apache.nifi.registry.ComponentVariableRegistry;
|
import org.apache.nifi.registry.ComponentVariableRegistry;
|
||||||
@ -134,8 +133,6 @@ import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
|
|||||||
import org.apache.nifi.scheduling.ExecutionNode;
|
import org.apache.nifi.scheduling.ExecutionNode;
|
||||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||||
import org.apache.nifi.util.FlowDifferenceFilters;
|
import org.apache.nifi.util.FlowDifferenceFilters;
|
||||||
import org.apache.nifi.util.FormatUtils;
|
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
|
||||||
import org.apache.nifi.util.ReflectionUtils;
|
import org.apache.nifi.util.ReflectionUtils;
|
||||||
import org.apache.nifi.util.SnippetUtils;
|
import org.apache.nifi.util.SnippetUtils;
|
||||||
import org.apache.nifi.web.ResourceNotFoundException;
|
import org.apache.nifi.web.ResourceNotFoundException;
|
||||||
@ -164,7 +161,6 @@ import java.util.Set;
|
|||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
@ -174,7 +170,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import java.util.regex.Pattern;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static java.util.Objects.requireNonNull;
|
import static java.util.Objects.requireNonNull;
|
||||||
@ -193,9 +188,6 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||||||
private final AtomicReference<String> name;
|
private final AtomicReference<String> name;
|
||||||
private final AtomicReference<Position> position;
|
private final AtomicReference<Position> position;
|
||||||
private final AtomicReference<String> comments;
|
private final AtomicReference<String> comments;
|
||||||
private final AtomicReference<String> defaultFlowFileExpiration;
|
|
||||||
private final AtomicReference<Long> defaultBackPressureObjectThreshold; // use AtomicReference vs AtomicLong to allow storing null
|
|
||||||
private final AtomicReference<String> defaultBackPressureDataSizeThreshold;
|
|
||||||
private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
|
private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
|
||||||
private final AtomicReference<StandardVersionControlInformation> versionControlInfo = new AtomicReference<>();
|
private final AtomicReference<StandardVersionControlInformation> versionControlInfo = new AtomicReference<>();
|
||||||
private static final SecureRandom randomGenerator = new SecureRandom();
|
private static final SecureRandom randomGenerator = new SecureRandom();
|
||||||
@ -229,30 +221,17 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||||||
private volatile FlowFileOutboundPolicy flowFileOutboundPolicy = FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE;
|
private volatile FlowFileOutboundPolicy flowFileOutboundPolicy = FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE;
|
||||||
private volatile BatchCounts batchCounts = new NoOpBatchCounts();
|
private volatile BatchCounts batchCounts = new NoOpBatchCounts();
|
||||||
private final DataValve dataValve;
|
private final DataValve dataValve;
|
||||||
private final Map<String, String> niFiPropertiesBackPressure;
|
|
||||||
|
|
||||||
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
|
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
|
||||||
private final Lock readLock = rwLock.readLock();
|
private final Lock readLock = rwLock.readLock();
|
||||||
private final Lock writeLock = rwLock.writeLock();
|
private final Lock writeLock = rwLock.writeLock();
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class);
|
private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class);
|
||||||
private static final String DEFAULT_FLOWFILE_EXPIRATION = "0 sec";
|
|
||||||
private static final String DEFAULT_BACKPRESSURE_OBJECT = "0";
|
|
||||||
private static final String DEFAULT_BACKPRESSURE_DATA_SIZE = "0 GB";
|
|
||||||
|
|
||||||
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
|
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
|
||||||
final PropertyEncryptor encryptor, final ExtensionManager extensionManager,
|
final PropertyEncryptor encryptor, final ExtensionManager extensionManager,
|
||||||
final StateManagerProvider stateManagerProvider, final FlowManager flowManager, final FlowRegistryClient flowRegistryClient,
|
final StateManagerProvider stateManagerProvider, final FlowManager flowManager, final FlowRegistryClient flowRegistryClient,
|
||||||
final ReloadComponent reloadComponent, final MutableVariableRegistry variableRegistry, final NodeTypeProvider nodeTypeProvider) {
|
final ReloadComponent reloadComponent, final MutableVariableRegistry variableRegistry, final NodeTypeProvider nodeTypeProvider) {
|
||||||
this(id, serviceProvider, scheduler, encryptor, extensionManager, stateManagerProvider, flowManager, flowRegistryClient,
|
|
||||||
reloadComponent, variableRegistry, nodeTypeProvider, null);
|
|
||||||
}
|
|
||||||
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
|
|
||||||
final PropertyEncryptor encryptor, final ExtensionManager extensionManager,
|
|
||||||
final StateManagerProvider stateManagerProvider, final FlowManager flowManager, final FlowRegistryClient flowRegistryClient,
|
|
||||||
final ReloadComponent reloadComponent, final MutableVariableRegistry variableRegistry, final NodeTypeProvider nodeTypeProvider,
|
|
||||||
final NiFiProperties niFiProperties) {
|
|
||||||
|
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.controllerServiceProvider = serviceProvider;
|
this.controllerServiceProvider = serviceProvider;
|
||||||
this.parent = new AtomicReference<>();
|
this.parent = new AtomicReference<>();
|
||||||
@ -272,16 +251,6 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||||||
|
|
||||||
final StateManager dataValveStateManager = stateManagerProvider.getStateManager(id + "-DataValve");
|
final StateManager dataValveStateManager = stateManagerProvider.getStateManager(id + "-DataValve");
|
||||||
dataValve = new StandardDataValve(this, dataValveStateManager);
|
dataValve = new StandardDataValve(this, dataValveStateManager);
|
||||||
|
|
||||||
this.defaultFlowFileExpiration = new AtomicReference<>();
|
|
||||||
this.defaultBackPressureObjectThreshold = new AtomicReference<>();
|
|
||||||
this.defaultBackPressureDataSizeThreshold = new AtomicReference<>();
|
|
||||||
// save only the nifi properties needed, and account for the possibility those properties are missing
|
|
||||||
niFiPropertiesBackPressure = new ConcurrentHashMap<>();
|
|
||||||
niFiPropertiesBackPressure.put(NiFiProperties.BACKPRESSURE_COUNT,
|
|
||||||
niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_COUNT) == null ? DEFAULT_BACKPRESSURE_OBJECT : niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_COUNT));
|
|
||||||
niFiPropertiesBackPressure.put(NiFiProperties.BACKPRESSURE_SIZE,
|
|
||||||
niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_SIZE) == null ? DEFAULT_BACKPRESSURE_DATA_SIZE : niFiProperties.getProperty(NiFiProperties.BACKPRESSURE_SIZE));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -5597,93 +5566,4 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||||||
public DataValve getDataValve() {
|
public DataValve getDataValve() {
|
||||||
return dataValve;
|
return dataValve;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setDefaultFlowFileExpiration(final String defaultFlowFileExpiration) {
|
|
||||||
// use default if value not provided
|
|
||||||
if (StringUtils.isBlank(defaultFlowFileExpiration)) {
|
|
||||||
this.defaultFlowFileExpiration.set(DEFAULT_FLOWFILE_EXPIRATION);
|
|
||||||
} else {
|
|
||||||
// Validate entry: must include time unit label
|
|
||||||
Pattern pattern = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
|
|
||||||
String caseAdjustedExpiration = defaultFlowFileExpiration.toLowerCase();
|
|
||||||
if (pattern.matcher(caseAdjustedExpiration).matches()) {
|
|
||||||
this.defaultFlowFileExpiration.set(caseAdjustedExpiration);
|
|
||||||
} else {
|
|
||||||
throw new IllegalArgumentException("The Default FlowFile Expiration of the process group must contain a valid time unit.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getDefaultFlowFileExpiration() {
|
|
||||||
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, use the default.
|
|
||||||
if (defaultFlowFileExpiration.get() == null) {
|
|
||||||
if (isRootGroup()) {
|
|
||||||
return DEFAULT_FLOWFILE_EXPIRATION;
|
|
||||||
} else {
|
|
||||||
return parent.get().getDefaultFlowFileExpiration();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return defaultFlowFileExpiration.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setDefaultBackPressureObjectThreshold(final Long defaultBackPressureObjectThreshold) {
|
|
||||||
// use default if value not provided
|
|
||||||
if (defaultBackPressureObjectThreshold == null) {
|
|
||||||
this.defaultBackPressureObjectThreshold.set(Long.parseLong(niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_COUNT)));
|
|
||||||
} else {
|
|
||||||
// Validate field is numeric
|
|
||||||
Pattern pattern = Pattern.compile("(\\d+)");
|
|
||||||
if (pattern.matcher(String.valueOf(defaultBackPressureObjectThreshold)).matches()) {
|
|
||||||
this.defaultBackPressureObjectThreshold.set(defaultBackPressureObjectThreshold);
|
|
||||||
} else {
|
|
||||||
throw new IllegalArgumentException("The Default Back Pressure Object Threshold of the process group must be numeric.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Long getDefaultBackPressureObjectThreshold() {
|
|
||||||
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.
|
|
||||||
if (defaultBackPressureObjectThreshold.get() == null) {
|
|
||||||
if (isRootGroup()) {
|
|
||||||
return Long.parseLong(niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_COUNT));
|
|
||||||
} else {
|
|
||||||
return getParent().getDefaultBackPressureObjectThreshold();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return defaultBackPressureObjectThreshold.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setDefaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
|
|
||||||
// use default if value not provided
|
|
||||||
if (StringUtils.isBlank(defaultBackPressureDataSizeThreshold)) {
|
|
||||||
this.defaultBackPressureDataSizeThreshold.set(niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_SIZE));
|
|
||||||
} else {
|
|
||||||
// Validate entry: must include size unit label
|
|
||||||
Pattern pattern = Pattern.compile(DataUnit.DATA_SIZE_REGEX);
|
|
||||||
String caseAdjustedSizeThreshold = defaultBackPressureDataSizeThreshold.toUpperCase();
|
|
||||||
if (pattern.matcher(caseAdjustedSizeThreshold).matches()) {
|
|
||||||
this.defaultBackPressureDataSizeThreshold.set(caseAdjustedSizeThreshold);
|
|
||||||
} else {
|
|
||||||
throw new IllegalArgumentException("The Default Back Pressure Data Size Threshold of the process group must contain a valid data size unit.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getDefaultBackPressureDataSizeThreshold() {
|
|
||||||
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.
|
|
||||||
if (StringUtils.isEmpty(defaultBackPressureDataSizeThreshold.get())) {
|
|
||||||
if (isRootGroup()) {
|
|
||||||
return niFiPropertiesBackPressure.get(NiFiProperties.BACKPRESSURE_SIZE);
|
|
||||||
} else {
|
|
||||||
return parent.get().getDefaultBackPressureDataSizeThreshold();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return defaultBackPressureDataSizeThreshold.get();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1174,40 +1174,4 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
|
|||||||
* @return the DataValve associated with this Process Group
|
* @return the DataValve associated with this Process Group
|
||||||
*/
|
*/
|
||||||
DataValve getDataValve();
|
DataValve getDataValve();
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the default flowfile expiration of the ProcessGroup
|
|
||||||
*/
|
|
||||||
String getDefaultFlowFileExpiration();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Updates the default flowfile expiration of this ProcessGroup.
|
|
||||||
*
|
|
||||||
* @param defaultFlowFileExpiration new default flowfile expiration value (must include time unit label)
|
|
||||||
*/
|
|
||||||
void setDefaultFlowFileExpiration(String defaultFlowFileExpiration);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the default back pressure object threshold of this ProcessGroup
|
|
||||||
*/
|
|
||||||
Long getDefaultBackPressureObjectThreshold();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Updates the default back pressure object threshold of this ProcessGroup
|
|
||||||
*
|
|
||||||
* @param defaultBackPressureObjectThreshold new default back pressure object threshold value
|
|
||||||
*/
|
|
||||||
void setDefaultBackPressureObjectThreshold(Long defaultBackPressureObjectThreshold);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @returnthe default back pressure size threshold of this ProcessGroup
|
|
||||||
*/
|
|
||||||
String getDefaultBackPressureDataSizeThreshold();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Updates the default back pressure size threshold of this ProcessGroup
|
|
||||||
*
|
|
||||||
* @param defaultBackPressureDataSizeThreshold new default back pressure size threshold (must include size unit label)
|
|
||||||
*/
|
|
||||||
void setDefaultBackPressureDataSizeThreshold(String defaultBackPressureDataSizeThreshold);
|
|
||||||
}
|
}
|
||||||
|
@ -579,8 +579,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||||||
this.reloadComponent = new StandardReloadComponent(this);
|
this.reloadComponent = new StandardReloadComponent(this);
|
||||||
|
|
||||||
final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), controllerServiceProvider, processScheduler,
|
final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), controllerServiceProvider, processScheduler,
|
||||||
encryptor, extensionManager, stateManagerProvider, flowManager, flowRegistryClient, reloadComponent, new MutableVariableRegistry(this.variableRegistry), this,
|
encryptor, extensionManager, stateManagerProvider, flowManager, flowRegistryClient, reloadComponent, new MutableVariableRegistry(this.variableRegistry), this);
|
||||||
nifiProperties);
|
|
||||||
rootGroup.setName(FlowManager.DEFAULT_ROOT_GROUP_NAME);
|
rootGroup.setName(FlowManager.DEFAULT_ROOT_GROUP_NAME);
|
||||||
setRootGroup(rootGroup);
|
setRootGroup(rootGroup);
|
||||||
instanceId = ComponentIdGenerator.generateId().toString();
|
instanceId = ComponentIdGenerator.generateId().toString();
|
||||||
@ -1968,21 +1967,18 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||||||
|
|
||||||
final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() {
|
final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() {
|
||||||
@Override
|
@Override
|
||||||
public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener,
|
public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener) {
|
||||||
final ProcessGroup processGroup) {
|
|
||||||
final FlowFileQueue flowFileQueue;
|
final FlowFileQueue flowFileQueue;
|
||||||
|
|
||||||
if (clusterCoordinator == null) {
|
if (clusterCoordinator == null) {
|
||||||
flowFileQueue = new StandardFlowFileQueue(id, eventListener, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager,
|
flowFileQueue = new StandardFlowFileQueue(id, eventListener, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager,
|
||||||
eventReporter, nifiProperties.getQueueSwapThreshold(),
|
eventReporter, nifiProperties.getQueueSwapThreshold(), nifiProperties.getDefaultBackPressureObjectThreshold(), nifiProperties.getDefaultBackPressureDataSizeThreshold());
|
||||||
processGroup.getDefaultFlowFileExpiration(), processGroup.getDefaultBackPressureObjectThreshold(), processGroup.getDefaultBackPressureDataSizeThreshold());
|
|
||||||
} else {
|
} else {
|
||||||
flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, eventListener, processScheduler, flowFileRepository, provenanceRepository, contentRepository, resourceClaimManager,
|
flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, eventListener, processScheduler, flowFileRepository, provenanceRepository, contentRepository, resourceClaimManager,
|
||||||
clusterCoordinator, loadBalanceClientRegistry, swapManager, nifiProperties.getQueueSwapThreshold(), eventReporter);
|
clusterCoordinator, loadBalanceClientRegistry, swapManager, nifiProperties.getQueueSwapThreshold(), eventReporter);
|
||||||
|
|
||||||
flowFileQueue.setFlowFileExpiration(processGroup.getDefaultFlowFileExpiration());
|
flowFileQueue.setBackPressureObjectThreshold(nifiProperties.getDefaultBackPressureObjectThreshold());
|
||||||
flowFileQueue.setBackPressureObjectThreshold(processGroup.getDefaultBackPressureObjectThreshold());
|
flowFileQueue.setBackPressureDataSizeThreshold(nifiProperties.getDefaultBackPressureDataSizeThreshold());
|
||||||
flowFileQueue.setBackPressureDataSizeThreshold(processGroup.getDefaultBackPressureDataSizeThreshold());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return flowFileQueue;
|
return flowFileQueue;
|
||||||
@ -1991,7 +1987,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||||||
|
|
||||||
final Connection connection = builder.id(requireNonNull(id).intern())
|
final Connection connection = builder.id(requireNonNull(id).intern())
|
||||||
.name(name == null ? null : name.intern())
|
.name(name == null ? null : name.intern())
|
||||||
.processGroup(destination.getProcessGroup())
|
|
||||||
.relationships(relationships)
|
.relationships(relationships)
|
||||||
.source(requireNonNull(source))
|
.source(requireNonNull(source))
|
||||||
.destination(destination)
|
.destination(destination)
|
||||||
|
@ -487,21 +487,6 @@ public class StandardFlowSnippet implements FlowSnippet {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final String defaultFlowFileExpiration = groupDTO.getDefaultFlowFileExpiration();
|
|
||||||
if (defaultFlowFileExpiration != null) {
|
|
||||||
childGroup.setDefaultFlowFileExpiration(defaultFlowFileExpiration);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Long defaultBackPressureObjectThreshold = groupDTO.getDefaultBackPressureObjectThreshold();
|
|
||||||
if (defaultBackPressureObjectThreshold != null) {
|
|
||||||
childGroup.setDefaultBackPressureObjectThreshold(defaultBackPressureObjectThreshold);
|
|
||||||
}
|
|
||||||
|
|
||||||
final String defaultBackPressureDataSizeThreshold = groupDTO.getDefaultBackPressureDataSizeThreshold();
|
|
||||||
if (defaultBackPressureDataSizeThreshold != null) {
|
|
||||||
childGroup.setDefaultBackPressureDataSizeThreshold(defaultBackPressureDataSizeThreshold);
|
|
||||||
}
|
|
||||||
|
|
||||||
// If this Process Group is 'top level' then we do not set versioned component ID's.
|
// If this Process Group is 'top level' then we do not set versioned component ID's.
|
||||||
// We do this only if this component is the child of a Versioned Component.
|
// We do this only if this component is the child of a Versioned Component.
|
||||||
if (!topLevel) {
|
if (!topLevel) {
|
||||||
|
@ -1146,8 +1146,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||||||
/**
|
/**
|
||||||
* Updates the process group corresponding to the specified DTO. Any field
|
* Updates the process group corresponding to the specified DTO. Any field
|
||||||
* in DTO that is <code>null</code> (with the exception of the required ID)
|
* in DTO that is <code>null</code> (with the exception of the required ID)
|
||||||
* will be ignored, or in the case of back pressure settings, will obtain
|
* will be ignored.
|
||||||
* value from the parent of this process group
|
|
||||||
*
|
*
|
||||||
* @throws IllegalStateException if no process group can be found with the
|
* @throws IllegalStateException if no process group can be found with the
|
||||||
* ID of DTO or with the ID of the DTO's parentGroupId, if the template ID
|
* ID of DTO or with the ID of the DTO's parentGroupId, if the template ID
|
||||||
@ -1162,9 +1161,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||||||
final String comments = dto.getComments();
|
final String comments = dto.getComments();
|
||||||
final String flowfileConcurrencyName = dto.getFlowfileConcurrency();
|
final String flowfileConcurrencyName = dto.getFlowfileConcurrency();
|
||||||
final String flowfileOutboundPolicyName = dto.getFlowfileOutboundPolicy();
|
final String flowfileOutboundPolicyName = dto.getFlowfileOutboundPolicy();
|
||||||
final String defaultFlowFileExpiration = dto.getDefaultFlowFileExpiration();
|
|
||||||
final Long defaultBackPressureObjectThreshold = dto.getDefaultBackPressureObjectThreshold();
|
|
||||||
final String defaultBackPressureDataSizeThreshold = dto.getDefaultBackPressureDataSizeThreshold();
|
|
||||||
|
|
||||||
if (name != null) {
|
if (name != null) {
|
||||||
group.setName(name);
|
group.setName(name);
|
||||||
@ -1197,9 +1193,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
group.setDefaultFlowFileExpiration(defaultFlowFileExpiration);
|
|
||||||
group.setDefaultBackPressureObjectThreshold(defaultBackPressureObjectThreshold);
|
|
||||||
group.setDefaultBackPressureDataSizeThreshold(defaultBackPressureDataSizeThreshold);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T extends Connectable & Triggerable> ScheduledState getScheduledState(final T component, final FlowController flowController) {
|
private <T extends Connectable & Triggerable> ScheduledState getScheduledState(final T component, final FlowController flowController) {
|
||||||
@ -1312,9 +1305,6 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||||||
processGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(flowfileOutboundPolicyName));
|
processGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(flowfileOutboundPolicyName));
|
||||||
}
|
}
|
||||||
|
|
||||||
processGroup.setDefaultFlowFileExpiration(processGroupDTO.getDefaultFlowFileExpiration());
|
|
||||||
processGroup.setDefaultBackPressureObjectThreshold(processGroupDTO.getDefaultBackPressureObjectThreshold());
|
|
||||||
processGroup.setDefaultBackPressureDataSizeThreshold(processGroupDTO.getDefaultBackPressureDataSizeThreshold());
|
|
||||||
|
|
||||||
final String parameterContextId = getString(processGroupElement, "parameterContextId");
|
final String parameterContextId = getString(processGroupElement, "parameterContextId");
|
||||||
if (parameterContextId != null) {
|
if (parameterContextId != null) {
|
||||||
|
@ -259,7 +259,7 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana
|
|||||||
|
|
||||||
final ProcessGroup group = new StandardProcessGroup(requireNonNull(id), flowController.getControllerServiceProvider(), processScheduler, flowController.getEncryptor(),
|
final ProcessGroup group = new StandardProcessGroup(requireNonNull(id), flowController.getControllerServiceProvider(), processScheduler, flowController.getEncryptor(),
|
||||||
flowController.getExtensionManager(), flowController.getStateManagerProvider(), this, flowController.getFlowRegistryClient(),
|
flowController.getExtensionManager(), flowController.getStateManagerProvider(), this, flowController.getFlowRegistryClient(),
|
||||||
flowController.getReloadComponent(), mutableVariableRegistry, flowController, nifiProperties);
|
flowController.getReloadComponent(), mutableVariableRegistry, flowController);
|
||||||
onProcessGroupAdded(group);
|
onProcessGroupAdded(group);
|
||||||
|
|
||||||
return group;
|
return group;
|
||||||
|
@ -56,7 +56,7 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
|
|||||||
private final ResourceClaimManager resourceClaimManager;
|
private final ResourceClaimManager resourceClaimManager;
|
||||||
private final ProcessScheduler scheduler;
|
private final ProcessScheduler scheduler;
|
||||||
|
|
||||||
private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 sec", 0L));
|
private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L));
|
||||||
private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("1 GB", 1024 * 1024 * 1024, 10000));
|
private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("1 GB", 1024 * 1024 * 1024, 10000));
|
||||||
|
|
||||||
private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>();
|
||||||
|
@ -53,10 +53,9 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow
|
|||||||
|
|
||||||
public StandardFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
|
public StandardFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
|
||||||
final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
|
final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
|
||||||
final int swapThreshold, final String expirationPeriod, final long defaultBackPressureObjectThreshold, final String defaultBackPressureDataSizeThreshold) {
|
final int swapThreshold, final long defaultBackPressureObjectThreshold, final String defaultBackPressureDataSizeThreshold) {
|
||||||
|
|
||||||
super(identifier, scheduler, flowFileRepo, provRepo, resourceClaimManager);
|
super(identifier, scheduler, flowFileRepo, provRepo, resourceClaimManager);
|
||||||
super.setFlowFileExpiration(expirationPeriod);
|
|
||||||
this.swapManager = swapManager;
|
this.swapManager = swapManager;
|
||||||
this.queue = new SwappablePriorityQueue(swapManager, swapThreshold, eventReporter, this, this::drop, null);
|
this.queue = new SwappablePriorityQueue(swapManager, swapThreshold, eventReporter, this, this::drop, null);
|
||||||
this.eventListener = eventListener;
|
this.eventListener = eventListener;
|
||||||
|
@ -185,9 +185,6 @@ public class FlowFromDOMFactory {
|
|||||||
dto.setComments(getString(element, "comment"));
|
dto.setComments(getString(element, "comment"));
|
||||||
dto.setFlowfileConcurrency(getString(element, "flowfileConcurrency"));
|
dto.setFlowfileConcurrency(getString(element, "flowfileConcurrency"));
|
||||||
dto.setFlowfileOutboundPolicy(getString(element, "flowfileOutboundPolicy"));
|
dto.setFlowfileOutboundPolicy(getString(element, "flowfileOutboundPolicy"));
|
||||||
dto.setDefaultFlowFileExpiration(getString(element, "defaultFlowFileExpiration"));
|
|
||||||
dto.setDefaultBackPressureObjectThreshold(getLong(element, "defaultBackPressureObjectThreshold"));
|
|
||||||
dto.setDefaultBackPressureDataSizeThreshold(getString(element, "defaultBackPressureDataSizeThreshold"));
|
|
||||||
|
|
||||||
final Map<String, String> variables = new HashMap<>();
|
final Map<String, String> variables = new HashMap<>();
|
||||||
final NodeList variableList = DomUtils.getChildNodesByTagName(element, "variable");
|
final NodeList variableList = DomUtils.getChildNodesByTagName(element, "variable");
|
||||||
@ -586,10 +583,8 @@ public class FlowFromDOMFactory {
|
|||||||
return Integer.parseInt(getString(element, childElementName));
|
return Integer.parseInt(getString(element, childElementName));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Long getLong(final Element element, final String childElementName) {
|
private static long getLong(final Element element, final String childElementName) {
|
||||||
// missing element must be handled gracefully, e.g. flow definition from a previous version without this element
|
return Long.parseLong(getString(element, childElementName));
|
||||||
String longString = getString(element, childElementName);
|
|
||||||
return longString == null ? null : Long.parseLong(longString);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean getBoolean(final Element element, final String childElementName) {
|
private static boolean getBoolean(final Element element, final String childElementName) {
|
||||||
|
@ -235,9 +235,6 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
|
|||||||
addTextElement(element, "comment", group.getComments());
|
addTextElement(element, "comment", group.getComments());
|
||||||
addTextElement(element, "flowfileConcurrency", group.getFlowFileConcurrency().name());
|
addTextElement(element, "flowfileConcurrency", group.getFlowFileConcurrency().name());
|
||||||
addTextElement(element, "flowfileOutboundPolicy", group.getFlowFileOutboundPolicy().name());
|
addTextElement(element, "flowfileOutboundPolicy", group.getFlowFileOutboundPolicy().name());
|
||||||
addTextElement(element, "defaultFlowFileExpiration", group.getDefaultFlowFileExpiration());
|
|
||||||
addTextElement(element, "defaultBackPressureObjectThreshold", group.getDefaultBackPressureObjectThreshold());
|
|
||||||
addTextElement(element, "defaultBackPressureDataSizeThreshold", group.getDefaultBackPressureDataSizeThreshold());
|
|
||||||
|
|
||||||
final VersionControlInformation versionControlInfo = group.getVersionControlInformation();
|
final VersionControlInformation versionControlInfo = group.getVersionControlInformation();
|
||||||
if (versionControlInfo != null) {
|
if (versionControlInfo != null) {
|
||||||
|
@ -346,10 +346,6 @@ public class FingerprintFactory {
|
|||||||
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "parameterContextId"));
|
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "parameterContextId"));
|
||||||
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "flowfileConcurrency"));
|
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "flowfileConcurrency"));
|
||||||
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "flowfileOutboundPolicy"));
|
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "flowfileOutboundPolicy"));
|
||||||
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "defaultFlowFileExpiration"));
|
|
||||||
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "defaultBackPressureObjectThreshold"));
|
|
||||||
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "defaultBackPressureDataSizeThreshold"));
|
|
||||||
|
|
||||||
|
|
||||||
final Element versionControlInfo = DomUtils.getChild(processGroupElem, "versionControlInformation");
|
final Element versionControlInfo = DomUtils.getChild(processGroupElem, "versionControlInformation");
|
||||||
if (versionControlInfo == null) {
|
if (versionControlInfo == null) {
|
||||||
|
@ -103,7 +103,7 @@ public class TestStandardFlowFileQueue {
|
|||||||
}
|
}
|
||||||
}).when(provRepo).registerEvents(Mockito.any(Iterable.class));
|
}).when(provRepo).registerEvents(Mockito.any(Iterable.class));
|
||||||
|
|
||||||
queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, "0 sec", 0L, "0 B");
|
queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, 0L, "0 B");
|
||||||
MockFlowFileRecord.resetIdGenerator();
|
MockFlowFileRecord.resetIdGenerator();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -357,7 +357,7 @@ public class TestStandardFlowFileQueue {
|
|||||||
@Test
|
@Test
|
||||||
public void testSwapInWhenThresholdIsLessThanSwapSize() {
|
public void testSwapInWhenThresholdIsLessThanSwapSize() {
|
||||||
// create a queue where the swap threshold is less than 10k
|
// create a queue where the swap threshold is less than 10k
|
||||||
queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000, "0 sec", 0L, "0 B");
|
queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000, 0L, "0 B");
|
||||||
|
|
||||||
for (int i = 1; i <= 20000; i++) {
|
for (int i = 1; i <= 20000; i++) {
|
||||||
queue.put(new MockFlowFileRecord());
|
queue.put(new MockFlowFileRecord());
|
||||||
|
@ -224,7 +224,7 @@ public class StandardProcessSessionIT {
|
|||||||
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
|
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
|
||||||
|
|
||||||
final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", new NopConnectionEventListener(), flowFileRepo, provenanceRepo, null,
|
final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", new NopConnectionEventListener(), flowFileRepo, provenanceRepo, null,
|
||||||
processScheduler, swapManager, null, 10000, "0 sec", 0L, "0 B");
|
processScheduler, swapManager, null, 10000, 0L, "0 B");
|
||||||
return Mockito.spy(actualQueue);
|
return Mockito.spy(actualQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -206,7 +206,7 @@ public class TestRocksDBFlowFileRepository {
|
|||||||
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
|
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
|
||||||
|
|
||||||
final FlowFileSwapManager swapMgr = new TestRocksDBFlowFileRepository.MockFlowFileSwapManager();
|
final FlowFileSwapManager swapMgr = new TestRocksDBFlowFileRepository.MockFlowFileSwapManager();
|
||||||
final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
|
final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, 0L, "0 B");
|
||||||
|
|
||||||
when(connection.getFlowFileQueue()).thenReturn(queue);
|
when(connection.getFlowFileQueue()).thenReturn(queue);
|
||||||
queueProvider.addConnection(connection);
|
queueProvider.addConnection(connection);
|
||||||
@ -651,7 +651,7 @@ public class TestRocksDBFlowFileRepository {
|
|||||||
provider = new TestQueueProvider();
|
provider = new TestQueueProvider();
|
||||||
queuedFlowFiles = new ConcurrentSkipListSet<>(); // potentially accessed from multiple threads
|
queuedFlowFiles = new ConcurrentSkipListSet<>(); // potentially accessed from multiple threads
|
||||||
|
|
||||||
final FlowFileQueue queue = new StandardFlowFileQueue("1234", null, null, null, null, null, null, null, 0, "0 sec",0, "0 B") {
|
final FlowFileQueue queue = new StandardFlowFileQueue("1234", null, null, null, null, null, null, null, 0, 0, "0 B") {
|
||||||
@Override
|
@Override
|
||||||
public void put(final FlowFileRecord file) {
|
public void put(final FlowFileRecord file) {
|
||||||
queuedFlowFiles.add(file);
|
queuedFlowFiles.add(file);
|
||||||
|
@ -520,7 +520,7 @@ public class TestWriteAheadFlowFileRepository {
|
|||||||
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
|
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
|
||||||
|
|
||||||
final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
|
final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
|
||||||
final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
|
final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, 0L, "0 B");
|
||||||
|
|
||||||
when(connection.getFlowFileQueue()).thenReturn(queue);
|
when(connection.getFlowFileQueue()).thenReturn(queue);
|
||||||
queueProvider.addConnection(connection);
|
queueProvider.addConnection(connection);
|
||||||
|
@ -71,9 +71,6 @@ public class MockProcessGroup implements ProcessGroup {
|
|||||||
private final MutableVariableRegistry variableRegistry = new MutableVariableRegistry(VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY);
|
private final MutableVariableRegistry variableRegistry = new MutableVariableRegistry(VariableRegistry.ENVIRONMENT_SYSTEM_REGISTRY);
|
||||||
private VersionControlInformation versionControlInfo;
|
private VersionControlInformation versionControlInfo;
|
||||||
private ParameterContext parameterContext;
|
private ParameterContext parameterContext;
|
||||||
private String defaultFlowfileExpiration;
|
|
||||||
private long defaultBackPressureObjectThreshold;
|
|
||||||
private String defaultBackPressureDataSizeThreshold;
|
|
||||||
|
|
||||||
public MockProcessGroup(final FlowManager flowManager) {
|
public MockProcessGroup(final FlowManager flowManager) {
|
||||||
this.flowManager = flowManager;
|
this.flowManager = flowManager;
|
||||||
@ -812,36 +809,6 @@ public class MockProcessGroup implements ProcessGroup {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getDefaultFlowFileExpiration() {
|
|
||||||
return defaultFlowfileExpiration;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setDefaultFlowFileExpiration(String defaultFlowFileExpiration) {
|
|
||||||
this.defaultFlowfileExpiration = defaultFlowFileExpiration;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Long getDefaultBackPressureObjectThreshold() {
|
|
||||||
return defaultBackPressureObjectThreshold;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setDefaultBackPressureObjectThreshold(Long defaultBackPressureObjectThreshold) {
|
|
||||||
this.defaultBackPressureObjectThreshold = defaultBackPressureObjectThreshold;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getDefaultBackPressureDataSizeThreshold() {
|
|
||||||
return defaultBackPressureDataSizeThreshold;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setDefaultBackPressureDataSizeThreshold(String defaultBackPressureDataSizeThreshold) {
|
|
||||||
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void terminateProcessor(ProcessorNode processor) {
|
public void terminateProcessor(ProcessorNode processor) {
|
||||||
}
|
}
|
||||||
|
@ -377,11 +377,10 @@ public class FrameworkIntegrationTest {
|
|||||||
FileUtils.deleteFile(dir, true);
|
FileUtils.deleteFile(dir, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected FlowFileQueue createFlowFileQueue(final String uuid, final ProcessGroup processGroup) {
|
protected FlowFileQueue createFlowFileQueue(final String uuid) {
|
||||||
final RepositoryContext repoContext = getRepositoryContext();
|
final RepositoryContext repoContext = getRepositoryContext();
|
||||||
return new StandardFlowFileQueue(uuid, ConnectionEventListener.NOP_EVENT_LISTENER, repoContext.getFlowFileRepository(), repoContext.getProvenanceRepository(),
|
return new StandardFlowFileQueue(uuid, ConnectionEventListener.NOP_EVENT_LISTENER, repoContext.getFlowFileRepository(), repoContext.getProvenanceRepository(),
|
||||||
resourceClaimManager, processScheduler, flowFileSwapManager, flowController.createEventReporter(), 20000,
|
resourceClaimManager, processScheduler, flowFileSwapManager, flowController.createEventReporter(), 20000, 10000L, "1 GB");
|
||||||
processGroup.getDefaultFlowFileExpiration(), processGroup.getDefaultBackPressureObjectThreshold(), processGroup.getDefaultBackPressureDataSizeThreshold());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final ProcessorNode createProcessorNode(final Class<? extends Processor> processorType) {
|
protected final ProcessorNode createProcessorNode(final Class<? extends Processor> processorType) {
|
||||||
@ -476,14 +475,13 @@ public class FrameworkIntegrationTest {
|
|||||||
protected final Connection connect(ProcessGroup processGroup, final ProcessorNode source, final ProcessorNode destination, final Collection<Relationship> relationships) {
|
protected final Connection connect(ProcessGroup processGroup, final ProcessorNode source, final ProcessorNode destination, final Collection<Relationship> relationships) {
|
||||||
final String id = UUID.randomUUID().toString();
|
final String id = UUID.randomUUID().toString();
|
||||||
final Connection connection = new StandardConnection.Builder(processScheduler)
|
final Connection connection = new StandardConnection.Builder(processScheduler)
|
||||||
.source(source)
|
.source(source)
|
||||||
.destination(destination)
|
.destination(destination)
|
||||||
.processGroup(processGroup)
|
.relationships(relationships)
|
||||||
.relationships(relationships)
|
.id(id)
|
||||||
.id(id)
|
.clustered(false)
|
||||||
.clustered(false)
|
.flowFileQueueFactory((loadBalanceStrategy, partitioningAttribute, eventListener) -> createFlowFileQueue(id))
|
||||||
.flowFileQueueFactory((loadBalanceStrategy, partitioningAttribute, eventListener, processGroup1) -> createFlowFileQueue(id, processGroup))
|
.build();
|
||||||
.build();
|
|
||||||
|
|
||||||
source.addConnection(connection);
|
source.addConnection(connection);
|
||||||
destination.addConnection(connection);
|
destination.addConnection(connection);
|
||||||
|
@ -74,7 +74,7 @@ public class FlowFileRepositoryLifecycleIT extends FrameworkIntegrationTest {
|
|||||||
|
|
||||||
shutdown();
|
shutdown();
|
||||||
|
|
||||||
final FlowFileQueue restoredQueue = createFlowFileQueue(queue.getIdentifier(), getRootGroup());
|
final FlowFileQueue restoredQueue = createFlowFileQueue(queue.getIdentifier());
|
||||||
initialize();
|
initialize();
|
||||||
getFlowController().initializeFlow(() -> Collections.singleton(restoredQueue));
|
getFlowController().initializeFlow(() -> Collections.singleton(restoredQueue));
|
||||||
|
|
||||||
|
@ -31,7 +31,6 @@ import org.apache.nifi.processor.ProcessContext;
|
|||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -44,40 +43,6 @@ import static org.junit.Assert.assertEquals;
|
|||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class StandardProcessGroupIT extends FrameworkIntegrationTest {
|
public class StandardProcessGroupIT extends FrameworkIntegrationTest {
|
||||||
@Test
|
|
||||||
public void testProcessGroupDefaults() {
|
|
||||||
// Connect two processors with default settings of the root process group
|
|
||||||
ProcessorNode sourceProcessor = createGenerateProcessor(1);
|
|
||||||
ProcessorNode destinationProcessor = createProcessorNode((context, session) -> {});
|
|
||||||
Connection connection = connect(sourceProcessor, destinationProcessor, Collections.singleton(REL_SUCCESS));
|
|
||||||
|
|
||||||
|
|
||||||
// Verify all defaults are in place on the process group and the connection
|
|
||||||
assertEquals("0 sec", getRootGroup().getDefaultFlowFileExpiration());
|
|
||||||
assertEquals(NiFiProperties.DEFAULT_BACKPRESSURE_COUNT, (long) getRootGroup().getDefaultBackPressureObjectThreshold());
|
|
||||||
assertEquals(NiFiProperties.DEFAULT_BACKPRESSURE_SIZE, getRootGroup().getDefaultBackPressureDataSizeThreshold());
|
|
||||||
assertEquals("0 sec", connection.getFlowFileQueue().getFlowFileExpiration());
|
|
||||||
assertEquals(NiFiProperties.DEFAULT_BACKPRESSURE_COUNT, (long) connection.getFlowFileQueue().getBackPressureObjectThreshold());
|
|
||||||
assertEquals(NiFiProperties.DEFAULT_BACKPRESSURE_SIZE, connection.getFlowFileQueue().getBackPressureDataSizeThreshold());
|
|
||||||
|
|
||||||
// Update default settings of the process group, and create a new connection
|
|
||||||
getRootGroup().setDefaultFlowFileExpiration("99 min");
|
|
||||||
getRootGroup().setDefaultBackPressureObjectThreshold(99L);
|
|
||||||
getRootGroup().setDefaultBackPressureDataSizeThreshold("99 MB");
|
|
||||||
|
|
||||||
ProcessorNode sourceProcessor1 = createGenerateProcessor(1);
|
|
||||||
ProcessorNode destinationProcessor1 = createProcessorNode((context, session) -> {});
|
|
||||||
Connection connection1 = connect(sourceProcessor1, destinationProcessor1, Collections.singleton(REL_SUCCESS));
|
|
||||||
|
|
||||||
// Verify updated settings are in place on the process group and the connection
|
|
||||||
assertEquals("99 min", getRootGroup().getDefaultFlowFileExpiration());
|
|
||||||
assertEquals(99, (long) getRootGroup().getDefaultBackPressureObjectThreshold());
|
|
||||||
assertEquals("99 MB", getRootGroup().getDefaultBackPressureDataSizeThreshold());
|
|
||||||
assertEquals("99 min", connection1.getFlowFileQueue().getFlowFileExpiration());
|
|
||||||
assertEquals(99, (long) connection1.getFlowFileQueue().getBackPressureObjectThreshold());
|
|
||||||
assertEquals("99 MB", connection1.getFlowFileQueue().getBackPressureDataSizeThreshold());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDropAllFlowFilesFromOneConnection() throws Exception {
|
public void testDropAllFlowFilesFromOneConnection() throws Exception {
|
||||||
ProcessorNode sourceProcessGroup = createGenerateProcessor(1);
|
ProcessorNode sourceProcessGroup = createGenerateProcessor(1);
|
||||||
|
@ -2512,9 +2512,6 @@ public final class DtoFactory {
|
|||||||
dto.setVersionControlInformation(createVersionControlInformationDto(group));
|
dto.setVersionControlInformation(createVersionControlInformationDto(group));
|
||||||
dto.setFlowfileConcurrency(group.getFlowFileConcurrency().name());
|
dto.setFlowfileConcurrency(group.getFlowFileConcurrency().name());
|
||||||
dto.setFlowfileOutboundPolicy(group.getFlowFileOutboundPolicy().name());
|
dto.setFlowfileOutboundPolicy(group.getFlowFileOutboundPolicy().name());
|
||||||
dto.setDefaultFlowFileExpiration(group.getDefaultFlowFileExpiration());
|
|
||||||
dto.setDefaultBackPressureObjectThreshold(group.getDefaultBackPressureObjectThreshold());
|
|
||||||
dto.setDefaultBackPressureDataSizeThreshold(group.getDefaultBackPressureDataSizeThreshold());
|
|
||||||
|
|
||||||
final ParameterContext parameterContext = group.getParameterContext();
|
final ParameterContext parameterContext = group.getParameterContext();
|
||||||
if (parameterContext != null) {
|
if (parameterContext != null) {
|
||||||
@ -4345,9 +4342,6 @@ public final class DtoFactory {
|
|||||||
copy.setVersionedComponentId(original.getVersionedComponentId());
|
copy.setVersionedComponentId(original.getVersionedComponentId());
|
||||||
copy.setFlowfileConcurrency(original.getFlowfileConcurrency());
|
copy.setFlowfileConcurrency(original.getFlowfileConcurrency());
|
||||||
copy.setFlowfileOutboundPolicy(original.getFlowfileOutboundPolicy());
|
copy.setFlowfileOutboundPolicy(original.getFlowfileOutboundPolicy());
|
||||||
copy.setDefaultFlowFileExpiration(original.getDefaultFlowFileExpiration());
|
|
||||||
copy.setDefaultBackPressureObjectThreshold(original.getDefaultBackPressureObjectThreshold());
|
|
||||||
copy.setDefaultBackPressureDataSizeThreshold(original.getDefaultBackPressureDataSizeThreshold());
|
|
||||||
|
|
||||||
copy.setRunningCount(original.getRunningCount());
|
copy.setRunningCount(original.getRunningCount());
|
||||||
copy.setStoppedCount(original.getStoppedCount());
|
copy.setStoppedCount(original.getStoppedCount());
|
||||||
|
@ -373,11 +373,6 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
|
|||||||
if (flowFileOutboundPolicy != null) {
|
if (flowFileOutboundPolicy != null) {
|
||||||
group.setFlowFileOutboundPolicy(flowFileOutboundPolicy);
|
group.setFlowFileOutboundPolicy(flowFileOutboundPolicy);
|
||||||
}
|
}
|
||||||
|
|
||||||
group.setDefaultFlowFileExpiration(processGroupDTO.getDefaultFlowFileExpiration());
|
|
||||||
group.setDefaultBackPressureObjectThreshold(processGroupDTO.getDefaultBackPressureObjectThreshold());
|
|
||||||
group.setDefaultBackPressureDataSizeThreshold(processGroupDTO.getDefaultBackPressureDataSizeThreshold());
|
|
||||||
|
|
||||||
group.onComponentModified();
|
group.onComponentModified();
|
||||||
return group;
|
return group;
|
||||||
}
|
}
|
||||||
|
@ -72,34 +72,6 @@
|
|||||||
<span id="read-only-process-group-outbound-policy" class="unset"></span>
|
<span id="read-only-process-group-outbound-policy" class="unset"></span>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
<div class="setting">
|
|
||||||
<div class="setting-name">Default FlowFile Expiration</div>
|
|
||||||
<div class="editable setting-field">
|
|
||||||
<input type="text" id="process-group-default-flowfile-expiration" class="setting-input"/>
|
|
||||||
</div>
|
|
||||||
<div class="read-only setting-field">
|
|
||||||
<span id="read-only-process-group-default-flowfile-expiration" class="unset"></span>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
<div class="setting">
|
|
||||||
<div class="setting-name">Default Back Pressure Object Threshold</div>
|
|
||||||
<div class="editable setting-field">
|
|
||||||
<input type="text" id="process-group-default-back-pressure-object-threshold" class="setting-input"/>
|
|
||||||
</div>
|
|
||||||
<div class="read-only setting-field">
|
|
||||||
<span id="read-only-process-group-default-back-pressure-object-threshold" class="unset"></span>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
<div class="setting">
|
|
||||||
<div class="setting-name">Default Back Pressure Data Size Threshold</div>
|
|
||||||
<div class="editable setting-field">
|
|
||||||
<input type="text" id="process-group-default-back-pressure-data-size-threshold" class="setting-input"/>
|
|
||||||
</div>
|
|
||||||
<div class="read-only setting-field">
|
|
||||||
<span id="read-only-process-group-default-back-pressure-data-size-threshold" class="unset"></span>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
|
|
||||||
<div class="editable settings-buttons">
|
<div class="editable settings-buttons">
|
||||||
<div id="process-group-configuration-save" class="button">Apply</div>
|
<div id="process-group-configuration-save" class="button">Apply</div>
|
||||||
<div class="clear"></div>
|
<div class="clear"></div>
|
||||||
|
@ -93,18 +93,6 @@
|
|||||||
width: 328px;
|
width: 328px;
|
||||||
}
|
}
|
||||||
|
|
||||||
#process-group-default-flowfile-expiration {
|
|
||||||
width: 328px;
|
|
||||||
}
|
|
||||||
|
|
||||||
#process-group-default-back-pressure-object-threshold {
|
|
||||||
width: 328px;
|
|
||||||
}
|
|
||||||
|
|
||||||
#process-group-default-back-pressure-data-size-threshold {
|
|
||||||
width: 328px;
|
|
||||||
}
|
|
||||||
|
|
||||||
#process-group-comments {
|
#process-group-comments {
|
||||||
height: 100px;
|
height: 100px;
|
||||||
}
|
}
|
||||||
@ -119,4 +107,4 @@
|
|||||||
|
|
||||||
#upload-process-group-link {
|
#upload-process-group-link {
|
||||||
float: right;
|
float: right;
|
||||||
}
|
}
|
||||||
|
@ -45,12 +45,6 @@
|
|||||||
var canvas;
|
var canvas;
|
||||||
var origin;
|
var origin;
|
||||||
|
|
||||||
var config = {
|
|
||||||
urls: {
|
|
||||||
api: '../nifi-api',
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Determines if we want to allow adding connections in the current state:
|
* Determines if we want to allow adding connections in the current state:
|
||||||
*
|
*
|
||||||
@ -218,19 +212,7 @@
|
|||||||
|
|
||||||
// create the connection
|
// create the connection
|
||||||
var destinationData = destination.datum();
|
var destinationData = destination.datum();
|
||||||
|
nfConnectionConfiguration.createConnection(connectorData.sourceId, destinationData.id);
|
||||||
$.ajax({
|
|
||||||
type: 'GET',
|
|
||||||
url: config.urls.api + '/process-groups/' + encodeURIComponent(destinationData.component.parentGroupId),
|
|
||||||
dataType: 'json'
|
|
||||||
}).done(function (response) {
|
|
||||||
var defaultSettings = {
|
|
||||||
flowfileExpiration: response.component.defaultFlowFileExpiration,
|
|
||||||
objectThreshold: response.component.defaultBackPressureObjectThreshold,
|
|
||||||
dataSizeThreshold: response.component.defaultBackPressureDataSizeThreshold,
|
|
||||||
};
|
|
||||||
nfConnectionConfiguration.createConnection(connectorData.sourceId, destinationData.id, defaultSettings);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
@ -304,4 +286,4 @@
|
|||||||
.on('mouseout.connectable', null);
|
.on('mouseout.connectable', null);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}));
|
}));
|
@ -1267,10 +1267,13 @@
|
|||||||
* @param nfBirdseyeRef The nfBirdseye module.
|
* @param nfBirdseyeRef The nfBirdseye module.
|
||||||
* @param nfGraphRef The nfGraph module.
|
* @param nfGraphRef The nfGraph module.
|
||||||
*/
|
*/
|
||||||
init: function (nfBirdseyeRef, nfGraphRef) {
|
init: function (nfBirdseyeRef, nfGraphRef, defaultBackPressureObjectThresholdRef, defaultBackPressureDataSizeThresholdRef) {
|
||||||
nfBirdseye = nfBirdseyeRef;
|
nfBirdseye = nfBirdseyeRef;
|
||||||
nfGraph = nfGraphRef;
|
nfGraph = nfGraphRef;
|
||||||
|
|
||||||
|
defaultBackPressureObjectThreshold = defaultBackPressureObjectThresholdRef;
|
||||||
|
defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThresholdRef;
|
||||||
|
|
||||||
// initially hide the relationship names container
|
// initially hide the relationship names container
|
||||||
$('#relationship-names-container').hide();
|
$('#relationship-names-container').hide();
|
||||||
|
|
||||||
@ -1380,7 +1383,7 @@
|
|||||||
* @argument {string} sourceId The source id
|
* @argument {string} sourceId The source id
|
||||||
* @argument {string} destinationId The destination id
|
* @argument {string} destinationId The destination id
|
||||||
*/
|
*/
|
||||||
createConnection: function (sourceId, destinationId, defaultSettings) {
|
createConnection: function (sourceId, destinationId) {
|
||||||
// select the source and destination
|
// select the source and destination
|
||||||
var source = d3.select('#id-' + sourceId);
|
var source = d3.select('#id-' + sourceId);
|
||||||
var destination = d3.select('#id-' + destinationId);
|
var destination = d3.select('#id-' + destinationId);
|
||||||
@ -1398,9 +1401,9 @@
|
|||||||
// initialize the connection dialog
|
// initialize the connection dialog
|
||||||
$.when(initializeSourceNewConnectionDialog(source), initializeDestinationNewConnectionDialog(destination)).done(function () {
|
$.when(initializeSourceNewConnectionDialog(source), initializeDestinationNewConnectionDialog(destination)).done(function () {
|
||||||
// set the default values
|
// set the default values
|
||||||
$('#flow-file-expiration').val(defaultSettings.flowfileExpiration);
|
$('#flow-file-expiration').val('0 sec');
|
||||||
$('#back-pressure-object-threshold').val(defaultSettings.objectThreshold);
|
$('#back-pressure-object-threshold').val(defaultBackPressureObjectThreshold);
|
||||||
$('#back-pressure-data-size-threshold').val(defaultSettings.dataSizeThreshold);
|
$('#back-pressure-data-size-threshold').val(defaultBackPressureDataSizeThreshold);
|
||||||
|
|
||||||
// select the first tab
|
// select the first tab
|
||||||
$('#connection-configuration-tabs').find('li:first').click();
|
$('#connection-configuration-tabs').find('li:first').click();
|
||||||
@ -1613,4 +1616,4 @@
|
|||||||
};
|
};
|
||||||
|
|
||||||
return nfConnectionConfiguration;
|
return nfConnectionConfiguration;
|
||||||
}));
|
}));
|
@ -61,7 +61,6 @@
|
|||||||
|
|
||||||
var nfControllerServices;
|
var nfControllerServices;
|
||||||
var nfParameterContexts;
|
var nfParameterContexts;
|
||||||
var nfBackpressureDefaults;
|
|
||||||
|
|
||||||
var config = {
|
var config = {
|
||||||
urls: {
|
urls: {
|
||||||
@ -108,10 +107,7 @@
|
|||||||
'id': $('#process-group-parameter-context-combo').combo('getSelectedOption').value
|
'id': $('#process-group-parameter-context-combo').combo('getSelectedOption').value
|
||||||
},
|
},
|
||||||
'flowfileConcurrency': $('#process-group-flowfile-concurrency-combo').combo('getSelectedOption').value,
|
'flowfileConcurrency': $('#process-group-flowfile-concurrency-combo').combo('getSelectedOption').value,
|
||||||
'flowfileOutboundPolicy': $('#process-group-outbound-policy-combo').combo('getSelectedOption').value,
|
'flowfileOutboundPolicy': $('#process-group-outbound-policy-combo').combo('getSelectedOption').value
|
||||||
'defaultFlowFileExpiration': $('#process-group-default-flowfile-expiration').val(),
|
|
||||||
'defaultBackPressureObjectThreshold': $('#process-group-default-back-pressure-object-threshold').val(),
|
|
||||||
'defaultBackPressureDataSizeThreshold': $('#process-group-default-back-pressure-data-size-threshold').val()
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -171,9 +167,6 @@
|
|||||||
var setUnauthorizedText = function () {
|
var setUnauthorizedText = function () {
|
||||||
$('#read-only-process-group-name').text('Unauthorized');
|
$('#read-only-process-group-name').text('Unauthorized');
|
||||||
$('#read-only-process-group-comments').text('Unauthorized');
|
$('#read-only-process-group-comments').text('Unauthorized');
|
||||||
$('#read-only-process-group-default-flowfile-expiration').text('Unauthorized');
|
|
||||||
$('#read-only-process-group-default-back-pressure-object-threshold').text('Unauthorized');
|
|
||||||
$('#read-only-process-group-default-back-pressure-data-size-threshold').text('Unauthorized');
|
|
||||||
};
|
};
|
||||||
|
|
||||||
var setEditable = function (editable) {
|
var setEditable = function (editable) {
|
||||||
@ -262,10 +255,6 @@
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
$('#process-group-default-flowfile-expiration').removeClass('unset').val(processGroup.defaultFlowFileExpiration);
|
|
||||||
$('#process-group-default-back-pressure-object-threshold').removeClass('unset').val(processGroup.defaultBackPressureObjectThreshold);
|
|
||||||
$('#process-group-default-back-pressure-data-size-threshold').removeClass('unset').val(processGroup.defaultBackPressureDataSizeThreshold);
|
|
||||||
|
|
||||||
|
|
||||||
// populate the header
|
// populate the header
|
||||||
$('#process-group-configuration-header-text').text(processGroup.name + ' Configuration');
|
$('#process-group-configuration-header-text').text(processGroup.name + ' Configuration');
|
||||||
@ -301,15 +290,6 @@
|
|||||||
|
|
||||||
// populate the header
|
// populate the header
|
||||||
$('#process-group-configuration-header-text').text(processGroup.name + ' Configuration');
|
$('#process-group-configuration-header-text').text(processGroup.name + ' Configuration');
|
||||||
|
|
||||||
// backpressure settings
|
|
||||||
$('#process-group-default-flowfile-expiration').text(processGroup.defaultFlowFileExpiration);
|
|
||||||
$('#process-group-default-back-pressure-object-threshold').text(processGroup.defaultBackPressureObjectThreshold);
|
|
||||||
$('#process-group-default-back-pressure-data-size-threshold').text(processGroup.defaultBackPressureDataSizeThreshold);
|
|
||||||
|
|
||||||
$('#read-only-process-group-default-flowfile-expiration').text(processGroup.defaultFlowFileExpiration);
|
|
||||||
$('#read-only-process-group-default-back-pressure-object-threshold').text(processGroup.defaultBackPressureObjectThreshold);
|
|
||||||
$('#read-only-process-group-default-back-pressure-data-size-threshold').text(processGroup.defaultBackPressureDataSizeThreshold);
|
|
||||||
} else {
|
} else {
|
||||||
setUnauthorizedText();
|
setUnauthorizedText();
|
||||||
}
|
}
|
||||||
@ -539,9 +519,6 @@
|
|||||||
$('#process-group-id').text('');
|
$('#process-group-id').text('');
|
||||||
$('#process-group-name').val('');
|
$('#process-group-name').val('');
|
||||||
$('#process-group-comments').val('');
|
$('#process-group-comments').val('');
|
||||||
$('#process-group-default-flowfile-expiration').val('');
|
|
||||||
$('#process-group-default-back-pressure-object-threshold').val('');
|
|
||||||
$('#process-group-default-back-pressure-data-size-threshold').val('');
|
|
||||||
|
|
||||||
// reset the header
|
// reset the header
|
||||||
$('#process-group-configuration-header-text').text('Process Group Configuration');
|
$('#process-group-configuration-header-text').text('Process Group Configuration');
|
||||||
|
@ -230,8 +230,7 @@ public class StatelessFlowManager extends AbstractFlowManager implements FlowMan
|
|||||||
|
|
||||||
final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() {
|
final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() {
|
||||||
@Override
|
@Override
|
||||||
public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener,
|
public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener) {
|
||||||
final ProcessGroup processGroup) {
|
|
||||||
return new StatelessFlowFileQueue(id);
|
return new StatelessFlowFileQueue(id);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user