NIFI-5044: Applied changes in SelectHiveQL to SelectHive3QL

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2799.
This commit is contained in:
Matthew Burgess 2018-06-15 13:39:28 -04:00 committed by Pierre Villard
parent 187417d077
commit 97f71fd6c7
2 changed files with 260 additions and 65 deletions

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -100,10 +102,21 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("HiveQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
.description("HiveQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship.")
.build();
public static final PropertyDescriptor HIVEQL_PRE_QUERY = new PropertyDescriptor.Builder()
.name("hive-pre-query")
.displayName("HiveQL Pre-Query")
.description("HiveQL pre-query to execute. Semicolon-delimited list of queries. "
+ "Example: 'set tez.queue.name=queue1; set hive.exec.orc.split.strategy=ETL; set hive.exec.reducers.bytes.per.reducer=1073741824'. "
+ "Note, the results/outputs of these queries will be suppressed if successfully executed.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor HIVEQL_SELECT_QUERY = new PropertyDescriptor.Builder()
.name("hive-query")
.displayName("HiveQL Select Query")
@ -113,6 +126,16 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor HIVEQL_POST_QUERY = new PropertyDescriptor.Builder()
.name("hive-post-query")
.displayName("HiveQL Post-Query")
.description("HiveQL post-query to execute. Semicolon-delimited list of queries. "
+ "Note, the results/outputs of these queries will be suppressed if successfully executed.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder()
.name("hive-fetch-size")
.displayName("Fetch Size")
@ -214,7 +237,9 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(HIVE_DBCP_SERVICE);
_propertyDescriptors.add(HIVEQL_PRE_QUERY);
_propertyDescriptors.add(HIVEQL_SELECT_QUERY);
_propertyDescriptors.add(HIVEQL_POST_QUERY);
_propertyDescriptors.add(FETCH_SIZE);
_propertyDescriptors.add(QUERY_TIMEOUT);
_propertyDescriptors.add(MAX_ROWS_PER_FLOW_FILE);
@ -278,19 +303,22 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
final Hive3DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class);
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
List<String> preQueries = getQueries(context.getProperty(HIVEQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
List<String> postQueries = getQueries(context.getProperty(HIVEQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
final boolean flowbased = !(context.getProperty(HIVEQL_SELECT_QUERY).isSet());
// Source the SQL
final String selectQuery;
String hqlStatement;
if (context.getProperty(HIVEQL_SELECT_QUERY).isSet()) {
selectQuery = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
hqlStatement = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
} else {
// If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
// If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
final StringBuilder queryContents = new StringBuilder();
session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, charset)));
selectQuery = queryContents.toString();
hqlStatement = queryContents.toString();
}
@ -309,10 +337,17 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
final boolean escape = context.getProperty(HIVEQL_CSV_HEADER).asBoolean();
final String fragmentIdentifier = UUID.randomUUID().toString();
try (final Connection con = dbcpService.getConnection();
final Statement st = (flowbased ? con.prepareStatement(selectQuery) : con.createStatement())
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
final Statement st = (flowbased ? con.prepareStatement(hqlStatement) : con.createStatement())
) {
Pair<String,SQLException> failure = executeConfigStatements(con, preQueries);
if (failure != null) {
// In case of failure, assigning config query to "hqlStatement" to follow current error handling
hqlStatement = failure.getLeft();
flowfile = (fileToProcess == null) ? session.create() : fileToProcess;
fileToProcess = null;
throw failure.getRight();
}
st.setQueryTimeout(context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asInteger());
if (fetchSize != null && fetchSize > 0) {
@ -326,14 +361,14 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
try {
logger.debug("Executing query {}", new Object[]{selectQuery});
logger.debug("Executing query {}", new Object[]{hqlStatement});
if (flowbased) {
// Hive JDBC Doesn't Support this yet:
// ParameterMetaData pmd = ((PreparedStatement)st).getParameterMetaData();
// int paramCount = pmd.getParameterCount();
// Alternate way to determine number of params in SQL.
int paramCount = StringUtils.countMatches(selectQuery, "?");
int paramCount = StringUtils.countMatches(hqlStatement, "?");
if (paramCount > 0) {
setParameters(1, (PreparedStatement) st, paramCount, fileToProcess.getAttributes());
@ -343,7 +378,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
final ResultSet resultSet;
try {
resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(selectQuery));
resultSet = (flowbased ? ((PreparedStatement) st).executeQuery() : st.executeQuery(hqlStatement));
} catch (SQLException se) {
// If an error occurs during the query, a flowfile is expected to be routed to failure, so ensure one here
flowfile = (fileToProcess == null) ? session.create() : fileToProcess;
@ -355,7 +390,7 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
String baseFilename = (fileToProcess != null) ? fileToProcess.getAttribute(CoreAttributes.FILENAME.key()) : null;
while (true) {
final AtomicLong nrOfRows = new AtomicLong(0L);
flowfile = (flowfile == null) ? session.create() : session.create(flowfile);
flowfile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
if (baseFilename == null) {
baseFilename = flowfile.getAttribute(CoreAttributes.FILENAME.key());
}
@ -388,10 +423,10 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
try {
// Set input/output table names by parsing the query
attributes.putAll(toQueryTableAttributes(findTableNames(selectQuery)));
attributes.putAll(toQueryTableAttributes(findTableNames(hqlStatement)));
} catch (Exception e) {
// If failed to parse the query, just log a warning message, but continue.
getLogger().warn("Failed to parse query: {} due to {}", new Object[]{selectQuery, e}, e);
getLogger().warn("Failed to parse query: {} due to {}", new Object[]{hqlStatement, e}, e);
}
// Set MIME type on output document and add extension to filename
@ -410,14 +445,13 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
flowfile = session.putAllAttributes(flowfile, attributes);
logger.info("{} contains {} Avro records; transferring to 'success'",
logger.info("{} contains {} " + outputFormat + " records; transferring to 'success'",
new Object[]{flowfile, nrOfRows.get()});
if (context.hasIncomingConnection()) {
// If the flow file came from an incoming connection, issue a Modify Content provenance event
session.getProvenanceReporter().modifyContent(flowfile, "Retrieved " + nrOfRows.get() + " rows",
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
// If the flow file came from an incoming connection, issue a Fetch provenance event
session.getProvenanceReporter().fetch(flowfile, dbcpService.getConnectionURL(),
"Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
} else {
// If we created a flow file from rows received from Hive, issue a Receive provenance event
session.getProvenanceReporter().receive(flowfile, dbcpService.getConnectionURL(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
@ -426,6 +460,9 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
} else {
// If there were no rows returned (and the first flow file has been sent, we're done processing, so remove the flowfile and carry on
session.remove(flowfile);
if (resultSetFlowFiles != null && resultSetFlowFiles.size() > 0) {
flowfile = resultSetFlowFiles.get(resultSetFlowFiles.size() - 1);
}
break;
}
@ -447,31 +484,73 @@ public class SelectHive3QL extends AbstractHive3QLProcessor {
throw e;
}
failure = executeConfigStatements(con, postQueries);
if (failure != null) {
hqlStatement = failure.getLeft();
if (resultSetFlowFiles != null) {
resultSetFlowFiles.forEach(ff -> session.remove(ff));
}
flowfile = (fileToProcess == null) ? session.create() : fileToProcess;
fileToProcess = null;
throw failure.getRight();
}
session.transfer(resultSetFlowFiles, REL_SUCCESS);
if (fileToProcess != null) {
session.remove(fileToProcess);
}
} catch (final ProcessException | SQLException e) {
logger.error("Issue processing SQL {} due to {}.", new Object[]{selectQuery, e});
logger.error("Issue processing SQL {} due to {}.", new Object[]{hqlStatement, e});
if (flowfile == null) {
// This can happen if any exceptions occur while setting up the connection, statement, etc.
logger.error("Unable to execute HiveQL select query {} due to {}. No FlowFile to route to failure",
new Object[]{selectQuery, e});
new Object[]{hqlStatement, e});
context.yield();
} else {
if (context.hasIncomingConnection()) {
logger.error("Unable to execute HiveQL select query {} for {} due to {}; routing to failure",
new Object[]{selectQuery, flowfile, e});
new Object[]{hqlStatement, flowfile, e});
flowfile = session.penalize(flowfile);
} else {
logger.error("Unable to execute HiveQL select query {} due to {}; routing to failure",
new Object[]{selectQuery, e});
new Object[]{hqlStatement, e});
context.yield();
}
session.transfer(flowfile, REL_FAILURE);
}
} finally {
if (fileToProcess != null) {
session.remove(fileToProcess);
}
}
}
/*
* Executes given queries using pre-defined connection.
* Returns null on success, or a query string if failed.
*/
protected Pair<String,SQLException> executeConfigStatements(final Connection con, final List<String> configQueries){
if (configQueries == null || configQueries.isEmpty()) {
return null;
}
for (String confSQL : configQueries) {
try(final Statement st = con.createStatement()){
st.execute(confSQL);
} catch (SQLException e) {
return Pair.of(confSQL, e);
}
}
return null;
}
protected List<String> getQueries(final String value) {
if (value == null || value.length() == 0 || value.trim().length() == 0) {
return null;
}
final List<String> queries = new LinkedList<>();
for (String query : value.split(";")) {
if (query.trim().length() > 0) {
queries.add(query.trim());
}
}
return queries;
}
}

View File

@ -25,6 +25,8 @@ import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.dbcp.hive.Hive3DBCPService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -64,6 +66,8 @@ public class TestSelectHive3QL {
private static final Logger LOGGER;
private final static String MAX_ROWS_KEY = "maxRows";
private final int NUM_OF_ROWS = 100;
static {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
@ -118,11 +122,26 @@ public class TestSelectHive3QL {
public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException {
runner.setIncomingConnection(false);
invokeOnTrigger(QUERY_WITHOUT_EL, false, "Avro");
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
final ProvenanceEventRecord provenance0 = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.RECEIVE, provenance0.getEventType());
assertEquals("jdbc:derby:target/db;create=true", provenance0.getTransitUri());
}
@Test
public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(QUERY_WITH_EL, true, "Avro");
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(2, provenanceEvents.size());
final ProvenanceEventRecord provenance0 = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.FORK, provenance0.getEventType());
final ProvenanceEventRecord provenance1 = provenanceEvents.get(1);
assertEquals(ProvenanceEventType.FETCH, provenance1.getEventType());
assertEquals("jdbc:derby:target/db;create=true", provenance1.getTransitUri());
}
@ -181,6 +200,51 @@ public class TestSelectHive3QL {
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
}
@Test
public void invokeOnTriggerExceptionInPreQueriesNoIncomingFlows()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
doOnTrigger(QUERY_WITHOUT_EL, false, CSV,
"select 'no exception' from persons; select exception from persons",
null);
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
}
@Test
public void invokeOnTriggerExceptionInPreQueriesWithIncomingFlows()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
doOnTrigger(QUERY_WITHOUT_EL, true, CSV,
"select 'no exception' from persons; select exception from persons",
null);
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
}
@Test
public void invokeOnTriggerExceptionInPostQueriesNoIncomingFlows()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
doOnTrigger(QUERY_WITHOUT_EL, false, CSV,
null,
"select 'no exception' from persons; select exception from persons");
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
}
@Test
public void invokeOnTriggerExceptionInPostQueriesWithIncomingFlows()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
doOnTrigger(QUERY_WITHOUT_EL, true, CSV,
null,
"select 'no exception' from persons; select exception from persons");
// with incoming connections, it should be rolled back
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1);
}
@Test
public void testWithBadSQL() throws SQLException {
final String BAD_SQL = "create table TEST_NO_ROWS (id integer)";
@ -218,45 +282,45 @@ public class TestSelectHive3QL {
invokeOnTrigger(QUERY_WITHOUT_EL, false, AVRO);
}
@Test
public void invokeOnTriggerWithValidPreQueries()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV,
"select '1' from persons; select '2' from persons", //should not be 'select'. But Derby driver doesn't support "set param=val" format.
null);
}
@Test
public void invokeOnTriggerWithValidPostQueries()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV,
null,
//should not be 'select'. But Derby driver doesn't support "set param=val" format,
//so just providing any "compilable" query.
" select '4' from persons; \nselect '5' from persons");
}
@Test
public void invokeOnTriggerWithValidPrePostQueries()
throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV,
//should not be 'select'. But Derby driver doesn't support "set param=val" format,
//so just providing any "compilable" query.
"select '1' from persons; select '2' from persons",
" select '4' from persons; \nselect '5' from persons");
}
public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat)
throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(query, incomingFlowFile, outputFormat, null, null);
}
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat,
String preQueries, String postQueries)
throws InitializationException, ClassNotFoundException, SQLException, IOException {
// load test data to database
final Connection con = ((Hive3DBCPService) runner.getControllerService("dbcp")).getConnection();
final Statement stmt = con.createStatement();
try {
stmt.execute("drop table persons");
} catch (final SQLException sqle) {
// Nothing to do here, the table didn't exist
}
stmt.execute("create table persons (id integer, name varchar(100), code integer)");
Random rng = new Random(53496);
final int nrOfRows = 100;
stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + rng.nextInt(469947) + ")");
for (int i = 2; i < nrOfRows; i++) {
stmt.executeUpdate("insert into persons values (" + i + ", 'Someone Else', " + rng.nextInt(469947) + ")");
}
stmt.executeUpdate("insert into persons values (" + nrOfRows + ", 'Last Person', NULL)");
LOGGER.info("test data loaded");
runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, query);
runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat);
if (incomingFlowFile) {
// incoming FlowFile content is not used, but attributes are used
final Map<String, String> attributes = new HashMap<>();
attributes.put("person.id", "10");
runner.enqueue("Hello".getBytes(), attributes);
}
runner.setIncomingConnection(incomingFlowFile);
runner.run();
TestRunner runner = doOnTrigger(query, incomingFlowFile, outputFormat, preQueries, postQueries);
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS);
@ -289,7 +353,7 @@ public class TestSelectHive3QL {
while ((line = br.readLine()) != null) {
recordsFromStream++;
String[] values = line.split(",");
if (recordsFromStream < (nrOfRows - 10)) {
if (recordsFromStream < (NUM_OF_ROWS - 10)) {
assertEquals(3, values.length);
assertTrue(values[1].startsWith("\""));
assertTrue(values[1].endsWith("\""));
@ -298,11 +362,60 @@ public class TestSelectHive3QL {
}
}
}
assertEquals(nrOfRows - 10, recordsFromStream);
assertEquals(NUM_OF_ROWS - 10, recordsFromStream);
assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHive3QL.RESULT_ROW_COUNT)));
flowFile.assertAttributeEquals(AbstractHive3QLProcessor.ATTR_INPUT_TABLES, "persons");
}
public TestRunner doOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat,
String preQueries, String postQueries)
throws InitializationException, ClassNotFoundException, SQLException, IOException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// load test data to database
final Connection con = ((Hive3DBCPService) runner.getControllerService("dbcp")).getConnection();
final Statement stmt = con.createStatement();
try {
stmt.execute("drop table persons");
} catch (final SQLException sqle) {
// Nothing to do here, the table didn't exist
}
stmt.execute("create table persons (id integer, name varchar(100), code integer)");
Random rng = new Random(53496);
stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + rng.nextInt(469947) + ")");
for (int i = 2; i < NUM_OF_ROWS; i++) {
stmt.executeUpdate("insert into persons values (" + i + ", 'Someone Else', " + rng.nextInt(469947) + ")");
}
stmt.executeUpdate("insert into persons values (" + NUM_OF_ROWS + ", 'Last Person', NULL)");
LOGGER.info("test data loaded");
runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, query);
runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat);
if (preQueries != null) {
runner.setProperty(SelectHive3QL.HIVEQL_PRE_QUERY, preQueries);
}
if (postQueries != null) {
runner.setProperty(SelectHive3QL.HIVEQL_POST_QUERY, postQueries);
}
if (incomingFlowFile) {
// incoming FlowFile content is not used, but attributes are used
final Map<String, String> attributes = new HashMap<>();
attributes.put("person.id", "10");
runner.enqueue("Hello".getBytes(), attributes);
}
runner.setIncomingConnection(incomingFlowFile);
runner.run();
return runner;
}
@Test
public void testMaxRowsPerFlowFileAvro() throws ClassNotFoundException, SQLException, InitializationException, IOException {
@ -388,6 +501,10 @@ public class TestSelectHive3QL {
runner.run();
runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0);
// Assert the attributes from the incoming flow file are preserved in the outgoing flow file(s)
flowFile.assertAttributeEquals("hiveql.args.1.value", "1");
flowFile.assertAttributeEquals("hiveql.args.1.type", String.valueOf(Types.INTEGER));
runner.clearTransferState();
}
@ -535,5 +652,4 @@ public class TestSelectHive3QL {
return "jdbc:derby:" + DB_LOCATION + ";create=true";
}
}
}
}