NIFI-4731: BQ Processors and GCP library update.

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Daniel Jimenez 2018-04-29 10:01:34 +02:00 committed by joewitt
parent f6b171d5f7
commit 444caf8a78
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
15 changed files with 1060 additions and 25 deletions

View File

@ -62,7 +62,7 @@
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<artifactId>google-cloud-core</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
@ -70,6 +70,14 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>

View File

@ -43,7 +43,8 @@ public abstract class AbstractGCPProcessor<
.Builder().name("gcp-project-id")
.displayName("Project ID")
.description("Google Cloud Project ID")
.required(true)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.common.collect.ImmutableList;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.util.StringUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Base class for creating processors that connect to GCP BiqQuery service
*/
public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<BigQuery, BigQueryOptions> {
static final int BUFFER_SIZE = 65536;
public static final Relationship REL_SUCCESS =
new Relationship.Builder().name("success")
.description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
.build();
public static final Relationship REL_FAILURE =
new Relationship.Builder().name("failure")
.description("FlowFiles are routed to this relationship if the Google BigQuery operation fails.")
.build();
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
public static final PropertyDescriptor DATASET = new PropertyDescriptor
.Builder().name(BigQueryAttributes.DATASET_ATTR)
.displayName("Dataset")
.description(BigQueryAttributes.DATASET_DESC)
.required(true)
.defaultValue("${" + BigQueryAttributes.DATASET_ATTR + "}")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor
.Builder().name(BigQueryAttributes.TABLE_NAME_ATTR)
.displayName("Table Name")
.description(BigQueryAttributes.TABLE_NAME_DESC)
.required(true)
.defaultValue("${" + BigQueryAttributes.TABLE_NAME_ATTR + "}")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor
.Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
.displayName("Table Schema")
.description(BigQueryAttributes.TABLE_SCHEMA_DESC)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor
.Builder().name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
.displayName("Read Timeout")
.description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
.required(true)
.defaultValue("5 minutes")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.<PropertyDescriptor>builder()
.addAll(super.getSupportedPropertyDescriptors())
.build();
}
@Override
protected BigQueryOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
final Integer retryCount = Integer.valueOf(context.getProperty(RETRY_COUNT).getValue());
BigQueryOptions.Builder builder = BigQueryOptions.newBuilder().setCredentials(credentials);
if (!StringUtils.isBlank(projectId)) {
builder.setProjectId(projectId);
}
return builder
.setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(retryCount).build())
.build();
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
/**
* Attributes associated with the BigQuery processors
*/
public class BigQueryAttributes {
private BigQueryAttributes() {}
public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
public static final String DATASET_ATTR = "bq.dataset";
public static final String DATASET_DESC = "BigQuery dataset";
public static final String TABLE_NAME_ATTR = "bq.table.name";
public static final String TABLE_NAME_DESC = "BigQuery table name";
public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
public static final String TABLE_SCHEMA_DESC = "BigQuery table name";
public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition";
public static final String CREATE_DISPOSITION_DESC = "Options for table creation";
public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
public static final String JOB_ERROR_MSG_DESC = "Load job error message";
public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
// Batch Attributes
public static final String SOURCE_TYPE_ATTR = "bq.load.type";
public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded";
public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
public static final String IGNORE_UNKNOWN_DESC = "Ignore fields not in table schema";
public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition";
public static final String WRITE_DISPOSITION_DESC = "Options for writing to table";
public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
public static final String MAX_BADRECORDS_DESC = "Number of erroneous records to ignore before generating an error";
public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time";
public static final String JOB_CREATE_TIME_DESC = "Time load job creation";
public static final String JOB_END_TIME_ATTR = "bq.job.stat.end_time";
public static final String JOB_END_TIME_DESC = "Time load job ended";
public static final String JOB_START_TIME_ATTR = "bq.job.stat.start_time";
public static final String JOB_START_TIME_DESC = "Time load job started";
public static final String JOB_LINK_ATTR = "bq.job.link";
public static final String JOB_LINK_DESC = "API Link to load job";
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
*
*/
public class BqUtils {
private final static Type gsonSchemaType = new TypeToken<List<Map>>() {
}.getType();
public static Field mapToField(Map fMap) {
String typeStr = fMap.get("type").toString();
String nameStr = fMap.get("name").toString();
String modeStr = fMap.get("mode").toString();
LegacySQLTypeName type = null;
if (typeStr.equals("BOOLEAN")) {
type = LegacySQLTypeName.BOOLEAN;
} else if (typeStr.equals("STRING")) {
type = LegacySQLTypeName.STRING;
} else if (typeStr.equals("BYTES")) {
type = LegacySQLTypeName.BYTES;
} else if (typeStr.equals("INTEGER")) {
type = LegacySQLTypeName.INTEGER;
} else if (typeStr.equals("FLOAT")) {
type = LegacySQLTypeName.FLOAT;
} else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE")
|| typeStr.equals("TIME") || typeStr.equals("DATETIME")) {
type = LegacySQLTypeName.TIMESTAMP;
} else if (typeStr.equals("RECORD")) {
type = LegacySQLTypeName.RECORD;
}
return Field.newBuilder(nameStr, type).setMode(Field.Mode.valueOf(modeStr)).build();
}
public static List<Field> listToFields(List<Map> m_fields) {
List<Field> fields = new ArrayList(m_fields.size());
for (Map m : m_fields) {
fields.add(mapToField(m));
}
return fields;
}
public static Schema schemaFromString(String schemaStr) {
if (schemaStr == null) {
return null;
} else {
Gson gson = new Gson();
List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
return Schema.of(BqUtils.listToFields(fields));
}
}
}

View File

@ -0,0 +1,269 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.collect.ImmutableList;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
import org.apache.nifi.processors.gcp.storage.PutGCSObject;
import org.apache.nifi.util.StringUtils;
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* A processor for batch loading data into a Google BigQuery table
*/
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"google", "google cloud", "bq", "bigquery"})
@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
@WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
@WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
@WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
@WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
@WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
@WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
@WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
})
public class PutBigQueryBatch extends AbstractBigQueryProcessor {
public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
.displayName("Load file type")
.description(BigQueryAttributes.SOURCE_TYPE_DESC)
.required(true)
.allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
.defaultValue(FormatOptions.avro().getType())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
.displayName("Ignore Unknown Values")
.description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
.displayName("Create Disposition")
.description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
.required(true)
.allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
.defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
.displayName("Write Disposition")
.description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
.required(true)
.allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
.defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
.displayName("Max Bad Records")
.description(BigQueryAttributes.MAX_BADRECORDS_DESC)
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
private Schema schemaCache = null;
public PutBigQueryBatch() {
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.<PropertyDescriptor>builder()
.addAll(super.getSupportedPropertyDescriptors())
.add(DATASET)
.add(TABLE_NAME)
.add(TABLE_SCHEMA)
.add(SOURCE_TYPE)
.add(CREATE_DISPOSITION)
.add(WRITE_DISPOSITION)
.add(MAXBAD_RECORDS)
.add(IGNORE_UNKNOWN)
.build();
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
@Override
@OnScheduled
public void onScheduled(ProcessContext context) {
super.onScheduled(context);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final Map<String, String> attributes = new HashMap<>();
final BigQuery bq = getCloudService();
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final TableId tableId;
if (StringUtils.isEmpty(projectId)) {
tableId = TableId.of(dataset, tableName);
} else {
tableId = TableId.of(projectId, dataset, tableName);
}
final String fileType = context.getProperty(SOURCE_TYPE).getValue();
String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
Schema schema = BqUtils.schemaFromString(schemaString);
WriteChannelConfiguration writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(tableId)
.setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
.setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
.setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
.setSchema(schema)
.setFormatOptions(FormatOptions.of(fileType))
.build();
TableDataWriteChannel writer = bq.writer(writeChannelConfiguration);
try {
session.read(flowFile, rawIn -> {
ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
while (readableByteChannel.read(byteBuffer) >= 0) {
byteBuffer.flip();
writer.write(byteBuffer);
byteBuffer.clear();
}
});
writer.close();
Job job = writer.getJob();
PropertyValue property = context.getProperty(READ_TIMEOUT);
Long timePeriod = property.evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
Duration duration = Duration.of(timePeriod, ChronoUnit.SECONDS);
job = job.waitFor(RetryOption.totalTimeout(duration));
if (job != null) {
attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
boolean jobError = (job.getStatus().getError() != null);
if (jobError) {
attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage());
attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason());
attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation());
} else {
// in case it got looped back from error
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR);
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR);
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
}
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
if (jobError) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
} else {
session.transfer(flowFile, REL_SUCCESS);
}
}
} catch (Exception ex) {
getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -16,12 +16,15 @@
*/
package org.apache.nifi.processors.gcp.storage;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.ImmutableList;
import java.net.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@ -31,14 +34,12 @@ import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.proxy.ProxyConfiguration;
import java.net.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.ImmutableList;
/**
* Base class for creating processors which connect to Google Cloud Storage.
@ -86,12 +87,11 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
@Override
protected StorageOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
final String projectId = context.getProperty(PROJECT_ID).getValue();
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
final Integer retryCount = context.getProperty(RETRY_COUNT).asInteger();
StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(projectId)
.setRetrySettings(RetrySettings.newBuilder()
.setMaxAttempts(retryCount)
.build());
@ -113,6 +113,10 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
return ProxyConfiguration.DIRECT_CONFIGURATION;
});
if (!projectId.isEmpty()) {
storageOptionsBuilder.setProjectId(projectId);
}
final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build());

View File

@ -150,8 +150,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
.displayName("Bucket")
.description(BUCKET_DESC)
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
@ -159,7 +159,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
.displayName("Prefix")
.description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor USE_GENERATIONS = new PropertyDescriptor.Builder()
@ -242,9 +243,9 @@ public class ListGCSBucket extends AbstractGCSProcessor {
final long startNanos = System.nanoTime();
final String bucket = context.getProperty(BUCKET).getValue();
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final String prefix = context.getProperty(PREFIX).getValue();
final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();

View File

@ -17,4 +17,5 @@ org.apache.nifi.processors.gcp.storage.FetchGCSObject
org.apache.nifi.processors.gcp.storage.DeleteGCSObject
org.apache.nifi.processors.gcp.storage.ListGCSBucket
org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
/**
* Base class for BigQuery Unit Tests. Provides a framework for creating a TestRunner instance with always-required credentials.
*/
public abstract class AbstractBQTest {
private static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi-test-gcp-project");
private static final Integer RETRIES = 9;
static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
}
public static TestRunner buildNewRunner(Processor processor) throws Exception {
final GCPCredentialsService credentialsService = new GCPCredentialsControllerService();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.addControllerService("gcpCredentialsControllerService", credentialsService);
runner.enableControllerService(credentialsService);
runner.setProperty(AbstractBigQueryProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, "gcpCredentialsControllerService");
runner.setProperty(AbstractBigQueryProcessor.PROJECT_ID, PROJECT_ID);
runner.setProperty(AbstractBigQueryProcessor.RETRY_COUNT, String.valueOf(RETRIES));
runner.assertValid(credentialsService);
return runner;
}
public abstract AbstractBigQueryProcessor getProcessor();
protected abstract void addRequiredPropertiesToRunner(TestRunner runner);
@Mock
protected BigQuery bq;
@Test
public void testBiqQueryOptionsConfiguration() throws Exception {
reset(bq);
final TestRunner runner = buildNewRunner(getProcessor());
final AbstractBigQueryProcessor processor = getProcessor();
final GoogleCredentials mockCredentials = mock(GoogleCredentials.class);
final BigQueryOptions options = processor.getServiceOptions(runner.getProcessContext(),
mockCredentials);
assertEquals("Project IDs should match",
PROJECT_ID, options.getProjectId());
assertEquals("Retry counts should match",
RETRIES.intValue(), options.getRetrySettings().getMaxAttempts());
assertSame("Credentials should be configured correctly",
mockCredentials, options.getCredentials());
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processors.gcp.GCPIntegrationTests;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertNull;
@Category(GCPIntegrationTests.class)
public abstract class AbstractBigQueryIT {
static final String CONTROLLER_SERVICE = "GCPCredentialsService";
protected static BigQuery bigquery;
protected static Dataset dataset;
protected static TestRunner runner;
@BeforeClass
public static void beforeClass() {
dataset = null;
BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder()
.build();
bigquery = bigQueryOptions.getService();
DatasetInfo datasetInfo = DatasetInfo.newBuilder(RemoteBigQueryHelper.generateDatasetName()).build();
dataset = bigquery.create(datasetInfo);
}
@AfterClass
public static void afterClass() {
bigquery.delete(dataset.getDatasetId(), BigQuery.DatasetDeleteOption.deleteContents());
}
protected static void validateNoServiceExceptionAttribute(FlowFile flowFile) {
assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_MSG_ATTR));
assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_REASON_ATTR));
assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR));
}
protected TestRunner setCredentialsControllerService(TestRunner runner) throws InitializationException {
final Map<String, String> propertiesMap = new HashMap<>();
final GCPCredentialsControllerService credentialsControllerService = new GCPCredentialsControllerService();
runner.addControllerService(CONTROLLER_SERVICE, credentialsControllerService, propertiesMap);
runner.enableControllerService(credentialsControllerService);
runner.assertValid(credentialsControllerService);
return runner;
}
}

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.bigquery.FormatOptions;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
public class PutBigQueryBatchIT extends AbstractBigQueryIT {
private static final String TABLE_SCHEMA_STRING = "[\n" +
" {\n" +
" \"description\": \"field 1\",\n" +
" \"mode\": \"REQUIRED\",\n" +
" \"name\": \"field_1\",\n" +
" \"type\": \"STRING\"\n" +
" },\n" +
" {\n" +
" \"description\": \"field 2\",\n" +
" \"mode\": \"REQUIRED\",\n" +
" \"name\": \"field_2\",\n" +
" \"type\": \"STRING\"\n" +
" },\n" +
" {\n" +
" \"description\": \"field 3\",\n" +
" \"mode\": \"NULLABLE\",\n" +
" \"name\": \"field_3\",\n" +
" \"type\": \"STRING\"\n" +
" }\n" +
"]";
@Before
public void setup() {
runner = TestRunners.newTestRunner(PutBigQueryBatch.class);
}
@Test
public void PutBigQueryBatchSmallPayloadTest() throws Exception {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
runner = setCredentialsControllerService(runner);
runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType());
runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING);
String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Daniel is great\"}\r\n";
runner.enqueue(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
runner.run(1);
for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(AbstractBigQueryProcessor.REL_SUCCESS)) {
validateNoServiceExceptionAttribute(flowFile);
}
runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1);
}
@Test
public void PutBigQueryBatchBadRecordTest() throws Exception {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
runner = setCredentialsControllerService(runner);
runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType());
runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING);
String str = "{\"field_1\":\"Daniel is great\"}\r\n";
runner.enqueue(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
runner.run(1);
runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_FAILURE, 1);
}
@Test
public void PutBigQueryBatchLargePayloadTest() throws InitializationException, IOException {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
runner = setCredentialsControllerService(runner);
runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType());
runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING);
// Allow one bad record to deal with the extra line break.
runner.setProperty(BigQueryAttributes.MAX_BADRECORDS_ATTR, String.valueOf(1));
String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Here's to the crazy ones. The misfits. The rebels. The troublemakers." +
" The round pegs in the square holes. The ones who see things differently. They're not fond of rules. And they have no respect" +
" for the status quo. You can quote them, disagree with them, glorify or vilify them. About the only thing you can't do is ignore" +
" them. Because they change things. They push the human race forward. And while some may see them as the crazy ones, we see genius." +
" Because the people who are crazy enough to think they can change the world, are the ones who do.\"}\n";
Path tempFile = Files.createTempFile(methodName, "");
try (BufferedWriter writer = Files.newBufferedWriter(tempFile)) {
for (int i = 0; i < 2; i++) {
for (int ii = 0; ii < 1_000_000; ii++) {
writer.write(str);
}
writer.flush();
}
writer.flush();
}
runner.enqueue(tempFile);
runner.run(1);
for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(AbstractBigQueryProcessor.REL_SUCCESS)) {
validateNoServiceExceptionAttribute(flowFile);
}
runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1);
}
}

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import org.apache.nifi.util.TestRunner;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;
/**
* Unit tests for {@link PutBigQueryBatch}.
*/
public class PutBigQueryBatchTest extends AbstractBQTest {
private static final String TABLENAME = "test_table";
private static final String TABLE_SCHEMA = "[{ \"mode\": \"NULLABLE\", \"name\": \"data\", \"type\": \"STRING\" }]";
private static final String SOURCE_TYPE = FormatOptions.json().getType();
private static final String CREATE_DISPOSITION = JobInfo.CreateDisposition.CREATE_IF_NEEDED.name();
private static final String WRITE_DISPOSITION = JobInfo.WriteDisposition.WRITE_EMPTY.name();
private static final String MAXBAD_RECORDS = "0";
private static final String IGNORE_UNKNOWN = "true";
private static final String READ_TIMEOUT = "5 minutes";
@Mock
BigQuery bq;
@Mock
Table table;
@Mock
Job job;
@Mock
JobStatus jobStatus;
@Mock
JobStatistics stats;
@Mock
TableDataWriteChannel tableDataWriteChannel;
@Before
public void setup() throws Exception {
super.setup();
reset(bq);
reset(table);
reset(job);
reset(jobStatus);
reset(stats);
}
@Override
public AbstractBigQueryProcessor getProcessor() {
return new PutBigQueryBatch() {
@Override
protected BigQuery getCloudService() {
return bq;
}
};
}
@Override
protected void addRequiredPropertiesToRunner(TestRunner runner) {
runner.setProperty(PutBigQueryBatch.DATASET, DATASET);
runner.setProperty(PutBigQueryBatch.TABLE_NAME, TABLENAME);
runner.setProperty(PutBigQueryBatch.TABLE_SCHEMA, TABLE_SCHEMA);
runner.setProperty(PutBigQueryBatch.SOURCE_TYPE, SOURCE_TYPE);
runner.setProperty(PutBigQueryBatch.CREATE_DISPOSITION, CREATE_DISPOSITION);
runner.setProperty(PutBigQueryBatch.WRITE_DISPOSITION, WRITE_DISPOSITION);
runner.setProperty(PutBigQueryBatch.MAXBAD_RECORDS, MAXBAD_RECORDS);
runner.setProperty(PutBigQueryBatch.IGNORE_UNKNOWN, IGNORE_UNKNOWN);
runner.setProperty(PutBigQueryBatch.READ_TIMEOUT, READ_TIMEOUT);
}
@Test
public void testSuccessfulLoad() throws Exception {
when(table.exists()).thenReturn(Boolean.TRUE);
when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job);
when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
when(tableDataWriteChannel.getJob()).thenReturn(job);
when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenReturn(job);
when(job.getStatus()).thenReturn(jobStatus);
when(job.getStatistics()).thenReturn(stats);
when(stats.getCreationTime()).thenReturn(0L);
when(stats.getStartTime()).thenReturn(1L);
when(stats.getEndTime()).thenReturn(2L);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
runner.enqueue("{ \"data\": \"datavalue\" }");
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_SUCCESS);
}
@Test
public void testFailedLoad() throws Exception {
when(table.exists()).thenReturn(Boolean.TRUE);
when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job);
when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
when(tableDataWriteChannel.getJob()).thenReturn(job);
when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenThrow(BigQueryException.class);
when(job.getStatus()).thenReturn(jobStatus);
when(job.getStatistics()).thenReturn(stats);
when(stats.getCreationTime()).thenReturn(0L);
when(stats.getStartTime()).thenReturn(1L);
when(stats.getEndTime()).thenReturn(2L);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
runner.enqueue("{ \"data\": \"datavalue\" }");
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_FAILURE);
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.gcp.storage;
import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Acl;
import com.google.cloud.storage.Blob;

View File

@ -33,6 +33,7 @@
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<version>0.9.0</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>