NIFI-626: Code cleanup to adhere to NiFi coding styles

This commit is contained in:
Mark Payne 2015-06-10 12:16:52 -04:00
parent d98757e4c7
commit a3b8e44ad5
5 changed files with 266 additions and 279 deletions

View File

@ -44,6 +44,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.JdbcCommon;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.StopWatch;
@EventDriven
@ -91,11 +92,11 @@ public class ExecuteSQL extends AbstractProcessor {
private final List<PropertyDescriptor> propDescriptors;
public ExecuteSQL() {
HashSet<Relationship> r = new HashSet<>();
final Set<Relationship> r = new HashSet<>();
r.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(r);
ArrayList<PropertyDescriptor> pds = new ArrayList<>();
final List<PropertyDescriptor> pds = new ArrayList<>();
pds.add(DBCP_SERVICE);
pds.add(SQL_SELECT_QUERY);
pds.add(QUERY_TIMEOUT);
@ -113,7 +114,7 @@ public class ExecuteSQL extends AbstractProcessor {
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile incoming = session.get();
if (incoming == null) {
return;
@ -127,43 +128,29 @@ public class ExecuteSQL extends AbstractProcessor {
final StopWatch stopWatch = new StopWatch(true);
try {
final Connection con = dbcpService.getConnection();
try {
final Statement st = con.createStatement();
try {
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
st.setQueryTimeout(queryTimeout); // timeout in seconds
final LongHolder nrOfRows = new LongHolder(0L);
FlowFile outgoing = session.write(incoming, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
try {
logger.info("start executing query {}", new Object[]{selectQuery});
ResultSet resultSet = st.executeQuery(selectQuery);
Long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, out);
logger.info("Result FlowFile contain {} Avro records", new Object[]{nrOfRows});
} catch (SQLException e) {
logger.debug("Executing query {}", new Object[] { selectQuery });
final ResultSet resultSet = st.executeQuery(selectQuery);
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out));
} catch (final SQLException e) {
throw new ProcessException(e);
}
}
});
logger.info("{} contains {} Avro records", new Object[] { nrOfRows.get() });
logger.info("Transferred {} to 'success'", new Object[] { outgoing });
session.getProvenanceReporter().modifyContent(outgoing, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.getProvenanceReporter().modifyContent(outgoing, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(outgoing, REL_SUCCESS);
} finally {
st.close();
}
} finally {
// return connection to pool
con.close();
}
} catch (ProcessException e) {
logger.error("Unable to execute sql select query due to {}", new Object[]{e});
session.transfer(incoming, REL_FAILURE);
} catch (SQLException e) {
logger.error("Unable to execute sql select query due to {}", new Object[]{e});
} catch (final ProcessException | SQLException e) {
logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", new Object[] { selectQuery, incoming, e });
session.transfer(incoming, REL_FAILURE);
}
}

View File

@ -21,6 +21,7 @@ import java.io.OutputStream;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import static java.sql.Types.*;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@ -34,104 +35,102 @@ import org.apache.avro.io.DatumWriter;
/**
* JDBC / SQL common functions.
*
*/
public class JdbcCommon {
public static long convertToAvroStream(ResultSet rs, OutputStream outStream) throws SQLException, IOException {
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException {
final Schema schema = createSchema(rs);
final GenericRecord rec = new GenericData.Record(schema);
Schema schema = createSchema(rs);
GenericRecord rec = new GenericData.Record(schema);
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter= new DataFileWriter<GenericRecord>(datumWriter);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter)) {
dataFileWriter.create(schema, outStream);
ResultSetMetaData meta = rs.getMetaData();
int nrOfColumns = meta.getColumnCount();
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
long nrOfRows = 0;
while (rs.next()) {
for (int i = 1; i <= nrOfColumns; i++) {
Object value = rs.getObject(i);
final Object value = rs.getObject(i);
rec.put(i - 1, value);
}
dataFileWriter.append(rec);
nrOfRows += 1;
}
dataFileWriter.close();
return nrOfRows;
}
}
public static Schema createSchema(ResultSet rs) throws SQLException {
public static Schema createSchema(final ResultSet rs) throws SQLException {
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
final String tableName = meta.getTableName(1);
ResultSetMetaData meta = rs.getMetaData();
int nrOfColumns = meta.getColumnCount();
String tableName = meta.getTableName(1);
FieldAssembler<Schema> builder = SchemaBuilder.record(tableName).namespace("any.data").fields();
final FieldAssembler<Schema> builder = SchemaBuilder.record(tableName).namespace("any.data").fields();
/**
* Some missing Avro types - Decimal, Date types.
* May need some additional work.
* Some missing Avro types - Decimal, Date types. May need some
* additional work.
*/
for (int i = 1; i <= nrOfColumns; i++)
for (int i = 1; i <= nrOfColumns; i++) {
switch (meta.getColumnType(i)) {
case java.sql.Types.CHAR:
case java.sql.Types.LONGNVARCHAR:
case java.sql.Types.LONGVARCHAR:
case java.sql.Types.NCHAR:
case java.sql.Types.NVARCHAR:
case java.sql.Types.VARCHAR:
case CHAR:
case LONGNVARCHAR:
case LONGVARCHAR:
case NCHAR:
case NVARCHAR:
case VARCHAR:
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
break;
case java.sql.Types.BOOLEAN:
case BOOLEAN:
builder.name(meta.getColumnName(i)).type().booleanType().noDefault();
break;
case java.sql.Types.INTEGER:
case java.sql.Types.SMALLINT:
case java.sql.Types.TINYINT:
case INTEGER:
case SMALLINT:
case TINYINT:
builder.name(meta.getColumnName(i)).type().intType().noDefault();
break;
case java.sql.Types.BIGINT:
case BIGINT:
builder.name(meta.getColumnName(i)).type().longType().noDefault();
break;
// java.sql.RowId is interface, is seems to be database implementation specific, let's convert to String
case java.sql.Types.ROWID:
// java.sql.RowId is interface, is seems to be database
// implementation specific, let's convert to String
case ROWID:
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
break;
case java.sql.Types.FLOAT:
case java.sql.Types.REAL:
case FLOAT:
case REAL:
builder.name(meta.getColumnName(i)).type().floatType().noDefault();
break;
case java.sql.Types.DOUBLE:
case DOUBLE:
builder.name(meta.getColumnName(i)).type().doubleType().noDefault();
break;
// TODO Did not find direct suitable type, need to be clarified!!!!
case java.sql.Types.DECIMAL:
case java.sql.Types.NUMERIC:
// Did not find direct suitable type, need to be clarified!!!!
case DECIMAL:
case NUMERIC:
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
break;
// TODO Did not find direct suitable type, need to be clarified!!!!
case java.sql.Types.DATE:
case java.sql.Types.TIME:
case java.sql.Types.TIMESTAMP:
// Did not find direct suitable type, need to be clarified!!!!
case DATE:
case TIME:
case TIMESTAMP:
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
break;
default:
break;
}
return builder.endRecord();
}
return builder.endRecord();
}
}

View File

@ -77,7 +77,7 @@ public class TestExecuteSQL {
invokeOnTrigger(1); // 1 second max time
}
public void invokeOnTrigger(Integer queryTimeout) throws InitializationException, ClassNotFoundException, SQLException, IOException {
public void invokeOnTrigger(final Integer queryTimeout) throws InitializationException, ClassNotFoundException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class);
final DBCPService dbcp = new DBCPServiceSimpleImpl();
@ -88,22 +88,23 @@ public class TestExecuteSQL {
runner.enableControllerService(dbcp);
runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp");
if (queryTimeout!=null)
if (queryTimeout != null) {
runner.setProperty(ExecuteSQL.QUERY_TIMEOUT, queryTimeout.toString() + " secs");
}
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// load test data to database
Connection con = dbcp.getConnection();
final Connection con = dbcp.getConnection();
TestJdbcHugeStream.loadTestData2Database(con, 100, 2000, 1000);
System.out.println("test data loaded");
LOGGER.info("test data loaded");
// ResultSet size will be 1x2000x1000 = 2 000 000 rows
// because of where PER.ID = ${person.id}
final int nrOfRows = 2000000;
String query = "select "
final String query = "select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
@ -114,7 +115,7 @@ public class TestExecuteSQL {
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
// incoming FlowFile content is not used, but attributes are used
Map<String,String> attributes = new HashMap<String,String>();
final Map<String,String> attributes = new HashMap<String,String>();
attributes.put("person.id", "10");
runner.enqueue("Hello".getBytes(), attributes);
@ -122,20 +123,20 @@ public class TestExecuteSQL {
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
// read all Avro records and verify created FlowFile contains 1000000 records
List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray());
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(in, datumReader);
final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
final InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray());
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(in, datumReader);
GenericRecord record = null;
long recordsFromStream = 0;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with many items.
record = dataFileReader.next(record);
// System.out.println(record);
recordsFromStream += 1;
}
System.out.println("total nr of records from stream: " + recordsFromStream);
LOGGER.info("total nr of records from stream: " + recordsFromStream);
assertEquals(nrOfRows, recordsFromStream);
dataFileReader.close();
}
@ -158,9 +159,9 @@ public class TestExecuteSQL {
public Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con;
} catch (Exception e) {
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
}
}

View File

@ -39,6 +39,7 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
/**
@ -73,108 +74,106 @@ public class TestJdbcHugeStream {
* at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446)
*
*/
// @Test
@Test
@Ignore
public void readSend2StreamHuge_InMemory() throws ClassNotFoundException, SQLException, IOException {
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
Connection con = createConnection();
try (final Connection con = createConnection()) {
loadTestData2Database(con, 150, 150, 150);
System.out.println("test data loaded");
Statement st = con.createStatement();
try (final Statement st = con.createStatement()) {
// Notice!
// Following select is deliberately invalid!
// For testing we need huge amount of rows, so where part is not used.
ResultSet resultSet = st.executeQuery("select "
// For testing we need huge amount of rows, so where part is not
// used.
final ResultSet resultSet = st.executeQuery("select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL");
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream);
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
final long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream);
System.out.println("total nr of rows in resultset: " + nrOfRows);
byte[] serializedBytes = outStream.toByteArray();
final byte[] serializedBytes = outStream.toByteArray();
assertNotNull(serializedBytes);
System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
// Deserialize bytes to records
InputStream instream = new ByteArrayInputStream(serializedBytes);
final InputStream instream = new ByteArrayInputStream(serializedBytes);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
GenericRecord record = null;
long recordsFromStream = 0;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with many items.
// Reuse record object by passing it to next(). This
// saves us from
// allocating and garbage collecting many objects for
// files with many items.
record = dataFileReader.next(record);
// System.out.println(record);
recordsFromStream += 1;
}
System.out.println("total nr of records from stream: " + recordsFromStream);
assertEquals(nrOfRows, recordsFromStream);
st.close();
con.close();
}
}
}
}
@Test
public void readSend2StreamHuge_FileBased() throws ClassNotFoundException, SQLException, IOException {
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
Connection con = createConnection();
try (final Connection con = createConnection()) {
loadTestData2Database(con, 300, 300, 300);
System.out.println("test data loaded");
Statement st = con.createStatement();
try (final Statement st = con.createStatement()) {
// Notice!
// Following select is deliberately invalid!
// For testing we need huge amount of rows, so where part is not used.
ResultSet resultSet = st.executeQuery("select "
// For testing we need huge amount of rows, so where part is not
// used.
final ResultSet resultSet = st.executeQuery("select "
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
+ ", ROW_NUMBER() OVER () as rownr "
+ " from persons PER, products PRD, relationships REL");
OutputStream outStream = new FileOutputStream("target/data.avro");
long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream);
System.out.println("total nr of rows in resultset: " + nrOfRows);
/*
byte[] serializedBytes = outStream.toByteArray();
assertNotNull(serializedBytes);
System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
*/
final OutputStream outStream = new FileOutputStream("target/data.avro");
final long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, outStream);
// Deserialize bytes to records
final InputStream instream = new FileInputStream("target/data.avro");
InputStream instream = new FileInputStream("target/data.avro");
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader);
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
GenericRecord record = null;
long recordsFromStream = 0;
while (dataFileReader.hasNext()) {
// Reuse record object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with many items.
// Reuse record object by passing it to next(). This
// saves us from
// allocating and garbage collecting many objects for
// files with many items.
record = dataFileReader.next(record);
// System.out.println(record);
recordsFromStream += 1;
}
System.out.println("total nr of records from stream: " + recordsFromStream);
assertEquals(nrOfRows, recordsFromStream);
st.close();
con.close();
}
}
}
}
//================================================ helpers ===============================================
@ -192,17 +191,17 @@ public class TestJdbcHugeStream {
System.out.println(createRandomName());
System.out.println(createRandomName());
Statement st = con.createStatement();
final Statement st = con.createStatement();
// tables may not exist, this is not serious problem.
try { st.executeUpdate(dropPersons);
} catch (Exception e) { }
} catch (final Exception e) { }
try { st.executeUpdate(dropProducts);
} catch (Exception e) { }
} catch (final Exception e) { }
try { st.executeUpdate(dropRelationships);
} catch (Exception e) { }
} catch (final Exception e) { }
st.executeUpdate(createPersons);
st.executeUpdate(createProducts);
@ -223,17 +222,14 @@ public class TestJdbcHugeStream {
static Random rng = new Random(53495);
static private void loadPersons(Statement st, int nr) throws SQLException {
st.executeUpdate("insert into persons values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" );
}
static private void loadProducts(Statement st, int nr) throws SQLException {
st.executeUpdate("insert into products values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" );
}
static private void loadRelationships(Statement st, int nr) throws SQLException {
st.executeUpdate("insert into relationships values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")" );
}
@ -243,10 +239,10 @@ public class TestJdbcHugeStream {
static private String createRandomString() {
int length = rng.nextInt(19);
String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
final int length = rng.nextInt(19);
final String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
char[] text = new char[length];
final char[] text = new char[length];
for (int i = 0; i < length; i++)
{
text[i] = characters.charAt(rng.nextInt(characters.length()));
@ -255,9 +251,8 @@ public class TestJdbcHugeStream {
}
private Connection createConnection() throws ClassNotFoundException, SQLException {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
return con;
}

View File

@ -857,6 +857,11 @@
<artifactId>json-path</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.11.1.1</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>