NIFI-4545: Improve Hive processors provenance transit URL

Incorporated review comments:

- Added 'input' to equals() method so that the same table name can appear
as input and output tables.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2239
This commit is contained in:
Koji Kawamura 2017-10-30 12:00:11 +09:00 committed by Matthew Burgess
parent d0502285f2
commit 37ef2839e0
9 changed files with 442 additions and 28 deletions

View File

@ -16,6 +16,10 @@
*/ */
package org.apache.nifi.processors.hive; package org.apache.nifi.processors.hive;
import org.antlr.runtime.tree.CommonTree;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.parse.ParseException;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.hive.HiveDBCPService; import org.apache.nifi.dbcp.hive.HiveDBCPService;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
@ -29,15 +33,17 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.sql.SQLDataException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Date; import java.sql.Date;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLDataException;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types; import java.sql.Types;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -49,6 +55,9 @@ public abstract class AbstractHiveQLProcessor extends AbstractSessionFactoryProc
protected static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("hiveql\\.args\\.(\\d+)\\.type"); protected static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("hiveql\\.args\\.(\\d+)\\.type");
protected static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); protected static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
static String ATTR_INPUT_TABLES = "query.input.tables";
static String ATTR_OUTPUT_TABLES = "query.output.tables";
public static final PropertyDescriptor HIVE_DBCP_SERVICE = new PropertyDescriptor.Builder() public static final PropertyDescriptor HIVE_DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("Hive Database Connection Pooling Service") .name("Hive Database Connection Pooling Service")
@ -216,4 +225,111 @@ public abstract class AbstractHiveQLProcessor extends AbstractSessionFactoryProc
} }
} }
protected static class TableName {
private final String database;
private final String table;
private final boolean input;
TableName(String database, String table, boolean input) {
this.database = database;
this.table = table;
this.input = input;
}
public String getDatabase() {
return database;
}
public String getTable() {
return table;
}
public boolean isInput() {
return input;
}
@Override
public String toString() {
return database == null || database.isEmpty() ? table : database + '.' + table;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TableName tableName = (TableName) o;
if (input != tableName.input) return false;
if (database != null ? !database.equals(tableName.database) : tableName.database != null) return false;
return table.equals(tableName.table);
}
@Override
public int hashCode() {
int result = database != null ? database.hashCode() : 0;
result = 31 * result + table.hashCode();
result = 31 * result + (input ? 1 : 0);
return result;
}
}
protected Set<TableName> findTableNames(final String query) throws ParseException {
final ASTNode node = new ParseDriver().parse(normalize(query));
final HashSet<TableName> tableNames = new HashSet<>();
findTableNames(node, tableNames);
return tableNames;
}
/**
* Normalize query.
* Hive resolves prepared statement parameters before executing a query,
* see {@link org.apache.hive.jdbc.HivePreparedStatement#updateSql(String, HashMap)} for detail.
* HiveParser does not expect '?' to be in a query string, and throws an Exception if there is one.
* In this normalize method, '?' is replaced to 'x' to avoid that.
*/
private String normalize(String query) {
return query.replace('?', 'x');
}
private void findTableNames(final Object obj, final Set<TableName> tableNames) {
if (!(obj instanceof CommonTree)) {
return;
}
final CommonTree tree = (CommonTree) obj;
final int childCount = tree.getChildCount();
if ("TOK_TABNAME".equals(tree.getText())) {
final TableName tableName;
final boolean isInput = "TOK_TABREF".equals(tree.getParent().getText());
switch (childCount) {
case 1 :
tableName = new TableName(null, tree.getChild(0).getText(), isInput);
break;
case 2:
tableName = new TableName(tree.getChild(0).getText(), tree.getChild(1).getText(), isInput);
break;
default:
throw new IllegalStateException("TOK_TABNAME does not have expected children, childCount=" + childCount);
}
// If parent is TOK_TABREF, then it is an input table.
tableNames.add(tableName);
return;
}
for (int i = 0; i < childCount; i++) {
findTableNames(tree.getChild(i), tableNames);
}
}
protected Map<String, String> toQueryTableAttributes(Set<TableName> tableNames) {
final Map<String, String> attributes = new HashMap<>();
for (TableName tableName : tableNames) {
final String attributeName = tableName.isInput() ? ATTR_INPUT_TABLES : ATTR_OUTPUT_TABLES;
if (attributes.containsKey(attributeName)) {
attributes.put(attributeName, attributes.get(attributeName) + "," + tableName);
} else {
attributes.put(attributeName, tableName.toString());
}
}
return attributes;
}
} }

View File

@ -21,6 +21,8 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -69,6 +71,12 @@ import java.util.regex.Pattern;
@ReadsAttribute(attribute = "hiveql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized HiveQL statements. The value of the Parameters are specified as " @ReadsAttribute(attribute = "hiveql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized HiveQL statements. The value of the Parameters are specified as "
+ "hiveql.args.1.value, hiveql.args.2.value, hiveql.args.3.value, and so on. The type of the hiveql.args.1.value Parameter is specified by the hiveql.args.1.type attribute.") + "hiveql.args.1.value, hiveql.args.2.value, hiveql.args.3.value, and so on. The type of the hiveql.args.1.value Parameter is specified by the hiveql.args.1.type attribute.")
}) })
@WritesAttributes({
@WritesAttribute(attribute = "query.input.tables", description = "This attribute is written on the flow files routed to the 'success' relationships, "
+ "and contains input table names (if any) in comma delimited 'databaseName.tableName' format."),
@WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' relationships, "
+ "and contains the target table names in 'databaseName.tableName' format.")
})
public class PutHiveQL extends AbstractHiveQLProcessor { public class PutHiveQL extends AbstractHiveQLProcessor {
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
@ -196,13 +204,15 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
String[] hiveQLs = script.split(regex); String[] hiveQLs = script.split(regex);
final Set<TableName> tableNames = new HashSet<>();
exceptionHandler.execute(fc, flowFile, input -> { exceptionHandler.execute(fc, flowFile, input -> {
int loc = 1; int loc = 1;
for (String hiveQL: hiveQLs) { for (String hiveQLStr: hiveQLs) {
getLogger().debug("HiveQL: {}", new Object[]{hiveQL}); getLogger().debug("HiveQL: {}", new Object[]{hiveQLStr});
if (!StringUtils.isEmpty(hiveQL.trim())) { final String hiveQL = hiveQLStr.trim();
final PreparedStatement stmt = conn.prepareStatement(hiveQL.trim()); if (!StringUtils.isEmpty(hiveQL)) {
final PreparedStatement stmt = conn.prepareStatement(hiveQL);
// Get ParameterMetadata // Get ParameterMetadata
// Hive JDBC Doesn't support this yet: // Hive JDBC Doesn't support this yet:
@ -214,6 +224,14 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes()); loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes());
} }
// Parse hiveQL and extract input/output tables
try {
tableNames.addAll(findTableNames(hiveQL));
} catch (Exception e) {
// If failed to parse the query, just log a warning message, but continue.
getLogger().warn("Failed to parse hiveQL: {} due to {}", new Object[]{hiveQL, e}, e);
}
// Execute the statement // Execute the statement
stmt.execute(); stmt.execute();
fc.proceed(); fc.proceed();
@ -223,7 +241,8 @@ public class PutHiveQL extends AbstractHiveQLProcessor {
// Emit a Provenance SEND event // Emit a Provenance SEND event
final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos); final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos);
session.getProvenanceReporter().send(flowFile, fc.connectionUrl, transmissionMillis, true); final FlowFile updatedFlowFile = session.putAllAttributes(flowFile, toQueryTableAttributes(tableNames));
session.getProvenanceReporter().send(updatedFlowFile, fc.connectionUrl, transmissionMillis, true);
result.routeTo(flowFile, REL_SUCCESS); result.routeTo(flowFile, REL_SUCCESS);
}, onFlowFileError(context, session, result)); }, onFlowFileError(context, session, result));

View File

@ -71,6 +71,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -97,7 +98,9 @@ import java.util.regex.Pattern;
+ "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor.") + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor.")
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' " @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively.") + "and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively."),
@WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")
}) })
@RequiresInstanceClassLoading @RequiresInstanceClassLoading
public class PutHiveStreaming extends AbstractSessionFactoryProcessor { public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
@ -473,13 +476,15 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
failedRecordCount.addAndGet(records.size()); failedRecordCount.addAndGet(records.size());
} }
private void transferFlowFiles(ProcessSession session, RoutingResult result, String transitUri) { private void transferFlowFiles(ProcessSession session, RoutingResult result, HiveOptions options) {
if (successfulRecordCount.get() > 0) { if (successfulRecordCount.get() > 0) {
// Transfer the flow file with successful records // Transfer the flow file with successful records
successFlowFile.set( Map<String, String> updateAttributes = new HashMap<>();
session.putAttribute(successFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(successfulRecordCount.get()))); updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(successfulRecordCount.get()));
session.getProvenanceReporter().send(successFlowFile.get(), transitUri); updateAttributes.put(AbstractHiveQLProcessor.ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
successFlowFile.set(session.putAllAttributes(successFlowFile.get(), updateAttributes));
session.getProvenanceReporter().send(successFlowFile.get(), options.getMetaStoreURI());
result.routeTo(successFlowFile.get(), REL_SUCCESS); result.routeTo(successFlowFile.get(), REL_SUCCESS);
} else { } else {
session.remove(successFlowFile.get()); session.remove(successFlowFile.get());
@ -487,8 +492,10 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
if (failedRecordCount.get() > 0) { if (failedRecordCount.get() > 0) {
// There were some failed records, so transfer that flow file to failure // There were some failed records, so transfer that flow file to failure
failureFlowFile.set( Map<String, String> updateAttributes = new HashMap<>();
session.putAttribute(failureFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(failedRecordCount.get()))); updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(failedRecordCount.get()));
updateAttributes.put(AbstractHiveQLProcessor.ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
failureFlowFile.set(session.putAllAttributes(failureFlowFile.get(), updateAttributes));
result.routeTo(failureFlowFile.get(), REL_FAILURE); result.routeTo(failureFlowFile.get(), REL_FAILURE);
} else { } else {
session.remove(failureFlowFile.get()); session.remove(failureFlowFile.get());
@ -760,7 +767,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
result.routeTo(flowFile, REL_RETRY); result.routeTo(flowFile, REL_RETRY);
} finally { } finally {
functionContext.transferFlowFiles(session, result, options.getMetaStoreURI()); functionContext.transferFlowFiles(session, result, options);
// Restore original class loader, might not be necessary but is good practice since the processor task changed it // Restore original class loader, might not be necessary but is good practice since the processor task changed it
Thread.currentThread().setContextClassLoader(originalClassloader); Thread.currentThread().setContextClassLoader(originalClassloader);
} }

View File

@ -24,8 +24,10 @@ import java.sql.Statement;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -83,7 +85,8 @@ import static org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
@WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+ "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ "FlowFiles were produced") + "FlowFiles were produced"),
@WritesAttribute(attribute = "query.input.tables", description = "Contains input table names in comma delimited 'databaseName.tableName' format.")
}) })
public class SelectHiveQL extends AbstractHiveQLProcessor { public class SelectHiveQL extends AbstractHiveQLProcessor {
@ -375,23 +378,34 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
} }
if (nrOfRows.get() > 0 || resultSetFlowFiles.isEmpty()) { if (nrOfRows.get() > 0 || resultSetFlowFiles.isEmpty()) {
final Map<String, String> attributes = new HashMap<>();
// Set attribute for how many rows were selected // Set attribute for how many rows were selected
flowfile = session.putAttribute(flowfile, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); attributes.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
try {
// Set input/output table names by parsing the query
attributes.putAll(toQueryTableAttributes(findTableNames(selectQuery)));
} catch (Exception e) {
// If failed to parse the query, just log a warning message, but continue.
getLogger().warn("Failed to parse query: {} due to {}", new Object[]{selectQuery, e}, e);
}
// Set MIME type on output document and add extension to filename // Set MIME type on output document and add extension to filename
if (AVRO.equals(outputFormat)) { if (AVRO.equals(outputFormat)) {
flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_AVRO_BINARY); attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE_AVRO_BINARY);
flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".avro"); attributes.put(CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".avro");
} else if (CSV.equals(outputFormat)) { } else if (CSV.equals(outputFormat)) {
flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE); attributes.put(CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE);
flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".csv"); attributes.put(CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".csv");
} }
if (maxRowsPerFlowFile > 0) { if (maxRowsPerFlowFile > 0) {
flowfile = session.putAttribute(flowfile, "fragment.identifier", fragmentIdentifier); attributes.put("fragment.identifier", fragmentIdentifier);
flowfile = session.putAttribute(flowfile, "fragment.index", String.valueOf(fragmentIndex)); attributes.put("fragment.index", String.valueOf(fragmentIndex));
} }
flowfile = session.putAllAttributes(flowfile, attributes);
logger.info("{} contains {} Avro records; transferring to 'success'", logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{flowfile, nrOfRows.get()}); new Object[]{flowfile, nrOfRows.get()});

View File

@ -109,6 +109,10 @@ public class HiveOptions implements Serializable {
return tableName; return tableName;
} }
public String getQualifiedTableName() {
return databaseName + "." + tableName;
}
public Integer getBatchSize() { public Integer getBatchSize() {
return batchSize; return batchSize;
} }

View File

@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hive;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.junit.Test;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestHiveParser extends AbstractHiveQLProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
}
@Test
public void parseSelect() throws Exception {
String query = "select a.empid, to_something(b.saraly) from " +
"company.emp a inner join default.salary b where a.empid = b.empid";
final Set<TableName> tableNames = findTableNames(query);
System.out.printf("tableNames=%s\n", tableNames);
assertEquals(2, tableNames.size());
assertTrue(tableNames.contains(new TableName("company", "emp", true)));
assertTrue(tableNames.contains(new TableName("default", "salary", true)));
}
@Test
public void parseSelectPrepared() throws Exception {
String query = "select empid from company.emp a where a.firstName = ?";
final Set<TableName> tableNames = findTableNames(query);
System.out.printf("tableNames=%s\n", tableNames);
assertEquals(1, tableNames.size());
assertTrue(tableNames.contains(new TableName("company", "emp", true)));
}
@Test
public void parseLongSelect() throws Exception {
String query = "select\n" +
"\n" +
" i_item_id,\n" +
"\n" +
" i_item_desc,\n" +
"\n" +
" s_state,\n" +
"\n" +
" count(ss_quantity) as store_sales_quantitycount,\n" +
"\n" +
" avg(ss_quantity) as store_sales_quantityave,\n" +
"\n" +
" stddev_samp(ss_quantity) as store_sales_quantitystdev,\n" +
"\n" +
" stddev_samp(ss_quantity) / avg(ss_quantity) as store_sales_quantitycov,\n" +
"\n" +
" count(sr_return_quantity) as store_returns_quantitycount,\n" +
"\n" +
" avg(sr_return_quantity) as store_returns_quantityave,\n" +
"\n" +
" stddev_samp(sr_return_quantity) as store_returns_quantitystdev,\n" +
"\n" +
" stddev_samp(sr_return_quantity) / avg(sr_return_quantity) as store_returns_quantitycov,\n" +
"\n" +
" count(cs_quantity) as catalog_sales_quantitycount,\n" +
"\n" +
" avg(cs_quantity) as catalog_sales_quantityave,\n" +
"\n" +
" stddev_samp(cs_quantity) / avg(cs_quantity) as catalog_sales_quantitystdev,\n" +
"\n" +
" stddev_samp(cs_quantity) / avg(cs_quantity) as catalog_sales_quantitycov\n" +
"\n" +
"from\n" +
"\n" +
" store_sales,\n" +
"\n" +
" store_returns,\n" +
"\n" +
" catalog_sales,\n" +
"\n" +
" date_dim d1,\n" +
"\n" +
" date_dim d2,\n" +
"\n" +
" date_dim d3,\n" +
"\n" +
" store,\n" +
"\n" +
" item\n" +
"\n" +
"where\n" +
"\n" +
" d1.d_quarter_name = '2000Q1'\n" +
"\n" +
" and d1.d_date_sk = ss_sold_date_sk\n" +
"\n" +
" and i_item_sk = ss_item_sk\n" +
"\n" +
" and s_store_sk = ss_store_sk\n" +
"\n" +
" and ss_customer_sk = sr_customer_sk\n" +
"\n" +
" and ss_item_sk = sr_item_sk\n" +
"\n" +
" and ss_ticket_number = sr_ticket_number\n" +
"\n" +
" and sr_returned_date_sk = d2.d_date_sk\n" +
"\n" +
" and d2.d_quarter_name in ('2000Q1' , '2000Q2', '2000Q3')\n" +
"\n" +
" and sr_customer_sk = cs_bill_customer_sk\n" +
"\n" +
" and sr_item_sk = cs_item_sk\n" +
"\n" +
" and cs_sold_date_sk = d3.d_date_sk\n" +
"\n" +
" and d3.d_quarter_name in ('2000Q1' , '2000Q2', '2000Q3')\n" +
"\n" +
"group by i_item_id , i_item_desc , s_state\n" +
"\n" +
"order by i_item_id , i_item_desc , s_state\n" +
"\n" +
"limit 100";
final Set<TableName> tableNames = findTableNames(query);
System.out.printf("tableNames=%s\n", tableNames);
assertEquals(6, tableNames.size());
AtomicInteger cnt = new AtomicInteger(0);
for (TableName tableName : tableNames) {
if (tableName.equals(new TableName(null, "store_sales", true))) {
cnt.incrementAndGet();
} else if (tableName.equals(new TableName(null, "store_returns", true))) {
cnt.incrementAndGet();
} else if (tableName.equals(new TableName(null, "catalog_sales", true))) {
cnt.incrementAndGet();
} else if (tableName.equals(new TableName(null, "date_dim", true))) {
cnt.incrementAndGet();
} else if (tableName.equals(new TableName(null, "store", true))) {
cnt.incrementAndGet();
} else if (tableName.equals(new TableName(null, "item", true))) {
cnt.incrementAndGet();
}
}
assertEquals(6, cnt.get());
}
@Test
public void parseSelectInsert() throws Exception {
String query = "insert into databaseA.tableA select key, max(value) from databaseA.tableA where category = 'x'";
// The same database.tableName can appear two times for input and output.
final Set<TableName> tableNames = findTableNames(query);
System.out.printf("tableNames=%s\n", tableNames);
assertEquals(2, tableNames.size());
AtomicInteger cnt = new AtomicInteger(0);
tableNames.forEach(tableName -> {
if (tableName.equals(new TableName("databaseA", "tableA", false))) {
cnt.incrementAndGet();
} else if (tableName.equals(new TableName("databaseA", "tableA", true))) {
cnt.incrementAndGet();
}
});
assertEquals(2, cnt.get());
}
@Test
public void parseInsert() throws Exception {
String query = "insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id";
final Set<TableName> tableNames = findTableNames(query);
System.out.printf("tableNames=%s\n", tableNames);
assertEquals(3, tableNames.size());
AtomicInteger cnt = new AtomicInteger(0);
tableNames.forEach(tableName -> {
if (tableName.equals(new TableName("databaseB", "tableB1", false))) {
cnt.incrementAndGet();
} else if (tableName.equals(new TableName(null, "tableA1", true))) {
cnt.incrementAndGet();
} else if (tableName.equals(new TableName(null, "tableA2", true))) {
cnt.incrementAndGet();
}
});
assertEquals(3, cnt.get());
}
@Test
public void parseUpdate() throws Exception {
String query = "update table_a set y = 'updated' where x > 100";
final Set<TableName> tableNames = findTableNames(query);
System.out.printf("tableNames=%s\n", tableNames);
assertEquals(1, tableNames.size());
assertTrue(tableNames.contains(new TableName(null, "table_a", false)));
}
@Test
public void parseDelete() throws Exception {
String query = "delete from table_a where x > 100";
final Set<TableName> tableNames = findTableNames(query);
System.out.printf("tableNames=%s\n", tableNames);
assertEquals(1, tableNames.size());
assertTrue(tableNames.contains(new TableName(null, "table_a", false)));
}
@Test
public void parseDDL() throws Exception {
String query = "CREATE TABLE IF NOT EXISTS EMPLOYEES(\n" +
"EmployeeID INT,FirstName STRING, Title STRING,\n" +
"State STRING, Laptop STRING)\n" +
"COMMENT 'Employee Names'\n" +
"STORED AS ORC";
final Set<TableName> tableNames = findTableNames(query);
System.out.printf("tableNames=%s\n", tableNames);
assertEquals(1, tableNames.size());
assertTrue(tableNames.contains(new TableName(null, "EMPLOYEES", false)));
}
}

View File

@ -128,6 +128,8 @@ public class TestPutHiveQL {
runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1); runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1);
runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3); runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3);
runner.getFlowFilesForRelationship(PutHiveQL.REL_SUCCESS)
.forEach(f -> f.assertAttributeEquals(PutHiveQL.ATTR_OUTPUT_TABLES, "PERSONS"));
} }
@Test @Test
@ -375,6 +377,7 @@ public class TestPutHiveQL {
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutHiveQL.REL_SUCCESS).get(0).assertAttributeEquals(PutHiveQL.ATTR_OUTPUT_TABLES, "PERSONS");
try (final Connection conn = service.getConnection()) { try (final Connection conn = service.getConnection()) {
try (final Statement stmt = conn.createStatement()) { try (final Statement stmt = conn.createStatement()) {
@ -493,6 +496,8 @@ public class TestPutHiveQL {
// should fail because of the semicolon // should fail because of the semicolon
runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutHiveQL.REL_SUCCESS)
.forEach(f -> f.assertAttributeEquals(PutHiveQL.ATTR_OUTPUT_TABLES, "PERSONS"));
// Now we can check that the values were inserted by the multi-statement script. // Now we can check that the values were inserted by the multi-statement script.
try (final Connection conn = service.getConnection()) { try (final Connection conn = service.getConnection()) {

View File

@ -57,6 +57,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import static org.apache.nifi.processors.hive.AbstractHiveQLProcessor.ATTR_OUTPUT_TABLES;
import static org.apache.nifi.processors.hive.PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR; import static org.apache.nifi.processors.hive.PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR;
import static org.apache.nifi.processors.hive.PutHiveStreaming.REL_SUCCESS; import static org.apache.nifi.processors.hive.PutHiveStreaming.REL_SUCCESS;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -193,7 +194,9 @@ public class TestPutHiveStreaming {
runner.run(); runner.run();
runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1); runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
assertEquals("1", runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0).getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0);
assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
} }
@Test @Test
@ -488,7 +491,9 @@ public class TestPutHiveStreaming {
runner.run(); runner.run();
runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1); runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1);
assertEquals("1", runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0).getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0);
assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0); runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0);
runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0); runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0);
} }
@ -713,7 +718,9 @@ public class TestPutHiveStreaming {
runner.run(); runner.run();
runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1); runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1);
assertEquals("2", runner.getFlowFilesForRelationship(PutHiveStreaming.REL_FAILURE).get(0).getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHiveStreaming.REL_FAILURE).get(0);
assertEquals("2", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR));
assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES));
} }
@Test @Test

View File

@ -300,6 +300,7 @@ public class TestSelectHiveQL {
} }
assertEquals(nrOfRows - 10, recordsFromStream); assertEquals(nrOfRows - 10, recordsFromStream);
assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHiveQL.RESULT_ROW_COUNT))); assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHiveQL.RESULT_ROW_COUNT)));
flowFile.assertAttributeEquals(AbstractHiveQLProcessor.ATTR_INPUT_TABLES, "persons");
} }
@Test @Test