NIFI-2765: Fixed Kerberos support for PutHiveStreaming

This closes #1012.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Matt Burgess 2016-09-13 10:20:27 -04:00 committed by Bryan Bende
parent b48355e75a
commit 6c9291ad53
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
1 changed files with 20 additions and 25 deletions

View File

@ -238,8 +238,8 @@ public class PutHiveStreaming extends AbstractProcessor {
+ "can be used to provide a retry capability since full rollback is not possible.")
.build();
private final static List<PropertyDescriptor> propertyDescriptors;
private final static Set<Relationship> relationships;
private List<PropertyDescriptor> propertyDescriptors;
private Set<Relationship> relationships;
private static final long TICKET_RENEWAL_PERIOD = 60000;
@ -258,21 +258,24 @@ public class PutHiveStreaming extends AbstractProcessor {
protected Map<HiveEndPoint, HiveWriter> allWriters;
/*
* Will ensure that the list of property descriptors is build only once.
* Will also create a Set of relationships
*/
static {
propertyDescriptors = new ArrayList<>();
propertyDescriptors.add(METASTORE_URI);
propertyDescriptors.add(HIVE_CONFIGURATION_RESOURCES);
propertyDescriptors.add(DB_NAME);
propertyDescriptors.add(TABLE_NAME);
propertyDescriptors.add(PARTITION_COLUMNS);
propertyDescriptors.add(AUTOCREATE_PARTITIONS);
propertyDescriptors.add(MAX_OPEN_CONNECTIONS);
propertyDescriptors.add(HEARTBEAT_INTERVAL);
propertyDescriptors.add(TXNS_PER_BATCH);
@Override
protected void init(ProcessorInitializationContext context) {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(METASTORE_URI);
props.add(HIVE_CONFIGURATION_RESOURCES);
props.add(DB_NAME);
props.add(TABLE_NAME);
props.add(PARTITION_COLUMNS);
props.add(AUTOCREATE_PARTITIONS);
props.add(MAX_OPEN_CONNECTIONS);
props.add(HEARTBEAT_INTERVAL);
props.add(TXNS_PER_BATCH);
kerberosConfigFile = context.getKerberosConfigurationFile();
kerberosProperties = new KerberosProperties(kerberosConfigFile);
props.add(kerberosProperties.getKerberosPrincipal());
props.add(kerberosProperties.getKerberosKeytab());
propertyDescriptors = Collections.unmodifiableList(props);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
@ -281,14 +284,6 @@ public class PutHiveStreaming extends AbstractProcessor {
relationships = Collections.unmodifiableSet(_relationships);
}
@Override
protected void init(ProcessorInitializationContext context) {
kerberosConfigFile = context.getKerberosConfigurationFile();
kerberosProperties = new KerberosProperties(kerberosConfigFile);
propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;