mirror of https://github.com/apache/nifi.git
NIFI-626 - First working version.
Signed-off-by: Toivo Adams <toivo.adams@gmail.com> Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
13addeb571
commit
14bcad212a
|
@ -17,7 +17,6 @@
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
|
@ -30,24 +29,20 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.dbcp.DBCPService;
|
import org.apache.nifi.dbcp.DBCPService;
|
||||||
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
|
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.stream.io.StreamUtils;
|
|
||||||
import org.apache.nifi.logging.ProcessorLog;
|
import org.apache.nifi.logging.ProcessorLog;
|
||||||
import org.apache.nifi.processor.AbstractProcessor;
|
import org.apache.nifi.processor.AbstractProcessor;
|
||||||
import org.apache.nifi.processor.DataUnit;
|
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
|
||||||
import org.apache.nifi.processor.exception.FlowFileAccessException;
|
import org.apache.nifi.processor.exception.FlowFileAccessException;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
import org.apache.nifi.processor.io.StreamCallback;
|
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.processors.standard.util.JdbcCommon;
|
import org.apache.nifi.processors.standard.util.JdbcCommon;
|
||||||
import org.apache.nifi.util.StopWatch;
|
import org.apache.nifi.util.StopWatch;
|
||||||
|
@ -92,6 +87,7 @@ public class ExecuteSQL extends AbstractProcessor {
|
||||||
relationships = Collections.unmodifiableSet(r);
|
relationships = Collections.unmodifiableSet(r);
|
||||||
|
|
||||||
ArrayList<PropertyDescriptor> pds = new ArrayList<>();
|
ArrayList<PropertyDescriptor> pds = new ArrayList<>();
|
||||||
|
pds.add(DBCP_SERVICE);
|
||||||
pds.add(SQL_SELECT_QUERY);
|
pds.add(SQL_SELECT_QUERY);
|
||||||
propDescriptors = Collections.unmodifiableList(pds);
|
propDescriptors = Collections.unmodifiableList(pds);
|
||||||
}
|
}
|
||||||
|
@ -108,47 +104,54 @@ public class ExecuteSQL extends AbstractProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||||
FlowFile flowFile = session.get();
|
FlowFile incoming = session.get();
|
||||||
if (flowFile == null) {
|
if (incoming == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final ProcessorLog logger = getLogger();
|
final ProcessorLog logger = getLogger();
|
||||||
|
|
||||||
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
|
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
|
||||||
final String selectQuery = context.getProperty(SQL_SELECT_QUERY).getValue();
|
final String selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(incoming).getValue();
|
||||||
|
final StopWatch stopWatch = new StopWatch(true);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final Connection con = dbcpService.getConnection();
|
final Connection con = dbcpService.getConnection();
|
||||||
final Statement st = con.createStatement();
|
try {
|
||||||
|
final Statement st = con.createStatement();
|
||||||
|
try {
|
||||||
|
FlowFile outgoing = session.write(incoming, new OutputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(final OutputStream out) throws IOException {
|
||||||
|
try {
|
||||||
|
logger.info("start executing query {}", new Object[]{selectQuery});
|
||||||
|
ResultSet resultSet = st.executeQuery(selectQuery);
|
||||||
|
Long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, out);
|
||||||
|
logger.info("Result FlowFile contain {} Avro records", new Object[]{nrOfRows});
|
||||||
|
|
||||||
final StopWatch stopWatch = new StopWatch(true);
|
} catch (SQLException e) {
|
||||||
|
throw new ProcessException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
logger.info("Transferred {} to 'success'", new Object[]{outgoing});
|
||||||
@Override
|
session.getProvenanceReporter().modifyContent(outgoing, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||||
public void process(final OutputStream out) throws IOException {
|
session.transfer(outgoing, REL_SUCCESS);
|
||||||
try {
|
} finally {
|
||||||
ResultSet resultSet = st.executeQuery(selectQuery);
|
st.close();
|
||||||
long nrOfRows = JdbcCommon.convertToAvroStream(resultSet, out);
|
|
||||||
|
|
||||||
|
|
||||||
} catch (SQLException e) {
|
|
||||||
// TODO Auto-generated catch block
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
} finally {
|
||||||
|
// return connection to pool
|
||||||
|
con.close();
|
||||||
|
}
|
||||||
|
|
||||||
logger.info("Transferred {} to 'success'", new Object[]{flowFile});
|
} catch (ProcessException e) {
|
||||||
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
logger.error("Unable to execute sql select query due to {}", new Object[]{e});
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(incoming, REL_FAILURE);
|
||||||
|
|
||||||
} catch (FlowFileAccessException e) {
|
|
||||||
// TODO Auto-generated catch block
|
|
||||||
e.printStackTrace();
|
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
// TODO Auto-generated catch block
|
logger.error("Unable to execute sql select query due to {}", new Object[]{e});
|
||||||
e.printStackTrace();
|
session.transfer(incoming, REL_FAILURE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard.util;
|
package org.apache.nifi.processors.standard.util;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
|
@ -30,9 +29,7 @@ import org.apache.avro.file.DataFileWriter;
|
||||||
import org.apache.avro.generic.GenericData;
|
import org.apache.avro.generic.GenericData;
|
||||||
import org.apache.avro.generic.GenericDatumWriter;
|
import org.apache.avro.generic.GenericDatumWriter;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.io.BinaryEncoder;
|
|
||||||
import org.apache.avro.io.DatumWriter;
|
import org.apache.avro.io.DatumWriter;
|
||||||
import org.apache.avro.io.EncoderFactory;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -46,9 +43,6 @@ public class JdbcCommon {
|
||||||
Schema schema = createSchema(rs);
|
Schema schema = createSchema(rs);
|
||||||
GenericRecord rec = new GenericData.Record(schema);
|
GenericRecord rec = new GenericData.Record(schema);
|
||||||
|
|
||||||
// ByteArrayOutputStream out = new ByteArrayOutputStream();
|
|
||||||
// BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
|
|
||||||
|
|
||||||
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
|
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
|
||||||
DataFileWriter<GenericRecord> dataFileWriter= new DataFileWriter<GenericRecord>(datumWriter);
|
DataFileWriter<GenericRecord> dataFileWriter= new DataFileWriter<GenericRecord>(datumWriter);
|
||||||
dataFileWriter.create(schema, outStream);
|
dataFileWriter.create(schema, outStream);
|
||||||
|
@ -67,10 +61,6 @@ public class JdbcCommon {
|
||||||
|
|
||||||
dataFileWriter.close();
|
dataFileWriter.close();
|
||||||
return nrOfRows;
|
return nrOfRows;
|
||||||
// encoder.flush();
|
|
||||||
// out.close();
|
|
||||||
// byte[] serializedBytes = out.toByteArray();
|
|
||||||
// return serializedBytes;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Schema createSchema(ResultSet rs) throws SQLException {
|
public static Schema createSchema(ResultSet rs) throws SQLException {
|
||||||
|
@ -82,7 +72,8 @@ public class JdbcCommon {
|
||||||
FieldAssembler<Schema> builder = SchemaBuilder.record(tableName).namespace("any.data").fields();
|
FieldAssembler<Schema> builder = SchemaBuilder.record(tableName).namespace("any.data").fields();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Type conversion is not precise and is incomplete, needs to be fixed!!!!!!
|
* Some missing Avro types - Decimal, Date types.
|
||||||
|
* May need some additional work.
|
||||||
*/
|
*/
|
||||||
for (int i = 1; i <= nrOfColumns; i++)
|
for (int i = 1; i <= nrOfColumns; i++)
|
||||||
switch (meta.getColumnType(i)) {
|
switch (meta.getColumnType(i)) {
|
||||||
|
|
|
@ -66,3 +66,4 @@ org.apache.nifi.processors.standard.SplitXml
|
||||||
org.apache.nifi.processors.standard.TransformXml
|
org.apache.nifi.processors.standard.TransformXml
|
||||||
org.apache.nifi.processors.standard.UnpackContent
|
org.apache.nifi.processors.standard.UnpackContent
|
||||||
org.apache.nifi.processors.standard.ValidateXml
|
org.apache.nifi.processors.standard.ValidateXml
|
||||||
|
org.apache.nifi.processors.standard.ExecuteSQL
|
||||||
|
|
|
@ -16,22 +16,32 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.DriverManager;
|
import java.sql.DriverManager;
|
||||||
import java.util.Collection;
|
import java.sql.SQLException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.avro.file.DataFileStream;
|
||||||
import org.apache.nifi.components.ValidationContext;
|
import org.apache.avro.generic.GenericDatumReader;
|
||||||
import org.apache.nifi.components.ValidationResult;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
import org.apache.avro.io.DatumReader;
|
||||||
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.dbcp.DBCPService;
|
import org.apache.nifi.dbcp.DBCPService;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processors.standard.util.TestJdbcHugeStream;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.fusesource.hawtbuf.ByteArrayInputStream;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -58,61 +68,76 @@ public class TestExecuteSQL {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test1() throws InitializationException {
|
public void test1() throws InitializationException, ClassNotFoundException, SQLException, IOException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class);
|
final TestRunner runner = TestRunners.newTestRunner(ExecuteSQL.class);
|
||||||
|
|
||||||
final DBCPService dbcp = new DBCPServiceSimpleImpl();
|
final DBCPService dbcp = new DBCPServiceSimpleImpl();
|
||||||
final Map<String, String> dbcpProperties = new HashMap<>();
|
final Map<String, String> dbcpProperties = new HashMap<>();
|
||||||
dbcpProperties.put("Database Host", "NA"); // Embedded Derby don't use host
|
|
||||||
dbcpProperties.put("Database Port", "1"); // Embedded Derby don't use port, but must have value anyway
|
|
||||||
dbcpProperties.put("Database Name", DB_LOCATION);
|
|
||||||
dbcpProperties.put("Database User", "tester");
|
|
||||||
dbcpProperties.put("Password", "testerp");
|
|
||||||
|
|
||||||
runner.addControllerService("dbcp", dbcp, dbcpProperties);
|
runner.addControllerService("dbcp", dbcp, dbcpProperties);
|
||||||
|
|
||||||
|
runner.enableControllerService(dbcp);
|
||||||
runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp");
|
runner.setProperty(ExecuteSQL.DBCP_SERVICE, "dbcp");
|
||||||
|
|
||||||
|
// remove previous test database, if any
|
||||||
|
File dbLocation = new File(DB_LOCATION);
|
||||||
|
dbLocation.delete();
|
||||||
|
|
||||||
|
// load test data to database
|
||||||
|
Connection con = dbcp.getConnection();
|
||||||
|
TestJdbcHugeStream.loadTestData2Database(con, 100, 100, 100);
|
||||||
|
System.out.println("test data loaded");
|
||||||
|
|
||||||
|
// ResultSet size will be 1x100x100 = 10000 rows
|
||||||
|
// because of where PER.ID = ${person.id}
|
||||||
|
final int nrOfRows = 10000;
|
||||||
String query = "select "
|
String query = "select "
|
||||||
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
|
+ " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode"
|
||||||
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
|
+ ", PRD.ID as ProductId,PRD.NAME as ProductName,PRD.CODE as ProductCode"
|
||||||
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
|
+ ", REL.ID as RelId, REL.NAME as RelName, REL.CODE as RelCode"
|
||||||
+ ", ROW_NUMBER() OVER () as rownr "
|
+ ", ROW_NUMBER() OVER () as rownr "
|
||||||
+ " from persons PER, products PRD, relationships REL";
|
+ " from persons PER, products PRD, relationships REL"
|
||||||
|
+ " where PER.ID = ${person.id}";
|
||||||
|
|
||||||
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
|
runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, query);
|
||||||
runner.enableControllerService(dbcp);
|
|
||||||
|
|
||||||
runner.enqueue("Hello".getBytes());
|
// incoming FlowFile content is not used, but attributes are used
|
||||||
|
Map<String,String> attributes = new HashMap<String,String>();
|
||||||
|
attributes.put("person.id", "10");
|
||||||
|
runner.enqueue("Hello".getBytes(), attributes);
|
||||||
|
|
||||||
runner.run();
|
runner.run();
|
||||||
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
|
runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1);
|
||||||
runner.clearTransferState();
|
|
||||||
|
// read all Avro records and verify created FlowFile contains 1000000 records
|
||||||
|
List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS);
|
||||||
|
InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray());
|
||||||
|
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
|
||||||
|
DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(in, datumReader);
|
||||||
|
GenericRecord record = null;
|
||||||
|
long recordsFromStream = 0;
|
||||||
|
while (dataFileReader.hasNext()) {
|
||||||
|
// Reuse record object by passing it to next(). This saves us from
|
||||||
|
// allocating and garbage collecting many objects for files with many items.
|
||||||
|
record = dataFileReader.next(record);
|
||||||
|
// System.out.println(record);
|
||||||
|
recordsFromStream += 1;
|
||||||
|
}
|
||||||
|
System.out.println("total nr of records from stream: " + recordsFromStream);
|
||||||
|
assertEquals(nrOfRows, recordsFromStream);
|
||||||
|
dataFileReader.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple implementation only for ExecuteSQL processor testing.
|
* Simple implementation only for ExecuteSQL processor testing.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
class DBCPServiceSimpleImpl implements DBCPService {
|
class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(ControllerServiceInitializationContext context) throws InitializationException { }
|
public String getIdentifier() {
|
||||||
|
return "dbcp";
|
||||||
@Override
|
}
|
||||||
public Collection<ValidationResult> validate(ValidationContext context) { return null; }
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public PropertyDescriptor getPropertyDescriptor(String name) { return null; }
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<PropertyDescriptor> getPropertyDescriptors() { return null; }
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getIdentifier() { return null; }
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Connection getConnection() throws ProcessException {
|
public Connection getConnection() throws ProcessException {
|
||||||
|
|
Loading…
Reference in New Issue