mirror of https://github.com/apache/nifi.git
NIFI-9437 Flowfile Expiration cannot exceed 24 days
NIFI-9437 Addressing review comments, correcting setter in StatelessFlowQueue class Signed-off-by: Nathan Gough <thenatog@gmail.com> This closes #6558.
This commit is contained in:
parent
753cb1b9df
commit
7c33516aee
|
@ -177,7 +177,7 @@ public interface FlowFileQueue {
|
|||
|
||||
String getFlowFileExpiration();
|
||||
|
||||
int getFlowFileExpiration(TimeUnit timeUnit);
|
||||
long getFlowFileExpiration(TimeUnit timeUnit);
|
||||
|
||||
void setFlowFileExpiration(String flowExpirationPeriod);
|
||||
|
||||
|
|
|
@ -98,8 +98,8 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int getFlowFileExpiration(final TimeUnit timeUnit) {
|
||||
return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS);
|
||||
public long getFlowFileExpiration(final TimeUnit timeUnit) {
|
||||
return timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -138,7 +138,7 @@ public class SwappablePriorityQueueLocalPartition implements LocalQueuePartition
|
|||
return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
|
||||
}
|
||||
|
||||
private int getExpiration() {
|
||||
private long getExpiration() {
|
||||
return flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
|
|
|
@ -262,7 +262,7 @@ public class TestWriteAheadFlowFileRepository {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int getFlowFileExpiration(TimeUnit timeUnit) {
|
||||
public long getFlowFileExpiration(TimeUnit timeUnit) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ public class ExpirationMatcher implements AttributeMatcher<Connection> {
|
|||
@Override
|
||||
public void match(final Connection component, final SearchQuery query, final List<String> matches) {
|
||||
if (containsKeyword(query)) {
|
||||
final int expirationMillis = component.getFlowFileQueue().getFlowFileExpiration(TimeUnit.MILLISECONDS);
|
||||
final long expirationMillis = component.getFlowFileQueue().getFlowFileExpiration(TimeUnit.MILLISECONDS);
|
||||
|
||||
if (expirationMillis > 0) {
|
||||
matches.add(MATCH_PREFIX + component.getFlowFileQueue().getFlowFileExpiration());
|
||||
|
|
|
@ -226,7 +226,7 @@ public class ComponentMockUtil {
|
|||
final Optional<String> versionedId,
|
||||
final Collection<Relationship> relationships,
|
||||
final List<FlowFilePrioritizer> flowFilePrioritizers,
|
||||
final int flowFileExpirationInMs,
|
||||
final long flowFileExpirationInMs,
|
||||
final String backPressureDataSize,
|
||||
final long backPressureCount,
|
||||
final Connectable source,
|
||||
|
|
|
@ -509,7 +509,7 @@ public class ControllerSearchServiceIntegrationTest extends AbstractControllerSe
|
|||
final Connection connection = getConnection("connection", "connectionName", getBasicRelationships(), processor1, processor2, AUTHORIZED);
|
||||
|
||||
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
|
||||
Mockito.when(flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS)).thenReturn(5);
|
||||
Mockito.when(flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS)).thenReturn(5L);
|
||||
Mockito.when(flowFileQueue.getFlowFileExpiration()).thenReturn("5");
|
||||
Mockito.when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
|
||||
|
||||
|
|
|
@ -96,11 +96,11 @@ public class ExpirationMatcherTest extends AbstractAttributeMatcherTest{
|
|||
}
|
||||
|
||||
private void givenExpired() {
|
||||
Mockito.when(flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS)).thenReturn(5);
|
||||
Mockito.when(flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS)).thenReturn(5L);
|
||||
Mockito.when(flowFileQueue.getFlowFileExpiration()).thenReturn("5");
|
||||
}
|
||||
|
||||
private void givenNotExpired() {
|
||||
Mockito.when(flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS)).thenReturn(0);
|
||||
Mockito.when(flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS)).thenReturn(0L);
|
||||
}
|
||||
}
|
|
@ -283,13 +283,13 @@ public class StatelessFlowFileQueue implements DrainableFlowFileQueue {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int getFlowFileExpiration(final TimeUnit timeUnit) {
|
||||
return (int) timeUnit.convert(expirationMillis, TimeUnit.MILLISECONDS);
|
||||
public long getFlowFileExpiration(final TimeUnit timeUnit) {
|
||||
return timeUnit.convert(expirationMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFlowFileExpiration(final String flowExpirationPeriod) {
|
||||
expirationMillis = (int) FormatUtils.getPreciseTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS);
|
||||
expirationMillis = Math.round(FormatUtils.getPreciseTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue