NIFI-12936 ListGCSBucket resets its tracking state after configuration change

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8550.
This commit is contained in:
Peter Turcsanyi 2024-03-22 22:32:27 +01:00 committed by Pierre Villard
parent 15d2b49e77
commit a9e246956c
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 299 additions and 214 deletions

View File

@ -278,40 +278,62 @@ public class ListGCSBucket extends AbstractGCSProcessor {
return RELATIONSHIPS;
}
// State tracking
private static final Set<PropertyDescriptor> TRACKING_RESET_PROPERTIES = Set.of(
BUCKET,
PREFIX,
LISTING_STRATEGY
);
// used by Tracking Timestamps tracking strategy
public static final String CURRENT_TIMESTAMP = "currentTimestamp";
public static final String CURRENT_KEY_PREFIX = "key-";
private volatile long currentTimestamp = 0L;
private final Set<String> currentKeys = Collections.synchronizedSet(new HashSet<>());
private volatile boolean justElectedPrimaryNode = false;
private volatile boolean resetEntityTrackingState = false;
// used by Tracking Entities tracking strategy
private volatile ListedEntityTracker<ListableBlob> listedEntityTracker;
private volatile boolean justElectedPrimaryNode = false;
private volatile boolean resetTracking = false;
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (isConfigurationRestored() && TRACKING_RESET_PROPERTIES.contains(descriptor)) {
resetTracking = true;
}
}
@OnScheduled
public void initListedEntityTracker(ProcessContext context) {
final boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
if (listedEntityTracker != null && (resetEntityTrackingState || !isTrackingEntityStrategy)) {
public void initTrackingStrategy(ProcessContext context) throws IOException {
final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
final boolean isTrackingTimestampsStrategy = BY_TIMESTAMPS.getValue().equals(listingStrategy);
final boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(listingStrategy);
if (resetTracking || !isTrackingTimestampsStrategy) {
context.getStateManager().clear(Scope.CLUSTER);
currentTimestamp = 0L;
currentKeys.clear();
}
if (listedEntityTracker != null && (resetTracking || !isTrackingEntityStrategy)) {
try {
listedEntityTracker.clearListedEntities();
listedEntityTracker = null;
} catch (IOException e) {
throw new RuntimeException("Failed to reset previously listed entities", e);
}
}
resetEntityTrackingState = false;
if (isTrackingEntityStrategy) {
if (listedEntityTracker == null) {
listedEntityTracker = createListedEntityTracker();
}
} else {
listedEntityTracker = null;
if (isTrackingEntityStrategy && listedEntityTracker == null) {
listedEntityTracker = createListedEntityTracker();
}
resetTracking = false;
}
protected ListedEntityTracker<ListableBlob> createListedEntityTracker() {
@ -1027,4 +1049,16 @@ public class ListGCSBucket extends AbstractGCSProcessor {
return count;
}
}
long getCurrentTimestamp() {
return currentTimestamp;
}
ListedEntityTracker<ListableBlob> getListedEntityTracker() {
return listedEntityTracker;
}
boolean isResetTracking() {
return resetTracking;
}
}

View File

@ -17,10 +17,31 @@
package org.apache.nifi.processors.gcp.storage;
import com.google.api.gax.paging.Page;
import com.google.cloud.PageImpl;
import com.google.cloud.storage.Acl;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
@ -35,21 +56,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR;
@ -75,11 +81,13 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@ -87,6 +95,7 @@ import static org.mockito.Mockito.when;
*/
@MockitoSettings(strictness = Strictness.LENIENT)
public class ListGCSBucketTest extends AbstractGCSTest {
private static final String PREFIX = "test-prefix";
private static final Boolean USE_GENERATIONS = true;
@ -113,16 +122,29 @@ public class ListGCSBucketTest extends AbstractGCSTest {
private static final Long CREATE_TIME = 1234L;
private static final Long UPDATE_TIME = 4567L;
private final static Long GENERATION = 5L;
private static final long TIMESTAMP = 1234567890;
@Mock
Storage storage;
@Mock
Page<Blob> mockBlobPage;
@Captor
ArgumentCaptor<Storage.BlobListOption> argumentCaptor;
@Override
public ListGCSBucket getProcessor() {
return new ListGCSBucket() {
private TestRunner runner;
private ListGCSBucket processor;
private MockStateManager mockStateManager;
@Mock
private DistributedMapCacheClient mockCache;
@BeforeEach
public void beforeEach() throws Exception {
processor = new ListGCSBucket() {
@Override
protected Storage getCloudService() {
return storage;
@ -133,21 +155,25 @@ public class ListGCSBucketTest extends AbstractGCSTest {
return storage;
}
};
runner = buildNewRunner(processor);
runner.setProperty(ListGCSBucket.BUCKET, BUCKET);
runner.assertValid();
mockStateManager = runner.getStateManager();
}
@Override
public ListGCSBucket getProcessor() {
return processor;
}
@Override
protected void addRequiredPropertiesToRunner(TestRunner runner) {
runner.setProperty(ListGCSBucket.BUCKET, BUCKET);
}
@Test
public void testRestoreFreshState() throws Exception {
reset(storage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
assertFalse(runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getStateVersion().isPresent(), "Cluster StateMap should be fresh (version -1L)");
assertTrue(processor.getStateKeys().isEmpty());
@ -157,17 +183,10 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals(0L, processor.getStateTimestamp());
assertTrue(processor.getStateKeys().isEmpty());
}
@Test
public void testRestorePreviousState() throws Exception {
reset(storage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
final Map<String, String> state = new LinkedHashMap<>();
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(4L));
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "test-key-0");
@ -188,12 +207,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testPersistState() throws Exception {
reset(storage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
assertFalse(
runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getStateVersion().isPresent(),
"Cluster StateMap should be fresh"
@ -215,13 +228,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Test
public void testFailedPersistState() throws Exception {
reset(storage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
public void testFailedPersistState() {
runner.getStateManager().setFailOnStateSet(Scope.CLUSTER, true);
final Set<String> keys = new HashSet<>(Arrays.asList("test-key-0", "test-key-1"));
@ -241,60 +248,10 @@ public class ListGCSBucketTest extends AbstractGCSTest {
// We could do more specific things like check the contents of the LogMessage,
// but that seems too nitpicky.
}
@Mock
Page<Blob> mockBlobPage;
private Blob buildMockBlob(final String bucket, final String key, final long updateTime) {
final Blob blob = mock(Blob.class);
when(blob.getBucket()).thenReturn(bucket);
when(blob.getName()).thenReturn(key);
when(blob.getUpdateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
when(blob.getCreateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
return blob;
}
private Blob buildMockBlobWithoutBucket(final String bucket, final String key, final long updateTime) {
final Blob blob = mock(Blob.class);
when(blob.getName()).thenReturn(key);
when(blob.getUpdateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
when(blob.getCreateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
return blob;
}
private OffsetDateTime offsetDateTime(final long value) {
final Instant instant = Instant.ofEpochMilli(value);
final LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("UTC"));
return OffsetDateTime.of(localDateTime, ZoneOffset.UTC);
}
private void verifyConfigVerification(final TestRunner runner, final ListGCSBucket processor, final int expectedCount) {
final List<ConfigVerificationResult> verificationResults = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertEquals(3, verificationResults.size());
final ConfigVerificationResult cloudServiceResult = verificationResults.get(0);
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, cloudServiceResult.getOutcome());
final ConfigVerificationResult iamPermissionsResult = verificationResults.get(1);
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, iamPermissionsResult.getOutcome());
final ConfigVerificationResult listingResult = verificationResults.get(2);
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, listingResult.getOutcome());
assertTrue(
listingResult.getExplanation().matches(String.format(".*finding %s blobs.*", expectedCount)),
String.format("Expected %s blobs to be counted, but explanation was: %s", expectedCount, listingResult.getExplanation()));
}
@Test
public void testSuccessfulList() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
public void testSuccessfulList() {
final Iterable<Blob> mockList = Arrays.asList(
buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
buildMockBlob("blob-bucket-2", "blob-key-2", 3L)
@ -331,11 +288,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Test
public void testNoTrackingListing() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
public void testNoTrackingListing() {
runner.setProperty(ListGCSBucket.LISTING_STRATEGY, ListGCSBucket.NO_TRACKING);
runner.assertValid();
@ -381,12 +334,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testOldValues() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
final Iterable<Blob> mockList = Collections.singletonList(
buildMockBlob("blob-bucket-1", "blob-key-1", 2L)
);
@ -409,16 +356,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals("2", runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_TIMESTAMP));
}
@Test
public void testEmptyList() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
final Iterable<Blob> mockList = Collections.emptyList();
when(mockBlobPage.getValues()).thenReturn(mockList);
@ -438,12 +377,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListWithStateAndFilesComingInAlphabeticalOrder() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
final Map<String, String> state = new LinkedHashMap<>();
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-1");
@ -482,12 +415,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListWithStateAndFilesComingNotInAlphabeticalOrder() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
final Map<String, String> state = new LinkedHashMap<>();
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
@ -531,12 +458,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListWithStateAndNewFilesComingWithTheSameTimestamp() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
final Map<String, String> state = new LinkedHashMap<>();
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
@ -586,12 +507,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListWithStateAndNewFilesComingWithTheCurrentTimestamp() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
final Map<String, String> state = new LinkedHashMap<>();
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
@ -639,13 +554,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Test
public void testAttributesSet() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
public void testAttributesSet() {
final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
when(blob.getSize()).thenReturn(SIZE);
when(blob.getCacheControl()).thenReturn(CACHE_CONTROL);
@ -705,13 +614,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Test
public void testAclOwnerUser() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
public void testAclOwnerUser() {
final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
final Acl.User mockUser = mock(Acl.User.class);
when(mockUser.getEmail()).thenReturn(OWNER_USER_EMAIL);
@ -734,15 +637,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals("user", flowFile.getAttribute(OWNER_TYPE_ATTR));
}
@Test
public void testAclOwnerGroup() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
public void testAclOwnerGroup() {
final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
final Acl.Group mockGroup = mock(Acl.Group.class);
when(mockGroup.getEmail()).thenReturn(OWNER_GROUP_EMAIL);
@ -765,16 +661,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals("group", flowFile.getAttribute(OWNER_TYPE_ATTR));
}
@Test
public void testAclOwnerDomain() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
public void testAclOwnerDomain() {
final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
final Acl.Domain mockDomain = mock(Acl.Domain.class);
when(mockDomain.getDomain()).thenReturn(OWNER_DOMAIN);
@ -796,16 +684,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals("domain", flowFile.getAttribute(OWNER_TYPE_ATTR));
}
@Test
public void testAclOwnerProject() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
public void testAclOwnerProject() {
final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
final Acl.Project mockProject = mock(Acl.Project.class);
when(mockProject.getProjectId()).thenReturn(OWNER_PROJECT_ID);
@ -828,15 +708,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals("project", flowFile.getAttribute(OWNER_TYPE_ATTR));
}
@Test
public void testYieldOnBadStateRestore() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
public void testYieldOnBadStateRestore() {
final Iterable<Blob> mockList = Collections.emptyList();
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
@ -848,12 +721,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Test
public void testListOptionsPrefix() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
public void testListOptionsPrefix() {
runner.setProperty(ListGCSBucket.PREFIX, PREFIX);
runner.assertValid();
@ -869,14 +737,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals(Storage.BlobListOption.prefix(PREFIX), argumentCaptor.getValue());
}
@Test
public void testListOptionsVersions() throws Exception {
reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
public void testListOptionsVersions() {
runner.setProperty(ListGCSBucket.USE_GENERATIONS, String.valueOf(USE_GENERATIONS));
runner.assertValid();
@ -892,4 +754,193 @@ public class ListGCSBucketTest extends AbstractGCSTest {
Storage.BlobListOption option = argumentCaptor.getValue();
assertEquals(Storage.BlobListOption.versions(true), option);
}
@Test
void testResetTimestampTrackingWhenBucketModified() throws Exception {
setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS);
assertFalse(processor.isResetTracking());
runner.run();
assertEquals(TIMESTAMP, processor.getCurrentTimestamp());
runner.setProperty(ListGCSBucket.BUCKET, "otherBucket");
assertTrue(processor.isResetTracking());
runner.run();
assertEquals(0, processor.getCurrentTimestamp());
mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP, Scope.CLUSTER);
assertFalse(processor.isResetTracking());
}
@Test
void testResetTimestampTrackingWhenPrefixModified() throws Exception {
setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS);
assertFalse(processor.isResetTracking());
runner.run();
assertEquals(TIMESTAMP, processor.getCurrentTimestamp());
runner.setProperty(ListGCSBucket.PREFIX, "prefix2");
assertTrue(processor.isResetTracking());
runner.run();
assertEquals(0, processor.getCurrentTimestamp());
mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP, Scope.CLUSTER);
assertFalse(processor.isResetTracking());
}
@Test
void testResetTimestampTrackingWhenStrategyModified() throws Exception {
setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS);
assertFalse(processor.isResetTracking());
runner.run();
assertEquals(TIMESTAMP, processor.getCurrentTimestamp());
runner.setProperty(ListGCSBucket.LISTING_STRATEGY, ListGCSBucket.NO_TRACKING);
assertTrue(processor.isResetTracking());
runner.run();
assertEquals(0, processor.getCurrentTimestamp());
mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP, Scope.CLUSTER);
assertFalse(processor.isResetTracking());
}
@Test
void testResetEntityTrackingWhenBucketModified() throws Exception {
setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES);
assertFalse(processor.isResetTracking());
runner.run();
assertNotNull(processor.getListedEntityTracker());
runner.setProperty(ListGCSBucket.BUCKET, "otherBucket");
assertTrue(processor.isResetTracking());
runner.run();
assertNotNull(processor.getListedEntityTracker());
verify(mockCache).remove(eq("ListedEntities::" + processor.getIdentifier()), any());
assertFalse(processor.isResetTracking());
}
@Test
void testResetEntityTrackingWhenPrefixModified() throws Exception {
setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES);
assertFalse(processor.isResetTracking());
runner.run();
assertNotNull(processor.getListedEntityTracker());
runner.setProperty(ListGCSBucket.PREFIX, "prefix2");
assertTrue(processor.isResetTracking());
runner.run();
assertNotNull(processor.getListedEntityTracker());
verify(mockCache).remove(eq("ListedEntities::" + processor.getIdentifier()), any());
assertFalse(processor.isResetTracking());
}
@Test
void testResetEntityTrackingWhenStrategyModified() throws Exception {
setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES);
assertFalse(processor.isResetTracking());
runner.run();
assertNotNull(processor.getListedEntityTracker());
runner.setProperty(ListGCSBucket.LISTING_STRATEGY, ListGCSBucket.NO_TRACKING);
assertTrue(processor.isResetTracking());
runner.run();
assertNull(processor.getListedEntityTracker());
verify(mockCache).remove(eq("ListedEntities::" + processor.getIdentifier()), any());
assertFalse(processor.isResetTracking());
}
private void setUpResetTrackingTest(AllowableValue listingStrategy) throws Exception {
runner.setProperty(ListGCSBucket.LISTING_STRATEGY, listingStrategy);
runner.setProperty(ListGCSBucket.PREFIX, "prefix1");
if (listingStrategy == ListGCSBucket.BY_TIMESTAMPS) {
mockStateManager.setState(Map.of(ListGCSBucket.CURRENT_TIMESTAMP, Long.toString(TIMESTAMP), ListGCSBucket.CURRENT_KEY_PREFIX + "0", "file"), Scope.CLUSTER);
} else if (listingStrategy == ListGCSBucket.BY_ENTITIES) {
String serviceId = "DistributedMapCacheClient";
when(mockCache.getIdentifier()).thenReturn(serviceId);
runner.addControllerService(serviceId, mockCache);
runner.enableControllerService(mockCache);
runner.setProperty(ListGCSBucket.TRACKING_STATE_CACHE, serviceId);
}
when(storage.list(anyString(), any(Storage.BlobListOption.class))).thenReturn(new PageImpl<>(null, null, null));
}
private Blob buildMockBlob(final String bucket, final String key, final long updateTime) {
final Blob blob = mock(Blob.class);
when(blob.getBucket()).thenReturn(bucket);
when(blob.getName()).thenReturn(key);
when(blob.getUpdateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
when(blob.getCreateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
return blob;
}
private Blob buildMockBlobWithoutBucket(final String bucket, final String key, final long updateTime) {
final Blob blob = mock(Blob.class);
when(blob.getName()).thenReturn(key);
when(blob.getUpdateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
when(blob.getCreateTimeOffsetDateTime()).thenReturn(offsetDateTime(updateTime));
return blob;
}
private OffsetDateTime offsetDateTime(final long value) {
final Instant instant = Instant.ofEpochMilli(value);
final LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("UTC"));
return OffsetDateTime.of(localDateTime, ZoneOffset.UTC);
}
private void verifyConfigVerification(final TestRunner runner, final ListGCSBucket processor, final int expectedCount) {
final List<ConfigVerificationResult> verificationResults = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertEquals(3, verificationResults.size());
final ConfigVerificationResult cloudServiceResult = verificationResults.get(0);
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, cloudServiceResult.getOutcome());
final ConfigVerificationResult iamPermissionsResult = verificationResults.get(1);
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, iamPermissionsResult.getOutcome());
final ConfigVerificationResult listingResult = verificationResults.get(2);
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, listingResult.getOutcome());
assertTrue(
listingResult.getExplanation().matches(String.format(".*finding %s blobs.*", expectedCount)),
String.format("Expected %s blobs to be counted, but explanation was: %s", expectedCount, listingResult.getExplanation()));
}
}