diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java index 1a2110a538..cd57090f9e 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java @@ -16,6 +16,10 @@ */ package org.apache.nifi.processors.hive; +import org.antlr.runtime.tree.CommonTree; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.ParseDriver; +import org.apache.hadoop.hive.ql.parse.ParseException; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.hive.HiveDBCPService; import org.apache.nifi.flowfile.FlowFile; @@ -29,15 +33,17 @@ import java.io.IOException; import java.io.InputStream; import java.math.BigDecimal; import java.nio.charset.Charset; -import java.sql.SQLDataException; -import java.sql.Time; -import java.sql.Timestamp; import java.sql.Date; import java.sql.PreparedStatement; +import java.sql.SQLDataException; import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; import java.sql.Types; - +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -49,6 +55,9 @@ public abstract class AbstractHiveQLProcessor extends AbstractSessionFactoryProc protected static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("hiveql\\.args\\.(\\d+)\\.type"); protected static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); + static String ATTR_INPUT_TABLES = "query.input.tables"; + static String ATTR_OUTPUT_TABLES = "query.output.tables"; + public static final PropertyDescriptor HIVE_DBCP_SERVICE = new PropertyDescriptor.Builder() .name("Hive Database Connection Pooling Service") @@ -216,4 +225,111 @@ public abstract class AbstractHiveQLProcessor extends AbstractSessionFactoryProc } } + protected static class TableName { + private final String database; + private final String table; + private final boolean input; + + TableName(String database, String table, boolean input) { + this.database = database; + this.table = table; + this.input = input; + } + + public String getDatabase() { + return database; + } + + public String getTable() { + return table; + } + + public boolean isInput() { + return input; + } + + @Override + public String toString() { + return database == null || database.isEmpty() ? table : database + '.' + table; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TableName tableName = (TableName) o; + + if (input != tableName.input) return false; + if (database != null ? !database.equals(tableName.database) : tableName.database != null) return false; + return table.equals(tableName.table); + } + + @Override + public int hashCode() { + int result = database != null ? database.hashCode() : 0; + result = 31 * result + table.hashCode(); + result = 31 * result + (input ? 1 : 0); + return result; + } + } + + protected Set findTableNames(final String query) throws ParseException { + final ASTNode node = new ParseDriver().parse(normalize(query)); + final HashSet tableNames = new HashSet<>(); + findTableNames(node, tableNames); + return tableNames; + } + + /** + * Normalize query. + * Hive resolves prepared statement parameters before executing a query, + * see {@link org.apache.hive.jdbc.HivePreparedStatement#updateSql(String, HashMap)} for detail. + * HiveParser does not expect '?' to be in a query string, and throws an Exception if there is one. + * In this normalize method, '?' is replaced to 'x' to avoid that. + */ + private String normalize(String query) { + return query.replace('?', 'x'); + } + + private void findTableNames(final Object obj, final Set tableNames) { + if (!(obj instanceof CommonTree)) { + return; + } + final CommonTree tree = (CommonTree) obj; + final int childCount = tree.getChildCount(); + if ("TOK_TABNAME".equals(tree.getText())) { + final TableName tableName; + final boolean isInput = "TOK_TABREF".equals(tree.getParent().getText()); + switch (childCount) { + case 1 : + tableName = new TableName(null, tree.getChild(0).getText(), isInput); + break; + case 2: + tableName = new TableName(tree.getChild(0).getText(), tree.getChild(1).getText(), isInput); + break; + default: + throw new IllegalStateException("TOK_TABNAME does not have expected children, childCount=" + childCount); + } + // If parent is TOK_TABREF, then it is an input table. + tableNames.add(tableName); + return; + } + for (int i = 0; i < childCount; i++) { + findTableNames(tree.getChild(i), tableNames); + } + } + + protected Map toQueryTableAttributes(Set tableNames) { + final Map attributes = new HashMap<>(); + for (TableName tableName : tableNames) { + final String attributeName = tableName.isInput() ? ATTR_INPUT_TABLES : ATTR_OUTPUT_TABLES; + if (attributes.containsKey(attributeName)) { + attributes.put(attributeName, attributes.get(attributeName) + "," + tableName); + } else { + attributes.put(attributeName, tableName.toString()); + } + } + return attributes; + } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java index b312327f50..93b07dc7ab 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java @@ -21,6 +21,8 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -69,6 +71,12 @@ import java.util.regex.Pattern; @ReadsAttribute(attribute = "hiveql.args.N.value", description = "Incoming FlowFiles are expected to be parametrized HiveQL statements. The value of the Parameters are specified as " + "hiveql.args.1.value, hiveql.args.2.value, hiveql.args.3.value, and so on. The type of the hiveql.args.1.value Parameter is specified by the hiveql.args.1.type attribute.") }) +@WritesAttributes({ + @WritesAttribute(attribute = "query.input.tables", description = "This attribute is written on the flow files routed to the 'success' relationships, " + + "and contains input table names (if any) in comma delimited 'databaseName.tableName' format."), + @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' relationships, " + + "and contains the target table names in 'databaseName.tableName' format.") +}) public class PutHiveQL extends AbstractHiveQLProcessor { public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() @@ -196,13 +204,15 @@ public class PutHiveQL extends AbstractHiveQLProcessor { String[] hiveQLs = script.split(regex); + final Set tableNames = new HashSet<>(); exceptionHandler.execute(fc, flowFile, input -> { int loc = 1; - for (String hiveQL: hiveQLs) { - getLogger().debug("HiveQL: {}", new Object[]{hiveQL}); + for (String hiveQLStr: hiveQLs) { + getLogger().debug("HiveQL: {}", new Object[]{hiveQLStr}); - if (!StringUtils.isEmpty(hiveQL.trim())) { - final PreparedStatement stmt = conn.prepareStatement(hiveQL.trim()); + final String hiveQL = hiveQLStr.trim(); + if (!StringUtils.isEmpty(hiveQL)) { + final PreparedStatement stmt = conn.prepareStatement(hiveQL); // Get ParameterMetadata // Hive JDBC Doesn't support this yet: @@ -214,6 +224,14 @@ public class PutHiveQL extends AbstractHiveQLProcessor { loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes()); } + // Parse hiveQL and extract input/output tables + try { + tableNames.addAll(findTableNames(hiveQL)); + } catch (Exception e) { + // If failed to parse the query, just log a warning message, but continue. + getLogger().warn("Failed to parse hiveQL: {} due to {}", new Object[]{hiveQL, e}, e); + } + // Execute the statement stmt.execute(); fc.proceed(); @@ -223,7 +241,8 @@ public class PutHiveQL extends AbstractHiveQLProcessor { // Emit a Provenance SEND event final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos); - session.getProvenanceReporter().send(flowFile, fc.connectionUrl, transmissionMillis, true); + final FlowFile updatedFlowFile = session.putAllAttributes(flowFile, toQueryTableAttributes(tableNames)); + session.getProvenanceReporter().send(updatedFlowFile, fc.connectionUrl, transmissionMillis, true); result.routeTo(flowFile, REL_SUCCESS); }, onFlowFileError(context, session, result)); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index afb99fda2f..bc11bf5000 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -71,6 +71,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -97,7 +98,9 @@ import java.util.regex.Pattern; + "The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor.") @WritesAttributes({ @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' " - + "and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively.") + + "and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively."), + @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' " + + "and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.") }) @RequiresInstanceClassLoading public class PutHiveStreaming extends AbstractSessionFactoryProcessor { @@ -473,13 +476,15 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { failedRecordCount.addAndGet(records.size()); } - private void transferFlowFiles(ProcessSession session, RoutingResult result, String transitUri) { + private void transferFlowFiles(ProcessSession session, RoutingResult result, HiveOptions options) { if (successfulRecordCount.get() > 0) { // Transfer the flow file with successful records - successFlowFile.set( - session.putAttribute(successFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(successfulRecordCount.get()))); - session.getProvenanceReporter().send(successFlowFile.get(), transitUri); + Map updateAttributes = new HashMap<>(); + updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(successfulRecordCount.get())); + updateAttributes.put(AbstractHiveQLProcessor.ATTR_OUTPUT_TABLES, options.getQualifiedTableName()); + successFlowFile.set(session.putAllAttributes(successFlowFile.get(), updateAttributes)); + session.getProvenanceReporter().send(successFlowFile.get(), options.getMetaStoreURI()); result.routeTo(successFlowFile.get(), REL_SUCCESS); } else { session.remove(successFlowFile.get()); @@ -487,8 +492,10 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { if (failedRecordCount.get() > 0) { // There were some failed records, so transfer that flow file to failure - failureFlowFile.set( - session.putAttribute(failureFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(failedRecordCount.get()))); + Map updateAttributes = new HashMap<>(); + updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(failedRecordCount.get())); + updateAttributes.put(AbstractHiveQLProcessor.ATTR_OUTPUT_TABLES, options.getQualifiedTableName()); + failureFlowFile.set(session.putAllAttributes(failureFlowFile.get(), updateAttributes)); result.routeTo(failureFlowFile.get(), REL_FAILURE); } else { session.remove(failureFlowFile.get()); @@ -760,7 +767,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { result.routeTo(flowFile, REL_RETRY); } finally { - functionContext.transferFlowFiles(session, result, options.getMetaStoreURI()); + functionContext.transferFlowFiles(session, result, options); // Restore original class loader, might not be necessary but is good practice since the processor task changed it Thread.currentThread().setContextClassLoader(originalClassloader); } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index b9ce1c9455..c15a9e1360 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -24,8 +24,10 @@ import java.sql.Statement; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -83,7 +85,8 @@ import static org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO; @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " - + "FlowFiles were produced") + + "FlowFiles were produced"), + @WritesAttribute(attribute = "query.input.tables", description = "Contains input table names in comma delimited 'databaseName.tableName' format.") }) public class SelectHiveQL extends AbstractHiveQLProcessor { @@ -375,23 +378,34 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { } if (nrOfRows.get() > 0 || resultSetFlowFiles.isEmpty()) { + final Map attributes = new HashMap<>(); // Set attribute for how many rows were selected - flowfile = session.putAttribute(flowfile, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + attributes.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + + try { + // Set input/output table names by parsing the query + attributes.putAll(toQueryTableAttributes(findTableNames(selectQuery))); + } catch (Exception e) { + // If failed to parse the query, just log a warning message, but continue. + getLogger().warn("Failed to parse query: {} due to {}", new Object[]{selectQuery, e}, e); + } // Set MIME type on output document and add extension to filename if (AVRO.equals(outputFormat)) { - flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_AVRO_BINARY); - flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".avro"); + attributes.put(CoreAttributes.MIME_TYPE.key(), MIME_TYPE_AVRO_BINARY); + attributes.put(CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".avro"); } else if (CSV.equals(outputFormat)) { - flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE); - flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".csv"); + attributes.put(CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE); + attributes.put(CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".csv"); } if (maxRowsPerFlowFile > 0) { - flowfile = session.putAttribute(flowfile, "fragment.identifier", fragmentIdentifier); - flowfile = session.putAttribute(flowfile, "fragment.index", String.valueOf(fragmentIndex)); + attributes.put("fragment.identifier", fragmentIdentifier); + attributes.put("fragment.index", String.valueOf(fragmentIndex)); } + flowfile = session.putAllAttributes(flowfile, attributes); + logger.info("{} contains {} Avro records; transferring to 'success'", new Object[]{flowfile, nrOfRows.get()}); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java index 2c37380f4e..ddc15673e5 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java @@ -109,6 +109,10 @@ public class HiveOptions implements Serializable { return tableName; } + public String getQualifiedTableName() { + return databaseName + "." + tableName; + } + public Integer getBatchSize() { return batchSize; } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestHiveParser.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestHiveParser.java new file mode 100644 index 0000000000..50ac0d8ad1 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestHiveParser.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.junit.Test; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestHiveParser extends AbstractHiveQLProcessor { + + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + + } + + @Test + public void parseSelect() throws Exception { + String query = "select a.empid, to_something(b.saraly) from " + + "company.emp a inner join default.salary b where a.empid = b.empid"; + final Set tableNames = findTableNames(query); + System.out.printf("tableNames=%s\n", tableNames); + assertEquals(2, tableNames.size()); + assertTrue(tableNames.contains(new TableName("company", "emp", true))); + assertTrue(tableNames.contains(new TableName("default", "salary", true))); + } + + @Test + public void parseSelectPrepared() throws Exception { + String query = "select empid from company.emp a where a.firstName = ?"; + final Set tableNames = findTableNames(query); + System.out.printf("tableNames=%s\n", tableNames); + assertEquals(1, tableNames.size()); + assertTrue(tableNames.contains(new TableName("company", "emp", true))); + } + + + @Test + public void parseLongSelect() throws Exception { + String query = "select\n" + + "\n" + + " i_item_id,\n" + + "\n" + + " i_item_desc,\n" + + "\n" + + " s_state,\n" + + "\n" + + " count(ss_quantity) as store_sales_quantitycount,\n" + + "\n" + + " avg(ss_quantity) as store_sales_quantityave,\n" + + "\n" + + " stddev_samp(ss_quantity) as store_sales_quantitystdev,\n" + + "\n" + + " stddev_samp(ss_quantity) / avg(ss_quantity) as store_sales_quantitycov,\n" + + "\n" + + " count(sr_return_quantity) as store_returns_quantitycount,\n" + + "\n" + + " avg(sr_return_quantity) as store_returns_quantityave,\n" + + "\n" + + " stddev_samp(sr_return_quantity) as store_returns_quantitystdev,\n" + + "\n" + + " stddev_samp(sr_return_quantity) / avg(sr_return_quantity) as store_returns_quantitycov,\n" + + "\n" + + " count(cs_quantity) as catalog_sales_quantitycount,\n" + + "\n" + + " avg(cs_quantity) as catalog_sales_quantityave,\n" + + "\n" + + " stddev_samp(cs_quantity) / avg(cs_quantity) as catalog_sales_quantitystdev,\n" + + "\n" + + " stddev_samp(cs_quantity) / avg(cs_quantity) as catalog_sales_quantitycov\n" + + "\n" + + "from\n" + + "\n" + + " store_sales,\n" + + "\n" + + " store_returns,\n" + + "\n" + + " catalog_sales,\n" + + "\n" + + " date_dim d1,\n" + + "\n" + + " date_dim d2,\n" + + "\n" + + " date_dim d3,\n" + + "\n" + + " store,\n" + + "\n" + + " item\n" + + "\n" + + "where\n" + + "\n" + + " d1.d_quarter_name = '2000Q1'\n" + + "\n" + + " and d1.d_date_sk = ss_sold_date_sk\n" + + "\n" + + " and i_item_sk = ss_item_sk\n" + + "\n" + + " and s_store_sk = ss_store_sk\n" + + "\n" + + " and ss_customer_sk = sr_customer_sk\n" + + "\n" + + " and ss_item_sk = sr_item_sk\n" + + "\n" + + " and ss_ticket_number = sr_ticket_number\n" + + "\n" + + " and sr_returned_date_sk = d2.d_date_sk\n" + + "\n" + + " and d2.d_quarter_name in ('2000Q1' , '2000Q2', '2000Q3')\n" + + "\n" + + " and sr_customer_sk = cs_bill_customer_sk\n" + + "\n" + + " and sr_item_sk = cs_item_sk\n" + + "\n" + + " and cs_sold_date_sk = d3.d_date_sk\n" + + "\n" + + " and d3.d_quarter_name in ('2000Q1' , '2000Q2', '2000Q3')\n" + + "\n" + + "group by i_item_id , i_item_desc , s_state\n" + + "\n" + + "order by i_item_id , i_item_desc , s_state\n" + + "\n" + + "limit 100"; + + final Set tableNames = findTableNames(query); + System.out.printf("tableNames=%s\n", tableNames); + assertEquals(6, tableNames.size()); + AtomicInteger cnt = new AtomicInteger(0); + for (TableName tableName : tableNames) { + if (tableName.equals(new TableName(null, "store_sales", true))) { + cnt.incrementAndGet(); + } else if (tableName.equals(new TableName(null, "store_returns", true))) { + cnt.incrementAndGet(); + } else if (tableName.equals(new TableName(null, "catalog_sales", true))) { + cnt.incrementAndGet(); + } else if (tableName.equals(new TableName(null, "date_dim", true))) { + cnt.incrementAndGet(); + } else if (tableName.equals(new TableName(null, "store", true))) { + cnt.incrementAndGet(); + } else if (tableName.equals(new TableName(null, "item", true))) { + cnt.incrementAndGet(); + } + } + assertEquals(6, cnt.get()); + } + + @Test + public void parseSelectInsert() throws Exception { + String query = "insert into databaseA.tableA select key, max(value) from databaseA.tableA where category = 'x'"; + + // The same database.tableName can appear two times for input and output. + final Set tableNames = findTableNames(query); + System.out.printf("tableNames=%s\n", tableNames); + assertEquals(2, tableNames.size()); + AtomicInteger cnt = new AtomicInteger(0); + tableNames.forEach(tableName -> { + if (tableName.equals(new TableName("databaseA", "tableA", false))) { + cnt.incrementAndGet(); + } else if (tableName.equals(new TableName("databaseA", "tableA", true))) { + cnt.incrementAndGet(); + } + }); + assertEquals(2, cnt.get()); + } + + @Test + public void parseInsert() throws Exception { + String query = "insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id"; + + final Set tableNames = findTableNames(query); + System.out.printf("tableNames=%s\n", tableNames); + assertEquals(3, tableNames.size()); + AtomicInteger cnt = new AtomicInteger(0); + tableNames.forEach(tableName -> { + if (tableName.equals(new TableName("databaseB", "tableB1", false))) { + cnt.incrementAndGet(); + } else if (tableName.equals(new TableName(null, "tableA1", true))) { + cnt.incrementAndGet(); + } else if (tableName.equals(new TableName(null, "tableA2", true))) { + cnt.incrementAndGet(); + } + }); + assertEquals(3, cnt.get()); + } + + @Test + public void parseUpdate() throws Exception { + String query = "update table_a set y = 'updated' where x > 100"; + + final Set tableNames = findTableNames(query); + System.out.printf("tableNames=%s\n", tableNames); + assertEquals(1, tableNames.size()); + assertTrue(tableNames.contains(new TableName(null, "table_a", false))); + } + + @Test + public void parseDelete() throws Exception { + String query = "delete from table_a where x > 100"; + + final Set tableNames = findTableNames(query); + System.out.printf("tableNames=%s\n", tableNames); + assertEquals(1, tableNames.size()); + assertTrue(tableNames.contains(new TableName(null, "table_a", false))); + } + + @Test + public void parseDDL() throws Exception { + String query = "CREATE TABLE IF NOT EXISTS EMPLOYEES(\n" + + "EmployeeID INT,FirstName STRING, Title STRING,\n" + + "State STRING, Laptop STRING)\n" + + "COMMENT 'Employee Names'\n" + + "STORED AS ORC"; + + + final Set tableNames = findTableNames(query); + System.out.printf("tableNames=%s\n", tableNames); + assertEquals(1, tableNames.size()); + assertTrue(tableNames.contains(new TableName(null, "EMPLOYEES", false))); + } + + +} diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java index 5624f79f90..badc4d9c94 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java @@ -128,6 +128,8 @@ public class TestPutHiveQL { runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1); runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3); + runner.getFlowFilesForRelationship(PutHiveQL.REL_SUCCESS) + .forEach(f -> f.assertAttributeEquals(PutHiveQL.ATTR_OUTPUT_TABLES, "PERSONS")); } @Test @@ -375,6 +377,7 @@ public class TestPutHiveQL { runner.run(); runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(PutHiveQL.REL_SUCCESS).get(0).assertAttributeEquals(PutHiveQL.ATTR_OUTPUT_TABLES, "PERSONS"); try (final Connection conn = service.getConnection()) { try (final Statement stmt = conn.createStatement()) { @@ -493,6 +496,8 @@ public class TestPutHiveQL { // should fail because of the semicolon runner.assertAllFlowFilesTransferred(PutHiveQL.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(PutHiveQL.REL_SUCCESS) + .forEach(f -> f.assertAttributeEquals(PutHiveQL.ATTR_OUTPUT_TABLES, "PERSONS")); // Now we can check that the values were inserted by the multi-statement script. try (final Connection conn = service.getConnection()) { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java index e57bb08711..f16cc6517b 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java @@ -57,6 +57,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import static org.apache.nifi.processors.hive.AbstractHiveQLProcessor.ATTR_OUTPUT_TABLES; import static org.apache.nifi.processors.hive.PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR; import static org.apache.nifi.processors.hive.PutHiveStreaming.REL_SUCCESS; import static org.junit.Assert.assertEquals; @@ -193,7 +194,9 @@ public class TestPutHiveStreaming { runner.run(); runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1); - assertEquals("1", runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0).getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0); + assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); } @Test @@ -488,7 +491,9 @@ public class TestPutHiveStreaming { runner.run(); runner.assertTransferCount(PutHiveStreaming.REL_SUCCESS, 1); - assertEquals("1", runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0).getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHiveStreaming.REL_SUCCESS).get(0); + assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 0); runner.assertTransferCount(PutHiveStreaming.REL_RETRY, 0); } @@ -713,7 +718,9 @@ public class TestPutHiveStreaming { runner.run(); runner.assertTransferCount(PutHiveStreaming.REL_FAILURE, 1); - assertEquals("2", runner.getFlowFilesForRelationship(PutHiveStreaming.REL_FAILURE).get(0).getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHiveStreaming.REL_FAILURE).get(0); + assertEquals("2", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); } @Test diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java index 2222dbcb8d..3c3b7f9ef4 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java @@ -300,6 +300,7 @@ public class TestSelectHiveQL { } assertEquals(nrOfRows - 10, recordsFromStream); assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHiveQL.RESULT_ROW_COUNT))); + flowFile.assertAttributeEquals(AbstractHiveQLProcessor.ATTR_INPUT_TABLES, "persons"); } @Test