mirror of https://github.com/apache/nifi.git
NIFI-9638 Refactored Google Guava usage in extensions
Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #5742
This commit is contained in:
parent
e584d3cf04
commit
a2f6420f43
|
@ -17,13 +17,16 @@
|
|||
*/
|
||||
package org.apache.nifi.accumulo.processors;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Base Accumulo class that provides connector services, table name, and thread
|
||||
* properties
|
||||
|
@ -79,7 +82,5 @@ public abstract class BaseAccumuloProcessor extends AbstractProcessor {
|
|||
* so that implementations must constructor their own lists knowingly
|
||||
*/
|
||||
|
||||
protected static final ImmutableList<PropertyDescriptor> baseProperties = ImmutableList.of(ACCUMULO_CONNECTOR_SERVICE,TABLE_NAME,CREATE_TABLE,THREADS,ACCUMULO_TIMEOUT);
|
||||
|
||||
|
||||
protected static final List<PropertyDescriptor> baseProperties = Collections.unmodifiableList(Arrays.asList(ACCUMULO_CONNECTOR_SERVICE, TABLE_NAME, CREATE_TABLE, THREADS, ACCUMULO_TIMEOUT));
|
||||
}
|
||||
|
|
|
@ -18,13 +18,14 @@ package org.apache.nifi.processors.aws.s3;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.amazonaws.SdkClientException;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
|
@ -245,7 +246,7 @@ public class TestFetchS3Object {
|
|||
|
||||
|
||||
@Test
|
||||
public void testGetObjectExceptionGoesToFailure() throws IOException {
|
||||
public void testGetObjectExceptionGoesToFailure() {
|
||||
runner.setProperty(FetchS3Object.REGION, "us-east-1");
|
||||
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
|
@ -267,7 +268,12 @@ public class TestFetchS3Object {
|
|||
runner.enqueue(new byte[0], attrs);
|
||||
|
||||
final AmazonS3Exception exception = new AmazonS3Exception("The specified bucket does not exist");
|
||||
exception.setAdditionalDetails(ImmutableMap.of("BucketName", "us-east-1", "Error", "ABC123"));
|
||||
|
||||
final Map<String, String> details = new LinkedHashMap<>();
|
||||
details.put("BucketName", "us-east-1");
|
||||
details.put("Error", "ABC123");
|
||||
|
||||
exception.setAdditionalDetails(details);
|
||||
exception.setErrorCode("NoSuchBucket");
|
||||
exception.setStatusCode(HttpURLConnection.HTTP_NOT_FOUND);
|
||||
Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
|
||||
|
@ -291,7 +297,7 @@ public class TestFetchS3Object {
|
|||
runner.enqueue(new byte[0], attrs);
|
||||
|
||||
final AmazonS3Exception exception = new AmazonS3Exception("signature");
|
||||
exception.setAdditionalDetails(ImmutableMap.of("CanonicalRequestBytes", "AA BB CC DD EE FF"));
|
||||
exception.setAdditionalDetails(Collections.singletonMap("CanonicalRequestBytes", "AA BB CC DD EE FF"));
|
||||
exception.setErrorCode("SignatureDoesNotMatch");
|
||||
exception.setStatusCode(HttpURLConnection.HTTP_FORBIDDEN);
|
||||
Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
|
||||
|
@ -325,7 +331,7 @@ public class TestFetchS3Object {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGetObjectReturnsNull() throws IOException {
|
||||
public void testGetObjectReturnsNull() {
|
||||
runner.setProperty(FetchS3Object.REGION, "us-east-1");
|
||||
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
|
@ -339,7 +345,7 @@ public class TestFetchS3Object {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFlowFileAccessExceptionGoesToFailure() throws IOException {
|
||||
public void testFlowFileAccessExceptionGoesToFailure() {
|
||||
runner.setProperty(FetchS3Object.REGION, "us-east-1");
|
||||
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
|
@ -355,7 +361,7 @@ public class TestFetchS3Object {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGetPropertyDescriptors() throws Exception {
|
||||
public void testGetPropertyDescriptors() {
|
||||
FetchS3Object processor = new FetchS3Object();
|
||||
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
|
||||
assertEquals("size should be eq", 21, pd.size());
|
||||
|
|
|
@ -44,6 +44,13 @@
|
|||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-azure-graph-authorizer</artifactId>
|
||||
<version>1.16.0-SNAPSHOT</version>
|
||||
<exclusions>
|
||||
<!-- Exclude Guava provided in nifi-azure-services-api-nar -->
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
|
|
@ -170,11 +170,6 @@
|
|||
<artifactId>commons-io</artifactId>
|
||||
<version>2.11.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>27.0.1-jre</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||
<artifactId>jackson-dataformat-xml</artifactId>
|
||||
|
|
|
@ -30,7 +30,6 @@ import java.util.Collection;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.microsoft.azure.storage.OperationContext;
|
||||
import org.apache.commons.codec.DecoderException;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
|
@ -194,7 +193,6 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
|
|||
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void uploadBlob(CloudBlob blob, OperationContext operationContext, BlobRequestOptions blobRequestOptions, InputStream in) throws StorageException, IOException {
|
||||
blob.upload(in, -1, null, blobRequestOptions, operationContext);
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import com.azure.storage.file.datalake.DataLakeFileClient;
|
|||
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
|
||||
import com.azure.storage.file.datalake.DataLakeServiceClient;
|
||||
import com.azure.storage.file.datalake.models.DataLakeStorageException;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.io.input.BoundedInputStream;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
|
@ -173,7 +172,6 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static void uploadContent(DataLakeFileClient fileClient, InputStream in, long length) {
|
||||
long chunkStart = 0;
|
||||
long chunkSize;
|
||||
|
|
|
@ -31,6 +31,8 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -56,7 +58,6 @@ import org.mockito.Mock;
|
|||
import org.mockito.Mockito;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.microsoft.azure.eventhubs.EventData;
|
||||
import com.microsoft.azure.eventhubs.EventHubClient;
|
||||
|
||||
|
@ -155,7 +156,7 @@ public class PutAzureEventHubTest {
|
|||
testRunner.setProperty(PutAzureEventHub.PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY_ATTRIBUTE_NAME);
|
||||
|
||||
MockFlowFile flowFile = new MockFlowFile(1234);
|
||||
flowFile.putAttributes(ImmutableMap.of(TEST_PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY));
|
||||
flowFile.putAttributes(Collections.singletonMap(TEST_PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY));
|
||||
testRunner.enqueue(flowFile);
|
||||
testRunner.run(1, true);
|
||||
|
||||
|
@ -178,7 +179,7 @@ public class PutAzureEventHubTest {
|
|||
setUpStandardTestConfig();
|
||||
|
||||
MockFlowFile flowFile = new MockFlowFile(1234);
|
||||
flowFile.putAttributes(ImmutableMap.of(TEST_PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY));
|
||||
flowFile.putAttributes(Collections.singletonMap(TEST_PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY));
|
||||
|
||||
// Key not specified
|
||||
testRunner.enqueue(flowFile);
|
||||
|
@ -210,7 +211,11 @@ public class PutAzureEventHubTest {
|
|||
setUpStandardTestConfig();
|
||||
|
||||
MockFlowFile flowFile = new MockFlowFile(1234);
|
||||
ImmutableMap<String, String> demoAttributes = ImmutableMap.of("A", "a", "B", "b", "D", "d", "C", "c");
|
||||
final Map<String, String> demoAttributes = new LinkedHashMap<>();
|
||||
demoAttributes.put("A", "a");
|
||||
demoAttributes.put("B", "b");
|
||||
demoAttributes.put("D", "d");
|
||||
demoAttributes.put("C", "c");
|
||||
flowFile.putAttributes(demoAttributes);
|
||||
|
||||
testRunner.enqueue(flowFile);
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.nifi.processors.azure.storage;
|
|||
|
||||
import com.azure.storage.blob.BlobClient;
|
||||
import com.azure.storage.blob.specialized.BlobClientBase;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
|
@ -174,7 +173,7 @@ public class ITDeleteAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT
|
|||
}
|
||||
|
||||
private void assertProvenanceEvents() {
|
||||
Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.REMOTE_INVOCATION);
|
||||
Set<ProvenanceEventType> expectedEventTypes = Collections.singleton(ProvenanceEventType.REMOTE_INVOCATION);
|
||||
|
||||
Set<ProvenanceEventType> actualEventTypes = runner.getProvenanceEvents().stream()
|
||||
.map(ProvenanceEventRecord::getEventType)
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.azure.storage;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||
|
@ -187,7 +186,7 @@ public class ITFetchAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT
|
|||
}
|
||||
|
||||
private void assertProvenanceEvents() {
|
||||
Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.FETCH);
|
||||
Set<ProvenanceEventType> expectedEventTypes = Collections.singleton(ProvenanceEventType.FETCH);
|
||||
|
||||
Set<ProvenanceEventType> actualEventTypes = runner.getProvenanceEvents().stream()
|
||||
.map(ProvenanceEventRecord::getEventType)
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.nifi.processors.azure.storage;
|
||||
|
||||
import com.azure.storage.file.datalake.models.DataLakeStorageException;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
|
@ -25,8 +24,10 @@ import org.apache.nifi.provenance.ProvenanceEventType;
|
|||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
@ -447,7 +448,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
|
|||
private void testSuccessfulFetch(String fileSystem, String directory, String filename, String rangeStart, String rangeLength,
|
||||
Map<String, String> attributes, String inputFlowFileContent, String expectedFlowFileContent) {
|
||||
// GIVEN
|
||||
Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.CONTENT_MODIFIED, ProvenanceEventType.FETCH);
|
||||
Set<ProvenanceEventType> expectedEventTypes = new LinkedHashSet<>(Arrays.asList(ProvenanceEventType.CONTENT_MODIFIED, ProvenanceEventType.FETCH));
|
||||
|
||||
setRunnerProperties(fileSystem, directory, filename, rangeStart, rangeLength);
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.nifi.processors.azure.storage;
|
|||
|
||||
import com.azure.storage.blob.BlobClient;
|
||||
import com.azure.storage.blob.BlobContainerClient;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
|
@ -175,7 +174,7 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
|
|||
}
|
||||
|
||||
private void assertProvenanceEvents() {
|
||||
Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.SEND);
|
||||
Set<ProvenanceEventType> expectedEventTypes = Collections.singleton(ProvenanceEventType.SEND);
|
||||
|
||||
Set<ProvenanceEventType> actualEventTypes = runner.getProvenanceEvents().stream()
|
||||
.map(ProvenanceEventRecord::getEventType)
|
||||
|
|
|
@ -18,8 +18,6 @@ package org.apache.nifi.processors.azure.storage;
|
|||
|
||||
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
|
||||
import com.azure.storage.file.datalake.DataLakeFileClient;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.net.UrlEscapers;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
|
@ -40,7 +38,6 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
|
|||
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
|
||||
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
|
||||
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
|
||||
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
|
@ -299,14 +296,6 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
|
|||
flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, directory);
|
||||
flowFile.assertAttributeEquals(ATTR_NAME_FILENAME, fileName);
|
||||
|
||||
String urlEscapedDirectory = UrlEscapers.urlPathSegmentEscaper().escape(directory);
|
||||
String urlEscapedFileName = UrlEscapers.urlPathSegmentEscaper().escape(fileName);
|
||||
String urlEscapedPathSeparator = UrlEscapers.urlPathSegmentEscaper().escape("/");
|
||||
String primaryUri = StringUtils.isNotEmpty(directory)
|
||||
? String.format("https://%s.dfs.core.windows.net/%s/%s%s%s", getAccountName(), fileSystemName, urlEscapedDirectory, urlEscapedPathSeparator, urlEscapedFileName)
|
||||
: String.format("https://%s.dfs.core.windows.net/%s/%s", getAccountName(), fileSystemName, urlEscapedFileName);
|
||||
flowFile.assertAttributeEquals(ATTR_NAME_PRIMARY_URI, primaryUri);
|
||||
|
||||
flowFile.assertAttributeEquals(ATTR_NAME_LENGTH, Integer.toString(fileData.length));
|
||||
}
|
||||
|
||||
|
@ -336,7 +325,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
|
|||
}
|
||||
|
||||
private void assertProvenanceEvents() {
|
||||
Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.SEND);
|
||||
Set<ProvenanceEventType> expectedEventTypes = Collections.singleton(ProvenanceEventType.SEND);
|
||||
|
||||
Set<ProvenanceEventType> actualEventTypes = runner.getProvenanceEvents().stream()
|
||||
.map(ProvenanceEventRecord::getEventType)
|
||||
|
|
|
@ -32,6 +32,10 @@
|
|||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
@ -55,8 +59,8 @@
|
|||
<artifactId>azure-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
@ -71,11 +75,6 @@
|
|||
<artifactId>jackson-dataformat-xml</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>27.0.1-jre</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
|
|
|
@ -98,6 +98,11 @@
|
|||
<artifactId>commons-text</artifactId>
|
||||
<version>1.8</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>31.0.1-jre</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
</project>
|
||||
|
|
|
@ -29,6 +29,17 @@
|
|||
<source.skip>true</source.skip>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<!-- Provided in nifi-cassandra-services-api-nar -->
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
|
|
@ -43,6 +43,7 @@
|
|||
<groupId>com.datastax.cassandra</groupId>
|
||||
<artifactId>cassandra-driver-core</artifactId>
|
||||
<version>${cassandra.sdk.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.datastax.cassandra</groupId>
|
||||
|
|
|
@ -24,6 +24,17 @@
|
|||
<artifactId>nifi-cassandra-services-nar</artifactId>
|
||||
<packaging>nar</packaging>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<!-- Provided in nifi-cassandra-services-api-nar -->
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
|
|
@ -48,6 +48,7 @@
|
|||
<groupId>com.datastax.cassandra</groupId>
|
||||
<artifactId>cassandra-driver-core</artifactId>
|
||||
<version>${cassandra.sdk.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
|
||||
<properties>
|
||||
<cassandra.sdk.version>3.10.2</cassandra.sdk.version>
|
||||
<cassandra.guava.version>19.0</cassandra.guava.version>
|
||||
</properties>
|
||||
|
||||
<artifactId>nifi-cassandra-bundle</artifactId>
|
||||
|
@ -63,6 +64,11 @@
|
|||
<artifactId>commons-compress</artifactId>
|
||||
<version>1.21</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>${cassandra.guava.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
|
|
|
@ -77,11 +77,6 @@
|
|||
<artifactId>jackson-databind</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>28.0-jre</version>
|
||||
</dependency>
|
||||
<!-- test dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
|
|
@ -18,9 +18,6 @@ package org.apache.nifi.reporting.datadog;
|
|||
|
||||
import com.codahale.metrics.Gauge;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
|
@ -41,6 +38,7 @@ import org.coursera.metrics.datadog.DynamicTagsCallback;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.ArrayList;
|
||||
|
@ -99,7 +97,7 @@ public class DataDogReportingTask extends AbstractReportingTask {
|
|||
private String metricsPrefix;
|
||||
private String environment;
|
||||
private String statusId;
|
||||
private ConcurrentHashMap<String, AtomicDouble> metricsMap;
|
||||
private ConcurrentHashMap<String, Double> metricsMap;
|
||||
private Map<String, String> defaultTags;
|
||||
private volatile JmxJvmMetrics virtualMachineMetrics;
|
||||
private Logger logger = LoggerFactory.getLogger(getClass().getName());
|
||||
|
@ -133,7 +131,10 @@ public class DataDogReportingTask extends AbstractReportingTask {
|
|||
metricsPrefix = context.getProperty(METRICS_PREFIX).evaluateAttributeExpressions().getValue();
|
||||
environment = context.getProperty(ENVIRONMENT).evaluateAttributeExpressions().getValue();
|
||||
statusId = status.getId();
|
||||
defaultTags = ImmutableMap.of("env", environment, "dataflow_id", statusId);
|
||||
final Map<String, String> tags = new HashMap<>();
|
||||
tags.put("env", environment);
|
||||
tags.put("dataflow_id", statusId);
|
||||
defaultTags = Collections.unmodifiableMap(tags);
|
||||
try {
|
||||
updateDataDogTransport(context);
|
||||
} catch (IOException e) {
|
||||
|
@ -149,11 +150,11 @@ public class DataDogReportingTask extends AbstractReportingTask {
|
|||
logger.debug(metricName + ": " + entry.getValue());
|
||||
//if metric is not registered yet - register it
|
||||
if (!metricsMap.containsKey(metricName)) {
|
||||
metricsMap.put(metricName, new AtomicDouble(entry.getValue()));
|
||||
metricsMap.put(metricName, entry.getValue());
|
||||
metricRegistry.register(metricName, new MetricGauge(metricName, tags));
|
||||
}
|
||||
//set real time value to metrics map
|
||||
metricsMap.get(metricName).set(entry.getValue());
|
||||
metricsMap.put(metricName, entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -199,12 +200,12 @@ public class DataDogReportingTask extends AbstractReportingTask {
|
|||
|
||||
@Override
|
||||
public Object getValue() {
|
||||
return metricsMap.get(metricName).get();
|
||||
return metricsMap.get(metricName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getTags() {
|
||||
List<String> tagsList = Lists.newArrayList();
|
||||
List<String> tagsList = new ArrayList<>();
|
||||
for (Map.Entry<String, String> entry : tags.entrySet()) {
|
||||
tagsList.add(entry.getKey() + ":" + entry.getValue());
|
||||
}
|
||||
|
@ -266,7 +267,7 @@ public class DataDogReportingTask extends AbstractReportingTask {
|
|||
return new MetricRegistry();
|
||||
}
|
||||
|
||||
protected ConcurrentHashMap<String, AtomicDouble> getMetricsMap() {
|
||||
protected ConcurrentHashMap<String, Double> getMetricsMap() {
|
||||
return new ConcurrentHashMap<>();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,6 @@ package org.apache.nifi.reporting.datadog;
|
|||
|
||||
import com.codahale.metrics.Gauge;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.status.ProcessGroupStatus;
|
||||
import org.apache.nifi.controller.status.ProcessorStatus;
|
||||
|
@ -37,6 +35,7 @@ import org.mockito.Mockito;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -51,7 +50,7 @@ public class TestDataDogReportingTask {
|
|||
|
||||
private ProcessGroupStatus status;
|
||||
private ProcessorStatus procStatus;
|
||||
private ConcurrentHashMap<String, AtomicDouble> metricsMap;
|
||||
private ConcurrentHashMap<String, Double> metricsMap;
|
||||
private MetricRegistry metricRegistry;
|
||||
private MetricsService metricsService;
|
||||
private String env = "dev";
|
||||
|
@ -114,7 +113,7 @@ public class TestDataDogReportingTask {
|
|||
public void testUpdateMetricsProcessor() throws InitializationException, IOException {
|
||||
MetricsService ms = new MetricsService();
|
||||
Map<String, Double> processorMetrics = ms.getProcessorMetrics(procStatus);
|
||||
Map<String, String> tagsMap = ImmutableMap.of("env", "test");
|
||||
Map<String, String> tagsMap = Collections.singletonMap("env", "test");
|
||||
DataDogReportingTask dataDogReportingTask = new TestableDataDogReportingTask();
|
||||
dataDogReportingTask.initialize(initContext);
|
||||
dataDogReportingTask.setup(configurationContext);
|
||||
|
@ -132,7 +131,7 @@ public class TestDataDogReportingTask {
|
|||
public void testUpdateMetricsJVM() throws InitializationException, IOException {
|
||||
MetricsService ms = new MetricsService();
|
||||
Map<String, Double> processorMetrics = ms.getJVMMetrics(virtualMachineMetrics);
|
||||
Map<String, String> tagsMap = ImmutableMap.of("env", "test");
|
||||
Map<String, String> tagsMap = Collections.singletonMap("env", "test");
|
||||
|
||||
DataDogReportingTask dataDogReportingTask = new TestableDataDogReportingTask();
|
||||
dataDogReportingTask.initialize(initContext);
|
||||
|
@ -205,7 +204,7 @@ public class TestDataDogReportingTask {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ConcurrentHashMap<String, AtomicDouble> getMetricsMap() {
|
||||
protected ConcurrentHashMap<String, Double> getMetricsMap() {
|
||||
return metricsMap;
|
||||
}
|
||||
|
||||
|
|
|
@ -74,11 +74,6 @@
|
|||
<artifactId>commons-net</artifactId>
|
||||
<version>3.6</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>28.0-jre</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.powermock</groupId>
|
||||
<artifactId>powermock-module-junit4</artifactId>
|
||||
|
|
|
@ -17,10 +17,6 @@
|
|||
|
||||
package org.apache.nifi.processors.enrich;
|
||||
|
||||
|
||||
|
||||
import com.google.common.collect.HashBasedTable;
|
||||
import com.google.common.collect.Table;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
|
@ -33,6 +29,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Scanner;
|
||||
|
@ -187,16 +184,16 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
|
|||
* @param queryParser The parsing mechanism being used to parse the data into groups
|
||||
* @param queryRegex The regex to be used to split the query results into groups. The regex MUST implement at least on named capture group "KEY" to be used to populate the table rows
|
||||
* @param lookupKey The regular expression number or the column of a split to be used for matching
|
||||
* @return Table with attribute names and values where each Table row uses the value of the KEY named capture group specified in @param queryRegex
|
||||
* @return Row Map with attribute names and values where each Map row uses the value of the KEY named capture group specified in @param queryRegex
|
||||
*/
|
||||
protected Table<String, String, String> parseBatchResponse(String rawResult, String queryParser, String queryRegex, int lookupKey, String schema) {
|
||||
protected Map<String, Map<String, String>> parseBatchResponse(String rawResult, String queryParser, String queryRegex, int lookupKey, String schema) {
|
||||
// Note the hardcoded record0.
|
||||
// Since iteration is done within the parser and Multimap is used, the record number here will always be 0.
|
||||
// Consequentially, 0 is hardcoded so that batched and non batched attributes follow the same naming
|
||||
// conventions
|
||||
final String recordPosition = ".record0";
|
||||
|
||||
final Table<String, String, String> results = HashBasedTable.create();
|
||||
final Map<String, Map<String, String>> results = new LinkedHashMap<>();
|
||||
|
||||
switch (queryParser) {
|
||||
case "Split":
|
||||
|
@ -207,7 +204,15 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
|
|||
String[] splitResult = line.split(queryRegex);
|
||||
|
||||
for (int r = 0; r < splitResult.length; r++) {
|
||||
results.put(splitResult[ lookupKey - 1 ], "enrich." + schema + recordPosition + ".group" + String.valueOf(r), splitResult[r]);
|
||||
final String key = splitResult[ lookupKey - 1 ];
|
||||
final String group = "enrich." + schema + recordPosition + ".group" + r;
|
||||
final String result = splitResult[r];
|
||||
Map<String, String> row = results.get(key);
|
||||
if (row == null) {
|
||||
row = new LinkedHashMap<>();
|
||||
}
|
||||
row.put(group, result);
|
||||
results.put(key, row);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
@ -223,7 +228,15 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
|
|||
// Note that RegEx matches capture group 0 is usually broad but starting with it anyway
|
||||
// for the sake of purity
|
||||
for (int r = 0; r <= matcher.groupCount(); r++) {
|
||||
results.put(matcher.group(lookupKey), "enrich." + schema + recordPosition + ".group" + String.valueOf(r), matcher.group(r));
|
||||
final String key = matcher.group(lookupKey);
|
||||
final String group = "enrich." + schema + recordPosition + ".group" + String.valueOf(r);
|
||||
final String result = matcher.group(r);
|
||||
Map<String, String> row = results.get(key);
|
||||
if (row == null) {
|
||||
row = new LinkedHashMap<>();
|
||||
}
|
||||
row.put(group, result);
|
||||
results.put(key, row);
|
||||
}
|
||||
} catch (IndexOutOfBoundsException e) {
|
||||
getLogger().warn("Could not find capture group {} while processing result. You may want to review your " +
|
||||
|
|
|
@ -275,7 +275,7 @@ public class QueryWhois extends AbstractEnrichProcessor {
|
|||
}
|
||||
} else {
|
||||
// Otherwise call the multiline parser and get the row map;
|
||||
final Map<String, Map<String, String>> rowMap = parseBatchResponse(result, queryParser, queryRegex, keyLookup, "whois").rowMap();
|
||||
final Map<String, Map<String, String>> rowMap = parseBatchResponse(result, queryParser, queryRegex, keyLookup, "whois");
|
||||
|
||||
// Identify the flowfile Lookupvalue and search against the rowMap
|
||||
String ffLookupValue = context.getProperty(QUERY_INPUT).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
|
|
@ -45,7 +45,7 @@
|
|||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>28.0-jre</version>
|
||||
<version>31.0.1-jre</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
|
|
|
@ -21,7 +21,6 @@ import com.google.cloud.Service;
|
|||
import com.google.cloud.ServiceOptions;
|
||||
import com.google.cloud.TransportOptions;
|
||||
import com.google.cloud.http.HttpTransportOptions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
|
||||
|
@ -35,6 +34,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
|
||||
import java.net.Proxy;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -125,7 +125,7 @@ public abstract class AbstractGCPProcessor<
|
|||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.of(
|
||||
return Collections.unmodifiableList(Arrays.asList(
|
||||
PROJECT_ID,
|
||||
GCP_CREDENTIALS_PROVIDER_SERVICE,
|
||||
RETRY_COUNT,
|
||||
|
@ -133,7 +133,7 @@ public abstract class AbstractGCPProcessor<
|
|||
PROXY_PORT,
|
||||
HTTP_PROXY_USERNAME,
|
||||
HTTP_PROXY_PASSWORD,
|
||||
ProxyConfiguration.createProxyConfigPropertyDescriptor(true, ProxyAwareTransportFactory.PROXY_SPECS)
|
||||
ProxyConfiguration.createProxyConfigPropertyDescriptor(true, ProxyAwareTransportFactory.PROXY_SPECS))
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ import com.google.cloud.BaseServiceException;
|
|||
import com.google.cloud.bigquery.BigQuery;
|
||||
import com.google.cloud.bigquery.BigQueryOptions;
|
||||
import com.google.cloud.bigquery.TableId;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
|
@ -104,12 +103,12 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
|
|||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.<PropertyDescriptor> builder()
|
||||
.addAll(super.getSupportedPropertyDescriptors())
|
||||
.add(DATASET)
|
||||
.add(TABLE_NAME)
|
||||
.add(IGNORE_UNKNOWN)
|
||||
.build();
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.addAll(super.getSupportedPropertyDescriptors());
|
||||
descriptors.add(DATASET);
|
||||
descriptors.add(TABLE_NAME);
|
||||
descriptors.add(IGNORE_UNKNOWN);
|
||||
return Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -26,7 +26,6 @@ import com.google.cloud.bigquery.Schema;
|
|||
import com.google.cloud.bigquery.TableDataWriteChannel;
|
||||
import com.google.cloud.bigquery.TableId;
|
||||
import com.google.cloud.bigquery.WriteChannelConfiguration;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
|
@ -54,7 +53,9 @@ import org.threeten.bp.temporal.ChronoUnit;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -225,22 +226,21 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
|
|||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.<PropertyDescriptor> builder()
|
||||
.addAll(super.getSupportedPropertyDescriptors())
|
||||
.add(TABLE_SCHEMA)
|
||||
.add(READ_TIMEOUT)
|
||||
.add(SOURCE_TYPE)
|
||||
.add(CREATE_DISPOSITION)
|
||||
.add(WRITE_DISPOSITION)
|
||||
.add(MAXBAD_RECORDS)
|
||||
.add(CSV_ALLOW_JAGGED_ROWS)
|
||||
.add(CSV_ALLOW_QUOTED_NEW_LINES)
|
||||
.add(CSV_CHARSET)
|
||||
.add(CSV_FIELD_DELIMITER)
|
||||
.add(CSV_QUOTE)
|
||||
.add(CSV_SKIP_LEADING_ROWS)
|
||||
.add(AVRO_USE_LOGICAL_TYPES)
|
||||
.build();
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
|
||||
descriptors.add(TABLE_SCHEMA);
|
||||
descriptors.add(READ_TIMEOUT);
|
||||
descriptors.add(SOURCE_TYPE);
|
||||
descriptors.add(CREATE_DISPOSITION);
|
||||
descriptors.add(WRITE_DISPOSITION);
|
||||
descriptors.add(MAXBAD_RECORDS);
|
||||
descriptors.add(CSV_ALLOW_JAGGED_ROWS);
|
||||
descriptors.add(CSV_ALLOW_QUOTED_NEW_LINES);
|
||||
descriptors.add(CSV_CHARSET);
|
||||
descriptors.add(CSV_FIELD_DELIMITER);
|
||||
descriptors.add(CSV_QUOTE);
|
||||
descriptors.add(CSV_SKIP_LEADING_ROWS);
|
||||
descriptors.add(AVRO_USE_LOGICAL_TYPES);
|
||||
return Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,7 +21,6 @@ import com.google.cloud.bigquery.BigQueryError;
|
|||
import com.google.cloud.bigquery.InsertAllRequest;
|
||||
import com.google.cloud.bigquery.InsertAllResponse;
|
||||
import com.google.cloud.bigquery.TableId;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.SystemResource;
|
||||
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
|
||||
|
@ -53,6 +52,7 @@ import java.time.LocalDateTime;
|
|||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -103,11 +103,10 @@ public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
|
|||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.<PropertyDescriptor> builder()
|
||||
.addAll(super.getSupportedPropertyDescriptors())
|
||||
.add(RECORD_READER)
|
||||
.add(SKIP_INVALID_ROWS)
|
||||
.build();
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
|
||||
descriptors.add(RECORD_READER);
|
||||
descriptors.add(SKIP_INVALID_ROWS);
|
||||
return Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -132,7 +131,7 @@ public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
|
|||
|
||||
try (final InputStream in = session.read(flowFile)) {
|
||||
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
|
||||
try (final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());) {
|
||||
try (final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
|
||||
Record currentRecord;
|
||||
while ((currentRecord = reader.nextRecord()) != null) {
|
||||
request.addRow(convertMapRecord(currentRecord.toMap()));
|
||||
|
|
|
@ -22,7 +22,6 @@ import com.google.api.pathtemplate.ValidationException;
|
|||
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
|
||||
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
|
||||
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.iam.v1.TestIamPermissionsRequest;
|
||||
import com.google.pubsub.v1.AcknowledgeRequest;
|
||||
import com.google.pubsub.v1.ProjectSubscriptionName;
|
||||
|
@ -51,6 +50,7 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -191,10 +191,10 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
|
|||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.of(PROJECT_ID,
|
||||
return Collections.unmodifiableList(Arrays.asList(PROJECT_ID,
|
||||
GCP_CREDENTIALS_PROVIDER_SERVICE,
|
||||
SUBSCRIPTION,
|
||||
BATCH_SIZE);
|
||||
BATCH_SIZE));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.google.api.gax.rpc.DeadlineExceededException;
|
|||
import com.google.cloud.pubsub.v1.Publisher;
|
||||
import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
|
||||
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.iam.v1.TestIamPermissionsRequest;
|
||||
import com.google.iam.v1.TestIamPermissionsResponse;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
@ -109,10 +108,10 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
|
|||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.of(PROJECT_ID,
|
||||
return Collections.unmodifiableList(Arrays.asList(PROJECT_ID,
|
||||
GCP_CREDENTIALS_PROVIDER_SERVICE,
|
||||
TOPIC_NAME,
|
||||
BATCH_SIZE);
|
||||
BATCH_SIZE));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.google.cloud.pubsublite.SubscriptionPath;
|
|||
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
|
||||
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
|
||||
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.pubsub.v1.PubsubMessage;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
|
@ -53,6 +52,7 @@ import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -171,10 +171,10 @@ public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
|
|||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.of(SUBSCRIPTION,
|
||||
return Collections.unmodifiableList(Arrays.asList(SUBSCRIPTION,
|
||||
GCP_CREDENTIALS_PROVIDER_SERVICE,
|
||||
BYTES_OUTSTANDING,
|
||||
MESSAGES_OUTSTANDING);
|
||||
MESSAGES_OUTSTANDING));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.google.api.gax.rpc.ApiException;
|
|||
import com.google.cloud.pubsublite.TopicPath;
|
||||
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
|
||||
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Timestamp;
|
||||
import com.google.pubsub.v1.PubsubMessage;
|
||||
|
@ -128,11 +127,11 @@ public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements Ve
|
|||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.of(TOPIC_NAME,
|
||||
return Collections.unmodifiableList(Arrays.asList(TOPIC_NAME,
|
||||
GCP_CREDENTIALS_PROVIDER_SERVICE,
|
||||
ORDERING_KEY,
|
||||
BATCH_SIZE,
|
||||
BATCH_BYTES);
|
||||
BATCH_BYTES));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,7 +21,6 @@ import com.google.auth.oauth2.GoogleCredentials;
|
|||
import com.google.cloud.BaseServiceException;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.cloud.storage.StorageOptions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
|
@ -68,9 +67,7 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
|
|||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.<PropertyDescriptor>builder()
|
||||
.addAll(super.getSupportedPropertyDescriptors())
|
||||
.build();
|
||||
return Collections.unmodifiableList(super.getSupportedPropertyDescriptors());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.nifi.processors.gcp.storage;
|
|||
|
||||
import com.google.cloud.storage.BlobId;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
|
@ -32,6 +31,7 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -79,12 +79,12 @@ public class DeleteGCSObject extends AbstractGCSProcessor {
|
|||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.<PropertyDescriptor>builder()
|
||||
.addAll(super.getSupportedPropertyDescriptors())
|
||||
.add(BUCKET)
|
||||
.add(KEY)
|
||||
.add(GENERATION)
|
||||
.build();
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.addAll(super.getSupportedPropertyDescriptors());
|
||||
descriptors.add(BUCKET);
|
||||
descriptors.add(KEY);
|
||||
descriptors.add(GENERATION);
|
||||
return Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,7 +22,6 @@ import com.google.cloud.storage.Blob;
|
|||
import com.google.cloud.storage.BlobId;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.cloud.storage.StorageException;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.io.input.BoundedInputStream;
|
||||
import org.apache.commons.io.output.CountingOutputStream;
|
||||
|
@ -192,15 +191,15 @@ public class FetchGCSObject extends AbstractGCSProcessor {
|
|||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.<PropertyDescriptor>builder()
|
||||
.add(BUCKET)
|
||||
.add(KEY)
|
||||
.addAll(super.getSupportedPropertyDescriptors())
|
||||
.add(GENERATION)
|
||||
.add(ENCRYPTION_KEY)
|
||||
.add(RANGE_START)
|
||||
.add(RANGE_LENGTH)
|
||||
.build();
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(BUCKET);
|
||||
descriptors.add(KEY);
|
||||
descriptors.addAll(super.getSupportedPropertyDescriptors());
|
||||
descriptors.add(GENERATION);
|
||||
descriptors.add(ENCRYPTION_KEY);
|
||||
descriptors.add(RANGE_START);
|
||||
descriptors.add(RANGE_LENGTH);
|
||||
return Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,7 +21,6 @@ import com.google.cloud.storage.Acl;
|
|||
import com.google.cloud.storage.Blob;
|
||||
import com.google.cloud.storage.BlobInfo;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
|
||||
|
@ -247,17 +246,17 @@ public class ListGCSBucket extends AbstractGCSProcessor {
|
|||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.<PropertyDescriptor>builder()
|
||||
.add(LISTING_STRATEGY)
|
||||
.add(TRACKING_STATE_CACHE)
|
||||
.add(INITIAL_LISTING_TARGET)
|
||||
.add(TRACKING_TIME_WINDOW)
|
||||
.add(BUCKET)
|
||||
.add(RECORD_WRITER)
|
||||
.addAll(super.getSupportedPropertyDescriptors())
|
||||
.add(PREFIX)
|
||||
.add(USE_GENERATIONS)
|
||||
.build();
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(LISTING_STRATEGY);
|
||||
descriptors.add(TRACKING_STATE_CACHE);
|
||||
descriptors.add(INITIAL_LISTING_TARGET);
|
||||
descriptors.add(TRACKING_TIME_WINDOW);
|
||||
descriptors.add(BUCKET);
|
||||
descriptors.add(RECORD_WRITER);
|
||||
descriptors.addAll(super.getSupportedPropertyDescriptors());
|
||||
descriptors.add(PREFIX);
|
||||
descriptors.add(USE_GENERATIONS);
|
||||
return Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
private static final Set<Relationship> relationships = Collections.singleton(REL_SUCCESS);
|
||||
|
|
|
@ -22,7 +22,6 @@ import com.google.cloud.storage.BlobId;
|
|||
import com.google.cloud.storage.BlobInfo;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.cloud.storage.StorageException;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
||||
|
@ -290,18 +289,17 @@ public class PutGCSObject extends AbstractGCSProcessor {
|
|||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.<PropertyDescriptor>builder()
|
||||
.addAll(super.getSupportedPropertyDescriptors())
|
||||
.add(BUCKET)
|
||||
.add(KEY)
|
||||
.add(CONTENT_TYPE)
|
||||
.add(MD5)
|
||||
.add(CRC32C)
|
||||
.add(ACL)
|
||||
.add(ENCRYPTION_KEY)
|
||||
.add(OVERWRITE)
|
||||
.add(CONTENT_DISPOSITION_TYPE)
|
||||
.build();
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
|
||||
descriptors.add(BUCKET);
|
||||
descriptors.add(KEY);
|
||||
descriptors.add(CONTENT_TYPE);
|
||||
descriptors.add(MD5);
|
||||
descriptors.add(CRC32C);
|
||||
descriptors.add(ACL);
|
||||
descriptors.add(ENCRYPTION_KEY);
|
||||
descriptors.add(OVERWRITE);
|
||||
descriptors.add(CONTENT_DISPOSITION_TYPE);
|
||||
return Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -406,7 +404,7 @@ public class PutGCSObject extends AbstractGCSProcessor {
|
|||
}
|
||||
|
||||
try {
|
||||
final Blob blob = storage.create(blobInfoBuilder.build(),
|
||||
final Blob blob = storage.createFrom(blobInfoBuilder.build(),
|
||||
in,
|
||||
blobWriteOptions.toArray(new Storage.BlobWriteOption[blobWriteOptions.size()])
|
||||
);
|
||||
|
|
|
@ -26,30 +26,26 @@ import org.apache.nifi.processor.Processor;
|
|||
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertSame;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.reset;
|
||||
|
||||
/**
|
||||
* Base class for GCS Unit Tests. Provides a framework for creating a TestRunner instance with always-required credentials.
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public abstract class AbstractGCSTest {
|
||||
private static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi-test-gcp-project");
|
||||
private static final Integer RETRIES = 9;
|
||||
|
||||
static final String BUCKET = RemoteStorageHelper.generateBucketName();
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
}
|
||||
|
||||
public static TestRunner buildNewRunner(Processor processor) throws Exception {
|
||||
final GCPCredentialsService credentialsService = new GCPCredentialsControllerService();
|
||||
|
||||
|
@ -84,13 +80,10 @@ public abstract class AbstractGCSTest {
|
|||
final StorageOptions options = processor.getServiceOptions(runner.getProcessContext(),
|
||||
mockCredentials);
|
||||
|
||||
assertEquals("Project IDs should match",
|
||||
PROJECT_ID, options.getProjectId());
|
||||
assertEquals(PROJECT_ID, options.getProjectId(), "Project IDs should match");
|
||||
|
||||
assertEquals("Retry counts should match",
|
||||
RETRIES.intValue(), options.getRetrySettings().getMaxAttempts());
|
||||
assertEquals(RETRIES.intValue(), options.getRetrySettings().getMaxAttempts(), "Retry counts should match");
|
||||
|
||||
assertSame("Credentials should be configured correctly",
|
||||
mockCredentials, options.getCredentials());
|
||||
assertSame(mockCredentials, options.getCredentials(), "Credentials should be configured correctly");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,11 +16,12 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.gcp.storage;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -39,7 +40,7 @@ public class DeleteGCSObjectIT extends AbstractGCSIT {
|
|||
runner.setProperty(DeleteGCSObject.BUCKET, BUCKET);
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue("testdata", ImmutableMap.of(
|
||||
runner.enqueue("testdata", Collections.singletonMap(
|
||||
CoreAttributes.FILENAME.key(), KEY
|
||||
));
|
||||
|
||||
|
@ -61,7 +62,7 @@ public class DeleteGCSObjectIT extends AbstractGCSIT {
|
|||
runner.setProperty(DeleteGCSObject.KEY, KEY);
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue("testdata", ImmutableMap.of(
|
||||
runner.enqueue("testdata", Collections.singletonMap(
|
||||
"filename", "different-filename"
|
||||
));
|
||||
|
||||
|
@ -79,7 +80,7 @@ public class DeleteGCSObjectIT extends AbstractGCSIT {
|
|||
runner.setProperty(DeleteGCSObject.BUCKET, BUCKET);
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue("testdata", ImmutableMap.of(
|
||||
runner.enqueue("testdata", Collections.singletonMap(
|
||||
"filename", "nonexistant-file"
|
||||
));
|
||||
|
||||
|
|
|
@ -19,12 +19,14 @@ package org.apache.nifi.processors.gcp.storage;
|
|||
import com.google.cloud.storage.BlobId;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.cloud.storage.StorageException;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.eq;
|
||||
|
@ -35,6 +37,7 @@ import static org.mockito.Mockito.when;
|
|||
/**
|
||||
* Unit tests for {@link DeleteGCSObject}. No connections to the Google Cloud service are made.
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class DeleteGCSObjectTest extends AbstractGCSTest {
|
||||
public static final Long GENERATION = 42L;
|
||||
static final String KEY = "somefile";
|
||||
|
@ -47,11 +50,6 @@ public class DeleteGCSObjectTest extends AbstractGCSTest {
|
|||
@Mock
|
||||
Storage storage;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addRequiredPropertiesToRunner(TestRunner runner) {
|
||||
runner.setProperty(DeleteGCSObject.BUCKET, BUCKET);
|
||||
|
@ -107,17 +105,19 @@ public class DeleteGCSObjectTest extends AbstractGCSTest {
|
|||
final Long generation1 = GENERATION + 1L;
|
||||
final Long generation2 = GENERATION + 2L;
|
||||
|
||||
runner.enqueue("testdata1", ImmutableMap.of(
|
||||
BUCKET_ATTR, bucket1,
|
||||
KEY_ATTR, key1,
|
||||
GENERATION_ATTR, String.valueOf(generation1)
|
||||
));
|
||||
final Map<String, String> firstAttributes = new LinkedHashMap<>();
|
||||
firstAttributes.put(BUCKET_ATTR, bucket1);
|
||||
firstAttributes.put(KEY_ATTR, key1);
|
||||
firstAttributes.put(GENERATION_ATTR, String.valueOf(generation1));
|
||||
|
||||
runner.enqueue("testdata2", ImmutableMap.of(
|
||||
BUCKET_ATTR, bucket2,
|
||||
KEY_ATTR, key2,
|
||||
GENERATION_ATTR, String.valueOf(generation2)
|
||||
));
|
||||
runner.enqueue("testdata1", firstAttributes);
|
||||
|
||||
final Map<String, String> secondAttributes = new LinkedHashMap<>();
|
||||
secondAttributes.put(BUCKET_ATTR, bucket2);
|
||||
secondAttributes.put(KEY_ATTR, key2);
|
||||
secondAttributes.put(GENERATION_ATTR, String.valueOf(generation2));
|
||||
|
||||
runner.enqueue("testdata2", secondAttributes);
|
||||
|
||||
runner.run(2);
|
||||
|
||||
|
|
|
@ -16,12 +16,12 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.gcp.storage;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -44,7 +44,7 @@ public class FetchGCSObjectIT extends AbstractGCSIT {
|
|||
final TestRunner runner = buildNewRunner(new FetchGCSObject());
|
||||
runner.setProperty(FetchGCSObject.BUCKET, BUCKET);
|
||||
|
||||
runner.enqueue(new byte[0], ImmutableMap.of(
|
||||
runner.enqueue(new byte[0], Collections.singletonMap(
|
||||
"filename", KEY
|
||||
));
|
||||
|
||||
|
@ -74,7 +74,7 @@ public class FetchGCSObjectIT extends AbstractGCSIT {
|
|||
runner.setProperty(FetchGCSObject.BUCKET, BUCKET);
|
||||
runner.setProperty(FetchGCSObject.ENCRYPTION_KEY, ENCRYPTION_KEY);
|
||||
|
||||
runner.enqueue(new byte[0], ImmutableMap.of(
|
||||
runner.enqueue(new byte[0], Collections.singletonMap(
|
||||
"filename", KEY
|
||||
));
|
||||
|
||||
|
@ -101,7 +101,7 @@ public class FetchGCSObjectIT extends AbstractGCSIT {
|
|||
public void testFetchNonexistantFile() throws Exception {
|
||||
final TestRunner runner = buildNewRunner(new FetchGCSObject());
|
||||
runner.setProperty(FetchGCSObject.BUCKET, BUCKET);
|
||||
runner.enqueue(new byte[0], ImmutableMap.of(
|
||||
runner.enqueue(new byte[0], Collections.singletonMap(
|
||||
"filename", "non-existent"
|
||||
));
|
||||
|
||||
|
|
|
@ -24,20 +24,21 @@ import com.google.cloud.storage.BlobId;
|
|||
import com.google.cloud.storage.BlobInfo;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.cloud.storage.StorageException;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
|
||||
|
@ -61,9 +62,9 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_ATT
|
|||
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYPE_ATTR;
|
||||
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_ATTR;
|
||||
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.reset;
|
||||
|
@ -73,6 +74,7 @@ import static org.mockito.Mockito.when;
|
|||
/**
|
||||
* Unit tests for {@link FetchGCSObject}.
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class FetchGCSObjectTest extends AbstractGCSTest {
|
||||
private final static String KEY = "test-key";
|
||||
private final static Long GENERATION = 5L;
|
||||
|
@ -101,15 +103,9 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
|
|||
private static final Long CREATE_TIME = 1234L;
|
||||
private static final Long UPDATE_TIME = 4567L;
|
||||
|
||||
|
||||
@Mock
|
||||
Storage storage;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractGCSProcessor getProcessor() {
|
||||
return new FetchGCSObject() {
|
||||
|
@ -522,10 +518,10 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
|
|||
when(storage.get(any(BlobId.class))).thenReturn(blob);
|
||||
when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
|
||||
|
||||
runner.enqueue("", ImmutableMap.of(
|
||||
BUCKET_ATTR, BUCKET,
|
||||
CoreAttributes.FILENAME.key(), KEY
|
||||
));
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put(BUCKET_ATTR, BUCKET);
|
||||
attributes.put(CoreAttributes.FILENAME.key(), KEY);
|
||||
runner.enqueue("", attributes);
|
||||
|
||||
runner.run();
|
||||
|
||||
|
@ -552,7 +548,7 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
|
|||
);
|
||||
|
||||
|
||||
final Set<Storage.BlobSourceOption> blobSourceOptions = ImmutableSet.copyOf(blobSourceOptionArgumentCaptor.getAllValues());
|
||||
final Set<Storage.BlobSourceOption> blobSourceOptions = new LinkedHashSet<>(blobSourceOptionArgumentCaptor.getAllValues());
|
||||
assertTrue(blobSourceOptions.contains(Storage.BlobSourceOption.generationMatch()));
|
||||
assertEquals(
|
||||
1,
|
||||
|
@ -600,7 +596,7 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
|
|||
|
||||
assertNull(capturedBlobId.getGeneration());
|
||||
|
||||
final Set<Storage.BlobSourceOption> blobSourceOptions = ImmutableSet.copyOf(blobSourceOptionArgumentCaptor.getAllValues());
|
||||
final Set<Storage.BlobSourceOption> blobSourceOptions = new LinkedHashSet<>(blobSourceOptionArgumentCaptor.getAllValues());
|
||||
|
||||
assertTrue(blobSourceOptions.contains(Storage.BlobSourceOption.decryptionKey(ENCRYPTION_SHA256)));
|
||||
assertEquals(
|
||||
|
@ -617,7 +613,6 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
|
|||
runner.assertValid();
|
||||
|
||||
when(storage.get(any(BlobId.class))).thenThrow(new StorageException(400, "test-exception"));
|
||||
when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
|
||||
|
||||
runner.enqueue("");
|
||||
|
||||
|
|
|
@ -21,9 +21,6 @@ import com.google.cloud.storage.Acl;
|
|||
import com.google.cloud.storage.Blob;
|
||||
import com.google.cloud.storage.BlobInfo;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.nifi.components.ConfigVerificationResult;
|
||||
import org.apache.nifi.components.state.Scope;
|
||||
import org.apache.nifi.components.state.StateMap;
|
||||
|
@ -33,12 +30,17 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
import org.apache.nifi.util.LogMessage;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.Mock;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -64,10 +66,10 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_ATT
|
|||
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYPE_ATTR;
|
||||
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_ATTR;
|
||||
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -139,7 +141,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
assertEquals("Cluster StateMap should be fresh (version -1L)", -1L, runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion());
|
||||
assertEquals(-1L, runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion(), "Cluster StateMap should be fresh (version -1L)");
|
||||
assertTrue(processor.getStateKeys().isEmpty());
|
||||
|
||||
processor.restoreState(runner.getProcessSessionFactory().createSession());
|
||||
|
@ -159,11 +161,10 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
final Map<String, String> state = ImmutableMap.of(
|
||||
ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(4L),
|
||||
ListGCSBucket.CURRENT_KEY_PREFIX + "0", "test-key-0",
|
||||
ListGCSBucket.CURRENT_KEY_PREFIX + "1", "test-key-1"
|
||||
);
|
||||
final Map<String, String> state = new LinkedHashMap<>();
|
||||
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(4L));
|
||||
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "test-key-0");
|
||||
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "1", "test-key-1");
|
||||
|
||||
runner.getStateManager().setState(state, Scope.CLUSTER);
|
||||
|
||||
|
@ -186,26 +187,24 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
assertEquals("Cluster StateMap should be fresh (version -1L)",
|
||||
-1L,
|
||||
runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion()
|
||||
assertEquals(-1L,
|
||||
runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion(),
|
||||
"Cluster StateMap should be fresh (version -1L)"
|
||||
);
|
||||
|
||||
final Set<String> keys = ImmutableSet.of("test-key-0", "test-key-1");
|
||||
final Set<String> keys = new LinkedHashSet<>(Arrays.asList("test-key-0", "test-key-1"));
|
||||
final ProcessSession session = runner.getProcessSessionFactory().createSession();
|
||||
processor.persistState(session, 4L, keys);
|
||||
|
||||
final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER);
|
||||
assertEquals("Cluster StateMap should have been written to", 1L, stateMap.getVersion());
|
||||
assertEquals(1L, stateMap.getVersion(), "Cluster StateMap should have been written to");
|
||||
|
||||
assertEquals(
|
||||
ImmutableMap.of(
|
||||
ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(4L),
|
||||
ListGCSBucket.CURRENT_KEY_PREFIX+"0", "test-key-0",
|
||||
ListGCSBucket.CURRENT_KEY_PREFIX+"1", "test-key-1"
|
||||
),
|
||||
stateMap.toMap()
|
||||
);
|
||||
final Map<String, String> state = new HashMap<>();
|
||||
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(4L));
|
||||
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "test-key-0");
|
||||
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "1", "test-key-1");
|
||||
|
||||
assertEquals(state, stateMap.toMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -218,7 +217,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
|
||||
runner.getStateManager().setFailOnStateSet(Scope.CLUSTER, true);
|
||||
|
||||
final Set<String> keys = ImmutableSet.of("test-key-0", "test-key-1");
|
||||
final Set<String> keys = new HashSet<>(Arrays.asList("test-key-0", "test-key-1"));
|
||||
|
||||
assertTrue(runner.getLogger().getErrorMessages().isEmpty());
|
||||
|
||||
|
@ -249,6 +248,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
return blob;
|
||||
}
|
||||
|
||||
private Blob buildMockBlobWithoutBucket(String bucket, String key, long updateTime) {
|
||||
final Blob blob = mock(Blob.class);
|
||||
when(blob.getName()).thenReturn(key);
|
||||
when(blob.getUpdateTime()).thenReturn(updateTime);
|
||||
return blob;
|
||||
}
|
||||
|
||||
private void verifyConfigVerification(final TestRunner runner, final ListGCSBucket processor, final int expectedCount) {
|
||||
final List<ConfigVerificationResult> verificationResults = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
|
||||
assertEquals(3, verificationResults.size());
|
||||
|
@ -261,8 +267,9 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
final ConfigVerificationResult listingResult = verificationResults.get(2);
|
||||
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, listingResult.getOutcome());
|
||||
|
||||
assertTrue(String.format("Expected %s blobs to be counted, but explanation was: %s", expectedCount, listingResult.getExplanation()),
|
||||
listingResult.getExplanation().matches(String.format(".*finding %s blobs.*", expectedCount)));
|
||||
assertTrue(
|
||||
listingResult.getExplanation().matches(String.format(".*finding %s blobs.*", expectedCount)),
|
||||
String.format("Expected %s blobs to be counted, but explanation was: %s", expectedCount, listingResult.getExplanation()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -273,7 +280,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of(
|
||||
final Iterable<Blob> mockList = Arrays.asList(
|
||||
buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
|
||||
buildMockBlob("blob-bucket-2", "blob-key-2", 3L)
|
||||
);
|
||||
|
@ -305,7 +312,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
|
||||
assertEquals(3L, processor.getStateTimestamp());
|
||||
|
||||
assertEquals(ImmutableSet.of("blob-key-2"), processor.getStateKeys());
|
||||
assertEquals(Collections.singleton("blob-key-2"), processor.getStateKeys());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -316,7 +323,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of(
|
||||
final Iterable<Blob> mockList = Collections.singletonList(
|
||||
buildMockBlob("blob-bucket-1", "blob-key-1", 2L)
|
||||
);
|
||||
|
||||
|
@ -348,7 +355,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of();
|
||||
final Iterable<Blob> mockList = Collections.emptyList();
|
||||
|
||||
when(mockBlobPage.getValues()).thenReturn(mockList);
|
||||
when(mockBlobPage.getNextPage()).thenReturn(null);
|
||||
|
@ -362,7 +369,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 0);
|
||||
verifyConfigVerification(runner, processor, 0);
|
||||
|
||||
assertEquals("No state should be persisted on an empty return", -1L, runner.getStateManager().getState(Scope.CLUSTER).getVersion());
|
||||
assertEquals(-1L, runner.getStateManager().getState(Scope.CLUSTER).getVersion(), "No state should be persisted on an empty return");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -373,12 +380,14 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
final Map<String, String> state = ImmutableMap.of(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L), ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-1");
|
||||
final Map<String, String> state = new LinkedHashMap<>();
|
||||
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
|
||||
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-1");
|
||||
|
||||
runner.getStateManager().setState(state, Scope.CLUSTER);
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of(
|
||||
buildMockBlob("blob-bucket-1", "blob-key-1", 1L),
|
||||
final Iterable<Blob> mockList = Arrays.asList(
|
||||
buildMockBlobWithoutBucket("blob-bucket-1", "blob-key-1", 1L),
|
||||
buildMockBlob("blob-bucket-2", "blob-key-2", 2L)
|
||||
);
|
||||
|
||||
|
@ -404,7 +413,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
assertEquals("blob-key-2", flowFile.getAttribute(KEY_ATTR));
|
||||
assertEquals("2", flowFile.getAttribute(UPDATE_TIME_ATTR));
|
||||
assertEquals(2L, processor.getStateTimestamp());
|
||||
assertEquals(ImmutableSet.of("blob-key-2"), processor.getStateKeys());
|
||||
assertEquals(Collections.singleton("blob-key-2"), processor.getStateKeys());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -415,16 +424,15 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
final Map<String, String> state = ImmutableMap.of(
|
||||
ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
|
||||
ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
|
||||
);
|
||||
final Map<String, String> state = new LinkedHashMap<>();
|
||||
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
|
||||
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
|
||||
|
||||
runner.getStateManager().setState(state, Scope.CLUSTER);
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of(
|
||||
final Iterable<Blob> mockList = Arrays.asList(
|
||||
buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
|
||||
buildMockBlob("blob-bucket-2", "blob-key-2", 1L)
|
||||
buildMockBlobWithoutBucket("blob-bucket-2", "blob-key-2", 1L)
|
||||
);
|
||||
|
||||
when(mockBlobPage.getValues())
|
||||
|
@ -454,7 +462,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
assertEquals("2", flowFile.getAttribute(UPDATE_TIME_ATTR));
|
||||
assertEquals(2L, processor.getStateTimestamp());
|
||||
|
||||
assertEquals(ImmutableSet.of("blob-key-1"), processor.getStateKeys());
|
||||
assertEquals(Collections.singleton("blob-key-1"), processor.getStateKeys());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -465,16 +473,15 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
final Map<String, String> state = ImmutableMap.of(
|
||||
ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
|
||||
ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
|
||||
);
|
||||
final Map<String, String> state = new LinkedHashMap<>();
|
||||
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
|
||||
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
|
||||
|
||||
runner.getStateManager().setState(state, Scope.CLUSTER);
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of(
|
||||
final Iterable<Blob> mockList = Arrays.asList(
|
||||
buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
|
||||
buildMockBlob("blob-bucket-2", "blob-key-2", 1L),
|
||||
buildMockBlobWithoutBucket("blob-bucket-2", "blob-key-2", 1L),
|
||||
buildMockBlob("blob-bucket-3", "blob-key-3", 2L)
|
||||
);
|
||||
|
||||
|
@ -509,7 +516,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
assertEquals("blob-key-3",flowFile.getAttribute(KEY_ATTR));
|
||||
assertEquals("2", flowFile.getAttribute(UPDATE_TIME_ATTR));
|
||||
assertEquals(2L, processor.getStateTimestamp());
|
||||
assertEquals(ImmutableSet.of("blob-key-1", "blob-key-3"), processor.getStateKeys());
|
||||
|
||||
assertEquals(new HashSet<>(Arrays.asList("blob-key-1", "blob-key-3")), processor.getStateKeys());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -520,16 +528,15 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
final Map<String, String> state = ImmutableMap.of(
|
||||
ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
|
||||
ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
|
||||
);
|
||||
final Map<String, String> state = new LinkedHashMap<>();
|
||||
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
|
||||
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
|
||||
|
||||
runner.getStateManager().setState(state, Scope.CLUSTER);
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of(
|
||||
final Iterable<Blob> mockList = Arrays.asList(
|
||||
buildMockBlob("blob-bucket-1", "blob-key-1", 1L),
|
||||
buildMockBlob("blob-bucket-2", "blob-key-2", 1L),
|
||||
buildMockBlobWithoutBucket("blob-bucket-2", "blob-key-2", 1L),
|
||||
buildMockBlob("blob-bucket-3", "blob-key-3", 1L)
|
||||
);
|
||||
|
||||
|
@ -564,7 +571,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
assertEquals("blob-key-3", flowFile.getAttribute(KEY_ATTR));
|
||||
assertEquals("1", flowFile.getAttribute(UPDATE_TIME_ATTR));
|
||||
assertEquals(1L, processor.getStateTimestamp());
|
||||
assertEquals(ImmutableSet.of("blob-key-1", "blob-key-3"), processor.getStateKeys());
|
||||
assertEquals(new HashSet<>(Arrays.asList("blob-key-1", "blob-key-3")), processor.getStateKeys());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -600,7 +607,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
when(blob.getCreateTime()).thenReturn(CREATE_TIME);
|
||||
when(blob.getUpdateTime()).thenReturn(UPDATE_TIME);
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of(blob);
|
||||
final Iterable<Blob> mockList = Collections.singletonList(blob);
|
||||
|
||||
when(mockBlobPage.getValues()).thenReturn(mockList);
|
||||
when(mockBlobPage.getNextPage()).thenReturn(null);
|
||||
|
@ -646,7 +653,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
when(mockUser.getEmail()).thenReturn(OWNER_USER_EMAIL);
|
||||
when(blob.getOwner()).thenReturn(mockUser);
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of(blob);
|
||||
final Iterable<Blob> mockList = Collections.singletonList(blob);
|
||||
|
||||
when(mockBlobPage.getValues()).thenReturn(mockList);
|
||||
when(mockBlobPage.getNextPage()).thenReturn(null);
|
||||
|
@ -677,7 +684,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
when(mockGroup.getEmail()).thenReturn(OWNER_GROUP_EMAIL);
|
||||
when(blob.getOwner()).thenReturn(mockGroup);
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of(blob);
|
||||
final Iterable<Blob> mockList = Collections.singletonList(blob);
|
||||
|
||||
when(mockBlobPage.getValues()).thenReturn(mockList);
|
||||
when(mockBlobPage.getNextPage()).thenReturn(null);
|
||||
|
@ -709,7 +716,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
when(mockDomain.getDomain()).thenReturn(OWNER_DOMAIN);
|
||||
when(blob.getOwner()).thenReturn(mockDomain);
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of(blob);
|
||||
final Iterable<Blob> mockList = Collections.singletonList(blob);
|
||||
when(mockBlobPage.getValues()).thenReturn(mockList);
|
||||
when(mockBlobPage.getNextPage()).thenReturn(null);
|
||||
when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
|
||||
|
@ -740,7 +747,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
when(mockProject.getProjectId()).thenReturn(OWNER_PROJECT_ID);
|
||||
when(blob.getOwner()).thenReturn(mockProject);
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of(blob);
|
||||
final Iterable<Blob> mockList = Collections.singletonList(blob);
|
||||
|
||||
when(mockBlobPage.getValues()).thenReturn(mockList);
|
||||
when(mockBlobPage.getNextPage()).thenReturn(null);
|
||||
|
@ -766,11 +773,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of();
|
||||
|
||||
when(mockBlobPage.getValues()).thenReturn(mockList);
|
||||
when(mockBlobPage.getNextPage()).thenReturn(null);
|
||||
when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
|
||||
final Iterable<Blob> mockList = Collections.emptyList();
|
||||
|
||||
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
|
||||
runner.enqueue("test");
|
||||
|
@ -790,7 +793,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
runner.setProperty(ListGCSBucket.PREFIX, PREFIX);
|
||||
runner.assertValid();
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of();
|
||||
final Iterable<Blob> mockList = Collections.emptyList();
|
||||
|
||||
when(mockBlobPage.getValues()).thenReturn(mockList);
|
||||
when(mockBlobPage.getNextPage()).thenReturn(null);
|
||||
|
@ -813,7 +816,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
|
|||
runner.setProperty(ListGCSBucket.USE_GENERATIONS, String.valueOf(USE_GENERATIONS));
|
||||
runner.assertValid();
|
||||
|
||||
final Iterable<Blob> mockList = ImmutableList.of();
|
||||
final Iterable<Blob> mockList = Collections.emptyList();
|
||||
|
||||
when(mockBlobPage.getValues()).thenReturn(mockList);
|
||||
when(mockBlobPage.getNextPage()).thenReturn(null);
|
||||
|
|
|
@ -39,10 +39,10 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.OWNER_TYP
|
|||
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.SIZE_ATTR;
|
||||
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.UPDATE_TIME_ATTR;
|
||||
import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.reset;
|
||||
|
@ -53,16 +53,16 @@ import com.google.cloud.storage.Blob;
|
|||
import com.google.cloud.storage.BlobInfo;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.cloud.storage.StorageException;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.Mock;
|
||||
|
@ -70,7 +70,6 @@ import org.mockito.Mock;
|
|||
/**
|
||||
* Unit tests for {@link PutGCSObject} which do not use Google Cloud resources.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public class PutGCSObjectTest extends AbstractGCSTest {
|
||||
private static final String FILENAME = "test-filename";
|
||||
private static final String KEY = "test-key";
|
||||
|
@ -143,7 +142,7 @@ public class PutGCSObjectTest extends AbstractGCSTest {
|
|||
|
||||
runner.assertValid();
|
||||
|
||||
when(storage.create(blobInfoArgumentCaptor.capture(),
|
||||
when(storage.createFrom(blobInfoArgumentCaptor.capture(),
|
||||
inputStreamArgumentCaptor.capture(),
|
||||
blobWriteOptionArgumentCaptor.capture())).thenReturn(blob);
|
||||
|
||||
|
@ -153,24 +152,11 @@ public class PutGCSObjectTest extends AbstractGCSTest {
|
|||
runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS);
|
||||
runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1);
|
||||
|
||||
/** Can't do this any more due to the switch to Java InputStreams which close after an operation **/
|
||||
/*
|
||||
String text;
|
||||
try (final Reader reader = new InputStreamReader(inputStreamArgumentCaptor.getValue())) {
|
||||
text = CharStreams.toString(reader);
|
||||
}
|
||||
|
||||
assertEquals(
|
||||
"FlowFile content should be equal to the Blob content",
|
||||
"test",
|
||||
text
|
||||
);
|
||||
*/
|
||||
|
||||
final List<Storage.BlobWriteOption> blobWriteOptions = blobWriteOptionArgumentCaptor.getAllValues();
|
||||
assertEquals("No BlobWriteOptions should be set",
|
||||
assertEquals(
|
||||
0,
|
||||
blobWriteOptions.size());
|
||||
blobWriteOptions.size(),
|
||||
"No BlobWriteOptions should be set");
|
||||
|
||||
final BlobInfo blobInfo = blobInfoArgumentCaptor.getValue();
|
||||
assertNull(blobInfo.getMd5());
|
||||
|
@ -196,31 +182,16 @@ public class PutGCSObjectTest extends AbstractGCSTest {
|
|||
|
||||
runner.assertValid();
|
||||
|
||||
when(storage.create(blobInfoArgumentCaptor.capture(),
|
||||
when(storage.createFrom(blobInfoArgumentCaptor.capture(),
|
||||
inputStreamArgumentCaptor.capture(),
|
||||
blobWriteOptionArgumentCaptor.capture())).thenReturn(blob);
|
||||
|
||||
runner.enqueue("test", ImmutableMap.of(CoreAttributes.FILENAME.key(), FILENAME));
|
||||
runner.enqueue("test", Collections.singletonMap(CoreAttributes.FILENAME.key(), FILENAME));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS);
|
||||
runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1);
|
||||
|
||||
/*
|
||||
|
||||
String text;
|
||||
try (final Reader reader = new InputStreamReader(inputStreamArgumentCaptor.getValue())) {
|
||||
text = CharStreams.toString(reader);
|
||||
}
|
||||
|
||||
assertEquals(
|
||||
"FlowFile content should be equal to the Blob content",
|
||||
"test",
|
||||
text
|
||||
);
|
||||
|
||||
*/
|
||||
|
||||
final BlobInfo blobInfo = blobInfoArgumentCaptor.getValue();
|
||||
assertEquals(
|
||||
BUCKET,
|
||||
|
@ -255,24 +226,19 @@ public class PutGCSObjectTest extends AbstractGCSTest {
|
|||
assertNull(blobInfo.getMetadata());
|
||||
|
||||
final List<Storage.BlobWriteOption> blobWriteOptions = blobWriteOptionArgumentCaptor.getAllValues();
|
||||
final Set<Storage.BlobWriteOption> blobWriteOptionSet = ImmutableSet.copyOf(blobWriteOptions);
|
||||
final Set<Storage.BlobWriteOption> blobWriteOptionSet = new HashSet<>(blobWriteOptions);
|
||||
|
||||
assertEquals(
|
||||
"Each of the BlobWriteOptions should be unique",
|
||||
blobWriteOptions.size(),
|
||||
blobWriteOptionSet.size()
|
||||
blobWriteOptionSet.size(),
|
||||
"Each of the BlobWriteOptions should be unique"
|
||||
);
|
||||
|
||||
assertTrue("The doesNotExist BlobWriteOption should be set if OVERWRITE is false",
|
||||
blobWriteOptionSet.contains(Storage.BlobWriteOption.doesNotExist()));
|
||||
assertTrue("The md5Match BlobWriteOption should be set if MD5 is non-null",
|
||||
blobWriteOptionSet.contains(Storage.BlobWriteOption.md5Match()));
|
||||
assertTrue("The crc32cMatch BlobWriteOption should be set if CRC32C is non-null",
|
||||
blobWriteOptionSet.contains(Storage.BlobWriteOption.crc32cMatch()));
|
||||
assertTrue("The predefinedAcl BlobWriteOption should be set if ACL is non-null",
|
||||
blobWriteOptionSet.contains(Storage.BlobWriteOption.predefinedAcl(ACL)));
|
||||
assertTrue("The encryptionKey BlobWriteOption should be set if ENCRYPTION_KEY is non-null",
|
||||
blobWriteOptionSet.contains(Storage.BlobWriteOption.encryptionKey(ENCRYPTION_KEY)));
|
||||
assertTrue(blobWriteOptionSet.contains(Storage.BlobWriteOption.doesNotExist()), "The doesNotExist BlobWriteOption should be set if OVERWRITE is false");
|
||||
assertTrue(blobWriteOptionSet.contains(Storage.BlobWriteOption.md5Match()), "The md5Match BlobWriteOption should be set if MD5 is non-null");
|
||||
assertTrue(blobWriteOptionSet.contains(Storage.BlobWriteOption.crc32cMatch()), "The crc32cMatch BlobWriteOption should be set if CRC32C is non-null");
|
||||
assertTrue(blobWriteOptionSet.contains(Storage.BlobWriteOption.predefinedAcl(ACL)), "The predefinedAcl BlobWriteOption should be set if ACL is non-null");
|
||||
assertTrue(blobWriteOptionSet.contains(Storage.BlobWriteOption.encryptionKey(ENCRYPTION_KEY)), "The encryptionKey BlobWriteOption should be set if ENCRYPTION_KEY is non-null");
|
||||
|
||||
}
|
||||
|
||||
|
@ -292,7 +258,7 @@ public class PutGCSObjectTest extends AbstractGCSTest {
|
|||
|
||||
runner.assertValid();
|
||||
|
||||
when(storage.create(blobInfoArgumentCaptor.capture(),
|
||||
when(storage.createFrom(blobInfoArgumentCaptor.capture(),
|
||||
inputStreamArgumentCaptor.capture(),
|
||||
blobWriteOptionArgumentCaptor.capture())).thenReturn(blob);
|
||||
|
||||
|
@ -302,21 +268,6 @@ public class PutGCSObjectTest extends AbstractGCSTest {
|
|||
runner.assertAllFlowFilesTransferred(PutGCSObject.REL_SUCCESS);
|
||||
runner.assertTransferCount(PutGCSObject.REL_SUCCESS, 1);
|
||||
|
||||
|
||||
/*
|
||||
String text;
|
||||
try (final Reader reader = new InputStreamReader(inputStreamArgumentCaptor.getValue())) {
|
||||
text = CharStreams.toString(reader);
|
||||
}
|
||||
|
||||
assertEquals(
|
||||
"FlowFile content should be equal to the Blob content",
|
||||
"test",
|
||||
text
|
||||
);
|
||||
|
||||
*/
|
||||
|
||||
final BlobInfo blobInfo = blobInfoArgumentCaptor.getValue();
|
||||
final Map<String, String> metadata = blobInfo.getMetadata();
|
||||
|
||||
|
@ -346,7 +297,7 @@ public class PutGCSObjectTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
when(storage.create(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class)))
|
||||
when(storage.createFrom(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class)))
|
||||
.thenReturn(blob);
|
||||
|
||||
when(blob.getBucket()).thenReturn(BUCKET);
|
||||
|
@ -414,7 +365,7 @@ public class PutGCSObjectTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
when(storage.create(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class)))
|
||||
when(storage.createFrom(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class)))
|
||||
.thenReturn(blob);
|
||||
|
||||
final Acl.User mockUser = mock(Acl.User.class);
|
||||
|
@ -440,7 +391,7 @@ public class PutGCSObjectTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
when(storage.create(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class)))
|
||||
when(storage.createFrom(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class)))
|
||||
.thenReturn(blob);
|
||||
|
||||
final Acl.Group mockGroup = mock(Acl.Group.class);
|
||||
|
@ -467,7 +418,7 @@ public class PutGCSObjectTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
when(storage.create(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class)))
|
||||
when(storage.createFrom(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class)))
|
||||
.thenReturn(blob);
|
||||
|
||||
final Acl.Domain mockDomain = mock(Acl.Domain.class);
|
||||
|
@ -494,7 +445,7 @@ public class PutGCSObjectTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
when(storage.create(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class)))
|
||||
when(storage.createFrom(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class)))
|
||||
.thenReturn(blob);
|
||||
|
||||
final Acl.Project mockProject = mock(Acl.Project.class);
|
||||
|
@ -520,7 +471,7 @@ public class PutGCSObjectTest extends AbstractGCSTest {
|
|||
addRequiredPropertiesToRunner(runner);
|
||||
runner.assertValid();
|
||||
|
||||
when(storage.create(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class)))
|
||||
when(storage.createFrom(any(BlobInfo.class), any(InputStream.class), any(Storage.BlobWriteOption.class)))
|
||||
.thenThrow(new StorageException(404, "test exception"));
|
||||
|
||||
runner.enqueue("test");
|
||||
|
|
|
@ -25,6 +25,17 @@
|
|||
<artifactId>nifi-gcp-services-api</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<!-- Override Guava version from google-auth-library-oauth2-http -->
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>31.0.1-jre</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
@ -39,10 +50,6 @@
|
|||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>commons-logging</groupId>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
|
@ -53,11 +60,6 @@
|
|||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>jcl-over-slf4j</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>31.0.1-jre</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.stephenc.findbugs</groupId>
|
||||
<artifactId>findbugs-annotations</artifactId>
|
||||
|
|
|
@ -56,6 +56,11 @@
|
|||
<artifactId>jackson-annotations</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>31.0.1-jre</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
|
|
|
@ -60,12 +60,6 @@
|
|||
<version>1.16.0-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>18.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-graph-client-service-api</artifactId>
|
||||
|
|
|
@ -38,7 +38,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
|
||||
import io.grpc.stub.StreamObserver;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
/**
|
||||
* Simple gRPC service that handles receipt of FlowFileRequest messages from external-to-NiFi gRPC
|
||||
|
@ -69,9 +69,9 @@ public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceIm
|
|||
public FlowFileIngestService(final ComponentLog logger,
|
||||
final AtomicReference<ProcessSessionFactory> sessionFactoryReference,
|
||||
final ProcessContext context) {
|
||||
this.context = checkNotNull(context);
|
||||
this.sessionFactoryReference = checkNotNull(sessionFactoryReference);
|
||||
this.logger = checkNotNull(logger);
|
||||
this.context = requireNonNull(context);
|
||||
this.sessionFactoryReference = requireNonNull(sessionFactoryReference);
|
||||
this.logger = requireNonNull(logger);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -36,7 +36,7 @@ import io.grpc.ServerCallHandler;
|
|||
import io.grpc.ServerInterceptor;
|
||||
import io.grpc.Status;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
/**
|
||||
* Simple gRPC service call interceptor that enforces various controls
|
||||
|
@ -59,7 +59,7 @@ public class FlowFileIngestServiceInterceptor implements ServerInterceptor {
|
|||
* @param logger the {@link ComponentLog} for the ListenGRPC processor
|
||||
*/
|
||||
public FlowFileIngestServiceInterceptor(final ComponentLog logger) {
|
||||
this.logger = checkNotNull(logger);
|
||||
this.logger = requireNonNull(logger);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -70,7 +70,7 @@ public class FlowFileIngestServiceInterceptor implements ServerInterceptor {
|
|||
* @return this
|
||||
*/
|
||||
public FlowFileIngestServiceInterceptor enforceDNPattern(final Pattern authorizedDNPattern) {
|
||||
this.authorizedDNpattern = checkNotNull(authorizedDNPattern);
|
||||
this.authorizedDNpattern = requireNonNull(authorizedDNPattern);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.hadoop;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -57,6 +56,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
@ -334,7 +334,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
|
|||
|
||||
protected void processBatchOfFiles(final List<Path> files, final ProcessContext context,
|
||||
final ProcessSession session, FlowFile parentFlowFile) {
|
||||
Preconditions.checkState(parentFlowFile != null, "No parent flowfile for this batch was provided");
|
||||
Objects.requireNonNull(parentFlowFile, "No parent flowfile for this batch was provided");
|
||||
|
||||
// process the batch of files
|
||||
final Configuration conf = getConfiguration();
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hive.ql.io.orc;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.management.ManagementFactory;
|
||||
|
@ -27,6 +25,7 @@ import java.util.ArrayList;
|
|||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.TimeZone;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Function;
|
||||
|
@ -2566,11 +2565,12 @@ public class OrcFlowFileWriter implements Writer, MemoryManager.Callback {
|
|||
public void appendStripe(byte[] stripe, int offset, int length,
|
||||
StripeInformation stripeInfo,
|
||||
OrcProto.StripeStatistics stripeStatistics) throws IOException {
|
||||
checkArgument(stripe != null, "Stripe must not be null");
|
||||
checkArgument(length <= stripe.length,
|
||||
"Specified length must not be greater specified array length");
|
||||
checkArgument(stripeInfo != null, "Stripe information must not be null");
|
||||
checkArgument(stripeStatistics != null,
|
||||
Objects.requireNonNull(stripe, "Stripe must not be null");
|
||||
if (length > stripe.length) {
|
||||
throw new IllegalArgumentException("Specified length must not be greater specified array length");
|
||||
}
|
||||
Objects.requireNonNull(stripeInfo, "Stripe information must not be null");
|
||||
Objects.requireNonNull(stripeStatistics,
|
||||
"Stripe statistics must not be null");
|
||||
|
||||
getStream();
|
||||
|
|
|
@ -56,11 +56,5 @@
|
|||
<artifactId>gson</artifactId>
|
||||
<version>2.7</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>28.0-jre</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.kudu.client.OperationResponse;
|
|||
import org.apache.kudu.client.PartialRow;
|
||||
import org.apache.kudu.client.RowError;
|
||||
import org.apache.kudu.client.SessionConfiguration;
|
||||
import org.apache.kudu.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyDescriptor.Builder;
|
||||
|
@ -378,7 +377,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected void buildPartialRow(Schema schema, PartialRow row, Record record, List<String> fieldNames, boolean ignoreNull, boolean lowercaseFields) {
|
||||
for (String recordFieldName : fieldNames) {
|
||||
String colName = recordFieldName;
|
||||
|
|
|
@ -74,12 +74,6 @@
|
|||
<version>1.16.0-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>28.0-jre</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-json-utils</artifactId>
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.nifi.processors.mongodb;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.MongoClientURI;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
|
@ -41,6 +40,7 @@ import org.junit.jupiter.api.BeforeEach;
|
|||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Arrays;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
|
@ -60,10 +60,10 @@ public class GetMongoIT {
|
|||
|
||||
static {
|
||||
CAL = Calendar.getInstance();
|
||||
DOCUMENTS = Lists.newArrayList(
|
||||
new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3),
|
||||
new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4).append("date_field", CAL.getTime()),
|
||||
new Document("_id", "doc_3").append("a", 1).append("b", 3)
|
||||
DOCUMENTS = Arrays.asList(
|
||||
new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3),
|
||||
new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4).append("date_field", CAL.getTime()),
|
||||
new Document("_id", "doc_3").append("a", 1).append("b", 3)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -319,13 +319,13 @@ class TestQueryNiFiReportingTask {
|
|||
final Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||
properties.put(QueryMetricsUtil.QUERY, "select * from BULLETINS, PROVENANCE where " +
|
||||
"bulletinTimestamp > $bulletinStartTime and bulletinTimestamp <= $bulletinEndTime " +
|
||||
"and timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime");
|
||||
"and timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime LIMIT 10");
|
||||
reportingTask = initTask(properties);
|
||||
currentTime.set(Instant.now().toEpochMilli());
|
||||
reportingTask.onTrigger(context);
|
||||
|
||||
List<Map<String, Object>> rows = mockRecordSinkService.getRows();
|
||||
assertEquals(3003, rows.size());
|
||||
assertEquals(10, rows.size());
|
||||
|
||||
final Bulletin bulletin = BulletinFactory.createBulletin(ComponentType.INPUT_PORT.name().toLowerCase(), "ERROR", "test bulletin 3", "testFlowFileUuid");
|
||||
mockBulletinRepository.addBulletin(bulletin);
|
||||
|
@ -341,7 +341,7 @@ class TestQueryNiFiReportingTask {
|
|||
|
||||
mockProvenanceRepository.registerEvent(prov1002);
|
||||
|
||||
currentTime.set(bulletin.getTimestamp().getTime());
|
||||
currentTime.set(bulletin.getTimestamp().toInstant().plusSeconds(1).toEpochMilli());
|
||||
reportingTask.onTrigger(context);
|
||||
|
||||
List<Map<String, Object>> sameRows = mockRecordSinkService.getRows();
|
||||
|
|
|
@ -375,6 +375,11 @@
|
|||
<artifactId>sshd-sftp</artifactId>
|
||||
<version>2.8.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>31.0.1-jre</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
</project>
|
||||
|
|
|
@ -16,10 +16,8 @@ package com.hortonworks.registries.schemaregistry.client;
|
|||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.hortonworks.registries.auth.KerberosLogin;
|
||||
import com.hortonworks.registries.auth.Login;
|
||||
import com.hortonworks.registries.auth.NOOPLogin;
|
||||
|
@ -101,11 +99,12 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.Configuration.DEFAULT_CONNECTION_TIMEOUT;
|
||||
|
@ -159,8 +158,8 @@ public class SchemaRegistryClient implements ISchemaRegistryClient {
|
|||
private static final String FILES_PATH = SCHEMA_REGISTRY_PATH + "/files/";
|
||||
private static final String SERIALIZERS_PATH = SCHEMA_REGISTRY_PATH + "/serdes/";
|
||||
private static final String REGISTY_CLIENT_JAAS_SECTION = "RegistryClient";
|
||||
private static final Set<Class<?>> DESERIALIZER_INTERFACE_CLASSES = Sets.<Class<?>>newHashSet(SnapshotDeserializer.class, PullDeserializer.class, PushDeserializer.class);
|
||||
private static final Set<Class<?>> SERIALIZER_INTERFACE_CLASSES = Sets.<Class<?>>newHashSet(SnapshotSerializer.class, PullSerializer.class);
|
||||
private static final Set<Class<?>> DESERIALIZER_INTERFACE_CLASSES = new LinkedHashSet<>(Arrays.asList(SnapshotDeserializer.class, PullDeserializer.class, PushDeserializer.class));
|
||||
private static final Set<Class<?>> SERIALIZER_INTERFACE_CLASSES = new LinkedHashSet<>(Arrays.asList(SnapshotSerializer.class, PullSerializer.class));
|
||||
private static final String SEARCH_FIELDS = SCHEMA_REGISTRY_PATH + "/search/schemas/fields";
|
||||
private static final long KERBEROS_SYNCHRONIZATION_TIMEOUT_MS = 180000;
|
||||
|
||||
|
@ -253,7 +252,7 @@ public class SchemaRegistryClient implements ISchemaRegistryClient {
|
|||
.name())).longValue(),
|
||||
schemaMetadataFetcher);
|
||||
|
||||
schemaTextCache = CacheBuilder.newBuilder()
|
||||
schemaTextCache = Caffeine.newBuilder()
|
||||
.maximumSize(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_SIZE
|
||||
.name())).longValue())
|
||||
.expireAfterAccess(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_EXPIRY_INTERVAL_SECS
|
||||
|
@ -592,22 +591,24 @@ public class SchemaRegistryClient implements ISchemaRegistryClient {
|
|||
|
||||
try {
|
||||
return schemaTextCache.get(buildSchemaTextEntry(schemaVersion, schemaName),
|
||||
() -> doAddSchemaVersion(schemaBranchName, schemaName, schemaVersion, disableCanonicalCheck));
|
||||
} catch (ExecutionException e) {
|
||||
Throwable cause = e.getCause();
|
||||
LOG.error("Encountered error while adding new version [{}] of schema [{}] and error [{}]", schemaVersion, schemaName, e);
|
||||
if (cause != null) {
|
||||
if (cause instanceof InvalidSchemaException)
|
||||
throw (InvalidSchemaException) cause;
|
||||
else if (cause instanceof IncompatibleSchemaException) {
|
||||
throw (IncompatibleSchemaException) cause;
|
||||
} else if (cause instanceof SchemaNotFoundException) {
|
||||
throw (SchemaNotFoundException) cause;
|
||||
} else {
|
||||
throw new RuntimeException(cause.getMessage(), cause);
|
||||
}
|
||||
key -> {
|
||||
try {
|
||||
return doAddSchemaVersion(schemaBranchName, schemaName, schemaVersion, disableCanonicalCheck);
|
||||
} catch (final Exception e) {
|
||||
LOG.error("Encountered error while adding new version [{}] of schema [{}] and error [{}]", schemaVersion, schemaName, e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
} catch (final RuntimeException e) {
|
||||
final Throwable cause = e.getCause();
|
||||
if (cause instanceof InvalidSchemaException)
|
||||
throw (InvalidSchemaException) cause;
|
||||
else if (cause instanceof IncompatibleSchemaException) {
|
||||
throw (IncompatibleSchemaException) cause;
|
||||
} else if (cause instanceof SchemaNotFoundException) {
|
||||
throw (SchemaNotFoundException) cause;
|
||||
} else {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1480,8 +1481,8 @@ public class SchemaRegistryClient implements ISchemaRegistryClient {
|
|||
private final byte[] schemaDigest;
|
||||
|
||||
SchemaDigestEntry(String name, byte[] schemaDigest) {
|
||||
Preconditions.checkNotNull(name, "name can not be null");
|
||||
Preconditions.checkNotNull(schemaDigest, "schema digest can not be null");
|
||||
Objects.requireNonNull(name, "name can not be null");
|
||||
Objects.requireNonNull(schemaDigest, "schema digest can not be null");
|
||||
|
||||
this.name = name;
|
||||
this.schemaDigest = schemaDigest;
|
||||
|
|
|
@ -34,6 +34,12 @@
|
|||
<artifactId>commons-compress</artifactId>
|
||||
<version>1.21</version>
|
||||
</dependency>
|
||||
<!-- Override Guava from schema-registry-client -->
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>31.0.1-jre</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
|
|
Loading…
Reference in New Issue