NIFI-5484: Fixed PutHive3Streaming to use the Hive Metastore URI property (to include multiple URIs)

NIFI-5484: Incorporated review comments, added unit test for new validator

This closes #2934
This commit is contained in:
Matthew Burgess 2018-08-02 21:39:48 -04:00 committed by Matt Gilman
parent 5e6c43f83e
commit 3d546b8d87
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
4 changed files with 96 additions and 8 deletions

View File

@ -36,6 +36,7 @@ import java.text.ParseException;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@ -390,6 +391,24 @@ public class StandardValidators {
}
};
public static final Validator URI_LIST_VALIDATOR = (subject, input, context) -> {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
if (input == null || input.isEmpty()) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid URI, value is missing or empty").valid(false).build();
}
Optional<ValidationResult> invalidUri = Arrays.stream(input.split(","))
.filter(uri -> uri != null && !uri.trim().isEmpty())
.map(String::trim)
.map((uri) -> StandardValidators.URI_VALIDATOR.validate(subject,uri,context)).filter((uri) -> !uri.isValid()).findFirst();
return invalidUri.orElseGet(() -> new ValidationResult.Builder().subject(subject).input(input).explanation("Valid URI(s)").valid(true).build());
};
public static final Validator REGULAR_EXPRESSION_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, false);
public static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = new Validator() {

View File

@ -288,4 +288,24 @@ public class TestStandardValidators {
vr = val.validate("foo", "2016-01-01T01:01:01.000Z", vc);
assertTrue(vr.isValid());
}
@Test
public void testURIListValidator() {
Validator val = StandardValidators.URI_LIST_VALIDATOR;
ValidationContext vc = mock(ValidationContext.class);
ValidationResult vr = val.validate("foo", null, vc);
assertFalse(vr.isValid());
vr = val.validate("foo", "", vc);
assertFalse(vr.isValid());
vr = val.validate("foo", "/no_scheme", vc);
assertTrue(vr.isValid());
vr = val.validate("foo", "http://localhost 8080, https://host2:8080 ", vc);
assertFalse(vr.isValid());
vr = val.validate("foo", "http://localhost , https://host2:8080 ", vc);
assertTrue(vr.isValid());
}
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.hive;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.hive.streaming.ConnectionError;
@ -81,7 +82,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
@ -115,12 +115,12 @@ public class PutHive3Streaming extends AbstractProcessor {
static final PropertyDescriptor METASTORE_URI = new PropertyDescriptor.Builder()
.name("hive3-stream-metastore-uri")
.displayName("Hive Metastore URI")
.description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
+ "Hive metastore is 9043.")
.required(true)
.description("The URI location(s) for the Hive metastore. This is a comma-separated list of Hive metastore URIs; note that this is not the location of the Hive Server. "
+ "The default port for the Hive metastore is 9043. If this field is not set, then the 'hive.metastore.uris' property from any provided configuration resources "
+ "will be used, and if none are provided, then the default value from a default hive-site.xml will be used (usually thrift://localhost:9083).")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.URI_VALIDATOR)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
.addValidator(StandardValidators.URI_LIST_VALIDATOR)
.build();
static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
@ -354,13 +354,26 @@ public class PutHive3Streaming extends AbstractProcessor {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final ComponentLog log = getLogger();
final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
String metastoreURIs = null;
if (context.getProperty(METASTORE_URI).isSet()) {
metastoreURIs = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(metastoreURIs)) {
// Shouldn't be empty at this point, log an error, penalize the flow file, and return
log.error("The '" + METASTORE_URI.getDisplayName() + "' property evaluated to null or empty, penalizing flow file, routing to failure");
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
}
final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName)
// Override the Hive Metastore URIs in the config if set by the user
if (metastoreURIs != null) {
hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs);
}
HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName)
.withHiveConf(hiveConfig)
.withAutoCreatePartitions(autoCreatePartitions)
.withCallTimeout(callTimeout)

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.serde.serdeConstants;
@ -288,6 +289,35 @@ public class TestPutHive3Streaming {
assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
}
@Test
public void onTriggerMultipleURIs() throws Exception {
configure(processor, 1);
runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://host1:9083,thrift://host2:9083");
runner.setProperty(PutHive3Streaming.DB_NAME, "default");
runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
}
@Test
public void onTriggerURIFromConfigFile() throws Exception {
configure(processor, 1);
runner.setProperty(PutHive3Streaming.DB_NAME, "default");
runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
runner.enqueue(new byte[0]);
runner.run();
runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0);
assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
}
@Test
public void onTriggerComplex() throws Exception {
configureComplex(processor, 10, -1, null);
@ -662,6 +692,12 @@ public class TestPutHive3Streaming {
@Override
StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
// Test here to ensure the 'hive.metastore.uris' property matches the options.getMetastoreUri() value (if it is set)
String userDefinedMetastoreURI = options.getMetaStoreURI();
if (null != userDefinedMetastoreURI) {
assertEquals(userDefinedMetastoreURI, options.getHiveConf().get(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName()));
}
if (generateConnectFailure) {
throw new StubConnectionError("Unit Test - Connection Error");
}