NIFI-12441 Added No Tracking Strategy to ListS3

This closes #8088

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Juldrixx 2023-12-23 04:38:03 -06:00 committed by exceptionfactory
parent dd45e7edcb
commit 9a91933909
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 157 additions and 12 deletions

View File

@ -143,12 +143,17 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
" However an additional DistributedMapCache controller service is required and more JVM heap memory is used." +
" For more information on how the 'Entity Tracking Time Window' property works, see the description.");
public static final AllowableValue NO_TRACKING = new AllowableValue("none", "No Tracking",
"This strategy lists all entities without any tracking. The same entities will be listed each time" +
" this processor is scheduled. It is recommended to change the default run schedule value." +
" Any property that relates to the persisting state will be ignored.");
public static final PropertyDescriptor LISTING_STRATEGY = new Builder()
.name("listing-strategy")
.displayName("Listing Strategy")
.description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
.required(true)
.allowableValues(BY_TIMESTAMPS, BY_ENTITIES)
.allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING)
.defaultValue(BY_TIMESTAMPS.getValue())
.build();
@ -337,7 +342,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
try {
listedEntityTracker.clearListedEntities();
} catch (IOException e) {
throw new RuntimeException("Failed to reset previously listed entities due to " + e, e);
throw new RuntimeException("Failed to reset previously listed entities", e);
}
}
resetEntityTrackingState = false;
@ -457,11 +462,92 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
listByTrackingTimestamps(context, session);
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
} else if (NO_TRACKING.equals(listingStrategy)) {
listNoTracking(context, session);
} else {
throw new ProcessException("Unknown listing strategy: " + listingStrategy);
}
}
private void listNoTracking(ProcessContext context, ProcessSession session) {
final AmazonS3 client = getClient(context);
S3BucketLister bucketLister = getS3BucketLister(context, client);
final long startNanos = System.nanoTime();
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
final long listingTimestamp = System.currentTimeMillis();
final String bucket = context.getProperty(BUCKET_WITHOUT_DEFAULT_VALUE).evaluateAttributeExpressions().getValue();
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
int listCount = 0;
int totalListCount = 0;
getLogger().trace("Start listing, listingTimestamp={}", listingTimestamp);
final S3ObjectWriter writer;
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue());
}
try {
writer.beginListing();
do {
VersionListing versionListing = bucketLister.listVersions();
for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
long lastModified = versionSummary.getLastModified().getTime();
if ((maxAgeMilliseconds != null && (lastModified < (listingTimestamp - maxAgeMilliseconds)))
|| lastModified > (listingTimestamp - minAgeMilliseconds)) {
continue;
}
getLogger().trace("Listed key={}, lastModified={}", versionSummary.getKey(), lastModified);
GetObjectTaggingResult taggingResult = getTaggingResult(context, client, versionSummary);
ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary);
// Write the entity to the listing
writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue());
listCount++;
}
bucketLister.setNextMarker();
totalListCount += listCount;
if (listCount >= batchSize && writer.isCheckpoint()) {
getLogger().info("Successfully listed {} new files from S3; routing to success", listCount);
session.commitAsync();
}
listCount = 0;
} while (bucketLister.isTruncated());
writer.finishListing();
} catch (final Exception e) {
getLogger().error("Failed to list contents of bucket", e);
writer.finishListingExceptionally(e);
session.rollback();
context.yield();
return;
}
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully listed S3 bucket {} in {} millis", bucket, listMillis);
if (totalListCount == 0) {
getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", bucket);
context.yield();
}
}
private void listByTrackingTimestamps(ProcessContext context, ProcessSession session) {
try {
restoreState(session);
@ -486,7 +572,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
long latestListedTimestampInThisCycle = listingTimestamp;
final Set<String> listedKeys = new HashSet<>();
getLogger().trace("Start listing, listingTimestamp={}, currentTimestamp={}, currentKeys={}", new Object[]{currentTimestamp, listingTimestamp, currentKeys});
getLogger().trace("Start listing, listingTimestamp={}, currentTimestamp={}, currentKeys={}", currentTimestamp, listingTimestamp, currentKeys);
final S3ObjectWriter writer;
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
@ -509,7 +595,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
continue;
}
getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", new Object[]{versionSummary.getKey(), lastModified, currentKeys});
getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", versionSummary.getKey(), lastModified, currentKeys);
// Write the entity to the listing
final GetObjectTaggingResult taggingResult = getTaggingResult(context, client, versionSummary);
@ -543,7 +629,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
writer.finishListing();
} catch (final Exception e) {
getLogger().error("Failed to list contents of bucket due to {}", e, e);
getLogger().error("Failed to list contents of bucket", e);
writer.finishListingExceptionally(e);
session.rollback();
context.yield();
@ -564,10 +650,10 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
});
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis});
getLogger().info("Successfully listed S3 bucket {} in {} millis", bucket, listMillis);
if (totalListCount == 0) {
getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket});
getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", bucket);
context.yield();
}
}
@ -604,7 +690,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
ProcessContext context,
ProcessSession session,
List<ListableEntityWrapper<S3VersionSummary>> updatedEntities
) throws IOException, SchemaNotFoundException {
) {
publishListing(context, session, updatedEntities);
}
@ -643,7 +729,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
listCount++;
if (listCount >= batchSize && writer.isCheckpoint()) {
getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[]{listCount});
getLogger().info("Successfully listed {} new files from S3; routing to success", listCount);
session.commitAsync();
}
@ -653,7 +739,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
writer.finishListing();
} catch (final Exception e) {
getLogger().error("Failed to list contents of bucket due to {}", e, e);
getLogger().error("Failed to list contents of bucket", e);
writer.finishListingExceptionally(e);
session.rollback();
context.yield();
@ -968,7 +1054,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
try {
recordWriter.close();
} catch (IOException e) {
logger.error("Failed to write listing as Records due to {}", e, e);
logger.error("Failed to write listing as Records", e);
}
session.remove(flowFile);
@ -1068,7 +1154,7 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
}
@Override
public void finishListing() throws IOException {
public void finishListing() {
}
@Override

View File

@ -568,4 +568,63 @@ public class TestListS3 {
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
}
@Test
public void testNoTrackingList() {
runner.setProperty(ListS3.REGION, "eu-west-1");
runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
runner.setProperty(ListS3.LISTING_STRATEGY, ListS3.NO_TRACKING);
Date lastModified = new Date();
ObjectListing objectListing = new ObjectListing();
S3ObjectSummary objectSummary1 = new S3ObjectSummary();
objectSummary1.setBucketName("test-bucket");
objectSummary1.setKey("a");
objectSummary1.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary1);
S3ObjectSummary objectSummary2 = new S3ObjectSummary();
objectSummary2.setBucketName("test-bucket");
objectSummary2.setKey("b/c");
objectSummary2.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary2);
S3ObjectSummary objectSummary3 = new S3ObjectSummary();
objectSummary3.setBucketName("test-bucket");
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest = ArgumentCaptor.forClass(ListObjectsRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertFalse(request.isRequesterPays());
Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("filename", "a");
ff0.assertAttributeEquals("s3.bucket", "test-bucket");
ff0.assertAttributeEquals("s3.region", "eu-west-1");
String lastModifiedTimestamp = String.valueOf(lastModified.getTime());
ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
flowFiles.get(2).assertAttributeEquals("filename", "d/e");
final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())
.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(0).getOutcome());
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(1).getOutcome());
assertTrue(results.get(1).getExplanation().contains("finding 3"));
runner.clearTransferState();
runner.run();
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, null, Scope.CLUSTER);
}
}