NIFI-5156: Updated GCP SDK to latest version

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2680.
This commit is contained in:
zenfenan 2018-05-05 14:45:40 +05:30 committed by Pierre Villard
parent 7a4990e7fe
commit f742a3a6ac
11 changed files with 40 additions and 54 deletions

View File

@ -63,21 +63,12 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.google.cloud</groupId> <groupId>com.google.cloud</groupId>
<artifactId>google-cloud</artifactId> <artifactId>google-cloud-storage</artifactId>
<version>0.8.0</version>
<exclusions> <exclusions>
<exclusion> <!-- NIFI-3089 -->
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</exclusion>
<exclusion> <exclusion>
<groupId>com.google.code.findbugs</groupId> <groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId> <artifactId>jsr305</artifactId>
</exclusion> </exclusion>
<exclusion> <!-- NIFI-5124 -->
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
</exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency> <dependency>
@ -85,11 +76,6 @@
<artifactId>json</artifactId> <artifactId>json</artifactId>
<version>1.8</version> <version>1.8</version>
</dependency> </dependency>
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>1.3.3</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -17,8 +17,8 @@
package org.apache.nifi.processors.gcp; package org.apache.nifi.processors.gcp;
import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.HttpServiceOptions;
import com.google.cloud.Service; import com.google.cloud.Service;
import com.google.cloud.ServiceOptions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
@ -36,8 +36,7 @@ import java.util.List;
*/ */
public abstract class AbstractGCPProcessor< public abstract class AbstractGCPProcessor<
CloudService extends Service<CloudServiceOptions>, CloudService extends Service<CloudServiceOptions>,
CloudServiceRpc, CloudServiceOptions extends ServiceOptions<CloudService, CloudServiceOptions>> extends AbstractProcessor {
CloudServiceOptions extends HttpServiceOptions<CloudService, CloudServiceRpc, CloudServiceOptions>> extends AbstractProcessor {
public static final PropertyDescriptor PROJECT_ID = new PropertyDescriptor public static final PropertyDescriptor PROJECT_ID = new PropertyDescriptor
.Builder().name("gcp-project-id") .Builder().name("gcp-project-id")

View File

@ -18,12 +18,12 @@ package org.apache.nifi.processors.gcp.storage;
import com.google.api.client.http.HttpTransport; import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport; import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.http.HttpTransportFactory; import com.google.auth.http.HttpTransportFactory;
import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.RetryParams; import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.spi.StorageRpc;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -44,7 +44,7 @@ import java.util.Set;
* *
* Every GCS processor operation requires a bucket, whether it's reading or writing from said bucket. * Every GCS processor operation requires a bucket, whether it's reading or writing from said bucket.
*/ */
public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage, StorageRpc, StorageOptions> { public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage, StorageOptions> {
public static final Relationship REL_SUCCESS = public static final Relationship REL_SUCCESS =
new Relationship.Builder().name("success") new Relationship.Builder().name("success")
.description("FlowFiles are routed to this relationship after a successful Google Cloud Storage operation.") .description("FlowFiles are routed to this relationship after a successful Google Cloud Storage operation.")
@ -80,25 +80,19 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder() StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder()
.setCredentials(credentials) .setCredentials(credentials)
.setProjectId(projectId) .setProjectId(projectId)
.setRetryParams(RetryParams.newBuilder() .setRetrySettings(RetrySettings.newBuilder()
.setRetryMaxAttempts(retryCount) .setMaxAttempts(retryCount)
.setRetryMinAttempts(retryCount)
.build()); .build());
if (!StringUtils.isBlank(proxyHost) && proxyPort > 0) { if (!StringUtils.isBlank(proxyHost) && proxyPort > 0) {
storageOptionsBuilder.setHttpTransportFactory(new HttpTransportFactory() { storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(new HttpTransportFactory() {
@Override @Override
public HttpTransport create() { public HttpTransport create() {
final HttpTransport transport = new NetHttpTransport.Builder() return new NetHttpTransport.Builder()
.setProxy( .setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)))
new Proxy(
Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)
)
)
.build(); .build();
return transport;
} }
}); }).build());
} }
return storageOptionsBuilder.build(); return storageOptionsBuilder.build();
} }

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.nifi.processors.gcp.storage; package org.apache.nifi.processors.gcp.storage;
import com.google.cloud.Page; import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Acl; import com.google.cloud.storage.Acl;
import com.google.cloud.storage.Blob; import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.BlobInfo;

View File

@ -17,8 +17,8 @@
package org.apache.nifi.processors.gcp.credentials.factory; package org.apache.nifi.processors.gcp.credentials.factory;
import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.HttpServiceOptions;
import com.google.cloud.Service; import com.google.cloud.Service;
import com.google.cloud.ServiceOptions;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
@ -53,8 +53,8 @@ public class MockCredentialsFactoryProcessor extends AbstractGCPProcessor {
} }
@Override @Override
protected HttpServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) { protected ServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
HttpServiceOptions mockOptions = mock(HttpServiceOptions.class); ServiceOptions mockOptions = mock(ServiceOptions.class);
Service mockService = mock(Service.class); Service mockService = mock(Service.class);
when(mockOptions.getService()).thenReturn(mockService); when(mockOptions.getService()).thenReturn(mockService);

View File

@ -17,8 +17,8 @@
package org.apache.nifi.processors.gcp.credentials.service; package org.apache.nifi.processors.gcp.credentials.service;
import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.HttpServiceOptions;
import com.google.cloud.Service; import com.google.cloud.Service;
import com.google.cloud.ServiceOptions;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -43,8 +43,8 @@ public class MockCredentialsServiceProcessor extends AbstractGCPProcessor {
} }
@Override @Override
protected HttpServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) { protected ServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
HttpServiceOptions mockOptions = mock(HttpServiceOptions.class); ServiceOptions mockOptions = mock(ServiceOptions.class);
Service mockService = mock(Service.class); Service mockService = mock(Service.class);
when(mockOptions.getService()).thenReturn(mockService); when(mockOptions.getService()).thenReturn(mockService);

View File

@ -33,7 +33,6 @@ import org.junit.BeforeClass;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -78,10 +77,9 @@ public abstract class AbstractGCSIT {
public static void tearDown() { public static void tearDown() {
try { try {
// Empty the bucket before deleting it. // Empty the bucket before deleting it.
Iterator<Blob> blobIterator = storage.list(BUCKET, Storage.BlobListOption.versions(true)).iterateAll(); Iterable<Blob> blobIterable = storage.list(BUCKET, Storage.BlobListOption.versions(true)).iterateAll();
while(blobIterator.hasNext()) { for (final Blob blob : blobIterable) {
Blob blob = blobIterator.next();
storage.delete(blob.getBlobId()); storage.delete(blob.getBlobId());
} }

View File

@ -88,10 +88,7 @@ public abstract class AbstractGCSTest {
PROJECT_ID, options.getProjectId()); PROJECT_ID, options.getProjectId());
assertEquals("Retry counts should match", assertEquals("Retry counts should match",
RETRIES.intValue(), options.getRetryParams().getRetryMinAttempts()); RETRIES.intValue(), options.getRetrySettings().getMaxAttempts());
assertEquals("Retry counts should match",
RETRIES.intValue(), options.getRetryParams().getRetryMaxAttempts());
assertSame("Credentials should be configured correctly", assertSame("Credentials should be configured correctly",
mockCredentials, options.getCredentials()); mockCredentials, options.getCredentials());

View File

@ -141,11 +141,6 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
} }
@Override
public void chunkSize(int i) {
}
@Override @Override
public void setChunkSize(int i) { public void setChunkSize(int i) {

View File

@ -16,7 +16,8 @@
*/ */
package org.apache.nifi.processors.gcp.storage; package org.apache.nifi.processors.gcp.storage;
import com.google.cloud.Page;
import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Acl; import com.google.cloud.storage.Acl;
import com.google.cloud.storage.Blob; import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.BlobInfo;

View File

@ -26,6 +26,22 @@
<version>1.7.0-SNAPSHOT</version> <version>1.7.0-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<properties>
<google.cloud.sdk.version>0.47.0-alpha</google.cloud.sdk.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud</artifactId>
<version>${google.cloud.sdk.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<modules> <modules>
<module>nifi-gcp-services-api</module> <module>nifi-gcp-services-api</module>
<module>nifi-gcp-services-api-nar</module> <module>nifi-gcp-services-api-nar</module>