NIFI-1280 Create FilterCSVColumns Processor.

Signed-off-by: Matt Burgess <mattyb149@apache.org>
This commit is contained in:
Toivo Adams 2016-05-07 12:29:15 +03:00 committed by Matt Burgess
parent 52cf9a7953
commit 4d5872a385
11 changed files with 1066 additions and 0 deletions

View File

@ -283,6 +283,11 @@ language governing permissions and limitations under the License. -->
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-example-csv</artifactId>
<version>1.11.0</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,303 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.csv;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.Pair;
import org.apache.commons.lang3.time.FastDateFormat;
import au.com.bytecode.opencsv.CSVReader;
/** Enumerator that reads from a CSV stream.
*
* @param <E> Row type
*/
class CsvEnumerator2<E> implements Enumerator<E> {
private final CSVReader reader;
private final String[] filterValues;
private final RowConverter<E> rowConverter;
private E current;
private static final FastDateFormat TIME_FORMAT_DATE;
private static final FastDateFormat TIME_FORMAT_TIME;
private static final FastDateFormat TIME_FORMAT_TIMESTAMP;
static {
TimeZone gmt = TimeZone.getTimeZone("GMT");
TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt);
TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);
TIME_FORMAT_TIMESTAMP =
FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt);
}
public CsvEnumerator2(CSVReader csvReader, List<CsvFieldType> fieldTypes) {
this(verifyNotNullReader(csvReader), fieldTypes, identityList(fieldTypes.size()));
}
public CsvEnumerator2(CSVReader csvReader, List<CsvFieldType> fieldTypes, int[] fields) {
//noinspection unchecked
this(csvReader, null, (RowConverter<E>) converter(fieldTypes, fields));
}
public CsvEnumerator2(CSVReader csvReader, String[] filterValues, RowConverter<E> rowConverter) {
this.rowConverter = rowConverter;
this.filterValues = filterValues;
this.reader = csvReader;
}
static public CSVReader verifyNotNullReader(CSVReader csvReader) {
if (csvReader==null)
throw new IllegalArgumentException("csvReader cannot be null");
return csvReader;
}
private static RowConverter<?> converter(List<CsvFieldType> fieldTypes,
int[] fields) {
if (fields.length == 1) {
final int field = fields[0];
return new SingleColumnRowConverter(fieldTypes.get(field), field);
} else {
return new ArrayRowConverter(fieldTypes, fields);
}
}
/** Deduces the names and types of a table's columns by reading the first line
* of a CSV stream. */
static public RelDataType deduceRowType(JavaTypeFactory typeFactory, String[] firstLine,
List<CsvFieldType> fieldTypes) {
final List<RelDataType> types = new ArrayList<>();
final List<String> names = new ArrayList<>();
for (String string : firstLine) {
final String name;
final CsvFieldType fieldType;
final int colon = string.indexOf(':');
if (colon >= 0) {
name = string.substring(0, colon);
String typeString = string.substring(colon + 1);
typeString = typeString.trim();
fieldType = CsvFieldType.of(typeString);
if (fieldType == null) {
System.out.println("WARNING: Found unknown type: "
+ typeString + " in first line: "
+ " for column: " + name
+ ". Will assume the type of column is string");
}
} else {
name = string;
fieldType = null;
}
final RelDataType type;
if (fieldType == null) {
type = typeFactory.createJavaType(String.class);
} else {
type = fieldType.toType(typeFactory);
}
names.add(name);
types.add(type);
if (fieldTypes != null) {
fieldTypes.add(fieldType);
}
}
if (names.isEmpty()) {
names.add("line");
types.add(typeFactory.createJavaType(String.class));
}
return typeFactory.createStructType(Pair.zip(names, types));
}
public E current() {
return current;
}
public boolean moveNext() {
try {
outer:
for (;;) {
final String[] strings = reader.readNext();
if (strings == null) {
current = null;
reader.close();
return false;
}
if (filterValues != null) {
for (int i = 0; i < strings.length; i++) {
String filterValue = filterValues[i];
if (filterValue != null) {
if (!filterValue.equals(strings[i])) {
continue outer;
}
}
}
}
current = rowConverter.convertRow(strings);
return true;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void reset() {
throw new UnsupportedOperationException();
}
public void close() {
try {
reader.close();
} catch (IOException e) {
throw new RuntimeException("Error closing CSV reader", e);
}
}
/** Returns an array of integers {0, ..., n - 1}. */
static int[] identityList(int n) {
int[] integers = new int[n];
for (int i = 0; i < n; i++) {
integers[i] = i;
}
return integers;
}
/** Row converter. */
abstract static class RowConverter<E> {
abstract E convertRow(String[] rows);
protected Object convert(CsvFieldType fieldType, String string) {
if (fieldType == null) {
return string;
}
switch (fieldType) {
case BOOLEAN:
if (string.length() == 0) {
return null;
}
return Boolean.parseBoolean(string);
case BYTE:
if (string.length() == 0) {
return null;
}
return Byte.parseByte(string);
case SHORT:
if (string.length() == 0) {
return null;
}
return Short.parseShort(string);
case INT:
if (string.length() == 0) {
return null;
}
return Integer.parseInt(string);
case LONG:
if (string.length() == 0) {
return null;
}
return Long.parseLong(string);
case FLOAT:
if (string.length() == 0) {
return null;
}
return Float.parseFloat(string);
case DOUBLE:
if (string.length() == 0) {
return null;
}
return Double.parseDouble(string);
case DATE:
if (string.length() == 0) {
return null;
}
try {
Date date = TIME_FORMAT_DATE.parse(string);
return new java.sql.Date(date.getTime());
} catch (ParseException e) {
return null;
}
case TIME:
if (string.length() == 0) {
return null;
}
try {
Date date = TIME_FORMAT_TIME.parse(string);
return new java.sql.Time(date.getTime());
} catch (ParseException e) {
return null;
}
case TIMESTAMP:
if (string.length() == 0) {
return null;
}
try {
Date date = TIME_FORMAT_TIMESTAMP.parse(string);
return new java.sql.Timestamp(date.getTime());
} catch (ParseException e) {
return null;
}
case STRING:
default:
return string;
}
}
}
/** Array row converter. */
static class ArrayRowConverter extends RowConverter<Object[]> {
private final CsvFieldType[] fieldTypes;
private final int[] fields;
ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields) {
this.fieldTypes = fieldTypes.toArray(new CsvFieldType[fieldTypes.size()]);
this.fields = fields;
}
public Object[] convertRow(String[] strings) {
final Object[] objects = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
int field = fields[i];
objects[i] = convert(fieldTypes[field], strings[field]);
}
return objects;
}
}
/** Single column row converter. */
private static class SingleColumnRowConverter extends RowConverter {
private final CsvFieldType fieldType;
private final int fieldIndex;
private SingleColumnRowConverter(CsvFieldType fieldType, int fieldIndex) {
this.fieldType = fieldType;
this.fieldIndex = fieldIndex;
}
public Object convertRow(String[] strings) {
return convert(fieldType, strings[fieldIndex]);
}
}
}
// End CsvEnumerator2.java

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.csv;
import java.io.Reader;
import java.util.Map;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import com.google.common.collect.ImmutableMap;
/**
* Schema mapped onto a directory of CSV files. Each table in the schema
* is a CSV file in that directory.
*/
public class CsvSchema2 extends AbstractSchema {
final private Map<String, Reader> inputs;
private final CsvTable.Flavor flavor;
private Map<String, Table> tableMap;
/**
* Creates a CSV schema.
*
* @param inputs Inputs map
* @param flavor Whether to instantiate flavor tables that undergo
* query optimization
*/
public CsvSchema2(Map<String, Reader> inputs, CsvTable.Flavor flavor) {
super();
this.inputs = inputs;
this.flavor = flavor;
}
/** Looks for a suffix on a string and returns
* either the string with the suffix removed
* or the original string. */
private static String trim(String s, String suffix) {
String trimmed = trimOrNull(s, suffix);
return trimmed != null ? trimmed : s;
}
/** Looks for a suffix on a string and returns
* either the string with the suffix removed
* or null. */
private static String trimOrNull(String s, String suffix) {
return s.endsWith(suffix)
? s.substring(0, s.length() - suffix.length())
: null;
}
@Override protected Map<String, Table> getTableMap() {
if (tableMap!=null)
return tableMap;
// Build a map from table name to table; each file becomes a table.
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
for (Map.Entry<String, Reader> entry : inputs.entrySet()) {
final Table table = createTable(entry.getValue());
builder.put(entry.getKey(), table);
}
tableMap = builder.build();
return tableMap;
}
/** Creates different sub-type of table based on the "flavor" attribute. */
private Table createTable(Reader readerx) {
switch (flavor) {
case TRANSLATABLE:
return new CsvTranslatableTable2(readerx, null);
// case SCANNABLE:
// return new CsvScannableTable(file, null);
// case FILTERABLE:
// return new CsvFilterableTable(file, null);
default:
throw new AssertionError("Unknown flavor " + flavor);
}
}
}
// End CsvSchema2.java

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.csv;
import java.io.Reader;
import java.util.Map;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
/**
* Factory that creates a {@link CsvSchema}.
*
* <p>Allows a custom schema to be included in a <code><i>model</i>.json</code>
* file.</p>
*/
@SuppressWarnings("UnusedDeclaration")
public class CsvSchemaFactory2 implements SchemaFactory {
final private Map<String, Reader> inputs;
// public constructor, per factory contract
public CsvSchemaFactory2(Map<String, Reader> inputs) {
this.inputs = inputs;
}
public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
String flavorName = (String) operand.get("flavor");
CsvTable.Flavor flavor;
if (flavorName == null) {
flavor = CsvTable.Flavor.SCANNABLE;
} else {
flavor = CsvTable.Flavor.valueOf(flavorName.toUpperCase());
}
return new CsvSchema2(inputs, flavor);
}
}
// End CsvSchemaFactory2.java

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.csv;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.calcite.linq4j.tree.Blocks;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.Primitive;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import java.util.List;
/**
* Relational expression representing a scan of a CSV stream.
*
* <p>Like any table scan, it serves as a leaf node of a query tree.</p>
*/
public class CsvTableScan2 extends TableScan implements EnumerableRel {
final CsvTranslatableTable2 csvTable;
final int[] fields;
protected CsvTableScan2(RelOptCluster cluster, RelOptTable table,
CsvTranslatableTable2 csvTable, int[] fields) {
super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table);
this.csvTable = csvTable;
this.fields = fields;
assert csvTable != null;
}
@Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
assert inputs.isEmpty();
return new CsvTableScan2(getCluster(), table, csvTable, fields);
}
@Override public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw)
.item("fields", Primitive.asList(fields));
}
@Override public RelDataType deriveRowType() {
final List<RelDataTypeField> fieldList = table.getRowType().getFieldList();
final RelDataTypeFactory.FieldInfoBuilder builder =
getCluster().getTypeFactory().builder();
for (int field : fields) {
builder.add(fieldList.get(field));
}
return builder.build();
}
@Override public void register(RelOptPlanner planner) {
planner.addRule(CsvProjectTableScanRule.INSTANCE);
}
public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
PhysType physType =
PhysTypeImpl.of(
implementor.getTypeFactory(),
getRowType(),
pref.preferArray());
if (table instanceof JsonTable) {
return implementor.result(
physType,
Blocks.toBlock(
Expressions.call(table.getExpression(JsonTable.class),
"enumerable")));
}
return implementor.result(
physType,
Blocks.toBlock(
Expressions.call(table.getExpression(CsvTranslatableTable2.class),
"project", Expressions.constant(fields))));
}
}
// End CsvTableScan.java

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.csv;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.QueryableTable;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.TranslatableTable;
import au.com.bytecode.opencsv.CSVReader;
import java.io.IOException;
import java.io.Reader;
import java.lang.reflect.Type;
import java.util.ArrayList;
/**
* Table based on a CSV stream.
*/
public class CsvTranslatableTable2 extends CsvTable
implements QueryableTable, TranslatableTable {
final private CSVReader csvReader;
private CsvEnumerator2<Object> csvEnumerator2;
final private String[] firstLine;
/** Creates a CsvTable.
*/
CsvTranslatableTable2(Reader readerx, RelProtoDataType protoRowType) {
super(null, protoRowType);
this.csvReader = new CSVReader(readerx);
try {
this.firstLine = csvReader.readNext();
} catch (IOException e) {
throw new RuntimeException("csvReader.readNext() failed ", e);
}
}
public String toString() {
return "CsvTranslatableTable2";
}
/** Returns an enumerable over a given projection of the fields.
*
* <p>Called from generated code. */
public Enumerable<Object> project(final int[] fields) {
return new AbstractEnumerable<Object>() {
public Enumerator<Object> enumerator() {
return csvEnumerator2;
}
};
}
public Expression getExpression(SchemaPlus schema, String tableName,
Class clazz) {
return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
}
public Type getElementType() {
return Object[].class;
}
public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
SchemaPlus schema, String tableName) {
throw new UnsupportedOperationException();
}
public RelNode toRel(
RelOptTable.ToRelContext context,
RelOptTable relOptTable) {
// Request all fields.
final int fieldCount = relOptTable.getRowType().getFieldCount();
final int[] fields = CsvEnumerator.identityList(fieldCount);
return new CsvTableScan2(context.getCluster(), relOptTable, this, fields);
}
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
RelDataType rowType = null;
if (fieldTypes == null) {
fieldTypes = new ArrayList<CsvFieldType>();
rowType = CsvEnumerator2.deduceRowType((JavaTypeFactory) typeFactory, firstLine, fieldTypes);
} else {
rowType = CsvEnumerator2.deduceRowType((JavaTypeFactory) typeFactory, firstLine, null);
}
if (csvEnumerator2==null)
csvEnumerator2 = new CsvEnumerator2<Object>(csvReader, fieldTypes);
return rowType;
}
}
// End CsvTranslatableTable2.java

View File

@ -0,0 +1,258 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static java.sql.Types.CHAR;
import static java.sql.Types.LONGNVARCHAR;
import static java.sql.Types.LONGVARCHAR;
import static java.sql.Types.NCHAR;
import static java.sql.Types.NVARCHAR;
import static java.sql.Types.VARCHAR;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.calcite.adapter.csv.CsvSchemaFactory2;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.StopWatch;
import com.google.common.collect.ImmutableMap;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"xml", "xslt", "transform"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Filter out specific columns from CSV data. Some other transformations are also supported."
+ "Columns can be renamed, simple calculations performed, aggregations, etc."
+ "SQL select statement is used to specify how CSV data should be transformed."
+ "SQL statement follows standard SQL, some restrictions may apply."
+ "Successfully transformed CSV data is routed to the 'success' relationship."
+ "If transform fails, the original FlowFile is routed to the 'failure' relationship")
public class FilterCSVColumns extends AbstractProcessor {
public static final PropertyDescriptor SQL_SELECT = new PropertyDescriptor.Builder()
.name("SQL select statement")
.description("SQL select statement specifies how CSV data should be transformed. "
+ "Sql select should select from CSV.A table")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("The FlowFile with transformed content will be routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a FlowFile fails processing for any reason (for example, the SQL statement contains columns not present in CSV), it will be routed to this relationship")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SQL_SELECT);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile original = session.get();
if (original == null) {
return;
}
final ProcessorLog logger = getLogger();
final StopWatch stopWatch = new StopWatch(true);
try {
FlowFile transformed = session.write(original, new StreamCallback() {
@Override
public void process(final InputStream rawIn, final OutputStream out) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
String sql = context.getProperty(SQL_SELECT).getValue();
final ResultSet resultSet = transform(rawIn, sql);
convertToCSV(resultSet, out);
} catch (final Exception e) {
throw new IOException(e);
}
}
});
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("Transformed {}", new Object[]{original});
} catch (ProcessException e) {
logger.error("Unable to transform {} due to {}", new Object[]{original, e});
session.transfer(original, REL_FAILURE);
}
}
static protected ResultSet transform(InputStream rawIn, String sql) throws SQLException {
Reader readerx = new InputStreamReader(rawIn);
HashMap<String, Reader> inputs = new HashMap<>();
inputs.put("A", readerx);
Statement statement = null;
final Properties properties = new Properties();
// properties.setProperty("caseSensitive", "true");
try (final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties)) {
final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
final SchemaPlus rootSchema = calciteConnection.getRootSchema();
final Schema schema =
new CsvSchemaFactory2(inputs)
.create(rootSchema, "CSV", ImmutableMap.<String, Object>of("flavor", "TRANSLATABLE"));
calciteConnection.getRootSchema().add("CSV", schema);
rootSchema.add("default", schema);
statement = connection.createStatement();
final ResultSet resultSet = statement.executeQuery(sql);
return resultSet;
}
}
static protected void convertToCSV(ResultSet resultSet, OutputStream out) throws SQLException, IOException {
convertToCsvStream(resultSet, out);
}
public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException {
return convertToCsvStream(rs, outStream, null, null);
}
public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback)
throws SQLException, IOException {
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
List<String> columnNames = new ArrayList<>(nrOfColumns);
for (int i = 1; i <= nrOfColumns; i++) {
String columnNameFromMeta = meta.getColumnName(i);
// Hive returns table.column for column name. Grab the column name as the string after the last period
int columnNameDelimiter = columnNameFromMeta.lastIndexOf(".");
columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 1));
}
// Write column names as header row
outStream.write(StringUtils.join(columnNames, ",").getBytes(StandardCharsets.UTF_8));
outStream.write("\n".getBytes(StandardCharsets.UTF_8));
// Iterate over the rows
long nrOfRows = 0;
while (rs.next()) {
if (callback != null) {
callback.processRow(rs);
}
List<String> rowValues = new ArrayList<>(nrOfColumns);
for (int i = 1; i <= nrOfColumns; i++) {
final int javaSqlType = meta.getColumnType(i);
final Object value = rs.getObject(i);
switch (javaSqlType) {
case CHAR:
case LONGNVARCHAR:
case LONGVARCHAR:
case NCHAR:
case NVARCHAR:
case VARCHAR:
rowValues.add("\"" + StringEscapeUtils.escapeCsv(rs.getString(i)) + "\"");
break;
default:
rowValues.add(value.toString());
}
}
// Write row values
outStream.write(StringUtils.join(rowValues, ",").getBytes(StandardCharsets.UTF_8));
outStream.write("\n".getBytes(StandardCharsets.UTF_8));
nrOfRows++;
}
return nrOfRows;
}
/**
* An interface for callback methods which allows processing of a row during the convertToXYZStream() processing.
* <b>IMPORTANT:</b> This method should only work on the row pointed at by the current ResultSet reference.
* Advancing the cursor (e.g.) can cause rows to be skipped during Avro transformation.
*/
public interface ResultSetRowCallback {
void processRow(ResultSet resultSet) throws IOException;
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestFilterCSVColumns {
private static final Logger LOGGER;
static {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.FilterCSVColumns", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestFilterCSVColumns", "debug");
LOGGER = LoggerFactory.getLogger(TestFilterCSVColumns.class);
}
@Test
public void testTransformSimple() throws InitializationException, IOException, SQLException {
String sql = "select first_name, last_name, company_name, address, city from CSV.A where city='New York'";
Path inpath = Paths.get("src/test/resources/TestFilterCSVColumns/US500.csv");
InputStream in = new FileInputStream(inpath.toFile());
ResultSet resultSet = FilterCSVColumns.transform(in, sql);
int nrofColumns = resultSet.getMetaData().getColumnCount();
for (int i = 1; i <= nrofColumns; i++) {
System.out.print(resultSet.getMetaData().getColumnLabel(i) + " ");
}
System.out.println();
while (resultSet.next()) {
for (int i = 1; i <= nrofColumns; i++) {
System.out.print(resultSet.getString(i)+ " ");
}
System.out.println();
}
}
@Test
public void testTransformCalc() throws InitializationException, IOException, SQLException {
String sql = "select ID, AMOUNT1+AMOUNT2+AMOUNT3 as TOTAL from CSV.A where ID=100";
Path inpath = Paths.get("src/test/resources/TestFilterCSVColumns/Numeric.csv");
InputStream in = new FileInputStream(inpath.toFile());
ResultSet resultSet = FilterCSVColumns.transform(in, sql);
int nrofColumns = resultSet.getMetaData().getColumnCount();
for (int i = 1; i <= nrofColumns; i++) {
System.out.print(resultSet.getMetaData().getColumnLabel(i) + " ");
}
System.out.println();
while (resultSet.next()) {
for (int i = 1; i <= nrofColumns; i++) {
System.out.print(resultSet.getString(i)+ " ");
}
double total = resultSet.getDouble("TOTAL");
System.out.println();
assertEquals(90.75, total, 0.0001);
}
}
@Test
public void testSimpleTypeless() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(FilterCSVColumns.class);
String sql = "select first_name, last_name, company_name, address, city from CSV.A where city='New York'";
runner.setProperty(FilterCSVColumns.SQL_SELECT, sql);
runner.enqueue(Paths.get("src/test/resources/TestFilterCSVColumns/US500_typeless.csv"));
runner.run();
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS);
for (final MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile);
System.out.println(new String(flowFile.toByteArray()));
}
}
}

View File

@ -0,0 +1,5 @@
ID:int,AMOUNT1: float,AMOUNT2:float,AMOUNT3:float
008, 10.05, 15.45, 89.99
100, 20.25, 25.25, 45.25
105, 20.05, 25.05, 45.05
200, 34.05, 25.05, 75.05
1 ID:int AMOUNT1: float AMOUNT2:float AMOUNT3:float
2 008 10.05 15.45 89.99
3 100 20.25 25.25 45.25
4 105 20.05 25.05 45.05
5 200 34.05 25.05 75.05