NIFI-12853 Refactor FlowFilePrioritizer using updated Java APIs

Signed-off-by: Mike Moser <mosermw@apache.org>

This closes #8461
This commit is contained in:
EndzeitBegins 2024-02-29 23:05:46 +01:00 committed by Mike Moser
parent a79b210d4e
commit 63fa036818
9 changed files with 183 additions and 232 deletions

View File

@ -19,11 +19,10 @@ package org.apache.nifi.flowfile;
import java.util.Comparator;
/**
* Provides a mechanism to prioritize flow file objects based on their
* attributes. The actual flow file content will not be available for comparison
* so if features of that content are necessary for prioritization it should be
* extracted to be used as an attribute of the flow file.
*
* Provides a mechanism to prioritize flow file objects based on their attributes.
* The actual flow file content will not be available for comparison.
* If features of that content are necessary for prioritization,
* it should be extracted to be used as an attribute of the flow file.
*/
public interface FlowFilePrioritizer extends Comparator<FlowFile> {
}

View File

@ -19,24 +19,18 @@ package org.apache.nifi.prioritizer;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import java.util.Comparator;
public class FirstInFirstOutPrioritizer implements FlowFilePrioritizer {
private static final Comparator<FlowFile> composedComparator = Comparator.nullsLast(
Comparator
.comparingLong(FlowFile::getLastQueueDate)
.thenComparingLong(FlowFile::getQueueDateIndex)
);
@Override
public int compare(final FlowFile o1, final FlowFile o2) {
if (o1 == null && o2 == null) {
return 0;
} else if (o2 == null) {
return -1;
} else if (o1 == null) {
return 1;
}
final int dateComparison = o1.getLastQueueDate().compareTo(o2.getLastQueueDate());
if (dateComparison != 0) {
return dateComparison;
}
return Long.compare(o1.getQueueDateIndex(), o2.getQueueDateIndex());
return composedComparator.compare(o1, o2);
}
}

View File

@ -19,24 +19,19 @@ package org.apache.nifi.prioritizer;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import java.util.Comparator;
public class NewestFlowFileFirstPrioritizer implements FlowFilePrioritizer {
private static final Comparator<FlowFile> composedComparator = Comparator.nullsLast(
Comparator
.comparingLong(FlowFile::getLineageStartDate)
.thenComparingLong(FlowFile::getLineageStartIndex)
.reversed()
);
@Override
public int compare(final FlowFile o1, final FlowFile o2) {
if (o1 == null && o2 == null) {
return 0;
} else if (o2 == null) {
return -1;
} else if (o1 == null) {
return 1;
}
final int lineageDateCompare = Long.compare(o2.getLineageStartDate(), o1.getLineageStartDate());
if (lineageDateCompare != 0) {
return lineageDateCompare;
}
return Long.compare(o2.getLineageStartIndex(), o1.getLineageStartIndex());
return composedComparator.compare(o1, o2);
}
}

View File

@ -19,24 +19,18 @@ package org.apache.nifi.prioritizer;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import java.util.Comparator;
public class OldestFlowFileFirstPrioritizer implements FlowFilePrioritizer {
private static final Comparator<FlowFile> composedComparator = Comparator.nullsLast(
Comparator
.comparingLong(FlowFile::getLineageStartDate)
.thenComparingLong(FlowFile::getLineageStartIndex)
);
@Override
public int compare(final FlowFile o1, final FlowFile o2) {
if (o1 == null && o2 == null) {
return 0;
} else if (o2 == null) {
return -1;
} else if (o1 == null) {
return 1;
}
final int lineageDateCompare = Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate());
if (lineageDateCompare != 0) {
return lineageDateCompare;
}
return Long.compare(o1.getLineageStartIndex(), o2.getLineageStartIndex());
return composedComparator.compare(o1, o2);
}
}

View File

@ -16,71 +16,55 @@
*/
package org.apache.nifi.prioritizer;
import java.util.regex.Pattern;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import java.util.Comparator;
import java.util.function.Predicate;
import java.util.regex.Pattern;
/**
* This prioritizer checks each FlowFile for a "priority" attribute and lets
* that attribute determine the priority.
*
* 1. if neither FlowFile has a "priority" attribute then order will be
* FirstInFirstOut 2. if one FlowFile has a "priority" attribute and the other
* does not, then the one with the attribute wins 3. if one or both "priority"
* attributes is an integer, then the lowest number wins 4. the "priority"
* attributes are compared lexicographically and the lowest wins
* This prioritizer checks each FlowFile for a "priority" attribute and lets that attribute determine the priority.
* <p>
* 1. If one FlowFile has a "priority" attribute and the other does not, then the one with the attribute wins.
* 2. If only one's FlowFile "priority" attribute is an integer, then that FlowFile wins.
* 3. If the "priority" attributes of both are an integer, then the FlowFile with the lowest number wins.
* 4. If the "priority" attributes of both are not an integer, they're compared lexicographically and the lowest wins.
*/
public class PriorityAttributePrioritizer implements FlowFilePrioritizer {
private static final Pattern intPattern = Pattern.compile("-?\\d+");
private static final Predicate<String> isInteger = Pattern.compile("-?\\d+").asMatchPredicate();
private static final Comparator<String> priorityAttributeComparator = Comparator.nullsLast(
Comparator.comparing(
PriorityAttributePrioritizer::parseLongOrNull,
Comparator.nullsLast(Long::compare)
).thenComparing(Comparator.naturalOrder())
);
private static final Comparator<FlowFile> composedComparator = Comparator.nullsLast(
Comparator.comparing(
flowFile -> flowFile.getAttribute(CoreAttributes.PRIORITY.key()),
priorityAttributeComparator
)
);
private static Long parseLongOrNull(String attribute) {
final String trimmedAttribute = attribute.trim();
if (isInteger.test(trimmedAttribute)) {
try {
return Long.parseLong(trimmedAttribute);
} catch (NumberFormatException ignored) {
return null;
}
}
return null;
}
@Override
public int compare(FlowFile o1, FlowFile o2) {
if (o1 == null && o2 == null) {
return 0;
} else if (o2 == null) {
return -1;
} else if (o1 == null) {
return 1;
}
String o1Priority = o1.getAttribute(CoreAttributes.PRIORITY.key());
String o2Priority = o2.getAttribute(CoreAttributes.PRIORITY.key());
if (o1Priority == null && o2Priority == null) {
return 0;
} else if (o2Priority == null) {
return -1;
} else if (o1Priority == null) {
return 1;
}
// priority exists on both FlowFiles
if (intPattern.matcher(o1Priority.trim()).matches()) {
if (intPattern.matcher(o2Priority.trim()).matches()) {
try {
// both o1Priority and o2Priority are numbers
long o1num = Long.parseLong(o1Priority.trim());
long o2num = Long.parseLong(o2Priority.trim());
return o1num < o2num ? -1 : (o1num > o2num ? 1 : 0);
} catch (NumberFormatException e) {
// not a long after regex matched
return 0;
}
} else {
// o1Priority is a number, o2Priority is not, o1 wins
return -1;
}
} else {
if (intPattern.matcher(o2Priority.trim()).matches()) {
// o2Priority is a number, o1Priority is not, o2 wins
return 1;
} else {
// neither o1Priority nor o2Priority are numbers
return o1Priority.compareTo(o2Priority);
}
}
return composedComparator.compare(o1, o2);
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.prioritizer;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@SuppressWarnings("EqualsWithItself")
public class FirstInFirstOutPrioritizerTest {
private final TestRunner testRunner = TestRunners.newTestRunner(NoOpProcessor.class);
private final FlowFilePrioritizer prioritizer = new FirstInFirstOutPrioritizer();
@Test
public void testPrioritizer() {
MockFlowFile flowFile1 = testRunner.enqueue("created first but 'enqueued' later");
flowFile1.setLastEnqueuedDate(830822400000L);
MockFlowFile flowFile2 = testRunner.enqueue("created second but 'enqueued' earlier");
flowFile2.setLastEnqueuedDate(795916800000L);
assertEquals(0, prioritizer.compare(null, null));
assertEquals(-1, prioritizer.compare(flowFile1, null));
assertEquals(1, prioritizer.compare(null, flowFile1));
assertEquals(0, prioritizer.compare(flowFile1, flowFile1));
assertEquals(0, prioritizer.compare(flowFile2, flowFile2));
assertEquals(1, prioritizer.compare(flowFile1, flowFile2));
assertEquals(-1, prioritizer.compare(flowFile2, flowFile1));
}
}

View File

@ -16,33 +16,27 @@
*/
package org.apache.nifi.prioritizer;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.SharedSessionState;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
@SuppressWarnings("EqualsWithItself")
public class NewestFirstPrioritizerTest {
private final TestRunner testRunner = TestRunners.newTestRunner(NoOpProcessor.class);
private final FlowFilePrioritizer prioritizer = new NewestFlowFileFirstPrioritizer();
@Test
public void testPrioritizer() {
final Processor processor = new NoOpProcessor();
final AtomicLong idGenerator = new AtomicLong(0L);
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class));
public void testPrioritizer() throws InterruptedException {
final FlowFile flowFile1 = testRunner.enqueue("flowFile1");
Thread.sleep(2); // guarantee the FlowFile entryDate for flowFile2 is different than flowFile1
final FlowFile flowFile2 = testRunner.enqueue("flowFile1");
final MockFlowFile flowFile1 = session.create();
try {
Thread.sleep(2); // guarantee the FlowFile entryDate for flowFile2 is different than flowFile1
} catch (final InterruptedException e) {
}
final MockFlowFile flowFile2 = session.create();
final NewestFlowFileFirstPrioritizer prioritizer = new NewestFlowFileFirstPrioritizer();
assertEquals(0, prioritizer.compare(null, null));
assertEquals(-1, prioritizer.compare(flowFile1, null));
assertEquals(1, prioritizer.compare(null, flowFile1));
@ -51,5 +45,4 @@ public class NewestFirstPrioritizerTest {
assertEquals(1, prioritizer.compare(flowFile1, flowFile2));
assertEquals(-1, prioritizer.compare(flowFile2, flowFile1));
}
}

View File

@ -16,37 +16,27 @@
*/
package org.apache.nifi.prioritizer;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
@SuppressWarnings("EqualsWithItself")
public class OldestFirstPrioritizerTest {
private final TestRunner testRunner = TestRunners.newTestRunner(NoOpProcessor.class);
private final FlowFilePrioritizer prioritizer = new OldestFlowFileFirstPrioritizer();
@Test
public void testPrioritizer() {
final Processor processor = new SimpleProcessor();
final AtomicLong idGenerator = new AtomicLong(0L);
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class));
public void testPrioritizer() throws InterruptedException {
final FlowFile flowFile1 = testRunner.enqueue("flowFile1");
Thread.sleep(2); // guarantee the FlowFile entryDate for flowFile2 is different than flowFile1
final FlowFile flowFile2 = testRunner.enqueue("flowFile2");
final MockFlowFile flowFile1 = session.create();
try {
Thread.sleep(2); // guarantee the FlowFile entryDate for flowFile2 is different than flowFile1
} catch (final InterruptedException e) {
}
final MockFlowFile flowFile2 = session.create();
final OldestFlowFileFirstPrioritizer prioritizer = new OldestFlowFileFirstPrioritizer();
assertEquals(0, prioritizer.compare(null, null));
assertEquals(-1, prioritizer.compare(flowFile1, null));
assertEquals(1, prioritizer.compare(null, flowFile1));
@ -55,13 +45,4 @@ public class OldestFirstPrioritizerTest {
assertEquals(-1, prioritizer.compare(flowFile1, flowFile2));
assertEquals(1, prioritizer.compare(flowFile2, flowFile1));
}
public class SimpleProcessor extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}
}

View File

@ -16,102 +16,64 @@
*/
package org.apache.nifi.prioritizer;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
import org.junit.jupiter.api.BeforeAll;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
@SuppressWarnings("EqualsWithItself")
public class PriorityAttributePrioritizerTest {
static Map<String, String> attrsPri1 = new HashMap<>();
static Map<String, String> attrsPri2 = new HashMap<>();
static Map<String, String> attrsPrin1 = new HashMap<>();
static Map<String, String> attrsPriA = new HashMap<>();
static Map<String, String> attrsPriB = new HashMap<>();
static Map<String, String> attrsPriLP = new HashMap<>();
static Map<String, String> attrsPriLN = new HashMap<>();
@BeforeAll
public static void init() {
attrsPri1.put(CoreAttributes.PRIORITY.key(), "1");
attrsPri2.put(CoreAttributes.PRIORITY.key(), "2");
attrsPrin1.put(CoreAttributes.PRIORITY.key(), "-1");
attrsPriA.put(CoreAttributes.PRIORITY.key(), "A");
attrsPriB.put(CoreAttributes.PRIORITY.key(), "B");
attrsPriLP.put(CoreAttributes.PRIORITY.key(), "5432123456789");
attrsPriLN.put(CoreAttributes.PRIORITY.key(), "-5432123456789");
}
private final TestRunner testRunner = TestRunners.newTestRunner(NoOpProcessor.class);
private final FlowFilePrioritizer prioritizer = new PriorityAttributePrioritizer();
@Test
public void testPrioritizer() {
final Processor processor = new SimpleProcessor();
final AtomicLong idGenerator = new AtomicLong(0L);
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class));
final FlowFile ffWithoutPriority = testRunner.enqueue("data");
final FlowFile ffWithPriority1 = enqueueWithPriority("1");
final FlowFile ffWithPriority2 = enqueueWithPriority("2");
final FlowFile ffWithPriorityNegative1 = enqueueWithPriority("-1");
final FlowFile ffWithPriorityA = enqueueWithPriority("A");
final FlowFile ffWithPriorityB = enqueueWithPriority("B");
final FlowFile ffWithLongPriority = enqueueWithPriority("5432123456789");
final FlowFile ffWithNegativeLongPriority = enqueueWithPriority("-5432123456789");
final MockFlowFile ffNoPriority = session.create();
final MockFlowFile ffPri1 = session.create();
ffPri1.putAttributes(attrsPri1);
final MockFlowFile ffPri2 = session.create();
ffPri2.putAttributes(attrsPri2);
final MockFlowFile ffPrin1 = session.create();
ffPrin1.putAttributes(attrsPrin1);
final MockFlowFile ffPriA = session.create();
ffPriA.putAttributes(attrsPriA);
final MockFlowFile ffPriB = session.create();
ffPriB.putAttributes(attrsPriB);
final MockFlowFile ffPriLP = session.create();
ffPriLP.putAttributes(attrsPriLP);
final MockFlowFile ffPriLN = session.create();
ffPriLN.putAttributes(attrsPriLN);
final PriorityAttributePrioritizer prioritizer = new PriorityAttributePrioritizer();
assertEquals(0, prioritizer.compare(null, null));
assertEquals(-1, prioritizer.compare(ffNoPriority, null));
assertEquals(1, prioritizer.compare(null, ffNoPriority));
assertEquals(-1, prioritizer.compare(ffWithoutPriority, null));
assertEquals(1, prioritizer.compare(null, ffWithoutPriority));
assertEquals(0, prioritizer.compare(ffNoPriority, ffNoPriority));
assertEquals(-1, prioritizer.compare(ffPri1, ffNoPriority));
assertEquals(1, prioritizer.compare(ffNoPriority, ffPri1));
assertEquals(0, prioritizer.compare(ffWithoutPriority, ffWithoutPriority));
assertEquals(-1, prioritizer.compare(ffWithPriority1, ffWithoutPriority));
assertEquals(1, prioritizer.compare(ffWithoutPriority, ffWithPriority1));
assertEquals(0, prioritizer.compare(ffPri1, ffPri1));
assertEquals(-1, prioritizer.compare(ffPri1, ffPri2));
assertEquals(1, prioritizer.compare(ffPri2, ffPri1));
assertEquals(-1, prioritizer.compare(ffPrin1, ffPri1));
assertEquals(1, prioritizer.compare(ffPri1, ffPrin1));
assertEquals(0, prioritizer.compare(ffWithPriority1, ffWithPriority1));
assertEquals(-1, prioritizer.compare(ffWithPriority1, ffWithPriority2));
assertEquals(1, prioritizer.compare(ffWithPriority2, ffWithPriority1));
assertEquals(-1, prioritizer.compare(ffWithPriorityNegative1, ffWithPriority1));
assertEquals(1, prioritizer.compare(ffWithPriority1, ffWithPriorityNegative1));
assertEquals(-1, prioritizer.compare(ffPri1, ffPriA));
assertEquals(1, prioritizer.compare(ffPriA, ffPri1));
assertEquals(-1, prioritizer.compare(ffWithPriority1, ffWithPriorityA));
assertEquals(1, prioritizer.compare(ffWithPriorityA, ffWithPriority1));
assertEquals(0, prioritizer.compare(ffPriA, ffPriA));
assertEquals(-1, prioritizer.compare(ffPriA, ffPriB));
assertEquals(1, prioritizer.compare(ffPriB, ffPriA));
assertEquals(0, prioritizer.compare(ffWithPriorityA, ffWithPriorityA));
assertEquals(-1, prioritizer.compare(ffWithPriorityA, ffWithPriorityB));
assertEquals(1, prioritizer.compare(ffWithPriorityB, ffWithPriorityA));
assertEquals(1, prioritizer.compare(ffPriLP, ffPri1));
assertEquals(-1, prioritizer.compare(ffPri1, ffPriLP));
assertEquals(-1, prioritizer.compare(ffPriLN, ffPri1));
assertEquals(1, prioritizer.compare(ffPri1, ffPriLN));
assertEquals(1, prioritizer.compare(ffWithLongPriority, ffWithPriority1));
assertEquals(-1, prioritizer.compare(ffWithPriority1, ffWithLongPriority));
assertEquals(-1, prioritizer.compare(ffWithNegativeLongPriority, ffWithPriority1));
assertEquals(1, prioritizer.compare(ffWithPriority1, ffWithNegativeLongPriority));
}
public class SimpleProcessor extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
private MockFlowFile enqueueWithPriority(String priority) {
return testRunner.enqueue("data", Map.of(CoreAttributes.PRIORITY.key(), priority));
}
}