NIFI-626: Fixed checkstyle violations

This commit is contained in:
Mark Payne 2015-06-17 13:26:22 -04:00
parent ec02e58d3c
commit 118b6608c7
8 changed files with 349 additions and 329 deletions

View File

@ -182,6 +182,14 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.11.1.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -48,41 +48,41 @@ import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.StopWatch;
@EventDriven
@Tags({"sql", "select", "jdbc", "query", "database"})
@Tags({ "sql", "select", "jdbc", "query", "database" })
@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format."
+ " Streaming is used so arbitrarily large result sets are supported.")
+ " Streaming is used so arbitrarily large result sets are supported.")
public class ExecuteSQL extends AbstractProcessor {
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Successfully created FlowFile from SQL query result set.")
.build();
.name("success")
.description("Successfully created FlowFile from SQL query result set.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
.build();
.name("failure")
.description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship")
.build();
private final Set<Relationship> relationships;
public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain connection to database")
.required(true)
.identifiesControllerService(DBCPService.class)
.build();
.name("Database Connection Pooling Service")
.description("The Controller Service that is used to obtain connection to database")
.required(true)
.identifiesControllerService(DBCPService.class)
.build();
public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder()
.name("SQL select query")
.description("SQL select query")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
.name("SQL select query")
.description("SQL select query")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time allowed for a running SQL select query "
+ " , zero means there is no limit. Max time less than 1 second will be equal to zero.")
.name("Max Wait Time")
.description("The maximum amount of time allowed for a running SQL select query "
+ " , zero means there is no limit. Max time less than 1 second will be equal to zero.")
.defaultValue("0 seconds")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
@ -115,7 +115,7 @@ public class ExecuteSQL extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile incoming = session.get();
final FlowFile incoming = session.get();
if (incoming == null) {
return;
}
@ -126,13 +126,13 @@ public class ExecuteSQL extends AbstractProcessor {
final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(incoming).getValue();
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
final StopWatch stopWatch = new StopWatch(true);
final StopWatch stopWatch = new StopWatch(true);
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
st.setQueryTimeout(queryTimeout); // timeout in seconds
final LongHolder nrOfRows = new LongHolder(0L);
FlowFile outgoing = session.write(incoming, new OutputStreamCallback() {
final FlowFile outgoing = session.write(incoming, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
@ -151,7 +151,7 @@ public class ExecuteSQL extends AbstractProcessor {
session.transfer(outgoing, REL_SUCCESS);
} catch (final ProcessException | SQLException e) {
logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] { selectQuery, incoming, e });
session.transfer(incoming, REL_FAILURE);
}
session.transfer(incoming, REL_FAILURE);
}
}
}

View File

@ -16,12 +16,32 @@
*/
package org.apache.nifi.processors.standard.util;
import static java.sql.Types.BIGINT;
import static java.sql.Types.BOOLEAN;
import static java.sql.Types.CHAR;
import static java.sql.Types.DATE;
import static java.sql.Types.DECIMAL;
import static java.sql.Types.DOUBLE;
import static java.sql.Types.FLOAT;
import static java.sql.Types.INTEGER;
import static java.sql.Types.LONGNVARCHAR;
import static java.sql.Types.LONGVARCHAR;
import static java.sql.Types.NCHAR;
import static java.sql.Types.NUMERIC;
import static java.sql.Types.NVARCHAR;
import static java.sql.Types.REAL;
import static java.sql.Types.ROWID;
import static java.sql.Types.SMALLINT;
import static java.sql.Types.TIME;
import static java.sql.Types.TIMESTAMP;
import static java.sql.Types.TINYINT;
import static java.sql.Types.VARCHAR;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import static java.sql.Types.*;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@ -32,9 +52,8 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
/**
* JDBC / SQL common functions.
* JDBC / SQL common functions.
*/
public class JdbcCommon {
@ -53,13 +72,10 @@ public class JdbcCommon {
for (int i = 1; i <= nrOfColumns; i++) {
final Object value = rs.getObject(i);
// The different types that we support are numbers (int,
// long, double, float), as well
// as boolean values and Strings. Since Avro doesn't provide
// timestamp types, we want to
// convert those to Strings. So we will cast anything other
// than numbers or booleans to
// strings by using to toString() method.
// The different types that we support are numbers (int, long, double, float),
// as well as boolean values and Strings. Since Avro doesn't provide
// timestamp types, we want to convert those to Strings. So we will cast anything other
// than numbers or booleans to strings by using to toString() method.
if (value == null) {
rec.put(i - 1, null);
} else if (value instanceof Number || value instanceof Boolean) {
@ -73,8 +89,8 @@ public class JdbcCommon {
}
return nrOfRows;
}
}
}
}
public static Schema createSchema(final ResultSet rs) throws SQLException {
final ResultSetMetaData meta = rs.getMetaData();
@ -84,11 +100,10 @@ public class JdbcCommon {
final FieldAssembler<Schema> builder = SchemaBuilder.record(tableName).namespace("any.data").fields();
/**
* Some missing Avro types - Decimal, Date types. May need some
* additional work.
* Some missing Avro types - Decimal, Date types. May need some additional work.
*/
for (int i = 1; i <= nrOfColumns; i++) {
switch (meta.getColumnType(i)) {
switch (meta.getColumnType(i)) {
case CHAR:
case LONGNVARCHAR:
case LONGVARCHAR:
@ -142,9 +157,9 @@ public class JdbcCommon {
default:
break;
}
}
}
return builder.endRecord();
}
return builder.endRecord();
}
}

View File

@ -68,13 +68,13 @@ public class TestExecuteSQL {
@Test
public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException {
invokeOnTrigger(null);
invokeOnTrigger(null);
}
@Test
public void testQueryTimeout() throws InitializationException, ClassNotFoundException, SQLException, IOException {
// Does to seem to have any effect when using embedded Derby
invokeOnTrigger(1); // 1 second max time
// Does to seem to have any effect when using embedded Derby
invokeOnTrigger(1); // 1 second max time
}
public void invokeOnTrigger(final Integer queryTimeout) throws InitializationException, ClassNotFoundException, SQLException, IOException {
@ -89,7 +89,7 @@ public class TestExecuteSQL {
runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp");
if (queryTimeout != null) {
runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs");
runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs");
}
// remove previous test database, if any
@ -105,24 +105,25 @@ public class TestExecuteSQL {
// because of where PER.ID = ${person.id}
final int nrOfRows = 2000000;
final String query = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL"
+ " where PER.ID = ${person.id}";
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL"
+ " where PER.ID = ${person.id}";
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
// incoming FlowFile content is not used, but attributes are used
final Map<String,String> attributes = new HashMap<String,String>();
final Map<String, String> attributes = new HashMap<String, String>();
attributes.put("person.id", "10");
runner.enqueue("Hello".getBytes(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
// read all Avro records and verify created FlowFile contains 1000000 records
// read all Avro records and verify created FlowFile contains 1000000
// records
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
final InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray());
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
@ -130,10 +131,11 @@ public class TestExecuteSQL {
GenericRecord record = null;
long recordsFromStream = 0;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with many items.
record = dataFileReader.next(record);
recordsFromStream += 1;
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
record = dataFileReader.next(record);
recordsFromStream += 1;
}
LOGGER.info("total nr of records from stream: " + recordsFromStream);
@ -141,30 +143,27 @@ public class TestExecuteSQL {
dataFileReader.close();
}
/**
* Simple implementation only for ExecuteSQL processor testing.
*
*/
class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
@Override
public String getIdentifier() {
return "dbcp";
}
@Override
public String getIdentifier() {
return "dbcp";
}
@Override
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con;
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
}
}
@Override
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con;
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
}
}
}
}

View File

@ -16,7 +16,8 @@
*/
package org.apache.nifi.processors.standard.util;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -30,7 +31,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@ -50,19 +50,19 @@ public class TestJdbcCommon {
String createTable = "create table restaurants(id integer, name varchar(20), city varchar(50))";
String dropTable = "drop table restaurants";
@Test
public void testCreateSchema() throws ClassNotFoundException, SQLException {
@Test
public void testCreateSchema() throws ClassNotFoundException, SQLException {
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
Connection con = createConnection();
Statement st = con.createStatement();
final Connection con = createConnection();
final Statement st = con.createStatement();
try {
st.executeUpdate(dropTable);
} catch (Exception e) {
} catch (final Exception e) {
// table may not exist, this is not serious problem.
}
@ -71,13 +71,14 @@ public class TestJdbcCommon {
st.executeUpdate("insert into restaurants values (2, 'Estradas', 'Daly City')");
st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')");
ResultSet resultSet = st.executeQuery("select * from restaurants");
Schema schema = JdbcCommon.createSchema(resultSet);
final ResultSet resultSet = st.executeQuery("select * from restaurants");
final Schema schema = JdbcCommon.createSchema(resultSet);
assertNotNull(schema);
// records name, should be result set first column table name
// Notice! sql select may join data from different tables, other columns may have different table names
// Notice! sql select may join data from different tables, other columns
// may have different table names
assertEquals("RESTAURANTS", schema.getName());
assertNotNull(schema.getField("ID"));
assertNotNull(schema.getField("NAME"));
@ -85,20 +86,20 @@ public class TestJdbcCommon {
st.close();
con.close();
}
}
@Test
public void testConvertToBytes() throws ClassNotFoundException, SQLException, IOException {
@Test
public void testConvertToBytes() throws ClassNotFoundException, SQLException, IOException {
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
Connection con = createConnection();
Statement st = con.createStatement();
final Connection con = createConnection();
final Statement st = con.createStatement();
try {
st.executeUpdate(dropTable);
} catch (Exception e) {
} catch (final Exception e) {
// table may not exist, this is not serious problem.
}
@ -108,46 +109,47 @@ public class TestJdbcCommon {
st.executeUpdate("insert into restaurants values (2, 'Estradas', 'Daly City')");
st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')");
ResultSet resultSet = st.executeQuery("select R.*, ROW_NUMBER() OVER () as rownr from restaurants R");
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
final ResultSet resultSet = st.executeQuery("select R.*, ROW_NUMBER() OVER () as rownr from restaurants R");
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
JdbcCommon.convertToAvroStream(resultSet, outStream);
byte[] serializedBytes = outStream.toByteArray();
final byte[] serializedBytes = outStream.toByteArray();
assertNotNull(serializedBytes);
System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
st.close();
con.close();
// Deserialize bytes to records
InputStream instream = new ByteArrayInputStream(serializedBytes);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader);
GenericRecord record = null;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with many items.
record = dataFileReader.next(record);
System.out.println(record);
}
}
// many test use Derby as database, so ensure driver is available
final InputStream instream = new ByteArrayInputStream(serializedBytes);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
GenericRecord record = null;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
record = dataFileReader.next(record);
System.out.println(record);
}
}
}
// many test use Derby as database, so ensure driver is available
@Test
public void testDriverLoad() throws ClassNotFoundException {
Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
assertNotNull(clazz);
}
private Connection createConnection() throws ClassNotFoundException, SQLException {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con;
}
private Connection createConnection() throws ClassNotFoundException, SQLException {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con;
}
}

View File

@ -43,16 +43,15 @@ import org.junit.Ignore;
import org.junit.Test;
/**
* Test streaming using large number of result set rows.
* 1. Read data from database.
* 2. Create Avro schema from ResultSet meta data.
* 3. Read rows from ResultSet and write rows to Avro writer stream
* (Avro will create record for each row).
* 4. And finally read records from Avro stream to verify all data is present in Avro stream.
* Test streaming using large number of result set rows. 1. Read data from
* database. 2. Create Avro schema from ResultSet meta data. 3. Read rows from
* ResultSet and write rows to Avro writer stream (Avro will create record for
* each row). 4. And finally read records from Avro stream to verify all data is
* present in Avro stream.
*
*
* Sql query will return all combinations from 3 table.
* For example when each table contain 1000 rows, result set will be 1 000 000 000 rows.
* Sql query will return all combinations from 3 table. For example when each
* table contain 1000 rows, result set will be 1 000 000 000 rows.
*
*/
public class TestJdbcHugeStream {
@ -65,18 +64,21 @@ public class TestJdbcHugeStream {
}
/**
* In case of large record set this will fail with
* java.lang.OutOfMemoryError: Java heap space
* at java.util.Arrays.copyOf(Arrays.java:2271)
* at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
* at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
* at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
* at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446)
* In case of large record set this will fail with
* java.lang.OutOfMemoryError: Java heap space at
* java.util.Arrays.copyOf(Arrays.java:2271) at
* java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at
* java
* .io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
* at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at
* org .apache.avro.file.
* DataFileWriter$BufferedFileOutputStream$PositionFilter
* .write(DataFileWriter.java:446)
*
*/
@Test
@Ignore
public void readSend2StreamHuge_InMemory() throws ClassNotFoundException, SQLException, IOException {
public void readSend2StreamHuge_InMemory() throws ClassNotFoundException, SQLException, IOException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
@ -127,17 +129,17 @@ public class TestJdbcHugeStream {
}
}
}
}
}
@Test
public void readSend2StreamHuge_FileBased() throws ClassNotFoundException, SQLException, IOException {
@Test
public void readSend2StreamHuge_FileBased() throws ClassNotFoundException, SQLException, IOException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
try (final Connection con = createConnection()) {
loadTestData2Database(con, 300, 300, 300);
loadTestData2Database(con, 300, 300, 300);
try (final Statement st = con.createStatement()) {
// Notice!
@ -174,9 +176,10 @@ public class TestJdbcHugeStream {
}
}
}
}
}
//================================================ helpers ===============================================
// ================================================ helpers
// ===============================================
static String dropPersons = "drop table persons";
static String dropProducts = "drop table products";
@ -185,75 +188,80 @@ public class TestJdbcHugeStream {
static String createProducts = "create table products (id integer, name varchar(100), code integer)";
static String createRelationships = "create table relationships (id integer,name varchar(100), code integer)";
static public void loadTestData2Database(Connection con, int nrOfPersons, int nrOfProducts, int nrOfRels) throws ClassNotFoundException, SQLException {
static public void loadTestData2Database(Connection con, int nrOfPersons, int nrOfProducts, int nrOfRels) throws ClassNotFoundException, SQLException {
System.out.println(createRandomName());
System.out.println(createRandomName());
System.out.println(createRandomName());
System.out.println(createRandomName());
System.out.println(createRandomName());
System.out.println(createRandomName());
final Statement st = con.createStatement();
// tables may not exist, this is not serious problem.
try { st.executeUpdate(dropPersons);
} catch (final Exception e) { }
try {
st.executeUpdate(dropPersons);
} catch (final Exception e) {
}
try { st.executeUpdate(dropProducts);
} catch (final Exception e) { }
try {
st.executeUpdate(dropProducts);
} catch (final Exception e) {
}
try { st.executeUpdate(dropRelationships);
} catch (final Exception e) { }
try {
st.executeUpdate(dropRelationships);
} catch (final Exception e) {
}
st.executeUpdate(createPersons);
st.executeUpdate(createProducts);
st.executeUpdate(createRelationships);
for (int i = 0; i < nrOfPersons; i++)
loadPersons(st, i);
loadPersons(st, i);
for (int i = 0; i < nrOfProducts; i++)
loadProducts(st, i);
loadProducts(st, i);
for (int i = 0; i < nrOfRels; i++)
loadRelationships(st, i);
loadRelationships(st, i);
st.close();
}
}
static Random rng = new Random(53495);
static Random rng = new Random(53495);
static private void loadPersons(Statement st, int nr) throws SQLException {
st.executeUpdate("insert into persons values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" );
}
static private void loadPersons(Statement st, int nr) throws SQLException {
st.executeUpdate("insert into persons values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
}
static private void loadProducts(Statement st, int nr) throws SQLException {
st.executeUpdate("insert into products values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" );
}
static private void loadProducts(Statement st, int nr) throws SQLException {
st.executeUpdate("insert into products values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
}
static private void loadRelationships(Statement st, int nr) throws SQLException {
st.executeUpdate("insert into relationships values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" );
}
static private void loadRelationships(Statement st, int nr) throws SQLException {
st.executeUpdate("insert into relationships values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")");
}
static private String createRandomName() {
return createRandomString() + " " + createRandomString();
}
static private String createRandomName() {
return createRandomString() + " " + createRandomString();
}
static private String createRandomString() {
static private String createRandomString() {
final int length = rng.nextInt(19);
final String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
final int length = rng.nextInt(19);
final String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
final char[] text = new char[length];
for (int i = 0; i < length; i++)
{
text[i] = characters.charAt(rng.nextInt(characters.length()));
}
return new String(text);
}
final char[] text = new char[length];
for (int i = 0; i < length; i++) {
text[i] = characters.charAt(rng.nextInt(characters.length()));
}
return new String(text);
}
private Connection createConnection() throws ClassNotFoundException, SQLException {
private Connection createConnection() throws ClassNotFoundException, SQLException {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con;
}
return con;
}
}

View File

@ -41,60 +41,59 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
/**
* Implementation of for Database Connection Pooling Service.
* Apache DBCP is used for connection pooling functionality.
* Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality.
*
*/
@Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"})
@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
@CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage.")
public class DBCPConnectionPool extends AbstractControllerService implements DBCPService {
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("Database connection URL")
.description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
.name("Database Connection URL")
.description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
+ " The exact syntax of a database connection URL is specified by your DBMS.")
.defaultValue(null)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
.defaultValue(null)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
.name("Database Driver Class Name")
.description("Database driver class name")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Database Driver Class Name")
.description("Database driver class name")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DB_DRIVER_JAR_URL = new PropertyDescriptor.Builder()
.name("Database Driver Jar Url")
.description("Optional database driver jar file path url. For example 'file:///var/tmp/mariadb-java-client-1.1.7.jar'")
.defaultValue(null)
.required(false)
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
.name("Database Driver Jar Url")
.description("Optional database driver jar file path url. For example 'file:///var/tmp/mariadb-java-client-1.1.7.jar'")
.defaultValue(null)
.required(false)
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
.name("Database User")
.description("Database user name")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Database User")
.description("Database user name")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("The password for the database user")
.defaultValue(null)
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Password")
.description("The password for the database user")
.defaultValue(null)
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time that the pool will wait (when there are no available connections) "
+ " for a connection to be returned before failing, or -1 to wait indefinitely. ")
.name("Max Wait Time")
.description("The maximum amount of time that the pool will wait (when there are no available connections) "
+ " for a connection to be returned before failing, or -1 to wait indefinitely. ")
.defaultValue("500 millis")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
@ -102,9 +101,9 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
.build();
public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
.name("Max Total Connections")
.description("The maximum number of active connections that can be allocated from this pool at the same time, "
+ " or negative for no limit.")
.name("Max Total Connections")
.description("The maximum number of active connections that can be allocated from this pool at the same time, "
+ " or negative for no limit.")
.defaultValue("8")
.required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
@ -114,7 +113,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
private static final List<PropertyDescriptor> properties;
static {
List<PropertyDescriptor> props = new ArrayList<>();
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(DATABASE_URL);
props.add(DB_DRIVERNAME);
props.add(DB_DRIVER_JAR_URL);
@ -135,26 +134,29 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
/**
* Create new pool, open some connections ready to be used
* @param context the configuration context
* @throws InitializationException if unable to create a database connection
*
* @param context
* the configuration context
* @throws InitializationException
* if unable to create a database connection
*/
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException {
String drv = context.getProperty(DB_DRIVERNAME).getValue();
String user = context.getProperty(DB_USER).getValue();
String passw = context.getProperty(DB_PASSWORD).getValue();
Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
final String drv = context.getProperty(DB_DRIVERNAME).getValue();
final String user = context.getProperty(DB_USER).getValue();
final String passw = context.getProperty(DB_PASSWORD).getValue();
final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
dataSource = new BasicDataSource();
dataSource.setDriverClassName(drv);
// Optional driver URL, when exist, this URL will be used to locate driver jar file location
String urlString = context.getProperty(DB_DRIVER_JAR_URL).getValue();
dataSource.setDriverClassLoader( getDriverClassLoader(urlString, drv) );
final String urlString = context.getProperty(DB_DRIVER_JAR_URL).getValue();
dataSource.setDriverClassLoader(getDriverClassLoader(urlString, drv));
String dburl = context.getProperty(DATABASE_URL).getValue();
final String dburl = context.getProperty(DATABASE_URL).getValue();
dataSource.setMaxWait(maxWaitMillis);
dataSource.setMaxActive(maxTotal);
@ -165,40 +167,41 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
// verify connection can be established.
try {
Connection con = dataSource.getConnection();
if (con==null) {
final Connection con = dataSource.getConnection();
if (con == null) {
throw new InitializationException("Connection to database cannot be established.");
}
con.close();
} catch (SQLException e) {
} catch (final SQLException e) {
throw new InitializationException(e);
}
}
/**
* using Thread.currentThread().getContextClassLoader();
* will ensure that you are using the ClassLoader for you NAR.
* @throws InitializationException if there is a problem obtaining the ClassLoader
* using Thread.currentThread().getContextClassLoader(); will ensure that you are using the ClassLoader for you NAR.
*
* @throws InitializationException
* if there is a problem obtaining the ClassLoader
*/
protected ClassLoader getDriverClassLoader(String urlString, String drvName) throws InitializationException {
if (urlString!=null && urlString.length()>0) {
if (urlString != null && urlString.length() > 0) {
try {
URL[] urls = new URL[] { new URL(urlString) };
URLClassLoader ucl = new URLClassLoader(urls);
final URL[] urls = new URL[] { new URL(urlString) };
final URLClassLoader ucl = new URLClassLoader(urls);
// Workaround which allows to use URLClassLoader for JDBC driver loading.
// (Because the DriverManager will refuse to use a driver not loaded by the system ClassLoader.)
Class<?> clazz = Class.forName(drvName, true, ucl);
if (clazz==null) {
final Class<?> clazz = Class.forName(drvName, true, ucl);
if (clazz == null) {
throw new InitializationException("Can't load Database Driver " + drvName);
}
Driver driver = (Driver) clazz.newInstance();
DriverManager.registerDriver( new DriverShim(driver) );
final Driver driver = (Driver) clazz.newInstance();
DriverManager.registerDriver(new DriverShim(driver));
return ucl;
} catch (MalformedURLException e) {
} catch (final MalformedURLException e) {
throw new InitializationException("Invalid Database Driver Jar Url", e);
} catch (Exception e) {
} catch (final Exception e) {
throw new InitializationException("Can't load Database Driver", e);
}
} else {
@ -208,24 +211,23 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
}
/**
* Shutdown pool, close all open connections.
* Shutdown pool, close all open connections.
*/
@OnDisabled
public void shutdown() {
try {
dataSource.close();
} catch (SQLException e) {
} catch (final SQLException e) {
throw new ProcessException(e);
}
}
@Override
public Connection getConnection() throws ProcessException {
try {
Connection con = dataSource.getConnection();
final Connection con = dataSource.getConnection();
return con;
} catch (SQLException e) {
} catch (final SQLException e) {
throw new ProcessException(e);
}
}

View File

@ -53,7 +53,7 @@ public class DBCPServiceTest {
}
/**
* Missing property values.
* Missing property values.
*/
@Test
public void testMissingPropertyValues() throws InitializationException {
@ -65,8 +65,7 @@ public class DBCPServiceTest {
}
/**
* Test database connection using Derby.
* Connect, create table, insert, select, drop table.
* Test database connection using Derby. Connect, create table, insert, select, drop table.
*
*/
@Test
@ -76,21 +75,21 @@ public class DBCPServiceTest {
runner.addControllerService("test-good1", service);
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// set embedded Derby database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
// set embedded Derby database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
runner.enableControllerService(service);
runner.assertValid(service);
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-good1");
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-good1");
Assert.assertNotNull(dbcpService);
Connection connection = dbcpService.getConnection();
final Connection connection = dbcpService.getConnection();
Assert.assertNotNull(connection);
createInsertSelectDrop(connection);
@ -99,12 +98,9 @@ public class DBCPServiceTest {
}
/**
* NB!!!!
* Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar
* Prerequisite: access to running MariaDb database server
* NB!!!! Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar Prerequisite: access to running MariaDb database server
*
* Test database connection using external JDBC jar located by URL.
* Connect, create table, insert, select, drop table.
* Test database connection using external JDBC jar located by URL. Connect, create table, insert, select, drop table.
*
*/
@Ignore
@ -115,34 +111,31 @@ public class DBCPServiceTest {
runner.addControllerService("test-external-jar", service);
// set MariaDB database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:mariadb://localhost:3306/" + "testdb");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.mariadb.jdbc.Driver");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVER_JAR_URL, "file:///var/tmp/mariadb-java-client-1.1.7.jar");
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:mariadb://localhost:3306/" + "testdb");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.mariadb.jdbc.Driver");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVER_JAR_URL, "file:///var/tmp/mariadb-java-client-1.1.7.jar");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.enableControllerService(service);
runner.assertValid(service);
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-external-jar");
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-external-jar");
Assert.assertNotNull(dbcpService);
Connection connection = dbcpService.getConnection();
final Connection connection = dbcpService.getConnection();
Assert.assertNotNull(connection);
createInsertSelectDrop(connection);
connection.close(); // return to pool
connection.close(); // return to pool
}
@Rule
public ExpectedException exception = ExpectedException.none();
/**
* Test get database connection using Derby.
* Get many times, after a while pool should not contain any available connection
* and getConnection should fail.
* Test get database connection using Derby. Get many times, after a while pool should not contain any available connection and getConnection should fail.
*/
@Test
public void testExhaustPool() throws InitializationException, SQLException {
@ -151,32 +144,30 @@ public class DBCPServiceTest {
runner.addControllerService("test-exhaust", service);
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// set embedded Derby database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
runner.enableControllerService(service);
runner.assertValid(service);
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust");
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust");
Assert.assertNotNull(dbcpService);
exception.expect(ProcessException.class);
exception.expectMessage("Cannot get a connection, pool error Timeout waiting for idle object");
for (int i = 0; i < 100; i++) {
Connection connection = dbcpService.getConnection();
final Connection connection = dbcpService.getConnection();
Assert.assertNotNull(connection);
}
}
/**
* Test get database connection using Derby.
* Get many times, release immediately
* and getConnection should not fail.
* Test get database connection using Derby. Get many times, release immediately and getConnection should not fail.
*/
@Test
public void testGetManyNormal() throws InitializationException, SQLException {
@ -185,11 +176,11 @@ public class DBCPServiceTest {
runner.addControllerService("test-exhaust", service);
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// set embedded Derby database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
@ -197,91 +188,86 @@ public class DBCPServiceTest {
runner.enableControllerService(service);
runner.assertValid(service);
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust");
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust");
Assert.assertNotNull(dbcpService);
for (int i = 0; i < 1000; i++) {
Connection connection = dbcpService.getConnection();
final Connection connection = dbcpService.getConnection();
Assert.assertNotNull(connection);
connection.close(); // will return connection to pool
connection.close(); // will return connection to pool
}
}
@Test
public void testDriverLoad() throws ClassNotFoundException {
Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
assertNotNull(clazz);
}
/**
* NB!!!!
* Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar
* NB!!!! Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar
*/
@Test
@Ignore("Intended only for local testing, not automated testing")
public void testURLClassLoader() throws ClassNotFoundException, MalformedURLException, SQLException, InstantiationException, IllegalAccessException {
URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar");
URL[] urls = new URL[] { url };
final URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar");
final URL[] urls = new URL[] { url };
ClassLoader parent = Thread.currentThread().getContextClassLoader();
URLClassLoader ucl = new URLClassLoader(urls,parent);
final ClassLoader parent = Thread.currentThread().getContextClassLoader();
final URLClassLoader ucl = new URLClassLoader(urls, parent);
Class<?> clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl);
final Class<?> clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl);
assertNotNull(clazz);
Driver driver = (Driver) clazz.newInstance();
Driver shim = new DriverShim(driver);
DriverManager.registerDriver( shim );
final Driver driver = (Driver) clazz.newInstance();
final Driver shim = new DriverShim(driver);
DriverManager.registerDriver(shim);
Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb");
final Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb");
assertNotNull(driver2);
}
/**
* NB!!!!
* Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar
* Prerequisite: access to running MariaDb database server
* NB!!!! Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar Prerequisite: access to running MariaDb database server
*/
@Test
@Ignore("Intended only for local testing, not automated testing")
public void testURLClassLoaderGetConnection() throws ClassNotFoundException, MalformedURLException, SQLException, InstantiationException, IllegalAccessException {
URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar");
URL[] urls = new URL[] { url };
final URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar");
final URL[] urls = new URL[] { url };
ClassLoader parent = Thread.currentThread().getContextClassLoader();
URLClassLoader ucl = new URLClassLoader(urls,parent);
final ClassLoader parent = Thread.currentThread().getContextClassLoader();
final URLClassLoader ucl = new URLClassLoader(urls, parent);
Class<?> clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl);
final Class<?> clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl);
assertNotNull(clazz);
Driver driver = (Driver) clazz.newInstance();
Driver shim = new DriverShim(driver);
DriverManager.registerDriver( shim );
final Driver driver = (Driver) clazz.newInstance();
final Driver shim = new DriverShim(driver);
DriverManager.registerDriver(shim);
Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb");
final Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb");
assertNotNull(driver2);
Connection connection = DriverManager.getConnection("jdbc:mariadb://localhost:3306/testdb","tester","testerp");
final Connection connection = DriverManager.getConnection("jdbc:mariadb://localhost:3306/testdb", "tester", "testerp");
assertNotNull(connection);
connection.close();
DriverManager.deregisterDriver(shim);
}
String createTable = "create table restaurants(id integer, name varchar(20), city varchar(50))";
String dropTable = "drop table restaurants";
protected void createInsertSelectDrop( Connection con) throws SQLException {
protected void createInsertSelectDrop(Connection con) throws SQLException {
Statement st = con.createStatement();
final Statement st = con.createStatement();
try {
st.executeUpdate(dropTable);
} catch (Exception e) {
} catch (final Exception e) {
// table may not exist, this is not serious problem.
}
@ -292,7 +278,7 @@ public class DBCPServiceTest {
st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')");
int nrOfRows = 0;
ResultSet resultSet = st.executeQuery("select * from restaurants");
final ResultSet resultSet = st.executeQuery("select * from restaurants");
while (resultSet.next())
nrOfRows++;
assertEquals(3, nrOfRows);