From 62487f575ab89328d37b8486bb00b7109023289e Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Thu, 3 Aug 2023 16:25:07 +0200 Subject: [PATCH] NIFI-11891 Added No Tracking listing strategy to ListGCS This closes #7570 Signed-off-by: David Handermann --- .../processors/gcp/storage/ListGCSBucket.java | 82 +++++++++++++++++-- .../gcp/storage/ListGCSBucketTest.java | 49 +++++++++++ 2 files changed, 124 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java index 7a5eecb22e..1b156aac71 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java @@ -182,12 +182,17 @@ public class ListGCSBucket extends AbstractGCSProcessor { " However an additional DistributedMapCache controller service is required and more JVM heap memory is used." + " For more information on how the 'Entity Tracking Time Window' property works, see the description."); + public static final AllowableValue NO_TRACKING = new AllowableValue("none", "No Tracking", + "This strategy lists all entities without any tracking. The same entities will be listed each time" + + " this processor is scheduled. It is recommended to change the default run schedule value." + + " Any property that relates to the persisting state will be ignored."); + public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder() .name("listing-strategy") .displayName("Listing Strategy") .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.") .required(true) - .allowableValues(BY_TIMESTAMPS, BY_ENTITIES) + .allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING) .defaultValue(BY_TIMESTAMPS.getValue()) .build(); @@ -291,7 +296,7 @@ public class ListGCSBucket extends AbstractGCSProcessor { try { listedEntityTracker.clearListedEntities(); } catch (IOException e) { - throw new RuntimeException("Failed to reset previously listed entities due to " + e, e); + throw new RuntimeException("Failed to reset previously listed entities", e); } } resetEntityTrackingState = false; @@ -396,11 +401,31 @@ public class ListGCSBucket extends AbstractGCSProcessor { listByTrackingTimestamps(context, session); } else if (BY_ENTITIES.equals(listingStrategy)) { listByTrackingEntities(context, session); + } else if (NO_TRACKING.equals(listingStrategy)) { + listNoTracking(context, session); } else { throw new ProcessException("Unknown listing strategy: " + listingStrategy); } } + private void listNoTracking(ProcessContext context, ProcessSession session) { + final long startNanos = System.nanoTime(); + final ListingAction listingAction = new NoTrackingListingAction(context, session); + + try { + listBucket(context, listingAction); + } catch (final Exception e) { + getLogger().error("Failed to list contents of GCS Bucket", e); + listingAction.getBlobWriter().finishListingExceptionally(e); + session.rollback(); + context.yield(); + return; + } + + final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + getLogger().info("Successfully listed GCS bucket {} in {} millis", context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(), listMillis); + } + private void listByTrackingTimestamps(ProcessContext context, ProcessSession session) { try { restoreState(session); @@ -416,7 +441,7 @@ public class ListGCSBucket extends AbstractGCSProcessor { try { listBucket(context, listingAction); } catch (final Exception e) { - getLogger().error("Failed to list contents of GCS Bucket due to {}", new Object[] {e}, e); + getLogger().error("Failed to list contents of GCS Bucket", e); listingAction.getBlobWriter().finishListingExceptionally(e); session.rollback(); context.yield(); @@ -424,7 +449,7 @@ public class ListGCSBucket extends AbstractGCSProcessor { } final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{ context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(), listMillis }); + getLogger().info("Successfully listed GCS bucket {} in {} millis", context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(), listMillis); } private void listBucket(final ProcessContext context, final ListingAction listingAction) throws IOException, SchemaNotFoundException { @@ -524,7 +549,7 @@ public class ListGCSBucket extends AbstractGCSProcessor { private void commit(final ProcessSession session, final int listCount) { if (listCount > 0) { - getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount}); + getLogger().info("Successfully listed {} new files from GCS; routing to success", listCount); session.commitAsync(); } } @@ -541,6 +566,49 @@ public class ListGCSBucket extends AbstractGCSProcessor { void commit(int listCount); } + private class NoTrackingListingAction implements ListingAction { + final ProcessContext context; + final ProcessSession session; + final BlobWriter blobWriter; + + private NoTrackingListingAction(final ProcessContext context, final ProcessSession session) { + this.context = context; + this.session = session; + + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + if (writerFactory == null) { + blobWriter = new AttributeBlobWriter(session); + } else { + blobWriter = new RecordBlobWriter(session, writerFactory, getLogger()); + } + } + + @Override + public boolean skipBlob(final Blob blob) { + return false; + } + + @Override + public void commit(final int listCount) { + ListGCSBucket.this.commit(session, listCount); + } + + @Override + public BlobWriter getBlobWriter() { + return blobWriter; + } + + @Override + public Storage getCloudService() { + return ListGCSBucket.this.getCloudService(); + } + + @Override + public void finishListing(final int listCount, final long maxTimestamp, final Set keysMatchingTimestamp) { + // nothing to do + } + } + private class TriggerListingAction implements ListingAction { final ProcessContext context; final ProcessSession session; @@ -645,7 +713,7 @@ public class ListGCSBucket extends AbstractGCSProcessor { writer.finishListing(); } catch (final Exception e) { - getLogger().error("Failed to list contents of bucket due to {}", new Object[] {e}, e); + getLogger().error("Failed to list contents of bucket", e); writer.finishListingExceptionally(e); session.rollback(); context.yield(); @@ -827,7 +895,7 @@ public class ListGCSBucket extends AbstractGCSProcessor { try { recordWriter.close(); } catch (IOException e) { - logger.error("Failed to write listing as Records due to {}", new Object[] {e}, e); + logger.error("Failed to write listing as Records", e); } session.remove(flowFile); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java index 5b997342e1..c1c4b2a8c6 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java @@ -315,6 +315,55 @@ public class ListGCSBucketTest extends AbstractGCSTest { assertEquals(Collections.singleton("blob-key-2"), processor.getStateKeys()); } + @Test + public void testNoTrackingListing() throws Exception { + reset(storage, mockBlobPage); + final ListGCSBucket processor = getProcessor(); + final TestRunner runner = buildNewRunner(processor); + addRequiredPropertiesToRunner(runner); + runner.setProperty(ListGCSBucket.LISTING_STRATEGY, ListGCSBucket.NO_TRACKING); + runner.assertValid(); + + final Iterable mockList = Arrays.asList( + buildMockBlob("blob-bucket-1", "blob-key-1", 2L), + buildMockBlob("blob-bucket-2", "blob-key-2", 3L) + ); + + when(mockBlobPage.getValues()).thenReturn(mockList); + when(mockBlobPage.getNextPage()).thenReturn(null); + when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage); + + runner.run(); + + when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true)); + + runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS); + runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2); + verifyConfigVerification(runner, processor, 2); + + final List successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS); + + MockFlowFile flowFile = successes.get(0); + assertEquals("blob-bucket-1", flowFile.getAttribute(BUCKET_ATTR)); + assertEquals("blob-key-1", flowFile.getAttribute(KEY_ATTR)); + assertEquals("2", flowFile.getAttribute(UPDATE_TIME_ATTR)); + + flowFile = successes.get(1); + assertEquals("blob-bucket-2", flowFile.getAttribute(BUCKET_ATTR)); + assertEquals("blob-key-2",flowFile.getAttribute(KEY_ATTR)); + assertEquals("3", flowFile.getAttribute(UPDATE_TIME_ATTR)); + + runner.clearTransferState(); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS); + runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2); + + assertEquals(0, processor.getStateTimestamp()); + assertEquals(0, processor.getStateKeys().size()); + } + @Test public void testOldValues() throws Exception { reset(storage, mockBlobPage);