mirror of https://github.com/apache/nifi.git
NIFI-3599 Allow back pressure object count and data size to be configurable in nifi.properties. This closes #2497
This commit is contained in:
parent
7abb02fff0
commit
dc9b4cb516
|
@ -79,6 +79,8 @@ public abstract class NiFiProperties {
|
|||
public static final String PERSISTENT_STATE_DIRECTORY = "nifi.persistent.state.directory";
|
||||
public static final String BORED_YIELD_DURATION = "nifi.bored.yield.duration";
|
||||
public static final String PROCESSOR_SCHEDULING_TIMEOUT = "nifi.processor.scheduling.timeout";
|
||||
public static final String BACKPRESSURE_COUNT = "nifi.queue.backpressure.count";
|
||||
public static final String BACKPRESSURE_SIZE = "nifi.queue.backpressure.size";
|
||||
|
||||
// content repository properties
|
||||
public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory.";
|
||||
|
@ -250,6 +252,8 @@ public abstract class NiFiProperties {
|
|||
public static final String DEFAULT_SWAP_OUT_PERIOD = "5 sec";
|
||||
public static final int DEFAULT_SWAP_IN_THREADS = 4;
|
||||
public static final int DEFAULT_SWAP_OUT_THREADS = 4;
|
||||
public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L;
|
||||
public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB";
|
||||
public static final String DEFAULT_ADMINISTRATIVE_YIELD_DURATION = "30 sec";
|
||||
public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state";
|
||||
public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins";
|
||||
|
@ -1382,6 +1386,25 @@ public abstract class NiFiProperties {
|
|||
).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public Long getDefaultBackPressureObjectThreshold() {
|
||||
long backPressureCount;
|
||||
try {
|
||||
String backPressureCountStr = getProperty(BACKPRESSURE_COUNT);
|
||||
if (backPressureCountStr == null || backPressureCountStr.trim().isEmpty()) {
|
||||
backPressureCount = DEFAULT_BACKPRESSURE_COUNT;
|
||||
} else {
|
||||
backPressureCount = Long.parseLong(backPressureCountStr);
|
||||
}
|
||||
} catch (NumberFormatException nfe) {
|
||||
backPressureCount = DEFAULT_BACKPRESSURE_COUNT;
|
||||
}
|
||||
return backPressureCount;
|
||||
}
|
||||
|
||||
public String getDefaultBackPressureDataSizeThreshold() {
|
||||
return getProperty(BACKPRESSURE_SIZE, DEFAULT_BACKPRESSURE_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an instance of NiFiProperties. This should likely not be called
|
||||
* by any classes outside of the NiFi framework but can be useful by the
|
||||
|
|
|
@ -2743,6 +2743,8 @@ This cleanup mechanism takes into account only automatically created archived fl
|
|||
|nifi.flowservice.writedelay.interval|When many changes are made to the flow.xml, this property specifies how long to wait before writing out the changes, so as to batch the changes into a single write. The default value is `500 ms`.
|
||||
|nifi.administrative.yield.duration|If a component allows an unexpected exception to escape, it is considered a bug. As a result, the framework will pause (or administratively yield) the component for this amount of time. This is done so that the component does not use up massive amounts of system resources, since it is known to have problems in the existing state. The default value is `30 secs`.
|
||||
|nifi.bored.yield.duration|When a component has no work to do (i.e., is "bored"), this is the amount of time it will wait before checking to see if it has new data to work on. This way, it does not use up CPU resources by checking for new work too often. When setting this property, be aware that it could add extra latency for components that do not constantly have work to do, as once they go into this "bored" state, they will wait this amount of time before checking for more work. The default value is `10 ms`.
|
||||
|nifi.queue.backpressure.count|When drawing a new connection between two components, this is the default value for that connection's back pressure object threshold. The default is 10000 and the value must be an integer.
|
||||
|nifi.queue.backpressure.size|When drawing a new connection between two components, this is the default value for that connection's back pressure data size threshold. The default is 1 GB and the value must be a data size including the unit of measure.
|
||||
|nifi.authorizer.configuration.file*|This is the location of the file that specifies how authorizers are defined. The default value is `./conf/authorizers.xml`.
|
||||
|nifi.login.identity.provider.configuration.file*|This is the location of the file that specifies how username/password authentication is performed. This file is
|
||||
only considered if `nifi.security.user.login.identity.provider` is configured with a provider identifier. The default value is `./conf/login-identity-providers.xml`.
|
||||
|
|
|
@ -1008,6 +1008,7 @@ applying back pressure. This value is configured by entering a number followed b
|
|||
kilobytes, `MB` for megabytes, `GB` for gigabytes, or `TB` for terabytes).
|
||||
|
||||
NOTE: By default each new connection added will have a default Back Pressure Object Threshold of 10,000 objects and Back Pressure Data Size Threshold of 1 GB.
|
||||
These defaults can be changed by modifying the appropriate properties in the `nifi.properties` file.
|
||||
|
||||
When back pressure is enabled, small progress bars appear on the connection label, so the DFM can see it at-a-glance when looking at a flow on the canvas. The progress bars change color based on the queue percentage: Green (0-60%), Yellow (61-85%) and Red (86-100%).
|
||||
|
||||
|
|
|
@ -37,6 +37,9 @@ public class FlowConfigurationDTO {
|
|||
private Date currentTime;
|
||||
private Integer timeOffset;
|
||||
|
||||
private Long defaultBackPressureObjectThreshold;
|
||||
private String defaultBackPressureDataSizeThreshold;
|
||||
|
||||
/**
|
||||
* @return interval in seconds between the automatic NiFi refresh requests. This value is read only
|
||||
*/
|
||||
|
@ -127,4 +130,32 @@ public class FlowConfigurationDTO {
|
|||
public void setTimeOffset(Integer timeOffset) {
|
||||
this.timeOffset = timeOffset;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the default back pressure object threshold
|
||||
*/
|
||||
@ApiModelProperty(
|
||||
value = "The default back pressure object threshold."
|
||||
)
|
||||
public Long getDefaultBackPressureObjectThreshold() {
|
||||
return defaultBackPressureObjectThreshold;
|
||||
}
|
||||
|
||||
public void setDefaultBackPressureObjectThreshold(Long backPressureObjectThreshold) {
|
||||
this.defaultBackPressureObjectThreshold = backPressureObjectThreshold;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the default back pressure data size threshold
|
||||
*/
|
||||
@ApiModelProperty(
|
||||
value = "The default back pressure data size threshold."
|
||||
)
|
||||
public String getDefaultBackPressureDataSizeThreshold() {
|
||||
return defaultBackPressureDataSizeThreshold;
|
||||
}
|
||||
|
||||
public void setDefaultBackPressureDataSizeThreshold(String backPressureDataSizeThreshold) {
|
||||
this.defaultBackPressureDataSizeThreshold = backPressureDataSizeThreshold;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,7 +86,8 @@ public final class StandardConnection implements Connection {
|
|||
relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
|
||||
scheduler = builder.scheduler;
|
||||
flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager,
|
||||
scheduler, builder.swapManager, builder.eventReporter, builder.queueSwapThreshold);
|
||||
scheduler, builder.swapManager, builder.eventReporter, builder.queueSwapThreshold,
|
||||
builder.defaultBackPressureObjectThreshold, builder.defaultBackPressureDataSizeThreshold);
|
||||
hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
|
||||
}
|
||||
|
||||
|
@ -387,6 +388,8 @@ public final class StandardConnection implements Connection {
|
|||
private ProvenanceEventRepository provenanceRepository;
|
||||
private ResourceClaimManager resourceClaimManager;
|
||||
private int queueSwapThreshold;
|
||||
private Long defaultBackPressureObjectThreshold;
|
||||
private String defaultBackPressureDataSizeThreshold;
|
||||
|
||||
public Builder(final ProcessScheduler scheduler) {
|
||||
this.scheduler = scheduler;
|
||||
|
@ -463,6 +466,16 @@ public final class StandardConnection implements Connection {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder defaultBackPressureObjectThreshold(final long defaultBackPressureObjectThreshold) {
|
||||
this.defaultBackPressureObjectThreshold = defaultBackPressureObjectThreshold;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder defaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
|
||||
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
|
||||
return this;
|
||||
}
|
||||
|
||||
public StandardConnection build() {
|
||||
if (source == null) {
|
||||
throw new IllegalStateException("Cannot build a Connection without a Source");
|
||||
|
|
|
@ -991,6 +991,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
.destination(destination)
|
||||
.swapManager(swapManager)
|
||||
.queueSwapThreshold(nifiProperties.getQueueSwapThreshold())
|
||||
.defaultBackPressureObjectThreshold(nifiProperties.getDefaultBackPressureObjectThreshold())
|
||||
.defaultBackPressureDataSizeThreshold(nifiProperties.getDefaultBackPressureDataSizeThreshold())
|
||||
.eventReporter(eventReporter)
|
||||
.resourceClaimManager(resourceClaimManager)
|
||||
.flowFileRepository(flowFileRepository)
|
||||
|
|
|
@ -95,10 +95,7 @@ public class StandardFlowFileQueue implements FlowFileQueue {
|
|||
|
||||
private boolean swapMode = false;
|
||||
|
||||
public static final int DEFAULT_BACKPRESSURE_COUNT = 10000;
|
||||
public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB";
|
||||
private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize(DEFAULT_BACKPRESSURE_SIZE,
|
||||
DataUnit.parseDataSize(DEFAULT_BACKPRESSURE_SIZE, DataUnit.B).longValue(), DEFAULT_BACKPRESSURE_COUNT));
|
||||
private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>();
|
||||
private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L));
|
||||
|
||||
private final EventReporter eventReporter;
|
||||
|
@ -122,7 +119,8 @@ public class StandardFlowFileQueue implements FlowFileQueue {
|
|||
private final ProcessScheduler scheduler;
|
||||
|
||||
public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
|
||||
final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
|
||||
final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
|
||||
final int swapThreshold, final long defaultBackPressureObjectThreshold, final String defaultBackPressureDataSizeThreshold) {
|
||||
activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
|
||||
priorities = new ArrayList<>();
|
||||
swapQueue = new ArrayList<>();
|
||||
|
@ -139,6 +137,10 @@ public class StandardFlowFileQueue implements FlowFileQueue {
|
|||
|
||||
readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100);
|
||||
writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100);
|
||||
|
||||
final MaxQueueSize initialMaxQueueSize = new MaxQueueSize(defaultBackPressureDataSizeThreshold,
|
||||
DataUnit.parseDataSize(defaultBackPressureDataSizeThreshold, DataUnit.B).longValue(), defaultBackPressureObjectThreshold);
|
||||
this.maxQueueSize.set(initialMaxQueueSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -116,7 +116,7 @@ public class TestStandardFlowFileQueue {
|
|||
}
|
||||
}).when(provRepo).registerEvents(Mockito.any(Iterable.class));
|
||||
|
||||
queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
|
||||
queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, 0L, "0 B");
|
||||
TestFlowFile.idGenerator.set(0L);
|
||||
}
|
||||
|
||||
|
@ -392,7 +392,7 @@ public class TestStandardFlowFileQueue {
|
|||
@Test
|
||||
public void testSwapInWhenThresholdIsLessThanSwapSize() {
|
||||
// create a queue where the swap threshold is less than 10k
|
||||
queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000);
|
||||
queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000, 0L, "0 B");
|
||||
|
||||
for (int i = 1; i <= 20000; i++) {
|
||||
queue.put(new TestFlowFile());
|
||||
|
|
|
@ -205,7 +205,7 @@ public class TestStandardProcessSession {
|
|||
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
|
||||
|
||||
final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null,
|
||||
processScheduler, swapManager, null, 10000);
|
||||
processScheduler, swapManager, null, 10000, 0L, "0 B");
|
||||
return Mockito.spy(actualQueue);
|
||||
}
|
||||
|
||||
|
|
|
@ -370,7 +370,7 @@ public class TestWriteAheadFlowFileRepository {
|
|||
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
|
||||
|
||||
final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
|
||||
final FlowFileQueue queue = new StandardFlowFileQueue("1234", connection, null, null, claimManager, null, swapMgr, null, 10000);
|
||||
final FlowFileQueue queue = new StandardFlowFileQueue("1234", connection, null, null, claimManager, null, swapMgr, null, 10000, 0L, "0 B");
|
||||
|
||||
when(connection.getFlowFileQueue()).thenReturn(queue);
|
||||
queueProvider.addConnection(connection);
|
||||
|
|
|
@ -34,6 +34,8 @@
|
|||
<nifi.flowservice.writedelay.interval>500 ms</nifi.flowservice.writedelay.interval>
|
||||
<nifi.administrative.yield.duration>30 sec</nifi.administrative.yield.duration>
|
||||
<nifi.bored.yield.duration>10 millis</nifi.bored.yield.duration>
|
||||
<nifi.queue.backpressure.count>10000</nifi.queue.backpressure.count>
|
||||
<nifi.queue.backpressure.size>1 GB</nifi.queue.backpressure.size>
|
||||
|
||||
<nifi.flow.configuration.file>./conf/flow.xml.gz</nifi.flow.configuration.file>
|
||||
<nifi.flow.configuration.archive.enabled>true</nifi.flow.configuration.archive.enabled>
|
||||
|
|
|
@ -26,6 +26,8 @@ nifi.flowservice.writedelay.interval=${nifi.flowservice.writedelay.interval}
|
|||
nifi.administrative.yield.duration=${nifi.administrative.yield.duration}
|
||||
# If a component has no work to do (is "bored"), how long should we wait before checking again for work?
|
||||
nifi.bored.yield.duration=${nifi.bored.yield.duration}
|
||||
nifi.queue.backpressure.count=${nifi.queue.backpressure.count}
|
||||
nifi.queue.backpressure.size=${nifi.queue.backpressure.size}
|
||||
|
||||
nifi.authorizer.configuration.file=${nifi.authorizer.configuration.file}
|
||||
nifi.login.identity.provider.configuration.file=${nifi.login.identity.provider.configuration.file}
|
||||
|
|
|
@ -3177,7 +3177,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
|
||||
@Override
|
||||
public FlowConfigurationEntity getFlowConfiguration() {
|
||||
final FlowConfigurationDTO dto = dtoFactory.createFlowConfigurationDto(properties.getAutoRefreshInterval());
|
||||
final FlowConfigurationDTO dto = dtoFactory.createFlowConfigurationDto(properties.getAutoRefreshInterval(),
|
||||
properties.getDefaultBackPressureObjectThreshold(), properties.getDefaultBackPressureDataSizeThreshold());
|
||||
final FlowConfigurationEntity entity = new FlowConfigurationEntity();
|
||||
entity.setFlowConfiguration(dto);
|
||||
return entity;
|
||||
|
|
|
@ -263,7 +263,9 @@ public final class DtoFactory {
|
|||
return dto;
|
||||
}
|
||||
|
||||
public FlowConfigurationDTO createFlowConfigurationDto(final String autoRefreshInterval) {
|
||||
public FlowConfigurationDTO createFlowConfigurationDto(final String autoRefreshInterval,
|
||||
final Long defaultBackPressureObjectThreshold,
|
||||
final String defaultBackPressureDataSizeThreshold) {
|
||||
final FlowConfigurationDTO dto = new FlowConfigurationDTO();
|
||||
|
||||
// get the refresh interval
|
||||
|
@ -277,6 +279,9 @@ public final class DtoFactory {
|
|||
dto.setTimeOffset(TimeZone.getDefault().getOffset(now.getTime()));
|
||||
dto.setCurrentTime(now);
|
||||
|
||||
dto.setDefaultBackPressureDataSizeThreshold(defaultBackPressureDataSizeThreshold);
|
||||
dto.setDefaultBackPressureObjectThreshold(defaultBackPressureObjectThreshold);
|
||||
|
||||
return dto;
|
||||
}
|
||||
|
||||
|
|
|
@ -369,7 +369,7 @@
|
|||
nfBirdseye.init(nfGraph);
|
||||
|
||||
// initialize the connection config and invert control of the birdseye and graph
|
||||
nfConnectionConfiguration.init(nfBirdseye, nfGraph);
|
||||
nfConnectionConfiguration.init(nfBirdseye, nfGraph, configDetails.defaultBackPressureObjectThreshold, configDetails.defaultBackPressureDataSizeThreshold);
|
||||
nfControllerService.init(nfControllerServices);
|
||||
nfReportingTask.init(nfSettings);
|
||||
nfPolicyManagement.init();
|
||||
|
|
|
@ -56,6 +56,9 @@
|
|||
var nfBirdseye;
|
||||
var nfGraph;
|
||||
|
||||
var defaultBackPressureObjectThreshold;
|
||||
var defaultBackPressureDataSizeThreshold;
|
||||
|
||||
var CONNECTION_OFFSET_Y_INCREMENT = 75;
|
||||
var CONNECTION_OFFSET_X_INCREMENT = 200;
|
||||
|
||||
|
@ -1175,10 +1178,13 @@
|
|||
* @param nfBirdseyeRef The nfBirdseye module.
|
||||
* @param nfGraphRef The nfGraph module.
|
||||
*/
|
||||
init: function (nfBirdseyeRef, nfGraphRef) {
|
||||
init: function (nfBirdseyeRef, nfGraphRef, defaultBackPressureObjectThresholdRef, defaultBackPressureDataSizeThresholdRef) {
|
||||
nfBirdseye = nfBirdseyeRef;
|
||||
nfGraph = nfGraphRef;
|
||||
|
||||
defaultBackPressureObjectThreshold = defaultBackPressureObjectThresholdRef;
|
||||
defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThresholdRef;
|
||||
|
||||
// initially hide the relationship names container
|
||||
$('#relationship-names-container').hide();
|
||||
|
||||
|
@ -1275,8 +1281,8 @@
|
|||
$.when(initializeSourceNewConnectionDialog(source), initializeDestinationNewConnectionDialog(destination)).done(function () {
|
||||
// set the default values
|
||||
$('#flow-file-expiration').val('0 sec');
|
||||
$('#back-pressure-object-threshold').val('10000');
|
||||
$('#back-pressure-data-size-threshold').val('1 GB');
|
||||
$('#back-pressure-object-threshold').val(defaultBackPressureObjectThreshold);
|
||||
$('#back-pressure-data-size-threshold').val(defaultBackPressureDataSizeThreshold);
|
||||
|
||||
// select the first tab
|
||||
$('#connection-configuration-tabs').find('li:first').click();
|
||||
|
|
Loading…
Reference in New Issue