mirror of https://github.com/apache/nifi.git
Merge branch 'NIFI-1049'
This commit is contained in:
commit
c59087bc3a
|
@ -51,14 +51,13 @@ import org.apache.nifi.util.StopWatch;
|
||||||
|
|
||||||
@EventDriven
|
@EventDriven
|
||||||
@InputRequirement(Requirement.INPUT_ALLOWED)
|
@InputRequirement(Requirement.INPUT_ALLOWED)
|
||||||
@Tags({ "sql", "select", "jdbc", "query", "database" })
|
@Tags({"sql", "select", "jdbc", "query", "database"})
|
||||||
@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format."
|
@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format."
|
||||||
+ " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " +
|
+ " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " +
|
||||||
"a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " +
|
"a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " +
|
||||||
"If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " +
|
"If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " +
|
||||||
"select query. " +
|
"select query. " +
|
||||||
"FlowFile attribute 'executesql.row.count' indicates how many rows were selected."
|
"FlowFile attribute 'executesql.row.count' indicates how many rows were selected.")
|
||||||
)
|
|
||||||
public class ExecuteSQL extends AbstractProcessor {
|
public class ExecuteSQL extends AbstractProcessor {
|
||||||
|
|
||||||
public static final String RESULT_ROW_COUNT = "executesql.row.count";
|
public static final String RESULT_ROW_COUNT = "executesql.row.count";
|
||||||
|
@ -155,7 +154,7 @@ public class ExecuteSQL extends AbstractProcessor {
|
||||||
@Override
|
@Override
|
||||||
public void process(final OutputStream out) throws IOException {
|
public void process(final OutputStream out) throws IOException {
|
||||||
try {
|
try {
|
||||||
logger.debug("Executing query {}", new Object[] { selectQuery });
|
logger.debug("Executing query {}", new Object[] {selectQuery});
|
||||||
final ResultSet resultSet = st.executeQuery(selectQuery);
|
final ResultSet resultSet = st.executeQuery(selectQuery);
|
||||||
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out));
|
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out));
|
||||||
} catch (final SQLException e) {
|
} catch (final SQLException e) {
|
||||||
|
@ -167,8 +166,7 @@ public class ExecuteSQL extends AbstractProcessor {
|
||||||
// set attribute how many rows were selected
|
// set attribute how many rows were selected
|
||||||
outgoing = session.putAttribute(outgoing, RESULT_ROW_COUNT, nrOfRows.get().toString());
|
outgoing = session.putAttribute(outgoing, RESULT_ROW_COUNT, nrOfRows.get().toString());
|
||||||
|
|
||||||
logger.info("{} contains {} Avro records", new Object[] { nrOfRows.get() });
|
logger.info("{} contains {} Avro records; transferring to 'success'", new Object[] {outgoing, nrOfRows.get()});
|
||||||
logger.info("Transferred {} to 'success'", new Object[] { outgoing });
|
|
||||||
session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||||
session.transfer(outgoing, REL_SUCCESS);
|
session.transfer(outgoing, REL_SUCCESS);
|
||||||
} catch (final ProcessException | SQLException e) {
|
} catch (final ProcessException | SQLException e) {
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.io.InputStream;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.DriverManager;
|
import java.sql.DriverManager;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -63,20 +64,20 @@ public class TestExecuteSQL {
|
||||||
final static String DB_LOCATION = "target/db";
|
final static String DB_LOCATION = "target/db";
|
||||||
|
|
||||||
final static String QUERY_WITH_EL = "select "
|
final static String QUERY_WITH_EL = "select "
|
||||||
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
|
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
|
||||||
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
|
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
|
||||||
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
|
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
|
||||||
+ ", ROW_NUMBER() OVER () as rownr "
|
+ ", ROW_NUMBER() OVER () as rownr "
|
||||||
+ " from persons PER, products PRD, relationships REL"
|
+ " from persons PER, products PRD, relationships REL"
|
||||||
+ " where PER.ID = ${person.id}";
|
+ " where PER.ID = ${person.id}";
|
||||||
|
|
||||||
final static String QUERY_WITHOUT_EL = "select "
|
final static String QUERY_WITHOUT_EL = "select "
|
||||||
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
|
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
|
||||||
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
|
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
|
||||||
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
|
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
|
||||||
+ ", ROW_NUMBER() OVER () as rownr "
|
+ ", ROW_NUMBER() OVER () as rownr "
|
||||||
+ " from persons PER, products PRD, relationships REL"
|
+ " from persons PER, products PRD, relationships REL"
|
||||||
+ " where PER.ID = 10";
|
+ " where PER.ID = 10";
|
||||||
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
@ -123,8 +124,36 @@ public class TestExecuteSQL {
|
||||||
invokeOnTrigger(1, QUERY_WITH_EL, true); // 1 second max time
|
invokeOnTrigger(1, QUERY_WITH_EL, true); // 1 second max time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithNullIntColumn() throws SQLException {
|
||||||
|
// remove previous test database, if any
|
||||||
|
final File dbLocation = new File(DB_LOCATION);
|
||||||
|
dbLocation.delete();
|
||||||
|
|
||||||
|
// load test data to database
|
||||||
|
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||||
|
Statement stmt = con.createStatement();
|
||||||
|
|
||||||
|
try {
|
||||||
|
stmt.execute("drop table TEST_NULL_INT");
|
||||||
|
} catch (final SQLException sqle) {
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))");
|
||||||
|
|
||||||
|
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)");
|
||||||
|
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)");
|
||||||
|
|
||||||
|
runner.setIncomingConnection(false);
|
||||||
|
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT");
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
|
||||||
|
runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "2");
|
||||||
|
}
|
||||||
|
|
||||||
public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile)
|
public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile)
|
||||||
throws InitializationException, ClassNotFoundException, SQLException, IOException {
|
throws InitializationException, ClassNotFoundException, SQLException, IOException {
|
||||||
|
|
||||||
if (queryTimeout != null) {
|
if (queryTimeout != null) {
|
||||||
runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs");
|
runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs");
|
||||||
|
@ -135,7 +164,7 @@ public class TestExecuteSQL {
|
||||||
dbLocation.delete();
|
dbLocation.delete();
|
||||||
|
|
||||||
// load test data to database
|
// load test data to database
|
||||||
final Connection con = ((DBCPService)runner.getControllerService("dbcp")).getConnection();
|
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
|
||||||
TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100);
|
TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100);
|
||||||
LOGGER.info("test data loaded");
|
LOGGER.info("test data loaded");
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue