NIFI-6636: Fixed ListGCSBucket file duplication error

ListGCSBucket duplicated files if they arrived not in alphabetical order.
The set storing the name of the latest blob (which was loaded with the highest
timestamp during the previous run of the processor) was cleared too early.

Also changed the state persisting logic: it is now saved only once at the end
of onTrigger() (similar to ListS3). Some inconsistent state (only blob names
without the timestamp) was also saved earlier.

This closes #3702.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Peter Turcsanyi 2019-09-06 13:49:39 +02:00 committed by Mark Payne
parent 8a8b9c1d08
commit 21a27c8bb0
2 changed files with 361 additions and 66 deletions

View File

@ -259,13 +259,16 @@ public class ListGCSBucket extends AbstractGCSProcessor {
}
final Storage storage = getCloudService();
int listCount = 0;
long maxTimestamp = 0L;
Set<String> maxKeys = new HashSet<>();
Page<Blob> blobPages = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[listOptions.size()]));
Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[listOptions.size()]));
do {
for (Blob blob : blobPages.getValues()) {
int listCount = 0;
for (Blob blob : blobPage.getValues()) {
long lastModified = blob.getUpdateTime();
if (lastModified < currentTimestamp
|| lastModified == currentTimestamp && currentKeys.contains(blob.getName())) {
@ -381,40 +384,36 @@ public class ListGCSBucket extends AbstractGCSProcessor {
// Update state
if (lastModified > maxTimestamp) {
maxTimestamp = lastModified;
currentKeys.clear();
maxKeys.clear();
}
if (lastModified == maxTimestamp) {
currentKeys.add(blob.getName());
maxKeys.add(blob.getName());
}
listCount++;
}
blobPages = blobPages.getNextPage();
commit(context, session, listCount);
listCount = 0;
} while (blobPages != null);
currentTimestamp = maxTimestamp;
blobPage = blobPage.getNextPage();
} while (blobPage != null);
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
if (!commit(context, session, listCount)) {
if (currentTimestamp > 0) {
persistState(context);
}
if (maxTimestamp != 0) {
currentTimestamp = maxTimestamp;
currentKeys = maxKeys;
persistState(context);
} else {
getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", new Object[]{bucket});
context.yield();
}
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
}
private boolean commit(final ProcessContext context, final ProcessSession session, int listCount) {
boolean willCommit = listCount > 0;
if (willCommit) {
private void commit(final ProcessContext context, final ProcessSession session, int listCount) {
if (listCount > 0) {
getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
session.commit();
persistState(context);
}
return willCommit;
}
}

View File

@ -256,7 +256,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Mock
Page<Blob> mockBlobPages;
Page<Blob> mockBlobPage;
private Blob buildMockBlob(String bucket, String key, long updateTime) {
final Blob blob = mock(Blob.class);
@ -268,7 +268,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testSuccessfulList() throws Exception {
reset(storage, mockBlobPages);
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -279,13 +279,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
buildMockBlob("blob-bucket-2", "blob-key-2", 3L)
);
when(mockBlobPages.getValues())
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPages.getNextPage()).thenReturn(null);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPages);
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@ -343,7 +343,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testOldValues() throws Exception {
reset(storage, mockBlobPages);
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -353,13 +353,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
buildMockBlob("blob-bucket-1", "blob-key-1", 2L)
);
when(mockBlobPages.getValues())
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPages.getNextPage()).thenReturn(null);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPages);
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.enqueue("test2");
@ -384,7 +384,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testEmptyList() throws Exception {
reset(storage, mockBlobPages);
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -392,13 +392,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of();
when(mockBlobPages.getValues())
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPages.getNextPage()).thenReturn(null);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPages);
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@ -412,9 +412,305 @@ public class ListGCSBucketTest extends AbstractGCSTest {
);
}
@Test
public void testListWithStateAndFilesComingInAlphabeticalOrder() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
final Map<String, String> state = ImmutableMap.of(
ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-1"
);
runner.getStateManager().setState(state, Scope.CLUSTER);
final Iterable<Blob> mockList = ImmutableList.of(
buildMockBlob("blob-bucket-1", "blob-key-1", 1L),
buildMockBlob("blob-bucket-2", "blob-key-2", 2L)
);
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
MockFlowFile flowFile = successes.get(0);
assertEquals(
"blob-bucket-2",
flowFile.getAttribute(BUCKET_ATTR)
);
assertEquals(
"blob-key-2",
flowFile.getAttribute(KEY_ATTR)
);
assertEquals(
"2",
flowFile.getAttribute(UPDATE_TIME_ATTR)
);
assertEquals(
2L,
processor.currentTimestamp
);
assertEquals(
ImmutableSet.of(
"blob-key-2"
),
processor.currentKeys
);
}
@Test
public void testListWithStateAndFilesComingNotInAlphabeticalOrder() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
final Map<String, String> state = ImmutableMap.of(
ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
);
runner.getStateManager().setState(state, Scope.CLUSTER);
final Iterable<Blob> mockList = ImmutableList.of(
buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
buildMockBlob("blob-bucket-2", "blob-key-2", 1L)
);
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
MockFlowFile flowFile = successes.get(0);
assertEquals(
"blob-bucket-1",
flowFile.getAttribute(BUCKET_ATTR)
);
assertEquals(
"blob-key-1",
flowFile.getAttribute(KEY_ATTR)
);
assertEquals(
"2",
flowFile.getAttribute(UPDATE_TIME_ATTR)
);
assertEquals(
2L,
processor.currentTimestamp
);
assertEquals(
ImmutableSet.of(
"blob-key-1"
),
processor.currentKeys
);
}
@Test
public void testListWithStateAndNewFilesComingWithTheSameTimestamp() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
final Map<String, String> state = ImmutableMap.of(
ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
);
runner.getStateManager().setState(state, Scope.CLUSTER);
final Iterable<Blob> mockList = ImmutableList.of(
buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
buildMockBlob("blob-bucket-2", "blob-key-2", 1L),
buildMockBlob("blob-bucket-3", "blob-key-3", 2L)
);
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
MockFlowFile flowFile = successes.get(0);
assertEquals(
"blob-bucket-1",
flowFile.getAttribute(BUCKET_ATTR)
);
assertEquals(
"blob-key-1",
flowFile.getAttribute(KEY_ATTR)
);
assertEquals(
"2",
flowFile.getAttribute(UPDATE_TIME_ATTR)
);
flowFile = successes.get(1);
assertEquals(
"blob-bucket-3",
flowFile.getAttribute(BUCKET_ATTR)
);
assertEquals(
"blob-key-3",
flowFile.getAttribute(KEY_ATTR)
);
assertEquals(
"2",
flowFile.getAttribute(UPDATE_TIME_ATTR)
);
assertEquals(
2L,
processor.currentTimestamp
);
assertEquals(
ImmutableSet.of(
"blob-key-1",
"blob-key-3"
),
processor.currentKeys
);
}
@Test
public void testListWithStateAndNewFilesComingWithTheCurrentTimestamp() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
final Map<String, String> state = ImmutableMap.of(
ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
);
runner.getStateManager().setState(state, Scope.CLUSTER);
final Iterable<Blob> mockList = ImmutableList.of(
buildMockBlob("blob-bucket-1", "blob-key-1", 1L),
buildMockBlob("blob-bucket-2", "blob-key-2", 1L),
buildMockBlob("blob-bucket-3", "blob-key-3", 1L)
);
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
MockFlowFile flowFile = successes.get(0);
assertEquals(
"blob-bucket-1",
flowFile.getAttribute(BUCKET_ATTR)
);
assertEquals(
"blob-key-1",
flowFile.getAttribute(KEY_ATTR)
);
assertEquals(
"1",
flowFile.getAttribute(UPDATE_TIME_ATTR)
);
flowFile = successes.get(1);
assertEquals(
"blob-bucket-3",
flowFile.getAttribute(BUCKET_ATTR)
);
assertEquals(
"blob-key-3",
flowFile.getAttribute(KEY_ATTR)
);
assertEquals(
"1",
flowFile.getAttribute(UPDATE_TIME_ATTR)
);
assertEquals(
1L,
processor.currentTimestamp
);
assertEquals(
ImmutableSet.of(
"blob-key-1",
"blob-key-3"
),
processor.currentKeys
);
}
@Test
public void testAttributesSet() throws Exception {
reset(storage, mockBlobPages);
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -447,13 +743,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
when(mockBlobPages.getValues())
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPages.getNextPage()).thenReturn(null);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPages);
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@ -555,7 +851,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testAclOwnerUser() throws Exception {
reset(storage, mockBlobPages);
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -568,13 +864,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
when(mockBlobPages.getValues())
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPages.getNextPage()).thenReturn(null);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPages);
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@ -598,7 +894,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testAclOwnerGroup() throws Exception {
reset(storage, mockBlobPages);
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -611,13 +907,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
when(mockBlobPages.getValues())
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPages.getNextPage()).thenReturn(null);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPages);
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@ -642,7 +938,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testAclOwnerDomain() throws Exception {
reset(storage, mockBlobPages);
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -655,13 +951,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
when(mockBlobPages.getValues())
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPages.getNextPage()).thenReturn(null);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPages);
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@ -686,7 +982,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testAclOwnerProject() throws Exception {
reset(storage, mockBlobPages);
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -699,13 +995,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
when(mockBlobPages.getValues())
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPages.getNextPage()).thenReturn(null);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPages);
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@ -729,7 +1025,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testYieldOnBadStateRestore() throws Exception {
reset(storage, mockBlobPages);
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -737,13 +1033,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of();
when(mockBlobPages.getValues())
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPages.getNextPage()).thenReturn(null);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPages);
.thenReturn(mockBlobPage);
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
runner.enqueue("test");
@ -758,7 +1054,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListOptionsPrefix() throws Exception {
reset(storage, mockBlobPages);
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -772,13 +1068,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of();
when(mockBlobPages.getValues())
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPages.getNextPage()).thenReturn(null);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), argumentCaptor.capture()))
.thenReturn(mockBlobPages);
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@ -793,7 +1089,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListOptionsVersions() throws Exception {
reset(storage, mockBlobPages);
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@ -806,13 +1102,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of();
when(mockBlobPages.getValues())
when(mockBlobPage.getValues())
.thenReturn(mockList);
when(mockBlobPages.getNextPage()).thenReturn(null);
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), argumentCaptor.capture()))
.thenReturn(mockBlobPages);
.thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();