diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index e61fa9fd31..fb059147d8 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -16,9 +16,6 @@ */ package org.apache.nifi.processors.hive; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.nio.charset.Charset; import java.sql.Connection; import java.sql.PreparedStatement; @@ -30,6 +27,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -53,14 +51,18 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.pattern.PartialFunctions; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.hive.CsvOutputOptions; import org.apache.nifi.util.hive.HiveJdbcCommon; +import static org.apache.nifi.util.hive.HiveJdbcCommon.AVRO; +import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV; +import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV_MIME_TYPE; +import static org.apache.nifi.util.hive.HiveJdbcCommon.MIME_TYPE_AVRO_BINARY; +import static org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO; + @EventDriven @InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"hive", "sql", "select", "jdbc", "query", "database"}) @@ -72,19 +74,21 @@ import org.apache.nifi.util.hive.HiveJdbcCommon; @WritesAttributes({ @WritesAttribute(attribute = "mime.type", description = "Sets the MIME type for the outgoing flowfile to application/avro-binary for Avro or text/csv for CSV."), @WritesAttribute(attribute = "filename", description = "Adds .avro or .csv to the filename attribute depending on which output format is selected."), - @WritesAttribute(attribute = "selecthiveql.row.count", description = "Indicates how many rows were selected/returned by the query.") + @WritesAttribute(attribute = "selecthiveql.row.count", description = "Indicates how many rows were selected/returned by the query."), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."), + @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced") }) public class SelectHiveQL extends AbstractHiveQLProcessor { public static final String RESULT_ROW_COUNT = "selecthiveql.row.count"; - protected static final String AVRO = "Avro"; - protected static final String CSV = "CSV"; - - public static final String AVRO_MIME_TYPE = "application/avro-binary"; - public static final String CSV_MIME_TYPE = "text/csv"; - - // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -99,12 +103,45 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { public static final PropertyDescriptor HIVEQL_SELECT_QUERY = new PropertyDescriptor.Builder() .name("hive-query") .displayName("HiveQL Select Query") - .description("HiveQL SELECT query to execute") + .description("HiveQL SELECT query to execute. If this is not set, the query is assumed to be in the content of an incoming FlowFile.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); + public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() + .name("hive-fetch-size") + .displayName("Fetch Size") + .description("The number of result rows to be fetched from the result set at a time. This is a hint to the driver and may not be " + + "honored and/or exact. If the value specified is zero, then the hint is ignored.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() + .name("hive-max-rows") + .displayName("Max Rows Per Flow File") + .description("The maximum number of result rows that will be included in a single FlowFile. " + + "This will allow you to break up very large result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder() + .name("hive-max-frags") + .displayName("Maximum Number of Fragments") + .description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " + + "This prevents OutOfMemoryError when this processor ingests huge table.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor HIVEQL_CSV_HEADER = new PropertyDescriptor.Builder() .name("csv-header") .displayName("CSV Header") @@ -174,7 +211,11 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { List _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.add(HIVE_DBCP_SERVICE); _propertyDescriptors.add(HIVEQL_SELECT_QUERY); + _propertyDescriptors.add(FETCH_SIZE); + _propertyDescriptors.add(MAX_ROWS_PER_FLOW_FILE); + _propertyDescriptors.add(MAX_FRAGMENTS); _propertyDescriptors.add(HIVEQL_OUTPUT_FORMAT); + _propertyDescriptors.add(NORMALIZE_NAMES_FOR_AVRO); _propertyDescriptors.add(HIVEQL_CSV_HEADER); _propertyDescriptors.add(HIVEQL_CSV_ALT_HEADER); _propertyDescriptors.add(HIVEQL_CSV_DELIMITER); @@ -216,7 +257,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { } private void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final FlowFile fileToProcess = (context.hasIncomingConnection()? session.get():null); + final FlowFile fileToProcess = (context.hasIncomingConnection() ? session.get() : null); FlowFile flowfile = null; // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. @@ -243,95 +284,152 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query. // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled. final StringBuilder queryContents = new StringBuilder(); - session.read(fileToProcess, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - queryContents.append(IOUtils.toString(in)); - } - }); + session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, charset))); selectQuery = queryContents.toString(); } + final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger(); + final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions(fileToProcess).asInteger(); + final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet() + ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions(fileToProcess).asInteger() + : 0; final String outputFormat = context.getProperty(HIVEQL_OUTPUT_FORMAT).getValue(); + final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean(); final StopWatch stopWatch = new StopWatch(true); final boolean header = context.getProperty(HIVEQL_CSV_HEADER).asBoolean(); final String altHeader = context.getProperty(HIVEQL_CSV_ALT_HEADER).evaluateAttributeExpressions(fileToProcess).getValue(); final String delimiter = context.getProperty(HIVEQL_CSV_DELIMITER).evaluateAttributeExpressions(fileToProcess).getValue(); final boolean quote = context.getProperty(HIVEQL_CSV_QUOTE).asBoolean(); final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean(); + final String fragmentIdentifier = UUID.randomUUID().toString(); try (final Connection con = dbcpService.getConnection(); - final Statement st = ( flowbased ? con.prepareStatement(selectQuery): con.createStatement()) + final Statement st = (flowbased ? con.prepareStatement(selectQuery) : con.createStatement()) ) { - final AtomicLong nrOfRows = new AtomicLong(0L); - if (fileToProcess == null) { - flowfile = session.create(); - } else { - flowfile = fileToProcess; + if (fetchSize != null && fetchSize > 0) { + try { + st.setFetchSize(fetchSize); + } catch (SQLException se) { + // Not all drivers support this, just log the error (at debug level) and move on + logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se); + } } - flowfile = session.write(flowfile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - try { - logger.debug("Executing query {}", new Object[]{selectQuery}); - if (flowbased) { - // Hive JDBC Doesn't Support this yet: - // ParameterMetaData pmd = ((PreparedStatement)st).getParameterMetaData(); - // int paramCount = pmd.getParameterCount(); + final List resultSetFlowFiles = new ArrayList<>(); + try { + logger.debug("Executing query {}", new Object[]{selectQuery}); + if (flowbased) { + // Hive JDBC Doesn't Support this yet: + // ParameterMetaData pmd = ((PreparedStatement)st).getParameterMetaData(); + // int paramCount = pmd.getParameterCount(); - // Alternate way to determine number of params in SQL. - int paramCount = StringUtils.countMatches(selectQuery, "?"); + // Alternate way to determine number of params in SQL. + int paramCount = StringUtils.countMatches(selectQuery, "?"); - if (paramCount > 0) { - setParameters(1, (PreparedStatement) st, paramCount, fileToProcess.getAttributes()); - } - } - - final ResultSet resultSet = (flowbased ? ((PreparedStatement)st).executeQuery(): st.executeQuery(selectQuery)); - - if (AVRO.equals(outputFormat)) { - nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out)); - } else if (CSV.equals(outputFormat)) { - CsvOutputOptions options = new CsvOutputOptions(header, altHeader, delimiter, quote, escape); - nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out,options)); - } else { - nrOfRows.set(0L); - throw new ProcessException("Unsupported output format: " + outputFormat); - } - } catch (final SQLException e) { - throw new ProcessException(e); + if (paramCount > 0) { + setParameters(1, (PreparedStatement) st, paramCount, flowfile.getAttributes()); } } - }); - // Set attribute for how many rows were selected - flowfile = session.putAttribute(flowfile, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + final ResultSet resultSet; - // Set MIME type on output document and add extension to filename - if (AVRO.equals(outputFormat)) { - flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), AVRO_MIME_TYPE); - flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), flowfile.getAttribute(CoreAttributes.FILENAME.key()) + ".avro"); - } else if (CSV.equals(outputFormat)) { - flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE); - flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), flowfile.getAttribute(CoreAttributes.FILENAME.key()) + ".csv"); + try { + resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(selectQuery)); + } catch (SQLException se) { + // If an error occurs during the query, a flowfile is expected to be routed to failure, so create one here (the original will be removed) + flowfile = session.create(fileToProcess); + throw se; + } + + int fragmentIndex = 0; + String baseFilename = (fileToProcess != null) ? fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) : null; + while (true) { + final AtomicLong nrOfRows = new AtomicLong(0L); + flowfile = (flowfile == null) ? session.create() : session.create(flowfile); + if (baseFilename == null) { + baseFilename = flowfile.getAttribute(CoreAttributes.FILENAME.key()); + } + try { + flowfile = session.write(flowfile, out -> { + try { + if (AVRO.equals(outputFormat)) { + nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out, maxRowsPerFlowFile, convertNamesForAvro)); + } else if (CSV.equals(outputFormat)) { + CsvOutputOptions options = new CsvOutputOptions(header, altHeader, delimiter, quote, escape, maxRowsPerFlowFile); + nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out, options)); + } else { + nrOfRows.set(0L); + throw new ProcessException("Unsupported output format: " + outputFormat); + } + } catch (final SQLException | RuntimeException e) { + throw new ProcessException("Error during database query or conversion of records.", e); + } + }); + } catch (ProcessException e) { + // Add flowfile to results before rethrowing so it will be removed from session in outer catch + resultSetFlowFiles.add(flowfile); + throw e; + } + + if (nrOfRows.get() > 0 || resultSetFlowFiles.isEmpty()) { + // Set attribute for how many rows were selected + flowfile = session.putAttribute(flowfile, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + + // Set MIME type on output document and add extension to filename + if (AVRO.equals(outputFormat)) { + flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_AVRO_BINARY); + flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".avro"); + } else if (CSV.equals(outputFormat)) { + flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE); + flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".csv"); + } + + if (maxRowsPerFlowFile > 0) { + flowfile = session.putAttribute(flowfile, "fragment.identifier", fragmentIdentifier); + flowfile = session.putAttribute(flowfile, "fragment.index", String.valueOf(fragmentIndex)); + } + + logger.info("{} contains {} Avro records; transferring to 'success'", + new Object[]{flowfile, nrOfRows.get()}); + + if (context.hasIncomingConnection()) { + // If the flow file came from an incoming connection, issue a Modify Content provenance event + + session.getProvenanceReporter().modifyContent(flowfile, "Retrieved " + nrOfRows.get() + " rows", + stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + } else { + // If we created a flow file from rows received from Hive, issue a Receive provenance event + session.getProvenanceReporter().receive(flowfile, dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + } + resultSetFlowFiles.add(flowfile); + } else { + // If there were no rows returned (and the first flow file has been sent, we're done processing, so remove the flowfile and carry on + session.remove(flowfile); + break; + } + + fragmentIndex++; + if (maxFragments > 0 && fragmentIndex >= maxFragments) { + break; + } + } + + for (int i = 0; i < resultSetFlowFiles.size(); i++) { + // Set count on all FlowFiles + if (maxRowsPerFlowFile > 0) { + resultSetFlowFiles.set(i, + session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex))); + } + } + + } catch (final SQLException e) { + throw e; } - logger.info("{} contains {} Avro records; transferring to 'success'", - new Object[]{flowfile, nrOfRows.get()}); + session.transfer(resultSetFlowFiles, REL_SUCCESS); - if (context.hasIncomingConnection()) { - // If the flow file came from an incoming connection, issue a Modify Content provenance event - - session.getProvenanceReporter().modifyContent(flowfile, "Retrieved " + nrOfRows.get() + " rows", - stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - } else { - // If we created a flow file from rows received from Hive, issue a Receive provenance event - session.getProvenanceReporter().receive(flowfile, dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - } - session.transfer(flowfile, REL_SUCCESS); } catch (final ProcessException | SQLException e) { logger.error("Issue processing SQL {} due to {}.", new Object[]{selectQuery, e}); if (flowfile == null) { @@ -352,7 +450,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { session.transfer(flowfile, REL_FAILURE); } } finally { - + if (fileToProcess != null) { + session.remove(fileToProcess); + } } } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/CsvOutputOptions.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/CsvOutputOptions.java index bad6926c17..36889129f3 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/CsvOutputOptions.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/CsvOutputOptions.java @@ -24,6 +24,8 @@ public class CsvOutputOptions { private boolean quote = false; private boolean escape = true; + private int maxRowsPerFlowFile = 0; + public boolean isHeader() { return header; } @@ -46,11 +48,16 @@ public class CsvOutputOptions { return escape; } - public CsvOutputOptions(boolean header, String altHeader, String delimiter, boolean quote, boolean escape) { + public int getMaxRowsPerFlowFile() { + return maxRowsPerFlowFile; + } + + public CsvOutputOptions(boolean header, String altHeader, String delimiter, boolean quote, boolean escape, int maxRowsPerFlowFile) { this.header = header; this.altHeader = altHeader; this.delimiter = delimiter; this.quote = quote; this.escape = escape; + this.maxRowsPerFlowFile = maxRowsPerFlowFile; } } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java index 83d4e22431..ff06495fee 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java @@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.nifi.components.PropertyDescriptor; import java.io.IOException; import java.io.OutputStream; @@ -78,14 +79,31 @@ import static java.sql.Types.VARCHAR; */ public class HiveJdbcCommon { - public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException { - return convertToAvroStream(rs, outStream, null, null); + public static final String AVRO = "Avro"; + public static final String CSV = "CSV"; + + public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary"; + public static final String CSV_MIME_TYPE = "text/csv"; + + + public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() + .name("hive-normalize-avro") + .displayName("Normalize Table/Column Names") + .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods " + + "will be changed to underscores in order to build a valid Avro record.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final int maxRows, boolean convertNames) throws SQLException, IOException { + return convertToAvroStream(rs, outStream, null, maxRows, convertNames, null); } - public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback) + public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, final int maxRows, boolean convertNames, ResultSetRowCallback callback) throws SQLException, IOException { - final Schema schema = createSchema(rs, recordName); + final Schema schema = createSchema(rs, recordName, convertNames); final GenericRecord rec = new GenericData.Record(schema); final DatumWriter datumWriter = new GenericDatumWriter<>(schema); @@ -157,14 +175,17 @@ public class HiveJdbcCommon { } dataFileWriter.append(rec); nrOfRows += 1; + + if (maxRows > 0 && nrOfRows == maxRows) + break; } return nrOfRows; } } - public static Schema createSchema(final ResultSet rs) throws SQLException { - return createSchema(rs, null); + public static Schema createSchema(final ResultSet rs, boolean convertNames) throws SQLException { + return createSchema(rs, null, false); } /** @@ -173,10 +194,11 @@ public class HiveJdbcCommon { * * @param rs The result set to convert to Avro * @param recordName The a priori record name to use if it cannot be determined from the result set. + * @param convertNames Whether to convert column/table names to be legal Avro names * @return A Schema object representing the result set converted to an Avro record * @throws SQLException if any error occurs during conversion */ - public static Schema createSchema(final ResultSet rs, String recordName) throws SQLException { + public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException { final ResultSetMetaData meta = rs.getMetaData(); final int nrOfColumns = meta.getColumnCount(); String tableName = StringUtils.isEmpty(recordName) ? "NiFi_SelectHiveQL_Record" : recordName; @@ -196,6 +218,9 @@ public class HiveJdbcCommon { // Not all drivers support getTableName, so just use the previously-set default } + if (convertNames) { + tableName = normalizeNameForAvro(tableName); + } final FieldAssembler builder = SchemaBuilder.record(tableName).namespace("any.data").fields(); /** @@ -325,6 +350,7 @@ public class HiveJdbcCommon { } // Iterate over the rows + int maxRows = outputOptions.getMaxRowsPerFlowFile(); long nrOfRows = 0; while (rs.next()) { if (callback != null) { @@ -388,10 +414,21 @@ public class HiveJdbcCommon { outStream.write(StringUtils.join(rowValues, outputOptions.getDelimiter()).getBytes(StandardCharsets.UTF_8)); outStream.write("\n".getBytes(StandardCharsets.UTF_8)); nrOfRows++; + + if (maxRows > 0 && nrOfRows == maxRows) + break; } return nrOfRows; } + public static String normalizeNameForAvro(String inputName) { + String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_"); + if (Character.isDigit(normalizedName.charAt(0))) { + normalizedName = "_" + normalizedName; + } + return normalizedName; + } + /** * An interface for callback methods which allows processing of a row during the convertToXYZStream() processing. * IMPORTANT: This method should only work on the row pointed at by the current ResultSet reference. diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java index 6ce21e97d0..34384ac286 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHiveQL.java @@ -29,6 +29,7 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.hive.HiveJdbcCommon; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -51,12 +52,17 @@ import java.util.Map; import java.util.Random; import static org.apache.nifi.processors.hive.SelectHiveQL.HIVEQL_OUTPUT_FORMAT; +import static org.apache.nifi.util.hive.HiveJdbcCommon.AVRO; +import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV; +import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV_MIME_TYPE; +import static org.apache.nifi.util.hive.HiveJdbcCommon.MIME_TYPE_AVRO_BINARY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class TestSelectHiveQL { private static final Logger LOGGER; + private final static String MAX_ROWS_KEY = "maxRows"; static { System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); @@ -67,14 +73,14 @@ public class TestSelectHiveQL { LOGGER = LoggerFactory.getLogger(TestSelectHiveQL.class); } - final static String DB_LOCATION = "target/db"; + private final static String DB_LOCATION = "target/db"; - final static String QUERY_WITH_EL = "select " + private final static String QUERY_WITH_EL = "select " + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + " from persons PER" + " where PER.ID > ${person.id}"; - final static String QUERY_WITHOUT_EL = "select " + private final static String QUERY_WITHOUT_EL = "select " + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + " from persons PER" + " where PER.ID > 10"; @@ -132,6 +138,7 @@ public class TestSelectHiveQL { try { stmt.execute("drop table TEST_NULL_INT"); } catch (final SQLException sqle) { + // Nothing to do, probably means the table didn't exist } stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); @@ -160,6 +167,7 @@ public class TestSelectHiveQL { try { stmt.execute("drop table TEST_NO_ROWS"); } catch (final SQLException sqle) { + // Nothing to do, probably means the table didn't exist } stmt.execute("create table TEST_NO_ROWS (id integer)"); @@ -176,13 +184,13 @@ public class TestSelectHiveQL { @Test public void invokeOnTriggerWithCsv() throws InitializationException, ClassNotFoundException, SQLException, IOException { - invokeOnTrigger(QUERY_WITHOUT_EL, false, SelectHiveQL.CSV); + invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV); } @Test public void invokeOnTriggerWithAvro() throws InitializationException, ClassNotFoundException, SQLException, IOException { - invokeOnTrigger(QUERY_WITHOUT_EL, false, SelectHiveQL.AVRO); + invokeOnTrigger(QUERY_WITHOUT_EL, false, AVRO); } public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat) @@ -230,8 +238,8 @@ public class TestSelectHiveQL { MockFlowFile flowFile = flowfiles.get(0); final InputStream in = new ByteArrayInputStream(flowFile.toByteArray()); long recordsFromStream = 0; - if (SelectHiveQL.AVRO.equals(outputFormat)) { - assertEquals(SelectHiveQL.AVRO_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + if (AVRO.equals(outputFormat)) { + assertEquals(MIME_TYPE_AVRO_BINARY, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); final DatumReader datumReader = new GenericDatumReader<>(); try (DataFileStream dataFileReader = new DataFileStream<>(in, datumReader)) { GenericRecord record = null; @@ -244,7 +252,7 @@ public class TestSelectHiveQL { } } } else { - assertEquals(SelectHiveQL.CSV_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + assertEquals(CSV_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String headerRow = br.readLine(); @@ -256,7 +264,7 @@ public class TestSelectHiveQL { while ((line = br.readLine()) != null) { recordsFromStream++; String[] values = line.split(","); - if(recordsFromStream < (nrOfRows - 10)) { + if (recordsFromStream < (nrOfRows - 10)) { assertEquals(3, values.length); assertTrue(values[1].startsWith("\"")); assertTrue(values[1].endsWith("\"")); @@ -269,6 +277,178 @@ public class TestSelectHiveQL { assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHiveQL.RESULT_ROW_COUNT))); } + @Test + public void testMaxRowsPerFlowFileAvro() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + InputStream in; + MockFlowFile mff; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + int rowCount = 0; + //create larger row set + for (int batch = 0; batch < 100; batch++) { + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.setIncomingConnection(false); + runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE"); + runner.setProperty(SelectHiveQL.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}"); + runner.setProperty(SelectHiveQL.HIVEQL_OUTPUT_FORMAT, HiveJdbcCommon.AVRO); + runner.setVariable(MAX_ROWS_KEY, "9"); + + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 12); + + //ensure all but the last file have 9 records each + for (int ff = 0; ff < 11; ff++) { + mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(ff); + in = new ByteArrayInputStream(mff.toByteArray()); + assertEquals(9, getNumberOfRecordsFromStream(in)); + + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); + } + + //last file should have 1 record + mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(11); + in = new ByteArrayInputStream(mff.toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(11), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); + runner.clearTransferState(); + } + + @Test + public void testMaxRowsPerFlowFileCSV() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + InputStream in; + MockFlowFile mff; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + int rowCount = 0; + //create larger row set + for (int batch = 0; batch < 100; batch++) { + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.setIncomingConnection(true); + runner.setProperty(SelectHiveQL.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}"); + runner.setProperty(SelectHiveQL.HIVEQL_OUTPUT_FORMAT, HiveJdbcCommon.CSV); + + runner.enqueue("SELECT * FROM TEST_QUERY_DB_TABLE", new HashMap() {{ + put(MAX_ROWS_KEY, "9"); + }}); + + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 12); + + //ensure all but the last file have 9 records (10 lines = 9 records + header) each + for (int ff = 0; ff < 11; ff++) { + mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(ff); + in = new ByteArrayInputStream(mff.toByteArray()); + BufferedReader br = new BufferedReader(new InputStreamReader(in)); + assertEquals(10, br.lines().count()); + + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); + } + + //last file should have 1 record (2 lines = 1 record + header) + mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(11); + in = new ByteArrayInputStream(mff.toByteArray()); + BufferedReader br = new BufferedReader(new InputStreamReader(in)); + assertEquals(2, br.lines().count()); + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(11), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); + runner.clearTransferState(); + } + + @Test + public void testMaxRowsPerFlowFileWithMaxFragments() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + InputStream in; + MockFlowFile mff; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + int rowCount = 0; + //create larger row set + for (int batch = 0; batch < 100; batch++) { + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.setIncomingConnection(false); + runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE"); + runner.setProperty(SelectHiveQL.MAX_ROWS_PER_FLOW_FILE, "9"); + Integer maxFragments = 3; + runner.setProperty(SelectHiveQL.MAX_FRAGMENTS, maxFragments.toString()); + + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, maxFragments); + + for (int i = 0; i < maxFragments; i++) { + mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(i); + in = new ByteArrayInputStream(mff.toByteArray()); + assertEquals(9, getNumberOfRecordsFromStream(in)); + + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(i), mff.getAttribute("fragment.index")); + assertEquals(maxFragments.toString(), mff.getAttribute("fragment.count")); + } + + runner.clearTransferState(); + } + + private long getNumberOfRecordsFromStream(InputStream in) throws IOException { + final DatumReader datumReader = new GenericDatumReader<>(); + try (DataFileStream dataFileReader = new DataFileStream<>(in, datumReader)) { + GenericRecord record = null; + long recordsFromStream = 0; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with + // many items. + record = dataFileReader.next(record); + recordsFromStream += 1; + } + + return recordsFromStream; + } + } + /** * Simple implementation only for SelectHiveQL processor testing. */ @@ -283,8 +463,7 @@ public class TestSelectHiveQL { public Connection getConnection() throws ProcessException { try { Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); - return con; + return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); } catch (final Exception e) { throw new ProcessException("getConnection failed: " + e); }