mirror of https://github.com/apache/nifi.git
NIFI-5838 - Improve the schema validation method in Kite processors
review Add empty check This closes #3182. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
parent
cc9f89b003
commit
986a2a4842
|
@ -39,6 +39,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
|
|||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processors.hadoop.HadoopValidators;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
import org.kitesdk.data.DatasetNotFoundException;
|
||||
import org.kitesdk.data.Datasets;
|
||||
import org.kitesdk.data.SchemaNotFoundException;
|
||||
|
@ -101,29 +102,30 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
|
|||
return parseSchema(uriOrLiteral);
|
||||
}
|
||||
|
||||
if(uri.getScheme() == null) {
|
||||
throw new SchemaNotFoundException("If the schema is not a JSON string, a scheme must be specified in the URI "
|
||||
+ "(ex: dataset:, view:, resource:, file:, hdfs:, etc).");
|
||||
}
|
||||
|
||||
try {
|
||||
if ("dataset".equals(uri.getScheme()) || "view".equals(uri.getScheme())) {
|
||||
return Datasets.load(uri).getDataset().getDescriptor().getSchema();
|
||||
} else if ("resource".equals(uri.getScheme())) {
|
||||
try (InputStream in = Resources.getResource(uri.getSchemeSpecificPart())
|
||||
.openStream()) {
|
||||
try (InputStream in = Resources.getResource(uri.getSchemeSpecificPart()).openStream()) {
|
||||
return parseSchema(uri, in);
|
||||
}
|
||||
} else {
|
||||
// try to open the file
|
||||
Path schemaPath = new Path(uri);
|
||||
FileSystem fs = schemaPath.getFileSystem(conf);
|
||||
try (InputStream in = fs.open(schemaPath)) {
|
||||
try (FileSystem fs = schemaPath.getFileSystem(conf); InputStream in = fs.open(schemaPath)) {
|
||||
return parseSchema(uri, in);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (DatasetNotFoundException e) {
|
||||
throw new SchemaNotFoundException(
|
||||
"Cannot read schema of missing dataset: " + uri, e);
|
||||
throw new SchemaNotFoundException("Cannot read schema of missing dataset: " + uri, e);
|
||||
} catch (IOException e) {
|
||||
throw new SchemaNotFoundException(
|
||||
"Failed while reading " + uri + ": " + e.getMessage(), e);
|
||||
throw new SchemaNotFoundException("Failed while reading " + uri + ": " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -131,8 +133,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
|
|||
try {
|
||||
return new Schema.Parser().parse(literal);
|
||||
} catch (RuntimeException e) {
|
||||
throw new SchemaNotFoundException(
|
||||
"Failed to parse schema: " + literal, e);
|
||||
throw new SchemaNotFoundException("Failed to parse schema: " + literal, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,6 +151,10 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
|
|||
Configuration conf = getConfiguration(context.getProperty(CONF_XML_FILES).evaluateAttributeExpressions().getValue());
|
||||
String error = null;
|
||||
|
||||
if(StringUtils.isBlank(uri)) {
|
||||
return new ValidationResult.Builder().subject(subject).input(uri).explanation("Schema cannot be null.").valid(false).build();
|
||||
}
|
||||
|
||||
final boolean elPresent = context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(uri);
|
||||
if (!elPresent) {
|
||||
try {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.Charset;
|
||||
|
@ -67,6 +68,23 @@ public class TestCSVToAvroProcessor {
|
|||
public static final String FAILURE_SUMMARY = "" +
|
||||
"Field id: cannot make \"long\" value: '': Field id type:LONG pos:0 not set and has no default value";
|
||||
|
||||
|
||||
/**
|
||||
* Test for a schema that is not a JSON but does not throw exception when trying to parse as an URI
|
||||
*/
|
||||
@Test
|
||||
public void testSchemeValidation() throws IOException {
|
||||
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
|
||||
runner.setProperty(ConvertCSVToAvro.SCHEMA, "column1;column2");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(ConvertCSVToAvro.SCHEMA, "src/test/resources/Shapes_header.csv.avro");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(ConvertCSVToAvro.SCHEMA, "file:" + new File("src/test/resources/Shapes_header.csv.avro").getAbsolutePath());
|
||||
runner.assertValid();
|
||||
runner.setProperty(ConvertCSVToAvro.SCHEMA, "");
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
/**
|
||||
* Basic test for tab separated files, similar to #test
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue