mirror of
https://github.com/apache/nifi.git
synced 2025-03-06 09:29:33 +00:00
NIFI-4933 BigQuery PR Review Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
444caf8a78
commit
03ef646547
@ -19,16 +19,20 @@ package org.apache.nifi.processors.gcp;
|
||||
import com.google.auth.oauth2.GoogleCredentials;
|
||||
import com.google.cloud.Service;
|
||||
import com.google.cloud.ServiceOptions;
|
||||
import com.google.cloud.TransportOptions;
|
||||
import com.google.cloud.http.HttpTransportOptions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
|
||||
import java.net.Proxy;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -65,7 +69,7 @@ public abstract class AbstractGCPProcessor<
|
||||
"-Djdk.http.auth.tunneling.disabledSchemes=\n" +
|
||||
"-Djdk.http.auth.proxying.disabledSchemes=")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
@ -74,14 +78,14 @@ public abstract class AbstractGCPProcessor<
|
||||
.displayName("Proxy port")
|
||||
.description("Proxy port number")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.addValidator(StandardValidators.PORT_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor
|
||||
.Builder().name("gcp-proxy-user-name")
|
||||
.displayName("Http Proxy Username")
|
||||
.description("Http Proxy Username")
|
||||
.displayName("HTTP Proxy Username")
|
||||
.description("HTTP Proxy Username")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.required(false)
|
||||
@ -89,8 +93,8 @@ public abstract class AbstractGCPProcessor<
|
||||
|
||||
public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor
|
||||
.Builder().name("gcp-proxy-user-password")
|
||||
.displayName("Http Proxy Password")
|
||||
.description("Http Proxy Password")
|
||||
.displayName("HTTP Proxy Password")
|
||||
.description("HTTP Proxy Password")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.required(false)
|
||||
@ -160,4 +164,31 @@ public abstract class AbstractGCPProcessor<
|
||||
* @see <a href="http://googlecloudplatform.github.io/google-cloud-java/0.8.0/apidocs/com/google/cloud/ServiceOptions.html">ServiceOptions</a>
|
||||
*/
|
||||
protected abstract CloudServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials);
|
||||
|
||||
/**
|
||||
* Builds the Transport Options containing the proxy configuration
|
||||
* @param context Context to get properties
|
||||
* @return Transport options object with proxy configuration
|
||||
*/
|
||||
protected TransportOptions getTransportOptions(ProcessContext context) {
|
||||
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context, () -> {
|
||||
final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
|
||||
final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
|
||||
if (proxyHost != null && proxyPort != null && proxyPort > 0) {
|
||||
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
|
||||
final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
|
||||
final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
componentProxyConfig.setProxyType(Proxy.Type.HTTP);
|
||||
componentProxyConfig.setProxyServerHost(proxyHost);
|
||||
componentProxyConfig.setProxyServerPort(proxyPort);
|
||||
componentProxyConfig.setProxyUserName(proxyUser);
|
||||
componentProxyConfig.setProxyUserPassword(proxyPassword);
|
||||
return componentProxyConfig;
|
||||
}
|
||||
return ProxyConfiguration.DIRECT_CONFIGURATION;
|
||||
});
|
||||
|
||||
final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
|
||||
return HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build();
|
||||
}
|
||||
}
|
||||
|
@ -22,15 +22,21 @@ import com.google.auth.oauth2.GoogleCredentials;
|
||||
import com.google.cloud.bigquery.BigQuery;
|
||||
import com.google.cloud.bigquery.BigQueryOptions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
|
||||
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -40,7 +46,9 @@ import java.util.Set;
|
||||
* Base class for creating processors that connect to GCP BiqQuery service
|
||||
*/
|
||||
public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<BigQuery, BigQueryOptions> {
|
||||
|
||||
static final int BUFFER_SIZE = 65536;
|
||||
|
||||
public static final Relationship REL_SUCCESS =
|
||||
new Relationship.Builder().name("success")
|
||||
.description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
|
||||
@ -53,8 +61,8 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
|
||||
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
|
||||
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
|
||||
|
||||
public static final PropertyDescriptor DATASET = new PropertyDescriptor
|
||||
.Builder().name(BigQueryAttributes.DATASET_ATTR)
|
||||
public static final PropertyDescriptor DATASET = new PropertyDescriptor.Builder()
|
||||
.name(BigQueryAttributes.DATASET_ATTR)
|
||||
.displayName("Dataset")
|
||||
.description(BigQueryAttributes.DATASET_DESC)
|
||||
.required(true)
|
||||
@ -63,8 +71,8 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
|
||||
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor
|
||||
.Builder().name(BigQueryAttributes.TABLE_NAME_ATTR)
|
||||
public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
|
||||
.name(BigQueryAttributes.TABLE_NAME_ATTR)
|
||||
.displayName("Table Name")
|
||||
.description(BigQueryAttributes.TABLE_NAME_DESC)
|
||||
.required(true)
|
||||
@ -73,22 +81,22 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
|
||||
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor
|
||||
.Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
|
||||
public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder()
|
||||
.name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
|
||||
.displayName("Table Schema")
|
||||
.description(BigQueryAttributes.TABLE_SCHEMA_DESC)
|
||||
.required(false)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor
|
||||
.Builder().name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
|
||||
public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
|
||||
.displayName("Read Timeout")
|
||||
.description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
|
||||
.required(true)
|
||||
.defaultValue("5 minutes")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.build();
|
||||
|
||||
@ -101,6 +109,10 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.<PropertyDescriptor>builder()
|
||||
.addAll(super.getSupportedPropertyDescriptors())
|
||||
.add(DATASET)
|
||||
.add(TABLE_NAME)
|
||||
.add(TABLE_SCHEMA)
|
||||
.add(READ_TIMEOUT)
|
||||
.build();
|
||||
}
|
||||
|
||||
@ -109,14 +121,40 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
|
||||
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
|
||||
final Integer retryCount = Integer.valueOf(context.getProperty(RETRY_COUNT).getValue());
|
||||
|
||||
BigQueryOptions.Builder builder = BigQueryOptions.newBuilder().setCredentials(credentials);
|
||||
final BigQueryOptions.Builder builder = BigQueryOptions.newBuilder();
|
||||
|
||||
if (!StringUtils.isBlank(projectId)) {
|
||||
builder.setProjectId(projectId);
|
||||
}
|
||||
|
||||
return builder
|
||||
return builder.setCredentials(credentials)
|
||||
.setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(retryCount).build())
|
||||
.setTransportOptions(getTransportOptions(context))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
final Collection<ValidationResult> results = super.customValidate(validationContext);
|
||||
ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS);
|
||||
|
||||
final boolean projectId = validationContext.getProperty(PROJECT_ID).isSet();
|
||||
if (!projectId) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(PROJECT_ID.getName())
|
||||
.valid(false)
|
||||
.explanation("The Project ID must be set for this processor.")
|
||||
.build());
|
||||
}
|
||||
|
||||
customValidate(validationContext, results);
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* If sub-classes needs to implement any custom validation, override this method then add validation result to the results.
|
||||
*/
|
||||
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,9 +17,12 @@
|
||||
|
||||
package org.apache.nifi.processors.gcp.bigquery;
|
||||
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
|
||||
|
||||
import com.google.cloud.bigquery.JobInfo;
|
||||
|
||||
/**
|
||||
* Attributes associated with the BigQuery processors
|
||||
*/
|
||||
@ -28,44 +31,75 @@ public class BigQueryAttributes {
|
||||
|
||||
public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
|
||||
|
||||
// Properties
|
||||
public static final String SOURCE_TYPE_ATTR = "bq.load.type";
|
||||
public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded. Possible values: AVRO, "
|
||||
+ "NEWLINE_DELIMITED_JSON, CSV.";
|
||||
|
||||
public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
|
||||
public static final String IGNORE_UNKNOWN_DESC = "Sets whether BigQuery should allow extra values that are not represented "
|
||||
+ "in the table schema. If true, the extra values are ignored. If false, records with extra columns are treated as "
|
||||
+ "bad records, and if there are too many bad records, an invalid error is returned in the job result. By default "
|
||||
+ "unknown values are not allowed.";
|
||||
|
||||
public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition";
|
||||
public static final String WRITE_DISPOSITION_DESC = "Sets the action that should occur if the destination table already exists.";
|
||||
|
||||
public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
|
||||
public static final String MAX_BADRECORDS_DESC = "Sets the maximum number of bad records that BigQuery can ignore when running "
|
||||
+ "the job. If the number of bad records exceeds this value, an invalid error is returned in the job result. By default "
|
||||
+ "no bad record is ignored.";
|
||||
|
||||
public static final String DATASET_ATTR = "bq.dataset";
|
||||
public static final String DATASET_DESC = "BigQuery dataset";
|
||||
public static final String DATASET_DESC = "BigQuery dataset name (Note - The dataset must exist in GCP)";
|
||||
|
||||
public static final String TABLE_NAME_ATTR = "bq.table.name";
|
||||
public static final String TABLE_NAME_DESC = "BigQuery table name";
|
||||
|
||||
public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
|
||||
public static final String TABLE_SCHEMA_DESC = "BigQuery table name";
|
||||
public static final String TABLE_SCHEMA_DESC = "BigQuery schema in JSON format";
|
||||
|
||||
public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition";
|
||||
public static final String CREATE_DISPOSITION_DESC = "Options for table creation";
|
||||
|
||||
public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
|
||||
public static final String JOB_ERROR_MSG_DESC = "Load job error message";
|
||||
|
||||
public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
|
||||
public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
|
||||
|
||||
public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
|
||||
public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
|
||||
public static final String CREATE_DISPOSITION_DESC = "Sets whether the job is allowed to create new tables";
|
||||
|
||||
public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
|
||||
public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
|
||||
|
||||
public static final String CSV_ALLOW_JAGGED_ROWS_ATTR = "bq.csv.allow.jagged.rows";
|
||||
public static final String CSV_ALLOW_JAGGED_ROWS_DESC = "Set whether BigQuery should accept rows that are missing "
|
||||
+ "trailing optional columns. If true, BigQuery treats missing trailing columns as null values. If false, "
|
||||
+ "records with missing trailing columns are treated as bad records, and if there are too many bad records, "
|
||||
+ "an invalid error is returned in the job result. By default, rows with missing trailing columns are "
|
||||
+ "considered bad records.";
|
||||
|
||||
public static final String CSV_ALLOW_QUOTED_NEW_LINES_ATTR = "bq.csv.allow.quoted.new.lines";
|
||||
public static final String CSV_ALLOW_QUOTED_NEW_LINES_DESC = "Sets whether BigQuery should allow quoted data sections "
|
||||
+ "that contain newline characters in a CSV file. By default quoted newline are not allowed.";
|
||||
|
||||
public static final String CSV_CHARSET_ATTR = "bq.csv.charset";
|
||||
public static final String CSV_CHARSET_DESC = "Sets the character encoding of the data.";
|
||||
|
||||
public static final String CSV_FIELD_DELIMITER_ATTR = "bq.csv.delimiter";
|
||||
public static final String CSV_FIELD_DELIMITER_DESC = "Sets the separator for fields in a CSV file. BigQuery converts "
|
||||
+ "the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the data in its "
|
||||
+ "raw, binary state. BigQuery also supports the escape sequence \"\t\" to specify a tab separator. The default "
|
||||
+ "value is a comma (',').";
|
||||
|
||||
public static final String CSV_QUOTE_ATTR = "bq.csv.quote";
|
||||
public static final String CSV_QUOTE_DESC = "Sets the value that is used to quote data sections in a CSV file. BigQuery "
|
||||
+ "converts the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the "
|
||||
+ "data in its raw, binary state. The default value is a double-quote ('\"'). If your data does not contain quoted "
|
||||
+ "sections, set the property value to an empty string. If your data contains quoted newline characters, you must "
|
||||
+ "also set the Allow Quoted New Lines property to true.";
|
||||
|
||||
public static final String CSV_SKIP_LEADING_ROWS_ATTR = "bq.csv.skip.leading.rows";
|
||||
public static final String CSV_SKIP_LEADING_ROWS_DESC = "Sets the number of rows at the top of a CSV file that BigQuery "
|
||||
+ "will skip when reading the data. The default value is 0. This property is useful if you have header rows in the "
|
||||
+ "file that should be skipped.";
|
||||
|
||||
|
||||
|
||||
// Batch Attributes
|
||||
public static final String SOURCE_TYPE_ATTR = "bq.load.type";
|
||||
public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded";
|
||||
|
||||
public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
|
||||
public static final String IGNORE_UNKNOWN_DESC = "Ignore fields not in table schema";
|
||||
|
||||
public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition";
|
||||
public static final String WRITE_DISPOSITION_DESC = "Options for writing to table";
|
||||
|
||||
public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
|
||||
public static final String MAX_BADRECORDS_DESC = "Number of erroneous records to ignore before generating an error";
|
||||
|
||||
public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time";
|
||||
public static final String JOB_CREATE_TIME_DESC = "Time load job creation";
|
||||
|
||||
@ -77,4 +111,31 @@ public class BigQueryAttributes {
|
||||
|
||||
public static final String JOB_LINK_ATTR = "bq.job.link";
|
||||
public static final String JOB_LINK_DESC = "API Link to load job";
|
||||
|
||||
public static final String JOB_NB_RECORDS_ATTR = "bq.records.count";
|
||||
public static final String JOB_NB_RECORDS_DESC = "Number of records successfully inserted";
|
||||
|
||||
public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
|
||||
public static final String JOB_ERROR_MSG_DESC = "Load job error message";
|
||||
|
||||
public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
|
||||
public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
|
||||
|
||||
public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
|
||||
public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
|
||||
|
||||
|
||||
// Allowable values
|
||||
public static final AllowableValue CREATE_IF_NEEDED = new AllowableValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(),
|
||||
JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the job to create the table if it does not exist.");
|
||||
public static final AllowableValue CREATE_NEVER = new AllowableValue(JobInfo.CreateDisposition.CREATE_NEVER.name(),
|
||||
JobInfo.CreateDisposition.CREATE_NEVER.name(), "Configures the job to fail with a not-found error if the table does not exist.");
|
||||
|
||||
public static final AllowableValue WRITE_EMPTY = new AllowableValue(JobInfo.WriteDisposition.WRITE_EMPTY.name(),
|
||||
JobInfo.WriteDisposition.WRITE_EMPTY.name(), "Configures the job to fail with a duplicate error if the table already exists.");
|
||||
public static final AllowableValue WRITE_APPEND = new AllowableValue(JobInfo.WriteDisposition.WRITE_APPEND.name(),
|
||||
JobInfo.WriteDisposition.WRITE_APPEND.name(), "Configures the job to append data to the table if it already exists.");
|
||||
public static final AllowableValue WRITE_TRUNCATE = new AllowableValue(JobInfo.WriteDisposition.WRITE_TRUNCATE.name(),
|
||||
JobInfo.WriteDisposition.WRITE_TRUNCATE.name(), "Configures the job to overwrite the table data if table already exists.");
|
||||
|
||||
}
|
@ -17,24 +17,24 @@
|
||||
|
||||
package org.apache.nifi.processors.gcp.bigquery;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.cloud.bigquery.Field;
|
||||
import com.google.cloud.bigquery.LegacySQLTypeName;
|
||||
import com.google.cloud.bigquery.Schema;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* Util class for schema manipulation
|
||||
*/
|
||||
public class BqUtils {
|
||||
private final static Type gsonSchemaType = new TypeToken<List<Map>>() {
|
||||
}.getType();
|
||||
public class BigQueryUtils {
|
||||
|
||||
private final static Type gsonSchemaType = new TypeToken<List<Map>>() { }.getType();
|
||||
|
||||
public static Field mapToField(Map fMap) {
|
||||
String typeStr = fMap.get("type").toString();
|
||||
@ -77,7 +77,7 @@ public class BqUtils {
|
||||
} else {
|
||||
Gson gson = new Gson();
|
||||
List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
|
||||
return Schema.of(BqUtils.listToFields(fields));
|
||||
return Schema.of(BigQueryUtils.listToFields(fields));
|
||||
}
|
||||
}
|
||||
|
@ -18,15 +18,16 @@
|
||||
package org.apache.nifi.processors.gcp.bigquery;
|
||||
|
||||
import com.google.cloud.RetryOption;
|
||||
import com.google.cloud.bigquery.BigQuery;
|
||||
import com.google.cloud.bigquery.FormatOptions;
|
||||
import com.google.cloud.bigquery.Job;
|
||||
import com.google.cloud.bigquery.JobInfo;
|
||||
import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
|
||||
import com.google.cloud.bigquery.Schema;
|
||||
import com.google.cloud.bigquery.TableDataWriteChannel;
|
||||
import com.google.cloud.bigquery.TableId;
|
||||
import com.google.cloud.bigquery.WriteChannelConfiguration;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
@ -35,7 +36,10 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.LogLevel;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
@ -51,6 +55,7 @@ import org.threeten.bp.temporal.ChronoUnit;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -59,40 +64,58 @@ import java.util.concurrent.TimeUnit;
|
||||
/**
|
||||
* A processor for batch loading data into a Google BigQuery table
|
||||
*/
|
||||
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@Tags({"google", "google cloud", "bq", "bigquery"})
|
||||
@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
|
||||
@CapabilityDescription("Batch loads flow files content to a Google BigQuery table.")
|
||||
@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
|
||||
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
|
||||
@WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
|
||||
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
|
||||
})
|
||||
|
||||
public class PutBigQueryBatch extends AbstractBigQueryProcessor {
|
||||
|
||||
private static final List<String> TYPES = Arrays.asList(FormatOptions.json().getType(), FormatOptions.csv().getType(), FormatOptions.avro().getType());
|
||||
|
||||
private static final Validator FORMAT_VALIDATOR = new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
|
||||
final ValidationResult.Builder builder = new ValidationResult.Builder();
|
||||
builder.subject(subject).input(input);
|
||||
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
|
||||
return builder.valid(true).explanation("Contains Expression Language").build();
|
||||
}
|
||||
|
||||
if(TYPES.contains(input.toUpperCase())) {
|
||||
builder.valid(true);
|
||||
} else {
|
||||
builder.valid(false).explanation("Load File Type must be one of the following options: " + StringUtils.join(TYPES, ", "));
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
};
|
||||
|
||||
public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
|
||||
.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
|
||||
.displayName("Load file type")
|
||||
.description(BigQueryAttributes.SOURCE_TYPE_DESC)
|
||||
.required(true)
|
||||
.allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
|
||||
.defaultValue(FormatOptions.avro().getType())
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(FORMAT_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
|
||||
@ -102,7 +125,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
.defaultValue("false")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
|
||||
@ -110,8 +133,8 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
|
||||
.displayName("Create Disposition")
|
||||
.description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
|
||||
.required(true)
|
||||
.allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
|
||||
.defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
|
||||
.allowableValues(BigQueryAttributes.CREATE_IF_NEEDED, BigQueryAttributes.CREATE_NEVER)
|
||||
.defaultValue(BigQueryAttributes.CREATE_IF_NEEDED.getValue())
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
@ -120,8 +143,8 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
|
||||
.displayName("Write Disposition")
|
||||
.description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
|
||||
.required(true)
|
||||
.allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
|
||||
.defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
|
||||
.allowableValues(BigQueryAttributes.WRITE_EMPTY, BigQueryAttributes.WRITE_APPEND, BigQueryAttributes.WRITE_TRUNCATE)
|
||||
.defaultValue(BigQueryAttributes.WRITE_EMPTY.getValue())
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
@ -134,34 +157,78 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.build();
|
||||
|
||||
private Schema schemaCache = null;
|
||||
public static final PropertyDescriptor CSV_ALLOW_JAGGED_ROWS = new PropertyDescriptor.Builder()
|
||||
.name(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_ATTR)
|
||||
.displayName("CSV Input - Allow Jagged Rows")
|
||||
.description(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_DESC)
|
||||
.required(true)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("false")
|
||||
.build();
|
||||
|
||||
public PutBigQueryBatch() {
|
||||
public static final PropertyDescriptor CSV_ALLOW_QUOTED_NEW_LINES = new PropertyDescriptor.Builder()
|
||||
.name(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_ATTR)
|
||||
.displayName("CSV Input - Allow Quoted New Lines")
|
||||
.description(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_DESC)
|
||||
.required(true)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("false")
|
||||
.build();
|
||||
|
||||
}
|
||||
public static final PropertyDescriptor CSV_CHARSET = new PropertyDescriptor.Builder()
|
||||
.name(BigQueryAttributes.CSV_CHARSET_ATTR)
|
||||
.displayName("CSV Input - Character Set")
|
||||
.description(BigQueryAttributes.CSV_CHARSET_DESC)
|
||||
.required(true)
|
||||
.allowableValues("UTF-8", "ISO-8859-1")
|
||||
.defaultValue("UTF-8")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CSV_FIELD_DELIMITER = new PropertyDescriptor.Builder()
|
||||
.name(BigQueryAttributes.CSV_FIELD_DELIMITER_ATTR)
|
||||
.displayName("CSV Input - Field Delimiter")
|
||||
.description(BigQueryAttributes.CSV_FIELD_DELIMITER_DESC)
|
||||
.required(true)
|
||||
.defaultValue(",")
|
||||
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CSV_QUOTE = new PropertyDescriptor.Builder()
|
||||
.name(BigQueryAttributes.CSV_QUOTE_ATTR)
|
||||
.displayName("CSV Input - Quote")
|
||||
.description(BigQueryAttributes.CSV_QUOTE_DESC)
|
||||
.required(true)
|
||||
.defaultValue("\"")
|
||||
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CSV_SKIP_LEADING_ROWS = new PropertyDescriptor.Builder()
|
||||
.name(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_ATTR)
|
||||
.displayName("CSV Input - Skip Leading Rows")
|
||||
.description(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_DESC)
|
||||
.required(true)
|
||||
.defaultValue("0")
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ImmutableList.<PropertyDescriptor>builder()
|
||||
.addAll(super.getSupportedPropertyDescriptors())
|
||||
.add(DATASET)
|
||||
.add(TABLE_NAME)
|
||||
.add(TABLE_SCHEMA)
|
||||
.add(SOURCE_TYPE)
|
||||
.add(CREATE_DISPOSITION)
|
||||
.add(WRITE_DISPOSITION)
|
||||
.add(MAXBAD_RECORDS)
|
||||
.add(IGNORE_UNKNOWN)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.dynamic(true)
|
||||
.add(CSV_ALLOW_JAGGED_ROWS)
|
||||
.add(CSV_ALLOW_QUOTED_NEW_LINES)
|
||||
.add(CSV_CHARSET)
|
||||
.add(CSV_FIELD_DELIMITER)
|
||||
.add(CSV_QUOTE)
|
||||
.add(CSV_SKIP_LEADING_ROWS)
|
||||
.build();
|
||||
}
|
||||
|
||||
@ -178,13 +245,10 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
|
||||
return;
|
||||
}
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
|
||||
final BigQuery bq = getCloudService();
|
||||
|
||||
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
|
||||
final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String type = context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
||||
final TableId tableId;
|
||||
if (StringUtils.isEmpty(projectId)) {
|
||||
@ -193,70 +257,93 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
|
||||
tableId = TableId.of(projectId, dataset, tableName);
|
||||
}
|
||||
|
||||
final String fileType = context.getProperty(SOURCE_TYPE).getValue();
|
||||
|
||||
String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
|
||||
Schema schema = BqUtils.schemaFromString(schemaString);
|
||||
|
||||
WriteChannelConfiguration writeChannelConfiguration =
|
||||
WriteChannelConfiguration.newBuilder(tableId)
|
||||
.setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
|
||||
.setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
|
||||
.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
|
||||
.setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
|
||||
.setSchema(schema)
|
||||
.setFormatOptions(FormatOptions.of(fileType))
|
||||
.build();
|
||||
|
||||
TableDataWriteChannel writer = bq.writer(writeChannelConfiguration);
|
||||
|
||||
try {
|
||||
session.read(flowFile, rawIn -> {
|
||||
ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
|
||||
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
|
||||
while (readableByteChannel.read(byteBuffer) >= 0) {
|
||||
byteBuffer.flip();
|
||||
writer.write(byteBuffer);
|
||||
byteBuffer.clear();
|
||||
}
|
||||
});
|
||||
|
||||
writer.close();
|
||||
FormatOptions formatOption;
|
||||
|
||||
Job job = writer.getJob();
|
||||
PropertyValue property = context.getProperty(READ_TIMEOUT);
|
||||
Long timePeriod = property.evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
|
||||
Duration duration = Duration.of(timePeriod, ChronoUnit.SECONDS);
|
||||
job = job.waitFor(RetryOption.totalTimeout(duration));
|
||||
if(type.equals(FormatOptions.csv().getType())) {
|
||||
formatOption = FormatOptions.csv().toBuilder()
|
||||
.setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean())
|
||||
.setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean())
|
||||
.setEncoding(context.getProperty(CSV_CHARSET).getValue())
|
||||
.setFieldDelimiter(context.getProperty(CSV_FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue())
|
||||
.setQuote(context.getProperty(CSV_QUOTE).evaluateAttributeExpressions(flowFile).getValue())
|
||||
.setSkipLeadingRows(context.getProperty(CSV_SKIP_LEADING_ROWS).evaluateAttributeExpressions(flowFile).asInteger())
|
||||
.build();
|
||||
} else {
|
||||
formatOption = FormatOptions.of(type);
|
||||
}
|
||||
|
||||
if (job != null) {
|
||||
attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
|
||||
attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
|
||||
attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
|
||||
attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
|
||||
final Schema schema = BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue());
|
||||
final WriteChannelConfiguration writeChannelConfiguration =
|
||||
WriteChannelConfiguration.newBuilder(tableId)
|
||||
.setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
|
||||
.setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
|
||||
.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
|
||||
.setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
|
||||
.setSchema(schema)
|
||||
.setFormatOptions(formatOption)
|
||||
.build();
|
||||
|
||||
boolean jobError = (job.getStatus().getError() != null);
|
||||
try ( TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration) ) {
|
||||
|
||||
if (jobError) {
|
||||
attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage());
|
||||
attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason());
|
||||
attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation());
|
||||
} else {
|
||||
// in case it got looped back from error
|
||||
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR);
|
||||
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR);
|
||||
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
|
||||
}
|
||||
session.read(flowFile, rawIn -> {
|
||||
ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
|
||||
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
|
||||
while (readableByteChannel.read(byteBuffer) >= 0) {
|
||||
byteBuffer.flip();
|
||||
writer.write(byteBuffer);
|
||||
byteBuffer.clear();
|
||||
}
|
||||
});
|
||||
|
||||
if (!attributes.isEmpty()) {
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
}
|
||||
// writer must be closed to get the job
|
||||
writer.close();
|
||||
|
||||
if (jobError) {
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
} else {
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
Job job = writer.getJob();
|
||||
Long timePeriod = context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.SECONDS);
|
||||
Duration waitFor = Duration.of(timePeriod, ChronoUnit.SECONDS);
|
||||
job = job.waitFor(RetryOption.totalTimeout(waitFor));
|
||||
|
||||
if (job != null) {
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
|
||||
attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
|
||||
attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
|
||||
attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
|
||||
attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
|
||||
|
||||
boolean jobError = (job.getStatus().getError() != null);
|
||||
|
||||
if (jobError) {
|
||||
attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage());
|
||||
attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason());
|
||||
attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation());
|
||||
} else {
|
||||
// in case it got looped back from error
|
||||
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR);
|
||||
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR);
|
||||
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
|
||||
|
||||
// add the number of records successfully added
|
||||
if(job.getStatistics() instanceof LoadStatistics) {
|
||||
final LoadStatistics stats = (LoadStatistics) job.getStatistics();
|
||||
attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(stats.getOutputRows()));
|
||||
}
|
||||
}
|
||||
|
||||
if (!attributes.isEmpty()) {
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
}
|
||||
|
||||
if (jobError) {
|
||||
getLogger().log(LogLevel.WARN, job.getStatus().getError().getMessage());
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
} else {
|
||||
session.getProvenanceReporter().send(flowFile, job.getSelfLink(), job.getStatistics().getEndTime() - job.getStatistics().getStartTime());
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -266,4 +353,5 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -18,13 +18,17 @@ package org.apache.nifi.processors.gcp.pubsub;
|
||||
|
||||
import com.google.auth.oauth2.GoogleCredentials;
|
||||
import com.google.cloud.ServiceOptions;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
@ -62,4 +66,21 @@ public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor {
|
||||
protected ServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
final Collection<ValidationResult> results = super.customValidate(validationContext);
|
||||
|
||||
final boolean projectId = validationContext.getProperty(PROJECT_ID).isSet();
|
||||
if (!projectId) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(PROJECT_ID.getName())
|
||||
.valid(false)
|
||||
.explanation("The Project ID must be set for this processor.")
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -16,8 +16,6 @@
|
||||
*/
|
||||
package org.apache.nifi.processors.gcp.storage;
|
||||
|
||||
import java.net.Proxy;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
@ -36,7 +34,6 @@ import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
|
||||
import com.google.api.gax.retrying.RetrySettings;
|
||||
import com.google.auth.oauth2.GoogleCredentials;
|
||||
import com.google.cloud.http.HttpTransportOptions;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.cloud.storage.StorageOptions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
@ -73,7 +70,7 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
|
||||
|
||||
@Override
|
||||
protected final Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
final Collection<ValidationResult> results = new ArrayList<>();
|
||||
final Collection<ValidationResult> results = super.customValidate(validationContext);
|
||||
ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS);
|
||||
customValidate(validationContext, results);
|
||||
return results;
|
||||
@ -96,30 +93,10 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
|
||||
.setMaxAttempts(retryCount)
|
||||
.build());
|
||||
|
||||
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context, () -> {
|
||||
final String proxyHost = context.getProperty(PROXY_HOST).getValue();
|
||||
final Integer proxyPort = context.getProperty(PROXY_PORT).asInteger();
|
||||
if (proxyHost != null && proxyPort != null && proxyPort > 0) {
|
||||
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
|
||||
final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
|
||||
final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
componentProxyConfig.setProxyType(Proxy.Type.HTTP);
|
||||
componentProxyConfig.setProxyServerHost(proxyHost);
|
||||
componentProxyConfig.setProxyServerPort(proxyPort);
|
||||
componentProxyConfig.setProxyUserName(proxyUser);
|
||||
componentProxyConfig.setProxyUserPassword(proxyPassword);
|
||||
return componentProxyConfig;
|
||||
}
|
||||
return ProxyConfiguration.DIRECT_CONFIGURATION;
|
||||
});
|
||||
|
||||
if (!projectId.isEmpty()) {
|
||||
storageOptionsBuilder.setProjectId(projectId);
|
||||
}
|
||||
|
||||
final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
|
||||
storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build());
|
||||
|
||||
return storageOptionsBuilder.build();
|
||||
return storageOptionsBuilder.setTransportOptions(getTransportOptions(context)).build();
|
||||
}
|
||||
}
|
||||
|
@ -33,7 +33,7 @@
|
||||
<dependency>
|
||||
<groupId>com.google.auth</groupId>
|
||||
<artifactId>google-auth-library-oauth2-http</artifactId>
|
||||
<version>0.9.0</version>
|
||||
<version>0.12.0</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
|
@ -27,14 +27,15 @@
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<properties>
|
||||
<google.cloud.sdk.version>0.47.0-alpha</google.cloud.sdk.version>
|
||||
<google.cloud.sdk.version>0.71.0-alpha</google.cloud.sdk.version>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<!-- https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-bom -->
|
||||
<groupId>com.google.cloud</groupId>
|
||||
<artifactId>google-cloud</artifactId>
|
||||
<artifactId>google-cloud-bom</artifactId>
|
||||
<version>${google.cloud.sdk.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
|
Loading…
x
Reference in New Issue
Block a user