NIFI-4473: Add support for large result sets and normalizing Avro names to SelectHiveQL

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2212.
This commit is contained in:
Matthew Burgess 2017-10-16 13:38:56 -04:00 committed by Pierre Villard
parent fd00df3d2f
commit 4edafad6e5
4 changed files with 421 additions and 98 deletions

View File

@ -16,9 +16,6 @@
*/
package org.apache.nifi.processors.hive;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.PreparedStatement;
@ -30,6 +27,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -53,14 +51,18 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.PartialFunctions;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.hive.CsvOutputOptions;
import org.apache.nifi.util.hive.HiveJdbcCommon;
import static org.apache.nifi.util.hive.HiveJdbcCommon.AVRO;
import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV;
import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV_MIME_TYPE;
import static org.apache.nifi.util.hive.HiveJdbcCommon.MIME_TYPE_AVRO_BINARY;
import static org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
@EventDriven
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"hive", "sql", "select", "jdbc", "query", "database"})
@ -72,19 +74,21 @@ import org.apache.nifi.util.hive.HiveJdbcCommon;
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Sets the MIME type for the outgoing flowfile to application/avro-binary for Avro or text/csv for CSV."),
@WritesAttribute(attribute = "filename", description = "Adds .avro or .csv to the filename attribute depending on which output format is selected."),
@WritesAttribute(attribute = "selecthiveql.row.count", description = "Indicates how many rows were selected/returned by the query.")
@WritesAttribute(attribute = "selecthiveql.row.count", description = "Indicates how many rows were selected/returned by the query."),
@WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set "
+ "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."),
@WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of "
+ "FlowFiles produced by a single ResultSet. This can be used in conjunction with the "
+ "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet."),
@WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of "
+ "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be "
+ "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order "
+ "FlowFiles were produced")
})
public class SelectHiveQL extends AbstractHiveQLProcessor {
public static final String RESULT_ROW_COUNT = "selecthiveql.row.count";
protected static final String AVRO = "Avro";
protected static final String CSV = "CSV";
public static final String AVRO_MIME_TYPE = "application/avro-binary";
public static final String CSV_MIME_TYPE = "text/csv";
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -99,12 +103,45 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
public static final PropertyDescriptor HIVEQL_SELECT_QUERY = new PropertyDescriptor.Builder()
.name("hive-query")
.displayName("HiveQL Select Query")
.description("HiveQL SELECT query to execute")
.description("HiveQL SELECT query to execute. If this is not set, the query is assumed to be in the content of an incoming FlowFile.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("hive-fetch-size")
.displayName("Fetch Size")
.description("The number of result rows to be fetched from the result set at a time. This is a hint to the driver and may not be "
+ "honored and/or exact. If the value specified is zero, then the hint is ignored.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder()
.name("hive-max-rows")
.displayName("Max Rows Per Flow File")
.description("The maximum number of result rows that will be included in a single FlowFile. " +
"This will allow you to break up very large result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder()
.name("hive-max-frags")
.displayName("Maximum Number of Fragments")
.description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " +
"This prevents OutOfMemoryError when this processor ingests huge table.")
.defaultValue("0")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor HIVEQL_CSV_HEADER = new PropertyDescriptor.Builder()
.name("csv-header")
.displayName("CSV Header")
@ -174,7 +211,11 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(HIVE_DBCP_SERVICE);
_propertyDescriptors.add(HIVEQL_SELECT_QUERY);
_propertyDescriptors.add(FETCH_SIZE);
_propertyDescriptors.add(MAX_ROWS_PER_FLOW_FILE);
_propertyDescriptors.add(MAX_FRAGMENTS);
_propertyDescriptors.add(HIVEQL_OUTPUT_FORMAT);
_propertyDescriptors.add(NORMALIZE_NAMES_FOR_AVRO);
_propertyDescriptors.add(HIVEQL_CSV_HEADER);
_propertyDescriptors.add(HIVEQL_CSV_ALT_HEADER);
_propertyDescriptors.add(HIVEQL_CSV_DELIMITER);
@ -216,7 +257,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
}
private void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile fileToProcess = (context.hasIncomingConnection()? session.get():null);
final FlowFile fileToProcess = (context.hasIncomingConnection() ? session.get() : null);
FlowFile flowfile = null;
// If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
@ -243,95 +284,152 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
// If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
// If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
final StringBuilder queryContents = new StringBuilder();
session.read(fileToProcess, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
queryContents.append(IOUtils.toString(in));
}
});
session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, charset)));
selectQuery = queryContents.toString();
}
final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions(fileToProcess).asInteger();
final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions(fileToProcess).asInteger()
: 0;
final String outputFormat = context.getProperty(HIVEQL_OUTPUT_FORMAT).getValue();
final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
final StopWatch stopWatch = new StopWatch(true);
final boolean header = context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
final String altHeader = context.getProperty(HIVEQL_CSV_ALT_HEADER).evaluateAttributeExpressions(fileToProcess).getValue();
final String delimiter = context.getProperty(HIVEQL_CSV_DELIMITER).evaluateAttributeExpressions(fileToProcess).getValue();
final boolean quote = context.getProperty(HIVEQL_CSV_QUOTE).asBoolean();
final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
final String fragmentIdentifier = UUID.randomUUID().toString();
try (final Connection con = dbcpService.getConnection();
final Statement st = ( flowbased ? con.prepareStatement(selectQuery): con.createStatement())
final Statement st = (flowbased ? con.prepareStatement(selectQuery) : con.createStatement())
) {
final AtomicLong nrOfRows = new AtomicLong(0L);
if (fileToProcess == null) {
flowfile = session.create();
} else {
flowfile = fileToProcess;
if (fetchSize != null && fetchSize > 0) {
try {
st.setFetchSize(fetchSize);
} catch (SQLException se) {
// Not all drivers support this, just log the error (at debug level) and move on
logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
}
}
flowfile = session.write(flowfile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
logger.debug("Executing query {}", new Object[]{selectQuery});
if (flowbased) {
// Hive JDBC Doesn't Support this yet:
// ParameterMetaData pmd = ((PreparedStatement)st).getParameterMetaData();
// int paramCount = pmd.getParameterCount();
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
try {
logger.debug("Executing query {}", new Object[]{selectQuery});
if (flowbased) {
// Hive JDBC Doesn't Support this yet:
// ParameterMetaData pmd = ((PreparedStatement)st).getParameterMetaData();
// int paramCount = pmd.getParameterCount();
// Alternate way to determine number of params in SQL.
int paramCount = StringUtils.countMatches(selectQuery, "?");
// Alternate way to determine number of params in SQL.
int paramCount = StringUtils.countMatches(selectQuery, "?");
if (paramCount > 0) {
setParameters(1, (PreparedStatement) st, paramCount, fileToProcess.getAttributes());
}
}
final ResultSet resultSet = (flowbased ? ((PreparedStatement)st).executeQuery(): st.executeQuery(selectQuery));
if (AVRO.equals(outputFormat)) {
nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out));
} else if (CSV.equals(outputFormat)) {
CsvOutputOptions options = new CsvOutputOptions(header, altHeader, delimiter, quote, escape);
nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out,options));
} else {
nrOfRows.set(0L);
throw new ProcessException("Unsupported output format: " + outputFormat);
}
} catch (final SQLException e) {
throw new ProcessException(e);
if (paramCount > 0) {
setParameters(1, (PreparedStatement) st, paramCount, flowfile.getAttributes());
}
}
});
// Set attribute for how many rows were selected
flowfile = session.putAttribute(flowfile, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
final ResultSet resultSet;
// Set MIME type on output document and add extension to filename
if (AVRO.equals(outputFormat)) {
flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), AVRO_MIME_TYPE);
flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), flowfile.getAttribute(CoreAttributes.FILENAME.key()) + ".avro");
} else if (CSV.equals(outputFormat)) {
flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE);
flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), flowfile.getAttribute(CoreAttributes.FILENAME.key()) + ".csv");
try {
resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(selectQuery));
} catch (SQLException se) {
// If an error occurs during the query, a flowfile is expected to be routed to failure, so create one here (the original will be removed)
flowfile = session.create(fileToProcess);
throw se;
}
int fragmentIndex = 0;
String baseFilename = (fileToProcess != null) ? fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) : null;
while (true) {
final AtomicLong nrOfRows = new AtomicLong(0L);
flowfile = (flowfile == null) ? session.create() : session.create(flowfile);
if (baseFilename == null) {
baseFilename = flowfile.getAttribute(CoreAttributes.FILENAME.key());
}
try {
flowfile = session.write(flowfile, out -> {
try {
if (AVRO.equals(outputFormat)) {
nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out, maxRowsPerFlowFile, convertNamesForAvro));
} else if (CSV.equals(outputFormat)) {
CsvOutputOptions options = new CsvOutputOptions(header, altHeader, delimiter, quote, escape, maxRowsPerFlowFile);
nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out, options));
} else {
nrOfRows.set(0L);
throw new ProcessException("Unsupported output format: " + outputFormat);
}
} catch (final SQLException | RuntimeException e) {
throw new ProcessException("Error during database query or conversion of records.", e);
}
});
} catch (ProcessException e) {
// Add flowfile to results before rethrowing so it will be removed from session in outer catch
resultSetFlowFiles.add(flowfile);
throw e;
}
if (nrOfRows.get() > 0 || resultSetFlowFiles.isEmpty()) {
// Set attribute for how many rows were selected
flowfile = session.putAttribute(flowfile, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
// Set MIME type on output document and add extension to filename
if (AVRO.equals(outputFormat)) {
flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_AVRO_BINARY);
flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".avro");
} else if (CSV.equals(outputFormat)) {
flowfile = session.putAttribute(flowfile, CoreAttributes.MIME_TYPE.key(), CSV_MIME_TYPE);
flowfile = session.putAttribute(flowfile, CoreAttributes.FILENAME.key(), baseFilename + "." + fragmentIndex + ".csv");
}
if (maxRowsPerFlowFile > 0) {
flowfile = session.putAttribute(flowfile, "fragment.identifier", fragmentIdentifier);
flowfile = session.putAttribute(flowfile, "fragment.index", String.valueOf(fragmentIndex));
}
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{flowfile, nrOfRows.get()});
if (context.hasIncomingConnection()) {
// If the flow file came from an incoming connection, issue a Modify Content provenance event
session.getProvenanceReporter().modifyContent(flowfile, "Retrieved " + nrOfRows.get() + " rows",
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
} else {
// If we created a flow file from rows received from Hive, issue a Receive provenance event
session.getProvenanceReporter().receive(flowfile, dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
}
resultSetFlowFiles.add(flowfile);
} else {
// If there were no rows returned (and the first flow file has been sent, we're done processing, so remove the flowfile and carry on
session.remove(flowfile);
break;
}
fragmentIndex++;
if (maxFragments > 0 && fragmentIndex >= maxFragments) {
break;
}
}
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
// Set count on all FlowFiles
if (maxRowsPerFlowFile > 0) {
resultSetFlowFiles.set(i,
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex)));
}
}
} catch (final SQLException e) {
throw e;
}
logger.info("{} contains {} Avro records; transferring to 'success'",
new Object[]{flowfile, nrOfRows.get()});
session.transfer(resultSetFlowFiles, REL_SUCCESS);
if (context.hasIncomingConnection()) {
// If the flow file came from an incoming connection, issue a Modify Content provenance event
session.getProvenanceReporter().modifyContent(flowfile, "Retrieved " + nrOfRows.get() + " rows",
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
} else {
// If we created a flow file from rows received from Hive, issue a Receive provenance event
session.getProvenanceReporter().receive(flowfile, dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
}
session.transfer(flowfile, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
logger.error("Issue processing SQL {} due to {}.", new Object[]{selectQuery, e});
if (flowfile == null) {
@ -352,7 +450,9 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
session.transfer(flowfile, REL_FAILURE);
}
} finally {
if (fileToProcess != null) {
session.remove(fileToProcess);
}
}
}
}

View File

@ -24,6 +24,8 @@ public class CsvOutputOptions {
private boolean quote = false;
private boolean escape = true;
private int maxRowsPerFlowFile = 0;
public boolean isHeader() {
return header;
}
@ -46,11 +48,16 @@ public class CsvOutputOptions {
return escape;
}
public CsvOutputOptions(boolean header, String altHeader, String delimiter, boolean quote, boolean escape) {
public int getMaxRowsPerFlowFile() {
return maxRowsPerFlowFile;
}
public CsvOutputOptions(boolean header, String altHeader, String delimiter, boolean quote, boolean escape, int maxRowsPerFlowFile) {
this.header = header;
this.altHeader = altHeader;
this.delimiter = delimiter;
this.quote = quote;
this.escape = escape;
this.maxRowsPerFlowFile = maxRowsPerFlowFile;
}
}

View File

@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.nifi.components.PropertyDescriptor;
import java.io.IOException;
import java.io.OutputStream;
@ -78,14 +79,31 @@ import static java.sql.Types.VARCHAR;
*/
public class HiveJdbcCommon {
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException {
return convertToAvroStream(rs, outStream, null, null);
public static final String AVRO = "Avro";
public static final String CSV = "CSV";
public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary";
public static final String CSV_MIME_TYPE = "text/csv";
public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder()
.name("hive-normalize-avro")
.displayName("Normalize Table/Column Names")
.description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods "
+ "will be changed to underscores in order to build a valid Avro record.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final int maxRows, boolean convertNames) throws SQLException, IOException {
return convertToAvroStream(rs, outStream, null, maxRows, convertNames, null);
}
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback)
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, String recordName, final int maxRows, boolean convertNames, ResultSetRowCallback callback)
throws SQLException, IOException {
final Schema schema = createSchema(rs, recordName);
final Schema schema = createSchema(rs, recordName, convertNames);
final GenericRecord rec = new GenericData.Record(schema);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
@ -157,14 +175,17 @@ public class HiveJdbcCommon {
}
dataFileWriter.append(rec);
nrOfRows += 1;
if (maxRows > 0 && nrOfRows == maxRows)
break;
}
return nrOfRows;
}
}
public static Schema createSchema(final ResultSet rs) throws SQLException {
return createSchema(rs, null);
public static Schema createSchema(final ResultSet rs, boolean convertNames) throws SQLException {
return createSchema(rs, null, false);
}
/**
@ -173,10 +194,11 @@ public class HiveJdbcCommon {
*
* @param rs The result set to convert to Avro
* @param recordName The a priori record name to use if it cannot be determined from the result set.
* @param convertNames Whether to convert column/table names to be legal Avro names
* @return A Schema object representing the result set converted to an Avro record
* @throws SQLException if any error occurs during conversion
*/
public static Schema createSchema(final ResultSet rs, String recordName) throws SQLException {
public static Schema createSchema(final ResultSet rs, String recordName, boolean convertNames) throws SQLException {
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
String tableName = StringUtils.isEmpty(recordName) ? "NiFi_SelectHiveQL_Record" : recordName;
@ -196,6 +218,9 @@ public class HiveJdbcCommon {
// Not all drivers support getTableName, so just use the previously-set default
}
if (convertNames) {
tableName = normalizeNameForAvro(tableName);
}
final FieldAssembler<Schema> builder = SchemaBuilder.record(tableName).namespace("any.data").fields();
/**
@ -325,6 +350,7 @@ public class HiveJdbcCommon {
}
// Iterate over the rows
int maxRows = outputOptions.getMaxRowsPerFlowFile();
long nrOfRows = 0;
while (rs.next()) {
if (callback != null) {
@ -388,10 +414,21 @@ public class HiveJdbcCommon {
outStream.write(StringUtils.join(rowValues, outputOptions.getDelimiter()).getBytes(StandardCharsets.UTF_8));
outStream.write("\n".getBytes(StandardCharsets.UTF_8));
nrOfRows++;
if (maxRows > 0 && nrOfRows == maxRows)
break;
}
return nrOfRows;
}
public static String normalizeNameForAvro(String inputName) {
String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_");
if (Character.isDigit(normalizedName.charAt(0))) {
normalizedName = "_" + normalizedName;
}
return normalizedName;
}
/**
* An interface for callback methods which allows processing of a row during the convertToXYZStream() processing.
* <b>IMPORTANT:</b> This method should only work on the row pointed at by the current ResultSet reference.

View File

@ -29,6 +29,7 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.hive.HiveJdbcCommon;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -51,12 +52,17 @@ import java.util.Map;
import java.util.Random;
import static org.apache.nifi.processors.hive.SelectHiveQL.HIVEQL_OUTPUT_FORMAT;
import static org.apache.nifi.util.hive.HiveJdbcCommon.AVRO;
import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV;
import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV_MIME_TYPE;
import static org.apache.nifi.util.hive.HiveJdbcCommon.MIME_TYPE_AVRO_BINARY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestSelectHiveQL {
private static final Logger LOGGER;
private final static String MAX_ROWS_KEY = "maxRows";
static {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
@ -67,14 +73,14 @@ public class TestSelectHiveQL {
LOGGER = LoggerFactory.getLogger(TestSelectHiveQL.class);
}
final static String DB_LOCATION = "target/db";
private final static String DB_LOCATION = "target/db";
final static String QUERY_WITH_EL = "select "
private final static String QUERY_WITH_EL = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ " from persons PER"
+ " where PER.ID > ${person.id}";
final static String QUERY_WITHOUT_EL = "select "
private final static String QUERY_WITHOUT_EL = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ " from persons PER"
+ " where PER.ID > 10";
@ -132,6 +138,7 @@ public class TestSelectHiveQL {
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
// Nothing to do, probably means the table didn't exist
}
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
@ -160,6 +167,7 @@ public class TestSelectHiveQL {
try {
stmt.execute("drop table TEST_NO_ROWS");
} catch (final SQLException sqle) {
// Nothing to do, probably means the table didn't exist
}
stmt.execute("create table TEST_NO_ROWS (id integer)");
@ -176,13 +184,13 @@ public class TestSelectHiveQL {
@Test
public void invokeOnTriggerWithCsv()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(QUERY_WITHOUT_EL, false, SelectHiveQL.CSV);
invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV);
}
@Test
public void invokeOnTriggerWithAvro()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(QUERY_WITHOUT_EL, false, SelectHiveQL.AVRO);
invokeOnTrigger(QUERY_WITHOUT_EL, false, AVRO);
}
public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat)
@ -230,8 +238,8 @@ public class TestSelectHiveQL {
MockFlowFile flowFile = flowfiles.get(0);
final InputStream in = new ByteArrayInputStream(flowFile.toByteArray());
long recordsFromStream = 0;
if (SelectHiveQL.AVRO.equals(outputFormat)) {
assertEquals(SelectHiveQL.AVRO_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
if (AVRO.equals(outputFormat)) {
assertEquals(MIME_TYPE_AVRO_BINARY, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
GenericRecord record = null;
@ -244,7 +252,7 @@ public class TestSelectHiveQL {
}
}
} else {
assertEquals(SelectHiveQL.CSV_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
assertEquals(CSV_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String headerRow = br.readLine();
@ -256,7 +264,7 @@ public class TestSelectHiveQL {
while ((line = br.readLine()) != null) {
recordsFromStream++;
String[] values = line.split(",");
if(recordsFromStream < (nrOfRows - 10)) {
if (recordsFromStream < (nrOfRows - 10)) {
assertEquals(3, values.length);
assertTrue(values[1].startsWith("\""));
assertTrue(values[1].endsWith("\""));
@ -269,6 +277,178 @@ public class TestSelectHiveQL {
assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHiveQL.RESULT_ROW_COUNT)));
}
@Test
public void testMaxRowsPerFlowFileAvro() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
InputStream in;
MockFlowFile mff;
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
int rowCount = 0;
//create larger row set
for (int batch = 0; batch < 100; batch++) {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
rowCount++;
}
runner.setIncomingConnection(false);
runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE");
runner.setProperty(SelectHiveQL.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}");
runner.setProperty(SelectHiveQL.HIVEQL_OUTPUT_FORMAT, HiveJdbcCommon.AVRO);
runner.setVariable(MAX_ROWS_KEY, "9");
runner.run();
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 12);
//ensure all but the last file have 9 records each
for (int ff = 0; ff < 11; ff++) {
mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(ff);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index"));
assertEquals("12", mff.getAttribute("fragment.count"));
}
//last file should have 1 record
mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(11);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(1, getNumberOfRecordsFromStream(in));
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(11), mff.getAttribute("fragment.index"));
assertEquals("12", mff.getAttribute("fragment.count"));
runner.clearTransferState();
}
@Test
public void testMaxRowsPerFlowFileCSV() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
InputStream in;
MockFlowFile mff;
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
int rowCount = 0;
//create larger row set
for (int batch = 0; batch < 100; batch++) {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
rowCount++;
}
runner.setIncomingConnection(true);
runner.setProperty(SelectHiveQL.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}");
runner.setProperty(SelectHiveQL.HIVEQL_OUTPUT_FORMAT, HiveJdbcCommon.CSV);
runner.enqueue("SELECT * FROM TEST_QUERY_DB_TABLE", new HashMap<String, String>() {{
put(MAX_ROWS_KEY, "9");
}});
runner.run();
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, 12);
//ensure all but the last file have 9 records (10 lines = 9 records + header) each
for (int ff = 0; ff < 11; ff++) {
mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(ff);
in = new ByteArrayInputStream(mff.toByteArray());
BufferedReader br = new BufferedReader(new InputStreamReader(in));
assertEquals(10, br.lines().count());
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index"));
assertEquals("12", mff.getAttribute("fragment.count"));
}
//last file should have 1 record (2 lines = 1 record + header)
mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(11);
in = new ByteArrayInputStream(mff.toByteArray());
BufferedReader br = new BufferedReader(new InputStreamReader(in));
assertEquals(2, br.lines().count());
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(11), mff.getAttribute("fragment.index"));
assertEquals("12", mff.getAttribute("fragment.count"));
runner.clearTransferState();
}
@Test
public void testMaxRowsPerFlowFileWithMaxFragments() throws ClassNotFoundException, SQLException, InitializationException, IOException {
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
InputStream in;
MockFlowFile mff;
try {
stmt.execute("drop table TEST_QUERY_DB_TABLE");
} catch (final SQLException sqle) {
// Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842]
}
stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)");
int rowCount = 0;
//create larger row set
for (int batch = 0; batch < 100; batch++) {
stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')");
rowCount++;
}
runner.setIncomingConnection(false);
runner.setProperty(SelectHiveQL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE");
runner.setProperty(SelectHiveQL.MAX_ROWS_PER_FLOW_FILE, "9");
Integer maxFragments = 3;
runner.setProperty(SelectHiveQL.MAX_FRAGMENTS, maxFragments.toString());
runner.run();
runner.assertAllFlowFilesTransferred(SelectHiveQL.REL_SUCCESS, maxFragments);
for (int i = 0; i < maxFragments; i++) {
mff = runner.getFlowFilesForRelationship(SelectHiveQL.REL_SUCCESS).get(i);
in = new ByteArrayInputStream(mff.toByteArray());
assertEquals(9, getNumberOfRecordsFromStream(in));
mff.assertAttributeExists("fragment.identifier");
assertEquals(Integer.toString(i), mff.getAttribute("fragment.index"));
assertEquals(maxFragments.toString(), mff.getAttribute("fragment.count"));
}
runner.clearTransferState();
}
private long getNumberOfRecordsFromStream(InputStream in) throws IOException {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) {
GenericRecord record = null;
long recordsFromStream = 0;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
record = dataFileReader.next(record);
recordsFromStream += 1;
}
return recordsFromStream;
}
}
/**
* Simple implementation only for SelectHiveQL processor testing.
*/
@ -283,8 +463,7 @@ public class TestSelectHiveQL {
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con;
return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
}