NIFI-11891 Added No Tracking listing strategy to ListGCS

This closes #7570

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Pierre Villard 2023-08-03 16:25:07 +02:00 committed by exceptionfactory
parent 5cb15b484c
commit 62487f575a
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 124 additions and 7 deletions

View File

@ -182,12 +182,17 @@ public class ListGCSBucket extends AbstractGCSProcessor {
" However an additional DistributedMapCache controller service is required and more JVM heap memory is used." + " However an additional DistributedMapCache controller service is required and more JVM heap memory is used." +
" For more information on how the 'Entity Tracking Time Window' property works, see the description."); " For more information on how the 'Entity Tracking Time Window' property works, see the description.");
public static final AllowableValue NO_TRACKING = new AllowableValue("none", "No Tracking",
"This strategy lists all entities without any tracking. The same entities will be listed each time" +
" this processor is scheduled. It is recommended to change the default run schedule value." +
" Any property that relates to the persisting state will be ignored.");
public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder() public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder()
.name("listing-strategy") .name("listing-strategy")
.displayName("Listing Strategy") .displayName("Listing Strategy")
.description("Specify how to determine new/updated entities. See each strategy descriptions for detail.") .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
.required(true) .required(true)
.allowableValues(BY_TIMESTAMPS, BY_ENTITIES) .allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING)
.defaultValue(BY_TIMESTAMPS.getValue()) .defaultValue(BY_TIMESTAMPS.getValue())
.build(); .build();
@ -291,7 +296,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
try { try {
listedEntityTracker.clearListedEntities(); listedEntityTracker.clearListedEntities();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Failed to reset previously listed entities due to " + e, e); throw new RuntimeException("Failed to reset previously listed entities", e);
} }
} }
resetEntityTrackingState = false; resetEntityTrackingState = false;
@ -396,11 +401,31 @@ public class ListGCSBucket extends AbstractGCSProcessor {
listByTrackingTimestamps(context, session); listByTrackingTimestamps(context, session);
} else if (BY_ENTITIES.equals(listingStrategy)) { } else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session); listByTrackingEntities(context, session);
} else if (NO_TRACKING.equals(listingStrategy)) {
listNoTracking(context, session);
} else { } else {
throw new ProcessException("Unknown listing strategy: " + listingStrategy); throw new ProcessException("Unknown listing strategy: " + listingStrategy);
} }
} }
private void listNoTracking(ProcessContext context, ProcessSession session) {
final long startNanos = System.nanoTime();
final ListingAction listingAction = new NoTrackingListingAction(context, session);
try {
listBucket(context, listingAction);
} catch (final Exception e) {
getLogger().error("Failed to list contents of GCS Bucket", e);
listingAction.getBlobWriter().finishListingExceptionally(e);
session.rollback();
context.yield();
return;
}
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully listed GCS bucket {} in {} millis", context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(), listMillis);
}
private void listByTrackingTimestamps(ProcessContext context, ProcessSession session) { private void listByTrackingTimestamps(ProcessContext context, ProcessSession session) {
try { try {
restoreState(session); restoreState(session);
@ -416,7 +441,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
try { try {
listBucket(context, listingAction); listBucket(context, listingAction);
} catch (final Exception e) { } catch (final Exception e) {
getLogger().error("Failed to list contents of GCS Bucket due to {}", new Object[] {e}, e); getLogger().error("Failed to list contents of GCS Bucket", e);
listingAction.getBlobWriter().finishListingExceptionally(e); listingAction.getBlobWriter().finishListingExceptionally(e);
session.rollback(); session.rollback();
context.yield(); context.yield();
@ -424,7 +449,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
} }
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{ context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(), listMillis }); getLogger().info("Successfully listed GCS bucket {} in {} millis", context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(), listMillis);
} }
private void listBucket(final ProcessContext context, final ListingAction listingAction) throws IOException, SchemaNotFoundException { private void listBucket(final ProcessContext context, final ListingAction listingAction) throws IOException, SchemaNotFoundException {
@ -524,7 +549,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
private void commit(final ProcessSession session, final int listCount) { private void commit(final ProcessSession session, final int listCount) {
if (listCount > 0) { if (listCount > 0) {
getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount}); getLogger().info("Successfully listed {} new files from GCS; routing to success", listCount);
session.commitAsync(); session.commitAsync();
} }
} }
@ -541,6 +566,49 @@ public class ListGCSBucket extends AbstractGCSProcessor {
void commit(int listCount); void commit(int listCount);
} }
private class NoTrackingListingAction implements ListingAction<BlobWriter> {
final ProcessContext context;
final ProcessSession session;
final BlobWriter blobWriter;
private NoTrackingListingAction(final ProcessContext context, final ProcessSession session) {
this.context = context;
this.session = session;
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
if (writerFactory == null) {
blobWriter = new AttributeBlobWriter(session);
} else {
blobWriter = new RecordBlobWriter(session, writerFactory, getLogger());
}
}
@Override
public boolean skipBlob(final Blob blob) {
return false;
}
@Override
public void commit(final int listCount) {
ListGCSBucket.this.commit(session, listCount);
}
@Override
public BlobWriter getBlobWriter() {
return blobWriter;
}
@Override
public Storage getCloudService() {
return ListGCSBucket.this.getCloudService();
}
@Override
public void finishListing(final int listCount, final long maxTimestamp, final Set<String> keysMatchingTimestamp) {
// nothing to do
}
}
private class TriggerListingAction implements ListingAction<BlobWriter> { private class TriggerListingAction implements ListingAction<BlobWriter> {
final ProcessContext context; final ProcessContext context;
final ProcessSession session; final ProcessSession session;
@ -645,7 +713,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
writer.finishListing(); writer.finishListing();
} catch (final Exception e) { } catch (final Exception e) {
getLogger().error("Failed to list contents of bucket due to {}", new Object[] {e}, e); getLogger().error("Failed to list contents of bucket", e);
writer.finishListingExceptionally(e); writer.finishListingExceptionally(e);
session.rollback(); session.rollback();
context.yield(); context.yield();
@ -827,7 +895,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
try { try {
recordWriter.close(); recordWriter.close();
} catch (IOException e) { } catch (IOException e) {
logger.error("Failed to write listing as Records due to {}", new Object[] {e}, e); logger.error("Failed to write listing as Records", e);
} }
session.remove(flowFile); session.remove(flowFile);

View File

@ -315,6 +315,55 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals(Collections.singleton("blob-key-2"), processor.getStateKeys()); assertEquals(Collections.singleton("blob-key-2"), processor.getStateKeys());
} }
@Test
public void testNoTrackingListing() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.setProperty(ListGCSBucket.LISTING_STRATEGY, ListGCSBucket.NO_TRACKING);
runner.assertValid();
final Iterable<Blob> mockList = Arrays.asList(
buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
buildMockBlob("blob-bucket-2", "blob-key-2", 3L)
);
when(mockBlobPage.getValues()).thenReturn(mockList);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
runner.run();
when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
verifyConfigVerification(runner, processor, 2);
final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
MockFlowFile flowFile = successes.get(0);
assertEquals("blob-bucket-1", flowFile.getAttribute(BUCKET_ATTR));
assertEquals("blob-key-1", flowFile.getAttribute(KEY_ATTR));
assertEquals("2", flowFile.getAttribute(UPDATE_TIME_ATTR));
flowFile = successes.get(1);
assertEquals("blob-bucket-2", flowFile.getAttribute(BUCKET_ATTR));
assertEquals("blob-key-2",flowFile.getAttribute(KEY_ATTR));
assertEquals("3", flowFile.getAttribute(UPDATE_TIME_ATTR));
runner.clearTransferState();
runner.run();
runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
assertEquals(0, processor.getStateTimestamp());
assertEquals(0, processor.getStateKeys().size());
}
@Test @Test
public void testOldValues() throws Exception { public void testOldValues() throws Exception {
reset(storage, mockBlobPage); reset(storage, mockBlobPage);