This commit is contained in:
joewitt 2015-04-25 09:15:36 -04:00
parent e93c829191
commit b2a1f5217d
12 changed files with 707 additions and 714 deletions

View File

@ -81,7 +81,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
public ValidationResult validate(String subject, String uri, ValidationContext context) {
String message = "not set";
boolean isValid = true;
if (uri.trim().isEmpty()) {
isValid = false;
} else {
@ -95,7 +95,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
}
}
}
return new ValidationResult.Builder()
.subject(subject)
.input(uri)
@ -163,14 +163,14 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
public ValidationResult validate(String subject, String uri, ValidationContext context) {
Configuration conf = getConfiguration(context.getProperty(CONF_XML_FILES).getValue());
String error = null;
final boolean elPresent = context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(uri);
if (!elPresent) {
try {
getSchema(uri, conf);
} catch (SchemaNotFoundException e) {
} catch (SchemaNotFoundException e) {
error = e.getMessage();
}
}
}
return new ValidationResult.Builder()
.subject(subject)

View File

@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import org.apache.avro.Schema;
@ -24,17 +23,16 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
class AvroUtil {
@SuppressWarnings("unchecked")
public static <D> DatumWriter<D> newDatumWriter(Schema schema, Class<D> dClass) {
return (DatumWriter<D>) GenericData.get().createDatumWriter(schema);
}
@SuppressWarnings("unchecked")
public static <D> DatumWriter<D> newDatumWriter(Schema schema, Class<D> dClass) {
return (DatumWriter<D>) GenericData.get().createDatumWriter(schema);
}
@SuppressWarnings("unchecked")
public static <D> DatumReader<D> newDatumReader(Schema schema, Class<D> dClass) {
return (DatumReader<D>) GenericData.get().createDatumReader(schema);
}
@SuppressWarnings("unchecked")
public static <D> DatumReader<D> newDatumReader(Schema schema, Class<D> dClass) {
return (DatumReader<D>) GenericData.get().createDatumReader(schema);
}
}

View File

@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import com.google.common.annotations.VisibleForTesting;
@ -57,204 +56,205 @@ import static org.apache.nifi.processor.util.StandardValidators.createLongValida
@Tags({"kite", "csv", "avro"})
@CapabilityDescription(
"Converts CSV files to Avro according to an Avro Schema")
"Converts CSV files to Avro according to an Avro Schema")
public class ConvertCSVToAvro extends AbstractKiteProcessor {
private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build();
private static final Validator CHAR_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input,
ValidationContext context) {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.explanation("Only single characters are supported")
.valid(input.length() == 1)
.build();
}
};
private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build();
private static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFile content has been successfully saved")
.build();
private static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFile content could not be processed")
.build();
@VisibleForTesting
static final PropertyDescriptor SCHEMA =
new PropertyDescriptor.Builder()
.name("Record schema")
.description("Outgoing Avro schema for each record created from a CSV row")
.addValidator(SCHEMA_VALIDATOR)
.expressionLanguageSupported(true)
.required(true)
.build();
@VisibleForTesting
static final PropertyDescriptor CHARSET =
new PropertyDescriptor.Builder()
.name("CSV charset")
.description("Character set for CSV files")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue(DEFAULTS.charset)
.build();
@VisibleForTesting
static final PropertyDescriptor DELIMITER =
new PropertyDescriptor.Builder()
.name("CSV delimiter")
.description("Delimiter character for CSV records")
.addValidator(CHAR_VALIDATOR)
.defaultValue(DEFAULTS.delimiter)
.build();
@VisibleForTesting
static final PropertyDescriptor QUOTE =
new PropertyDescriptor.Builder()
.name("CSV quote character")
.description("Quote character for CSV values")
.addValidator(CHAR_VALIDATOR)
.defaultValue(DEFAULTS.quote)
.build();
@VisibleForTesting
static final PropertyDescriptor ESCAPE =
new PropertyDescriptor.Builder()
.name("CSV escape character")
.description("Escape character for CSV values")
.addValidator(CHAR_VALIDATOR)
.defaultValue(DEFAULTS.escape)
.build();
@VisibleForTesting
static final PropertyDescriptor HAS_HEADER =
new PropertyDescriptor.Builder()
.name("Use CSV header line")
.description("Whether to use the first line as a header")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue(String.valueOf(DEFAULTS.useHeader))
.build();
@VisibleForTesting
static final PropertyDescriptor LINES_TO_SKIP =
new PropertyDescriptor.Builder()
.name("Lines to skip")
.description("Number of lines to skip before reading header or data")
.addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
.defaultValue(String.valueOf(DEFAULTS.linesToSkip))
.build();
private static final List<PropertyDescriptor> PROPERTIES =
ImmutableList.<PropertyDescriptor>builder()
.addAll(AbstractKiteProcessor.getProperties())
.add(SCHEMA)
.add(CHARSET)
.add(DELIMITER)
.add(QUOTE)
.add(ESCAPE)
.add(HAS_HEADER)
.add(LINES_TO_SKIP)
.build();
private static final Set<Relationship> RELATIONSHIPS =
ImmutableSet.<Relationship>builder()
.add(SUCCESS)
.add(FAILURE)
.build();
// Immutable configuration
@VisibleForTesting
volatile CSVProperties props;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@OnScheduled
public void createCSVProperties(ProcessContext context) throws IOException {
super.setDefaultConfiguration(context);
this.props = new CSVProperties.Builder()
.charset(context.getProperty(CHARSET).getValue())
.delimiter(context.getProperty(DELIMITER).getValue())
.quote(context.getProperty(QUOTE).getValue())
.escape(context.getProperty(ESCAPE).getValue())
.hasHeader(context.getProperty(HAS_HEADER).asBoolean())
.linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
.build();
}
@Override
public void onTrigger(ProcessContext context, final ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
String schemaProperty = context.getProperty(SCHEMA)
.evaluateAttributeExpressions(flowFile)
.getValue();
final Schema schema;
try {
schema = getSchema(schemaProperty, DefaultConfiguration.get());
} catch (SchemaNotFoundException e) {
getLogger().error("Cannot find schema: " + schemaProperty);
session.transfer(flowFile, FAILURE);
return;
}
final DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(schema, Record.class));
writer.setCodec(CodecFactory.snappyCodec());
try {
flowFile = session.write(flowFile, new StreamCallback() {
private static final Validator CHAR_VALIDATOR = new Validator() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
long written = 0L;
long errors = 0L;
try (CSVFileReader<Record> reader = new CSVFileReader<>(
in, props, schema, Record.class)) {
reader.initialize();
try (DataFileWriter<Record> w = writer.create(schema, out)) {
while (reader.hasNext()) {
try {
Record record = reader.next();
w.append(record);
written += 1;
} catch (DatasetRecordException e) {
errors += 1;
}
}
}
}
session.adjustCounter("Converted records", written,
false /* update only if file transfer is successful */);
session.adjustCounter("Conversion errors", errors,
false /* update only if file transfer is successful */);
public ValidationResult validate(String subject, String input,
ValidationContext context) {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.explanation("Only single characters are supported")
.valid(input.length() == 1)
.build();
}
});
};
session.transfer(flowFile, SUCCESS);
private static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFile content has been successfully saved")
.build();
//session.getProvenanceReporter().send(flowFile, target.getUri().toString());
} catch (ProcessException | DatasetIOException e) {
getLogger().error("Failed reading or writing", e);
session.transfer(flowFile, FAILURE);
} catch (DatasetException e) {
getLogger().error("Failed to read FlowFile", e);
session.transfer(flowFile, FAILURE);
private static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFile content could not be processed")
.build();
@VisibleForTesting
static final PropertyDescriptor SCHEMA
= new PropertyDescriptor.Builder()
.name("Record schema")
.description("Outgoing Avro schema for each record created from a CSV row")
.addValidator(SCHEMA_VALIDATOR)
.expressionLanguageSupported(true)
.required(true)
.build();
@VisibleForTesting
static final PropertyDescriptor CHARSET
= new PropertyDescriptor.Builder()
.name("CSV charset")
.description("Character set for CSV files")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue(DEFAULTS.charset)
.build();
@VisibleForTesting
static final PropertyDescriptor DELIMITER
= new PropertyDescriptor.Builder()
.name("CSV delimiter")
.description("Delimiter character for CSV records")
.addValidator(CHAR_VALIDATOR)
.defaultValue(DEFAULTS.delimiter)
.build();
@VisibleForTesting
static final PropertyDescriptor QUOTE
= new PropertyDescriptor.Builder()
.name("CSV quote character")
.description("Quote character for CSV values")
.addValidator(CHAR_VALIDATOR)
.defaultValue(DEFAULTS.quote)
.build();
@VisibleForTesting
static final PropertyDescriptor ESCAPE
= new PropertyDescriptor.Builder()
.name("CSV escape character")
.description("Escape character for CSV values")
.addValidator(CHAR_VALIDATOR)
.defaultValue(DEFAULTS.escape)
.build();
@VisibleForTesting
static final PropertyDescriptor HAS_HEADER
= new PropertyDescriptor.Builder()
.name("Use CSV header line")
.description("Whether to use the first line as a header")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue(String.valueOf(DEFAULTS.useHeader))
.build();
@VisibleForTesting
static final PropertyDescriptor LINES_TO_SKIP
= new PropertyDescriptor.Builder()
.name("Lines to skip")
.description("Number of lines to skip before reading header or data")
.addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
.defaultValue(String.valueOf(DEFAULTS.linesToSkip))
.build();
private static final List<PropertyDescriptor> PROPERTIES
= ImmutableList.<PropertyDescriptor>builder()
.addAll(AbstractKiteProcessor.getProperties())
.add(SCHEMA)
.add(CHARSET)
.add(DELIMITER)
.add(QUOTE)
.add(ESCAPE)
.add(HAS_HEADER)
.add(LINES_TO_SKIP)
.build();
private static final Set<Relationship> RELATIONSHIPS
= ImmutableSet.<Relationship>builder()
.add(SUCCESS)
.add(FAILURE)
.build();
// Immutable configuration
@VisibleForTesting
volatile CSVProperties props;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@OnScheduled
public void createCSVProperties(ProcessContext context) throws IOException {
super.setDefaultConfiguration(context);
this.props = new CSVProperties.Builder()
.charset(context.getProperty(CHARSET).getValue())
.delimiter(context.getProperty(DELIMITER).getValue())
.quote(context.getProperty(QUOTE).getValue())
.escape(context.getProperty(ESCAPE).getValue())
.hasHeader(context.getProperty(HAS_HEADER).asBoolean())
.linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
.build();
}
@Override
public void onTrigger(ProcessContext context, final ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
String schemaProperty = context.getProperty(SCHEMA)
.evaluateAttributeExpressions(flowFile)
.getValue();
final Schema schema;
try {
schema = getSchema(schemaProperty, DefaultConfiguration.get());
} catch (SchemaNotFoundException e) {
getLogger().error("Cannot find schema: " + schemaProperty);
session.transfer(flowFile, FAILURE);
return;
}
final DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(schema, Record.class));
writer.setCodec(CodecFactory.snappyCodec());
try {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
long written = 0L;
long errors = 0L;
try (CSVFileReader<Record> reader = new CSVFileReader<>(
in, props, schema, Record.class)) {
reader.initialize();
try (DataFileWriter<Record> w = writer.create(schema, out)) {
while (reader.hasNext()) {
try {
Record record = reader.next();
w.append(record);
written += 1;
} catch (DatasetRecordException e) {
errors += 1;
}
}
}
}
session.adjustCounter("Converted records", written,
false /* update only if file transfer is successful */);
session.adjustCounter("Conversion errors", errors,
false /* update only if file transfer is successful */);
}
});
session.transfer(flowFile, SUCCESS);
//session.getProvenanceReporter().send(flowFile, target.getUri().toString());
} catch (ProcessException | DatasetIOException e) {
getLogger().error("Failed reading or writing", e);
session.transfer(flowFile, FAILURE);
} catch (DatasetException e) {
getLogger().error("Failed to read FlowFile", e);
session.transfer(flowFile, FAILURE);
}
}
}
}

View File

@ -105,7 +105,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
}
String schemaProperty = context.getProperty(SCHEMA)
.evaluateAttributeExpressions(flowFile)
.evaluateAttributeExpressions(flowFile)
.getValue();
final Schema schema;
try {

View File

@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import com.google.common.collect.ImmutableList;
@ -50,113 +49,114 @@ import org.kitesdk.data.spi.SchemaValidationUtil;
@Tags({"kite", "avro", "parquet", "hadoop", "hive", "hdfs", "hbase"})
@CapabilityDescription("Stores Avro records in a Kite dataset")
public class StoreInKiteDataset extends AbstractKiteProcessor {
private static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFile content has been successfully saved")
.build();
private static final Relationship INCOMPATIBLE = new Relationship.Builder()
.name("incompatible")
.description("FlowFile content is not compatible with the target dataset")
.build();
private static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFile content has been successfully saved")
.build();
private static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFile content could not be processed")
.build();
private static final Relationship INCOMPATIBLE = new Relationship.Builder()
.name("incompatible")
.description("FlowFile content is not compatible with the target dataset")
.build();
public static final PropertyDescriptor KITE_DATASET_URI =
new PropertyDescriptor.Builder()
.name("Target dataset URI")
.description("URI that identifies a Kite dataset where data will be stored")
.addValidator(RECOGNIZED_URI)
.expressionLanguageSupported(true)
.required(true)
.build();
private static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFile content could not be processed")
.build();
private static final List<PropertyDescriptor> PROPERTIES =
ImmutableList.<PropertyDescriptor>builder()
.addAll(AbstractKiteProcessor.getProperties())
.add(KITE_DATASET_URI)
.build();
public static final PropertyDescriptor KITE_DATASET_URI
= new PropertyDescriptor.Builder()
.name("Target dataset URI")
.description("URI that identifies a Kite dataset where data will be stored")
.addValidator(RECOGNIZED_URI)
.expressionLanguageSupported(true)
.required(true)
.build();
private static final Set<Relationship> RELATIONSHIPS =
ImmutableSet.<Relationship>builder()
.add(SUCCESS)
.add(INCOMPATIBLE)
.add(FAILURE)
.build();
private static final List<PropertyDescriptor> PROPERTIES
= ImmutableList.<PropertyDescriptor>builder()
.addAll(AbstractKiteProcessor.getProperties())
.add(KITE_DATASET_URI)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
private static final Set<Relationship> RELATIONSHIPS
= ImmutableSet.<Relationship>builder()
.add(SUCCESS)
.add(INCOMPATIBLE)
.add(FAILURE)
.build();
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public void onTrigger(ProcessContext context, final ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
final View<Record> target = load(context, flowFile);
final Schema schema = target.getDataset().getDescriptor().getSchema();
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
try {
StopWatch timer = new StopWatch(true);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try (DataFileStream<Record> stream = new DataFileStream<>(
in, AvroUtil.newDatumReader(schema, Record.class))) {
IncompatibleSchemaException.check(
SchemaValidationUtil.canRead(stream.getSchema(), schema),
"Incompatible file schema %s, expected %s",
stream.getSchema(), schema);
long written = 0L;
try (DatasetWriter<Record> writer = target.newWriter()) {
for (Record record : stream) {
writer.write(record);
written += 1;
}
} finally {
session.adjustCounter("Stored records", written,
true /* cannot roll back the write */);
}
}
@Override
public void onTrigger(ProcessContext context, final ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
});
timer.stop();
session.getProvenanceReporter().send(flowFile,
target.getUri().toString(),
timer.getDuration(TimeUnit.MILLISECONDS),
true /* cannot roll back the write */ );
final View<Record> target = load(context, flowFile);
final Schema schema = target.getDataset().getDescriptor().getSchema();
session.transfer(flowFile, SUCCESS);
try {
StopWatch timer = new StopWatch(true);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try (DataFileStream<Record> stream = new DataFileStream<>(
in, AvroUtil.newDatumReader(schema, Record.class))) {
IncompatibleSchemaException.check(
SchemaValidationUtil.canRead(stream.getSchema(), schema),
"Incompatible file schema %s, expected %s",
stream.getSchema(), schema);
} catch (ProcessException | DatasetIOException e) {
getLogger().error("Failed to read FlowFile", e);
session.transfer(flowFile, FAILURE);
long written = 0L;
try (DatasetWriter<Record> writer = target.newWriter()) {
for (Record record : stream) {
writer.write(record);
written += 1;
}
} finally {
session.adjustCounter("Stored records", written,
true /* cannot roll back the write */);
}
}
}
});
timer.stop();
} catch (ValidationException e) {
getLogger().error(e.getMessage());
getLogger().debug("Incompatible schema error", e);
session.transfer(flowFile, INCOMPATIBLE);
session.getProvenanceReporter().send(flowFile,
target.getUri().toString(),
timer.getDuration(TimeUnit.MILLISECONDS),
true /* cannot roll back the write */);
session.transfer(flowFile, SUCCESS);
} catch (ProcessException | DatasetIOException e) {
getLogger().error("Failed to read FlowFile", e);
session.transfer(flowFile, FAILURE);
} catch (ValidationException e) {
getLogger().error(e.getMessage());
getLogger().debug("Incompatible schema error", e);
session.transfer(flowFile, INCOMPATIBLE);
}
}
}
private View<Record> load(ProcessContext context, FlowFile file) {
String uri = context.getProperty(KITE_DATASET_URI)
.evaluateAttributeExpressions(file)
.getValue();
return Datasets.load(uri, Record.class);
}
private View<Record> load(ProcessContext context, FlowFile file) {
String uri = context.getProperty(KITE_DATASET_URI)
.evaluateAttributeExpressions(file)
.getValue();
return Datasets.load(uri, Record.class);
}
}

View File

@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import java.io.IOException;
@ -33,94 +32,94 @@ import static org.apache.nifi.processors.kite.TestUtil.streamFor;
public class TestCSVToAvroProcessor {
public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
.requiredLong("id")
.requiredString("color")
.optionalDouble("price")
.endRecord();
public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
.requiredLong("id")
.requiredString("color")
.optionalDouble("price")
.endRecord();
public static final String CSV_CONTENT = "" +
"1,green\n" +
",blue,\n" + // invalid, ID is missing
"2,grey,12.95";
public static final String CSV_CONTENT = ""
+ "1,green\n"
+ ",blue,\n" + // invalid, ID is missing
"2,grey,12.95";
@Test
public void testBasicConversion() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
runner.assertNotValid();
runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
runner.assertValid();
@Test
public void testBasicConversion() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
runner.assertNotValid();
runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
runner.assertValid();
runner.enqueue(streamFor(CSV_CONTENT));
runner.run();
runner.enqueue(streamFor(CSV_CONTENT));
runner.run();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 row", 1, errors);
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 row", 1, errors);
runner.assertAllFlowFilesTransferred("success", 1);
}
runner.assertAllFlowFilesTransferred("success", 1);
}
@Test
public void testAlternateCharset() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
runner.assertValid();
@Test
public void testAlternateCharset() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
runner.assertValid();
runner.enqueue(streamFor(CSV_CONTENT, Charset.forName("UTF-16")));
runner.run();
runner.enqueue(streamFor(CSV_CONTENT, Charset.forName("UTF-16")));
runner.run();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 row", 1, errors);
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 row", 1, errors);
runner.assertAllFlowFilesTransferred("success", 1);
}
runner.assertAllFlowFilesTransferred("success", 1);
}
@Test
public void testCSVProperties() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
ConvertCSVToAvro processor = new ConvertCSVToAvro();
ProcessContext context = runner.getProcessContext();
@Test
public void testCSVProperties() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
ConvertCSVToAvro processor = new ConvertCSVToAvro();
ProcessContext context = runner.getProcessContext();
// check defaults
processor.createCSVProperties(context);
Assert.assertEquals("Charset should match",
"utf8", processor.props.charset);
Assert.assertEquals("Delimiter should match",
",", processor.props.delimiter);
Assert.assertEquals("Quote should match",
"\"", processor.props.quote);
Assert.assertEquals("Escape should match",
"\\", processor.props.escape);
Assert.assertEquals("Header flag should match",
false, processor.props.useHeader);
Assert.assertEquals("Lines to skip should match",
0, processor.props.linesToSkip);
// check defaults
processor.createCSVProperties(context);
Assert.assertEquals("Charset should match",
"utf8", processor.props.charset);
Assert.assertEquals("Delimiter should match",
",", processor.props.delimiter);
Assert.assertEquals("Quote should match",
"\"", processor.props.quote);
Assert.assertEquals("Escape should match",
"\\", processor.props.escape);
Assert.assertEquals("Header flag should match",
false, processor.props.useHeader);
Assert.assertEquals("Lines to skip should match",
0, processor.props.linesToSkip);
runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
runner.setProperty(ConvertCSVToAvro.DELIMITER, "|");
runner.setProperty(ConvertCSVToAvro.QUOTE, "'");
runner.setProperty(ConvertCSVToAvro.ESCAPE, "\u2603");
runner.setProperty(ConvertCSVToAvro.HAS_HEADER, "true");
runner.setProperty(ConvertCSVToAvro.LINES_TO_SKIP, "2");
runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
runner.setProperty(ConvertCSVToAvro.DELIMITER, "|");
runner.setProperty(ConvertCSVToAvro.QUOTE, "'");
runner.setProperty(ConvertCSVToAvro.ESCAPE, "\u2603");
runner.setProperty(ConvertCSVToAvro.HAS_HEADER, "true");
runner.setProperty(ConvertCSVToAvro.LINES_TO_SKIP, "2");
// check updates
processor.createCSVProperties(context);
Assert.assertEquals("Charset should match",
"utf16", processor.props.charset);
Assert.assertEquals("Delimiter should match",
"|", processor.props.delimiter);
Assert.assertEquals("Quote should match",
"'", processor.props.quote);
Assert.assertEquals("Escape should match",
"\u2603", processor.props.escape);
Assert.assertEquals("Header flag should match",
true, processor.props.useHeader);
Assert.assertEquals("Lines to skip should match",
2, processor.props.linesToSkip);
}
// check updates
processor.createCSVProperties(context);
Assert.assertEquals("Charset should match",
"utf16", processor.props.charset);
Assert.assertEquals("Delimiter should match",
"|", processor.props.delimiter);
Assert.assertEquals("Quote should match",
"'", processor.props.quote);
Assert.assertEquals("Escape should match",
"\u2603", processor.props.escape);
Assert.assertEquals("Header flag should match",
true, processor.props.useHeader);
Assert.assertEquals("Lines to skip should match",
2, processor.props.linesToSkip);
}
}

View File

@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import java.io.File;
@ -35,43 +34,43 @@ import org.kitesdk.data.spi.DefaultConfiguration;
public class TestConfigurationProperty {
@Rule
public final TemporaryFolder temp = new TemporaryFolder();
public File confLocation;
@Rule
public final TemporaryFolder temp = new TemporaryFolder();
public File confLocation;
@Before
public void saveConfiguration() throws IOException {
Configuration conf = new Configuration(false);
conf.setBoolean("nifi.config.canary", true);
@Before
public void saveConfiguration() throws IOException {
Configuration conf = new Configuration(false);
conf.setBoolean("nifi.config.canary", true);
confLocation = temp.newFile("nifi-conf.xml");
FileOutputStream out = new FileOutputStream(confLocation);
conf.writeXml(out);
out.close();
}
confLocation = temp.newFile("nifi-conf.xml");
FileOutputStream out = new FileOutputStream(confLocation);
conf.writeXml(out);
out.close();
}
@Test
public void testConfigurationCanary() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(
AbstractKiteProcessor.CONF_XML_FILES, confLocation.toString());
@Test
public void testConfigurationCanary() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(
AbstractKiteProcessor.CONF_XML_FILES, confLocation.toString());
Assert.assertFalse("Should not contain canary value",
DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
Assert.assertFalse("Should not contain canary value",
DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
AbstractKiteProcessor processor = new StoreInKiteDataset();
ProcessContext context = runner.getProcessContext();
processor.setDefaultConfiguration(context);
AbstractKiteProcessor processor = new StoreInKiteDataset();
ProcessContext context = runner.getProcessContext();
processor.setDefaultConfiguration(context);
Assert.assertTrue("Should contain canary value",
DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
}
Assert.assertTrue("Should contain canary value",
DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
}
@Test
public void testFilesMustExist() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(
AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString());
runner.assertNotValid();
}
@Test
public void testFilesMustExist() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(
AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString());
runner.assertNotValid();
}
}

View File

@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import java.io.File;
@ -39,63 +38,63 @@ import static org.apache.nifi.processors.kite.TestUtil.bytesFor;
public class TestGetSchema {
public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
.requiredLong("id")
.requiredString("color")
.optionalDouble("price")
.endRecord();
public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
.requiredLong("id")
.requiredString("color")
.optionalDouble("price")
.endRecord();
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@Test
@Ignore("Does not work on windows")
public void testSchemaFromFileSystem() throws IOException {
File schemaFile = temp.newFile("schema.avsc");
FileOutputStream out = new FileOutputStream(schemaFile);
out.write(bytesFor(SCHEMA.toString(), Charset.forName("utf8")));
out.close();
@Test
@Ignore("Does not work on windows")
public void testSchemaFromFileSystem() throws IOException {
File schemaFile = temp.newFile("schema.avsc");
FileOutputStream out = new FileOutputStream(schemaFile);
out.write(bytesFor(SCHEMA.toString(), Charset.forName("utf8")));
out.close();
Schema schema = AbstractKiteProcessor.getSchema(
schemaFile.toString(), DefaultConfiguration.get());
Schema schema = AbstractKiteProcessor.getSchema(
schemaFile.toString(), DefaultConfiguration.get());
Assert.assertEquals("Schema from file should match", SCHEMA, schema);
}
@Test
@Ignore("Does not work on windows")
public void testSchemaFromKiteURIs() throws IOException {
String location = temp.newFolder("ns", "temp").toString();
if (location.endsWith("/")) {
location = location.substring(0, location.length() - 1);
Assert.assertEquals("Schema from file should match", SCHEMA, schema);
}
String datasetUri = "dataset:" + location;
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(SCHEMA)
.build();
Datasets.create(datasetUri, descriptor);
@Test
@Ignore("Does not work on windows")
public void testSchemaFromKiteURIs() throws IOException {
String location = temp.newFolder("ns", "temp").toString();
if (location.endsWith("/")) {
location = location.substring(0, location.length() - 1);
}
String datasetUri = "dataset:" + location;
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(SCHEMA)
.build();
Schema schema = AbstractKiteProcessor.getSchema(
datasetUri, DefaultConfiguration.get());
Assert.assertEquals("Schema from dataset URI should match", SCHEMA, schema);
Datasets.create(datasetUri, descriptor);
schema = AbstractKiteProcessor.getSchema(
"view:file:" + location + "?color=orange", DefaultConfiguration.get());
Assert.assertEquals("Schema from view URI should match", SCHEMA, schema);
}
Schema schema = AbstractKiteProcessor.getSchema(
datasetUri, DefaultConfiguration.get());
Assert.assertEquals("Schema from dataset URI should match", SCHEMA, schema);
@Test
public void testSchemaFromResourceURI() throws IOException {
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schemaUri("resource:schema/user.avsc") // in kite-data-core test-jar
.build();
Schema expected = descriptor.getSchema();
schema = AbstractKiteProcessor.getSchema(
"view:file:" + location + "?color=orange", DefaultConfiguration.get());
Assert.assertEquals("Schema from view URI should match", SCHEMA, schema);
}
Schema schema = AbstractKiteProcessor.getSchema(
"resource:schema/user.avsc", DefaultConfiguration.get());
@Test
public void testSchemaFromResourceURI() throws IOException {
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schemaUri("resource:schema/user.avsc") // in kite-data-core test-jar
.build();
Schema expected = descriptor.getSchema();
Assert.assertEquals("Schema from resource URI should match",
expected, schema);
}
Schema schema = AbstractKiteProcessor.getSchema(
"resource:schema/user.avsc", DefaultConfiguration.get());
Assert.assertEquals("Schema from resource URI should match",
expected, schema);
}
}

View File

@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import java.io.IOException;
@ -30,32 +29,33 @@ import org.junit.Test;
import static org.apache.nifi.processors.kite.TestUtil.streamFor;
public class TestJSONToAvroProcessor {
public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
.requiredLong("id")
.requiredString("color")
.optionalDouble("price")
.endRecord();
public static final String JSON_CONTENT = "" +
"{\"id\": 1,\"color\": \"green\"}" +
"{\"id\": \"120V\", \"color\": \"blue\"}\n" + // invalid, ID is a string
"{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }";
public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
.requiredLong("id")
.requiredString("color")
.optionalDouble("price")
.endRecord();
@Test
public void testBasicConversion() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
runner.assertNotValid();
runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
runner.assertValid();
public static final String JSON_CONTENT = ""
+ "{\"id\": 1,\"color\": \"green\"}"
+ "{\"id\": \"120V\", \"color\": \"blue\"}\n" + // invalid, ID is a string
"{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }";
runner.enqueue(streamFor(JSON_CONTENT));
runner.run();
@Test
public void testBasicConversion() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
runner.assertNotValid();
runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
runner.assertValid();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 row", 1, errors);
runner.enqueue(streamFor(JSON_CONTENT));
runner.run();
runner.assertAllFlowFilesTransferred("success", 1);
}
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 row", 1, errors);
runner.assertAllFlowFilesTransferred("success", 1);
}
}

View File

@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import com.google.common.collect.Lists;
@ -54,79 +53,79 @@ import static org.apache.nifi.processors.kite.TestUtil.user;
@Ignore("Does not work on windows")
public class TestKiteProcessorsCluster {
public static MiniCluster cluster = null;
public static DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(USER_SCHEMA)
.build();
public static MiniCluster cluster = null;
public static DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(USER_SCHEMA)
.build();
@BeforeClass
public static void startCluster() throws IOException, InterruptedException {
long rand = Math.abs((long) (Math.random() * 1000000));
cluster = new MiniCluster.Builder()
.workDir("/tmp/minicluster-" + rand)
.clean(true)
.addService(HdfsService.class)
.addService(HiveService.class)
.bindIP("127.0.0.1")
.hiveMetastorePort(9083)
.build();
cluster.start();
}
@AfterClass
public static void stopCluster() throws IOException, InterruptedException {
if (cluster != null) {
cluster.stop();
cluster = null;
@BeforeClass
public static void startCluster() throws IOException, InterruptedException {
long rand = Math.abs((long) (Math.random() * 1000000));
cluster = new MiniCluster.Builder()
.workDir("/tmp/minicluster-" + rand)
.clean(true)
.addService(HdfsService.class)
.addService(HiveService.class)
.bindIP("127.0.0.1")
.hiveMetastorePort(9083)
.build();
cluster.start();
}
}
@Test
public void testBasicStoreToHive() throws IOException {
String datasetUri = "dataset:hive:ns/test";
@AfterClass
public static void stopCluster() throws IOException, InterruptedException {
if (cluster != null) {
cluster.stop();
cluster = null;
}
}
Dataset<Record> dataset = Datasets.create(datasetUri, descriptor, Record.class);
@Test
public void testBasicStoreToHive() throws IOException {
String datasetUri = "dataset:hive:ns/test";
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.assertNotValid();
Dataset<Record> dataset = Datasets.create(datasetUri, descriptor, Record.class);
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.assertNotValid();
List<Record> users = Lists.newArrayList(
user("a", "a@example.com"),
user("b", "b@example.com"),
user("c", "c@example.com")
);
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
runner.enqueue(streamFor(users));
runner.run();
List<Record> users = Lists.newArrayList(
user("a", "a@example.com"),
user("b", "b@example.com"),
user("c", "c@example.com")
);
runner.assertAllFlowFilesTransferred("success", 1);
List<Record> stored = Lists.newArrayList(
(Iterable<Record>) dataset.newReader());
Assert.assertEquals("Records should match", users, stored);
runner.enqueue(streamFor(users));
runner.run();
Datasets.delete(datasetUri);
}
runner.assertAllFlowFilesTransferred("success", 1);
List<Record> stored = Lists.newArrayList(
(Iterable<Record>) dataset.newReader());
Assert.assertEquals("Records should match", users, stored);
@Test
public void testSchemaFromDistributedFileSystem() throws IOException {
Schema expected = SchemaBuilder.record("Test").fields()
.requiredLong("id")
.requiredString("color")
.optionalDouble("price")
.endRecord();
Datasets.delete(datasetUri);
}
Path schemaPath = new Path("hdfs:/tmp/schema.avsc");
FileSystem fs = schemaPath.getFileSystem(DefaultConfiguration.get());
OutputStream out = fs.create(schemaPath);
out.write(bytesFor(expected.toString(), Charset.forName("utf8")));
out.close();
@Test
public void testSchemaFromDistributedFileSystem() throws IOException {
Schema expected = SchemaBuilder.record("Test").fields()
.requiredLong("id")
.requiredString("color")
.optionalDouble("price")
.endRecord();
Schema schema = AbstractKiteProcessor.getSchema(
schemaPath.toString(), DefaultConfiguration.get());
Path schemaPath = new Path("hdfs:/tmp/schema.avsc");
FileSystem fs = schemaPath.getFileSystem(DefaultConfiguration.get());
OutputStream out = fs.create(schemaPath);
out.write(bytesFor(expected.toString(), Charset.forName("utf8")));
out.close();
Assert.assertEquals("Schema from file should match", expected, schema);
}
Schema schema = AbstractKiteProcessor.getSchema(
schemaPath.toString(), DefaultConfiguration.get());
Assert.assertEquals("Schema from file should match", expected, schema);
}
}

View File

@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import com.google.common.collect.Lists;
@ -47,125 +46,125 @@ import static org.apache.nifi.processors.kite.TestUtil.user;
@Ignore("Does not work on windows")
public class TestKiteStorageProcessor {
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@Rule
public TemporaryFolder temp = new TemporaryFolder();
private String datasetUri = null;
private Dataset<Record> dataset = null;
private String datasetUri = null;
private Dataset<Record> dataset = null;
@Before
public void createDataset() throws Exception {
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(TestUtil.USER_SCHEMA)
.build();
this.datasetUri = "dataset:file:" + temp.newFolder("ns", "temp").toString();
this.dataset = Datasets.create(datasetUri, descriptor, Record.class);
}
@After
public void deleteDataset() throws Exception {
Datasets.delete(datasetUri);
}
@Test
public void testBasicStore() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.assertNotValid();
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
List<Record> users = Lists.newArrayList(
user("a", "a@example.com"),
user("b", "b@example.com"),
user("c", "c@example.com")
);
runner.enqueue(streamFor(users));
runner.run();
runner.assertAllFlowFilesTransferred("success", 1);
runner.assertQueueEmpty();
Assert.assertEquals("Should store 3 values",
3, (long) runner.getCounterValue("Stored records"));
List<Record> stored = Lists.newArrayList(
(Iterable<Record>) dataset.newReader());
Assert.assertEquals("Records should match", users, stored);
}
@Test
public void testViewURI() {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(
StoreInKiteDataset.KITE_DATASET_URI, "view:hive:ns/table?year=2015");
runner.assertValid();
}
@Test
public void testInvalidURI() {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(
StoreInKiteDataset.KITE_DATASET_URI, "dataset:unknown");
runner.assertNotValid();
}
@Test
public void testUnreadableContent() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
runner.enqueue(invalidStreamFor(user("a", "a@example.com")));
runner.run();
runner.assertAllFlowFilesTransferred("failure", 1);
}
@Test
public void testCorruptedBlocks() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
List<Record> records = Lists.newArrayList();
for (int i = 0; i < 10000; i += 1) {
String num = String.valueOf(i);
records.add(user(num, num + "@example.com"));
@Before
public void createDataset() throws Exception {
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(TestUtil.USER_SCHEMA)
.build();
this.datasetUri = "dataset:file:" + temp.newFolder("ns", "temp").toString();
this.dataset = Datasets.create(datasetUri, descriptor, Record.class);
}
runner.enqueue(invalidStreamFor(records));
runner.run();
@After
public void deleteDataset() throws Exception {
Datasets.delete(datasetUri);
}
long stored = runner.getCounterValue("Stored records");
Assert.assertTrue("Should store some readable values",
0 < stored && stored < 10000);
@Test
public void testBasicStore() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.assertNotValid();
runner.assertAllFlowFilesTransferred("success", 1);
}
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
@Test
public void testIncompatibleSchema() throws IOException {
Schema incompatible = SchemaBuilder.record("User").fields()
.requiredLong("id")
.requiredString("username")
.optionalString("email") // the dataset requires this field
.endRecord();
List<Record> users = Lists.newArrayList(
user("a", "a@example.com"),
user("b", "b@example.com"),
user("c", "c@example.com")
);
// this user has the email field and could be stored, but the schema is
// still incompatible so the entire stream is rejected
Record incompatibleUser = new Record(incompatible);
incompatibleUser.put("id", 1L);
incompatibleUser.put("username", "a");
incompatibleUser.put("email", "a@example.com");
runner.enqueue(streamFor(users));
runner.run();
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
runner.assertAllFlowFilesTransferred("success", 1);
runner.assertQueueEmpty();
Assert.assertEquals("Should store 3 values",
3, (long) runner.getCounterValue("Stored records"));
runner.enqueue(streamFor(incompatibleUser));
runner.run();
List<Record> stored = Lists.newArrayList(
(Iterable<Record>) dataset.newReader());
Assert.assertEquals("Records should match", users, stored);
}
runner.assertAllFlowFilesTransferred("incompatible", 1);
}
@Test
public void testViewURI() {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(
StoreInKiteDataset.KITE_DATASET_URI, "view:hive:ns/table?year=2015");
runner.assertValid();
}
@Test
public void testInvalidURI() {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(
StoreInKiteDataset.KITE_DATASET_URI, "dataset:unknown");
runner.assertNotValid();
}
@Test
public void testUnreadableContent() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
runner.enqueue(invalidStreamFor(user("a", "a@example.com")));
runner.run();
runner.assertAllFlowFilesTransferred("failure", 1);
}
@Test
public void testCorruptedBlocks() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
List<Record> records = Lists.newArrayList();
for (int i = 0; i < 10000; i += 1) {
String num = String.valueOf(i);
records.add(user(num, num + "@example.com"));
}
runner.enqueue(invalidStreamFor(records));
runner.run();
long stored = runner.getCounterValue("Stored records");
Assert.assertTrue("Should store some readable values",
0 < stored && stored < 10000);
runner.assertAllFlowFilesTransferred("success", 1);
}
@Test
public void testIncompatibleSchema() throws IOException {
Schema incompatible = SchemaBuilder.record("User").fields()
.requiredLong("id")
.requiredString("username")
.optionalString("email") // the dataset requires this field
.endRecord();
// this user has the email field and could be stored, but the schema is
// still incompatible so the entire stream is rejected
Record incompatibleUser = new Record(incompatible);
incompatibleUser.put("id", 1L);
incompatibleUser.put("username", "a");
incompatibleUser.put("email", "a@example.com");
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
runner.enqueue(streamFor(incompatibleUser));
runner.run();
runner.assertAllFlowFilesTransferred("incompatible", 1);
}
}

View File

@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import java.io.ByteArrayInputStream;
@ -37,67 +36,68 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
public class TestUtil {
public static final Schema USER_SCHEMA = SchemaBuilder.record("User").fields()
.requiredString("username")
.requiredString("email")
.endRecord();
public static Record user(String username, String email) {
Record user = new Record(USER_SCHEMA);
user.put("username", username);
user.put("email", email);
return user;
}
public static final Schema USER_SCHEMA = SchemaBuilder.record("User").fields()
.requiredString("username")
.requiredString("email")
.endRecord();
public static InputStream streamFor(Record... records) throws IOException {
return streamFor(Arrays.asList(records));
}
public static InputStream streamFor(List<Record> records) throws IOException {
return new ByteArrayInputStream(bytesFor(records));
}
public static InputStream invalidStreamFor(Record... records) throws IOException {
return invalidStreamFor(Arrays.asList(records));
}
public static InputStream invalidStreamFor(List<Record> records) throws IOException {
// purposely truncate the content
byte[] bytes = bytesFor(records);
return new ByteArrayInputStream(bytes, 0, bytes.length / 2);
}
private static byte[] bytesFor(List<Record> records) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(records.get(0).getSchema(), Record.class));
writer.setCodec(CodecFactory.snappyCodec());
writer = writer.create(records.get(0).getSchema(), out);
for (Record record : records) {
writer.append(record);
public static Record user(String username, String email) {
Record user = new Record(USER_SCHEMA);
user.put("username", username);
user.put("email", email);
return user;
}
writer.flush();
public static InputStream streamFor(Record... records) throws IOException {
return streamFor(Arrays.asList(records));
}
return out.toByteArray();
}
public static InputStream streamFor(List<Record> records) throws IOException {
return new ByteArrayInputStream(bytesFor(records));
}
public static InputStream streamFor(String content) throws CharacterCodingException {
return streamFor(content, Charset.forName("utf8"));
}
public static InputStream invalidStreamFor(Record... records) throws IOException {
return invalidStreamFor(Arrays.asList(records));
}
public static InputStream streamFor(String content, Charset charset) throws CharacterCodingException {
return new ByteArrayInputStream(bytesFor(content, charset));
}
public static InputStream invalidStreamFor(List<Record> records) throws IOException {
// purposely truncate the content
byte[] bytes = bytesFor(records);
return new ByteArrayInputStream(bytes, 0, bytes.length / 2);
}
public static byte[] bytesFor(String content, Charset charset) throws CharacterCodingException {
CharBuffer chars = CharBuffer.wrap(content);
CharsetEncoder encoder = charset.newEncoder();
ByteBuffer buffer = encoder.encode(chars);
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return bytes;
}
private static byte[] bytesFor(List<Record> records) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(records.get(0).getSchema(), Record.class));
writer.setCodec(CodecFactory.snappyCodec());
writer = writer.create(records.get(0).getSchema(), out);
for (Record record : records) {
writer.append(record);
}
writer.flush();
return out.toByteArray();
}
public static InputStream streamFor(String content) throws CharacterCodingException {
return streamFor(content, Charset.forName("utf8"));
}
public static InputStream streamFor(String content, Charset charset) throws CharacterCodingException {
return new ByteArrayInputStream(bytesFor(content, charset));
}
public static byte[] bytesFor(String content, Charset charset) throws CharacterCodingException {
CharBuffer chars = CharBuffer.wrap(content);
CharsetEncoder encoder = charset.newEncoder();
ByteBuffer buffer = encoder.encode(chars);
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return bytes;
}
}