NIFI-11291: Ensuring exceptional EventIterators are closed in PartitionedEventStore

This closes #7047

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Joe Gresock 2023-03-15 16:30:13 -04:00 committed by exceptionfactory
parent 6576163958
commit 8227c19b97
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 39 additions and 1 deletions

View File

@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@ -183,12 +184,14 @@ public abstract class PartitionedEventStore implements EventStore {
final SortedMap<ProvenanceEventRecord, EventIterator> recordToIteratorMap = new TreeMap<>(
(o1, o2) -> Long.compare(o1.getEventId(), o2.getEventId()));
final Collection<EventIterator> createdIterators = new ArrayList<>();
try {
// Seed our map with the first event in each Partition.
for (final EventStorePartition partition : getPartitions()) {
final EventAuthorizer nonNullAuthorizer = authorizer == null ? EventAuthorizer.GRANT_ALL : authorizer;
final EventIterator partitionIterator = eventIteratorFactory.apply(partition);
final EventIterator iterator = new AuthorizingEventIterator(partitionIterator, nonNullAuthorizer, transformer);
createdIterators.add(iterator);
final Optional<ProvenanceEventRecord> option = iterator.nextEvent();
if (option.isPresent()) {
@ -223,7 +226,7 @@ public abstract class PartitionedEventStore implements EventStore {
return selectedEvents;
} finally {
// Ensure that we close all record readers that have been created
for (final EventIterator iterator : recordToIteratorMap.values()) {
for (final EventIterator iterator : createdIterators) {
try {
iterator.close();
} catch (final Exception e) {

View File

@ -37,6 +37,7 @@ import org.apache.nifi.provenance.toc.TocWriter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
@ -50,12 +51,16 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;
public class TestPartitionedWriteAheadEventStore {
private static final RecordWriterFactory writerFactory = (file, idGen, compress, createToc) -> RecordWriters.newSchemaRecordWriter(file, idGen, compress, createToc);
@ -417,6 +422,36 @@ public class TestPartitionedWriteAheadEventStore {
}
}
@Test
public void testCloseIterators() throws IOException {
final RepositoryConfiguration config = createConfig();
final PartitionedWriteAheadEventStore store = new PartitionedWriteAheadEventStore(config, writerFactory, readerFactory, EventReporter.NO_OP, new EventFileManager());
store.initialize();
final PartitionedWriteAheadEventStore spy = Mockito.spy(store);
final List<WriteAheadStorePartition> partitions = new ArrayList<>();
final WriteAheadStorePartition exceptionalPartition = Mockito.mock(WriteAheadStorePartition.class);
final AtomicInteger iteratorsClosed = new AtomicInteger(0);
when(exceptionalPartition.createEventIterator(anyLong())).thenReturn(new EventIterator() {
@Override
public Optional<ProvenanceEventRecord> nextEvent() throws IOException {
throw new IOException("An exception");
}
@Override
public void close() throws IOException {
iteratorsClosed.incrementAndGet();
}
});
when(exceptionalPartition.getMaxEventId()).thenReturn(10L);
partitions.add(exceptionalPartition);
when(spy.getPartitions()).thenReturn(partitions);
assertThrows(IOException.class, () -> spy.getEvents(1, 5));
assertEquals(1, iteratorsClosed.get());
}
@Test
public void testGetEventsByTimestamp() throws IOException {