mirror of
https://github.com/apache/nifi.git
synced 2025-03-06 09:29:33 +00:00
NIFI-11853 Changed Embedded QuestDB Tests to use JUnit5 TempDir
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #7523.
This commit is contained in:
parent
319c974e7c
commit
72618b1817
@ -16,23 +16,17 @@
|
||||
*/
|
||||
package org.apache.nifi.controller.status.history;
|
||||
|
||||
import org.apache.nifi.util.FileUtils;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Date;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class AbstractEmbeddedQuestDbStatusHistoryRepositoryTest extends AbstractStatusHistoryRepositoryTest {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEmbeddedQuestDbStatusHistoryRepositoryTest.class);
|
||||
|
||||
protected static final String PATH = "target/questdb";
|
||||
protected static final long NOW = System.currentTimeMillis();
|
||||
protected static final Date START = new Date(0);
|
||||
protected static final Date INSERTED_AT = new Date(NOW - TimeUnit.MINUTES.toMillis(1));
|
||||
@ -43,27 +37,22 @@ public abstract class AbstractEmbeddedQuestDbStatusHistoryRepositoryTest extends
|
||||
protected static final int DAYS_TO_KEEP_DATA = 7;
|
||||
protected static final long PERSIST_FREQUENCY = 50; //200 milliseconds
|
||||
|
||||
protected EmbeddedQuestDbStatusHistoryRepository testSubject;
|
||||
protected String path;
|
||||
protected EmbeddedQuestDbStatusHistoryRepository repository;
|
||||
|
||||
@TempDir
|
||||
private Path temporaryDirectory;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
path = PATH + System.currentTimeMillis();
|
||||
testSubject = givenTestSubject();
|
||||
repository = startRepository();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() throws Exception {
|
||||
testSubject.shutdown();
|
||||
|
||||
try {
|
||||
FileUtils.deleteFile(new File(path), true);
|
||||
} catch (final Exception e) {
|
||||
LOGGER.error("Could not delete database directory", e);
|
||||
}
|
||||
public void tearDown() {
|
||||
repository.shutdown();
|
||||
}
|
||||
|
||||
private EmbeddedQuestDbStatusHistoryRepository givenTestSubject() {
|
||||
private EmbeddedQuestDbStatusHistoryRepository startRepository() {
|
||||
final NiFiProperties niFiProperties = Mockito.mock(NiFiProperties.class);
|
||||
|
||||
Mockito.when(niFiProperties.getIntegerProperty(
|
||||
@ -76,14 +65,14 @@ public abstract class AbstractEmbeddedQuestDbStatusHistoryRepositoryTest extends
|
||||
NiFiProperties.DEFAULT_COMPONENT_STATUS_REPOSITORY_PERSIST_COMPONENT_DAYS)
|
||||
).thenReturn(DAYS_TO_KEEP_DATA);
|
||||
|
||||
Mockito.when(niFiProperties.getQuestDbStatusRepositoryPath()).thenReturn(Paths.get(path));
|
||||
Mockito.when(niFiProperties.getQuestDbStatusRepositoryPath()).thenReturn(temporaryDirectory);
|
||||
|
||||
final EmbeddedQuestDbStatusHistoryRepository testSubject = new EmbeddedQuestDbStatusHistoryRepository(niFiProperties, PERSIST_FREQUENCY);
|
||||
testSubject.start();
|
||||
return testSubject;
|
||||
}
|
||||
|
||||
protected void givenWaitUntilPersisted() throws InterruptedException {
|
||||
protected void waitUntilPersisted() throws InterruptedException {
|
||||
Thread.sleep(3000); // The actual writing happens asynchronously on a different thread
|
||||
}
|
||||
}
|
||||
|
@ -29,72 +29,67 @@ public class EmbeddedQuestDbStatusHistoryRepositoryForComponentsTest extends Abs
|
||||
|
||||
@Test
|
||||
public void testReadingEmptyRepository() {
|
||||
// when
|
||||
final StatusHistory result = testSubject.getProcessGroupStatusHistory(ROOT_GROUP_ID, START, END, PREFERRED_DATA_POINTS);
|
||||
final StatusHistory result = repository.getProcessGroupStatusHistory(ROOT_GROUP_ID, START, END, PREFERRED_DATA_POINTS);
|
||||
|
||||
// then
|
||||
assertStatusHistoryIsEmpty(result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWritingThenReadingComponents() throws Exception {
|
||||
// given
|
||||
testSubject.capture(new NodeStatus(), givenRootProcessGroupStatus(), new ArrayList<>(), INSERTED_AT);
|
||||
givenWaitUntilPersisted();
|
||||
repository.capture(new NodeStatus(), givenRootProcessGroupStatus(), new ArrayList<>(), INSERTED_AT);
|
||||
waitUntilPersisted();
|
||||
|
||||
// when & then - reading root processor group
|
||||
final StatusHistory rootGroupStatus = testSubject.getProcessGroupStatusHistory(ROOT_GROUP_ID, START, END, PREFERRED_DATA_POINTS);
|
||||
// reading root processor group
|
||||
final StatusHistory rootGroupStatus = repository.getProcessGroupStatusHistory(ROOT_GROUP_ID, START, END, PREFERRED_DATA_POINTS);
|
||||
assertCorrectStatusHistory(rootGroupStatus, ROOT_GROUP_ID, "Root");
|
||||
assertRootProcessGroupStatusSnapshot(rootGroupStatus.getStatusSnapshots().get(0));
|
||||
|
||||
// when & then - reading child processor group
|
||||
final StatusHistory childGroupStatus = testSubject.getProcessGroupStatusHistory(CHILD_GROUP_ID, START, END, PREFERRED_DATA_POINTS);
|
||||
// reading child processor group
|
||||
final StatusHistory childGroupStatus = repository.getProcessGroupStatusHistory(CHILD_GROUP_ID, START, END, PREFERRED_DATA_POINTS);
|
||||
assertCorrectStatusHistory(childGroupStatus, CHILD_GROUP_ID, "Child");
|
||||
assertChildProcessGroupStatusSnapshot(childGroupStatus.getStatusSnapshots().get(0));
|
||||
|
||||
// when & then - reading processor (no-counter processor)
|
||||
final StatusHistory processorStatus = testSubject.getProcessorStatusHistory(PROCESSOR_ID, START, END, PREFERRED_DATA_POINTS, false);
|
||||
// reading processor (no-counter processor)
|
||||
final StatusHistory processorStatus = repository.getProcessorStatusHistory(PROCESSOR_ID, START, END, PREFERRED_DATA_POINTS, false);
|
||||
assertCorrectStatusHistory(processorStatus, PROCESSOR_ID, "Processor");
|
||||
assertProcessorStatusSnapshot(processorStatus.getStatusSnapshots().get(0));
|
||||
|
||||
// when & then - reading processor (with-counter processor)
|
||||
final StatusHistory processorWithCounterStatus1 = testSubject.getProcessorStatusHistory(PROCESSOR_WITH_COUNTER_ID, START, END, PREFERRED_DATA_POINTS, false);
|
||||
// reading processor (with-counter processor)
|
||||
final StatusHistory processorWithCounterStatus1 = repository.getProcessorStatusHistory(PROCESSOR_WITH_COUNTER_ID, START, END, PREFERRED_DATA_POINTS, false);
|
||||
assertCorrectStatusHistory(processorWithCounterStatus1, PROCESSOR_WITH_COUNTER_ID, "ProcessorWithCounter");
|
||||
assertProcessorWithCounterStatusSnapshot(processorWithCounterStatus1.getStatusSnapshots().get(0), false);
|
||||
|
||||
// when & then - reading processor (with-counter processor)
|
||||
final StatusHistory processorWithCounterStatus2 = testSubject.getProcessorStatusHistory(PROCESSOR_WITH_COUNTER_ID, START, END, PREFERRED_DATA_POINTS, true);
|
||||
// reading processor (with-counter processor)
|
||||
final StatusHistory processorWithCounterStatus2 = repository.getProcessorStatusHistory(PROCESSOR_WITH_COUNTER_ID, START, END, PREFERRED_DATA_POINTS, true);
|
||||
assertCorrectStatusHistory(processorWithCounterStatus2, PROCESSOR_WITH_COUNTER_ID, "ProcessorWithCounter");
|
||||
assertProcessorWithCounterStatusSnapshot(processorWithCounterStatus2.getStatusSnapshots().get(0), true);
|
||||
|
||||
// when & then - reading connection
|
||||
final StatusHistory connectionStatus = testSubject.getConnectionStatusHistory(CONNECTION_ID, START, END, PREFERRED_DATA_POINTS);
|
||||
// reading connection
|
||||
final StatusHistory connectionStatus = repository.getConnectionStatusHistory(CONNECTION_ID, START, END, PREFERRED_DATA_POINTS);
|
||||
assertCorrectStatusHistory(connectionStatus, CONNECTION_ID, "Connection");
|
||||
assertConnectionStatusSnapshot(connectionStatus.getStatusSnapshots().get(0));
|
||||
|
||||
// when & then - reading remote process group
|
||||
final StatusHistory remoteProcessGroupStatus = testSubject.getRemoteProcessGroupStatusHistory(REMOTE_PROCESS_GROUP_ID, START, END, PREFERRED_DATA_POINTS);
|
||||
// reading remote process group
|
||||
final StatusHistory remoteProcessGroupStatus = repository.getRemoteProcessGroupStatusHistory(REMOTE_PROCESS_GROUP_ID, START, END, PREFERRED_DATA_POINTS);
|
||||
assertCorrectStatusHistory(remoteProcessGroupStatus, REMOTE_PROCESS_GROUP_ID, "RemoteProcessGroup");
|
||||
assertRemoteProcessGroupSnapshot(remoteProcessGroupStatus.getStatusSnapshots().get(0));
|
||||
|
||||
// when & then - requesting data from out of recorded range
|
||||
final StatusHistory rootGroupStatus2 = testSubject.getProcessGroupStatusHistory(ROOT_GROUP_ID, START, END_EARLY, PREFERRED_DATA_POINTS);
|
||||
// requesting data from out of recorded range
|
||||
final StatusHistory rootGroupStatus2 = repository.getProcessGroupStatusHistory(ROOT_GROUP_ID, START, END_EARLY, PREFERRED_DATA_POINTS);
|
||||
assertStatusHistoryIsEmpty(rootGroupStatus2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadingLimitedByPreferredDataPoints() throws Exception {
|
||||
// given
|
||||
testSubject.capture(new NodeStatus(), givenSimpleRootProcessGroupStatus(), new ArrayList<>(), new Date(INSERTED_AT.getTime() - TimeUnit.MINUTES.toMillis(8)));
|
||||
testSubject.capture(new NodeStatus(), givenSimpleRootProcessGroupStatus(), new ArrayList<>(), new Date(INSERTED_AT.getTime() - TimeUnit.MINUTES.toMillis(7)));
|
||||
testSubject.capture(new NodeStatus(), givenSimpleRootProcessGroupStatus(), new ArrayList<>(), new Date(INSERTED_AT.getTime() - TimeUnit.MINUTES.toMillis(6)));
|
||||
testSubject.capture(new NodeStatus(), givenSimpleRootProcessGroupStatus(), new ArrayList<>(), new Date(INSERTED_AT.getTime() - TimeUnit.MINUTES.toMillis(5)));
|
||||
givenWaitUntilPersisted();
|
||||
repository.capture(new NodeStatus(), givenSimpleRootProcessGroupStatus(), new ArrayList<>(), new Date(INSERTED_AT.getTime() - TimeUnit.MINUTES.toMillis(8)));
|
||||
repository.capture(new NodeStatus(), givenSimpleRootProcessGroupStatus(), new ArrayList<>(), new Date(INSERTED_AT.getTime() - TimeUnit.MINUTES.toMillis(7)));
|
||||
repository.capture(new NodeStatus(), givenSimpleRootProcessGroupStatus(), new ArrayList<>(), new Date(INSERTED_AT.getTime() - TimeUnit.MINUTES.toMillis(6)));
|
||||
repository.capture(new NodeStatus(), givenSimpleRootProcessGroupStatus(), new ArrayList<>(), new Date(INSERTED_AT.getTime() - TimeUnit.MINUTES.toMillis(5)));
|
||||
waitUntilPersisted();
|
||||
|
||||
// when
|
||||
final StatusHistory result = testSubject.getProcessGroupStatusHistory(ROOT_GROUP_ID, START, END, 3);
|
||||
final StatusHistory result = repository.getProcessGroupStatusHistory(ROOT_GROUP_ID, START, END, 3);
|
||||
|
||||
// then - in case the value of preferred data points are lower than the number of snapshots available, the latest will added to the result
|
||||
// in case the value of preferred data points are lower than the number of snapshots available, the latest will added to the result
|
||||
assertEquals(3, result.getStatusSnapshots().size());
|
||||
assertEquals(new Date(INSERTED_AT.getTime() - TimeUnit.MINUTES.toMillis(7)), result.getStatusSnapshots().get(0).getTimestamp());
|
||||
assertEquals(new Date(INSERTED_AT.getTime() - TimeUnit.MINUTES.toMillis(6)), result.getStatusSnapshots().get(1).getTimestamp());
|
||||
|
@ -25,11 +25,9 @@ public class EmbeddedQuestDbStatusHistoryRepositoryForNodeTest extends AbstractE
|
||||
|
||||
@Test
|
||||
public void testReadingEmptyRepository() {
|
||||
// when
|
||||
final StatusHistory nodeStatusHistory = testSubject.getNodeStatusHistory(START, END);
|
||||
final GarbageCollectionHistory garbageCollectionHistory = testSubject.getGarbageCollectionHistory(START, END);
|
||||
final StatusHistory nodeStatusHistory = repository.getNodeStatusHistory(START, END);
|
||||
final GarbageCollectionHistory garbageCollectionHistory = repository.getGarbageCollectionHistory(START, END);
|
||||
|
||||
// then
|
||||
assertTrue(nodeStatusHistory.getStatusSnapshots().isEmpty());
|
||||
assertTrue(garbageCollectionHistory.getGarbageCollectionStatuses("gc1").isEmpty());
|
||||
assertTrue(garbageCollectionHistory.getGarbageCollectionStatuses("gc2").isEmpty());
|
||||
@ -37,16 +35,13 @@ public class EmbeddedQuestDbStatusHistoryRepositoryForNodeTest extends AbstractE
|
||||
|
||||
@Test
|
||||
public void testWritingThenReadingComponents() throws Exception {
|
||||
// given
|
||||
testSubject.capture(givenNodeStatus(), new ProcessGroupStatus(), givenGarbageCollectionStatuses(INSERTED_AT), INSERTED_AT);
|
||||
givenWaitUntilPersisted();
|
||||
repository.capture(givenNodeStatus(), new ProcessGroupStatus(), givenGarbageCollectionStatuses(INSERTED_AT), INSERTED_AT);
|
||||
waitUntilPersisted();
|
||||
|
||||
// when & then - reading node status
|
||||
final StatusHistory nodeStatusHistory = testSubject.getNodeStatusHistory(START, END);
|
||||
final StatusHistory nodeStatusHistory = repository.getNodeStatusHistory(START, END);
|
||||
assertNodeStatusHistory(nodeStatusHistory.getStatusSnapshots().get(0));
|
||||
|
||||
// when & then - garbage collection status
|
||||
final GarbageCollectionHistory garbageCollectionHistory = testSubject.getGarbageCollectionHistory(START, END);
|
||||
final GarbageCollectionHistory garbageCollectionHistory = repository.getGarbageCollectionHistory(START, END);
|
||||
assertGc1Status(garbageCollectionHistory.getGarbageCollectionStatuses("gc1"));
|
||||
assertGc2Status(garbageCollectionHistory.getGarbageCollectionStatuses("gc2"));
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user