From 8227c19b970345706c072295bc8dcc075a8f1b67 Mon Sep 17 00:00:00 2001 From: Joe Gresock Date: Wed, 15 Mar 2023 16:30:13 -0400 Subject: [PATCH] NIFI-11291: Ensuring exceptional EventIterators are closed in PartitionedEventStore This closes #7047 Signed-off-by: David Handermann --- .../store/PartitionedEventStore.java | 5 ++- .../TestPartitionedWriteAheadEventStore.java | 35 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java index a320f5adb9..25fb559dc5 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/PartitionedEventStore.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -183,12 +184,14 @@ public abstract class PartitionedEventStore implements EventStore { final SortedMap recordToIteratorMap = new TreeMap<>( (o1, o2) -> Long.compare(o1.getEventId(), o2.getEventId())); + final Collection createdIterators = new ArrayList<>(); try { // Seed our map with the first event in each Partition. for (final EventStorePartition partition : getPartitions()) { final EventAuthorizer nonNullAuthorizer = authorizer == null ? EventAuthorizer.GRANT_ALL : authorizer; final EventIterator partitionIterator = eventIteratorFactory.apply(partition); final EventIterator iterator = new AuthorizingEventIterator(partitionIterator, nonNullAuthorizer, transformer); + createdIterators.add(iterator); final Optional option = iterator.nextEvent(); if (option.isPresent()) { @@ -223,7 +226,7 @@ public abstract class PartitionedEventStore implements EventStore { return selectedEvents; } finally { // Ensure that we close all record readers that have been created - for (final EventIterator iterator : recordToIteratorMap.values()) { + for (final EventIterator iterator : createdIterators) { try { iterator.close(); } catch (final Exception e) { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestPartitionedWriteAheadEventStore.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestPartitionedWriteAheadEventStore.java index f63d394662..39f7406b89 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestPartitionedWriteAheadEventStore.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/store/TestPartitionedWriteAheadEventStore.java @@ -37,6 +37,7 @@ import org.apache.nifi.provenance.toc.TocWriter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; @@ -50,12 +51,16 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.when; public class TestPartitionedWriteAheadEventStore { private static final RecordWriterFactory writerFactory = (file, idGen, compress, createToc) -> RecordWriters.newSchemaRecordWriter(file, idGen, compress, createToc); @@ -417,6 +422,36 @@ public class TestPartitionedWriteAheadEventStore { } } + @Test + public void testCloseIterators() throws IOException { + final RepositoryConfiguration config = createConfig(); + final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager()); + store.initialize(); + + final PartitionedWriteAheadEventStore spy = Mockito.spy(store); + + final List partitions = new ArrayList<>(); + final WriteAheadStorePartition exceptionalPartition = Mockito.mock(WriteAheadStorePartition.class); + + final AtomicInteger iteratorsClosed = new AtomicInteger(0); + when(exceptionalPartition.createEventIterator(anyLong())).thenReturn(new EventIterator() { + @Override + public Optional nextEvent() throws IOException { + throw new IOException("An exception"); + } + + @Override + public void close() throws IOException { + iteratorsClosed.incrementAndGet(); + } + }); + when(exceptionalPartition.getMaxEventId()).thenReturn(10L); + partitions.add(exceptionalPartition); + when(spy.getPartitions()).thenReturn(partitions); + + assertThrows(IOException.class, () -> spy.getEvents(1, 5)); + assertEquals(1, iteratorsClosed.get()); + } @Test public void testGetEventsByTimestamp() throws IOException {