mirror of https://github.com/apache/nifi.git
NIFI-9137 Refactored nifi-parquet-processors using JUnit 5
NIFI-9138 Refactored nifi-pgp-bundle using JUnit 5 NIFI-9139 Refactored nifi-poi-bundle using JUnit 5 NIFI-9140 Refactored nifi-prometheus-bundle using JUnit 5 This closes #5353 Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
parent
d90ef06752
commit
60b08cc569
|
@ -20,7 +20,6 @@ import org.apache.avro.Schema;
|
|||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.SystemUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
|
@ -35,10 +34,11 @@ import org.apache.nifi.util.TestRunner;
|
|||
import org.apache.nifi.util.TestRunners;
|
||||
import org.apache.parquet.avro.AvroParquetWriter;
|
||||
import org.apache.parquet.hadoop.ParquetWriter;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.apache.parquet.hadoop.util.HadoopOutputFile;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.condition.DisabledOnOs;
|
||||
import org.junit.jupiter.api.condition.OS;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
|
@ -50,25 +50,22 @@ import java.nio.file.Paths;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
@DisabledOnOs({ OS.WINDOWS })
|
||||
public class TestParquetReader {
|
||||
|
||||
private Map<PropertyDescriptor,String> readerFactoryProperties;
|
||||
private ConfigurationContext readerFactoryConfigContext;
|
||||
private static final String PARQUET_PATH = "src/test/resources/TestParquetReader.parquet";
|
||||
private static final String SCHEMA_PATH = "src/test/resources/avro/user.avsc";
|
||||
|
||||
private ParquetReader parquetReaderFactory;
|
||||
private ComponentLog componentLog;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpSuite() {
|
||||
Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
readerFactoryProperties = new HashMap<>();
|
||||
readerFactoryConfigContext = new MockConfigurationContext(readerFactoryProperties, null);
|
||||
Map<PropertyDescriptor, String> readerFactoryProperties = new HashMap<>();
|
||||
ConfigurationContext readerFactoryConfigContext = new MockConfigurationContext(readerFactoryProperties, null);
|
||||
|
||||
parquetReaderFactory = new ParquetReader();
|
||||
parquetReaderFactory.abstractStoreConfigContext(readerFactoryConfigContext);
|
||||
|
@ -78,7 +75,7 @@ public class TestParquetReader {
|
|||
|
||||
@Test
|
||||
public void testReadUsers() throws IOException, MalformedRecordException {
|
||||
final Schema schema = getSchema("src/test/resources/avro/user.avsc");
|
||||
final Schema schema = getSchema();
|
||||
final File parquetFile = new File("target/TestParquetReader-testReadUsers-" + System.currentTimeMillis());
|
||||
|
||||
// write some users to the parquet file...
|
||||
|
@ -112,25 +109,25 @@ public class TestParquetReader {
|
|||
@Test
|
||||
public void testReader() throws InitializationException, IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestParquetProcessor.class);
|
||||
final String path = "src/test/resources/TestParquetReader.parquet";
|
||||
|
||||
|
||||
final ParquetReader parquetReader = new ParquetReader();
|
||||
|
||||
runner.addControllerService("reader", parquetReader);
|
||||
runner.enableControllerService(parquetReader);
|
||||
|
||||
runner.enqueue(Paths.get(path));
|
||||
runner.enqueue(Paths.get(PARQUET_PATH));
|
||||
|
||||
runner.setProperty(TestParquetProcessor.READER, "reader");
|
||||
runner.setProperty(TestParquetProcessor.PATH, path);
|
||||
runner.setProperty(TestParquetProcessor.PATH, PARQUET_PATH);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
|
||||
}
|
||||
|
||||
|
||||
private Schema getSchema(final String schemaFilePath) throws IOException {
|
||||
final File schemaFile = new File(schemaFilePath);
|
||||
private Schema getSchema() throws IOException {
|
||||
final File schemaFile = new File(SCHEMA_PATH);
|
||||
final String schemaString = IOUtils.toString(new FileInputStream(schemaFile), StandardCharsets.UTF_8);
|
||||
return new Schema.Parser().parse(schemaString);
|
||||
}
|
||||
|
@ -139,13 +136,9 @@ public class TestParquetReader {
|
|||
final Configuration conf = new Configuration();
|
||||
final Path parquetPath = new Path(parquetFile.getPath());
|
||||
|
||||
final ParquetWriter<GenericRecord> writer =
|
||||
AvroParquetWriter.<GenericRecord>builder(parquetPath)
|
||||
return AvroParquetWriter.<GenericRecord>builder(HadoopOutputFile.fromPath(parquetPath, conf))
|
||||
.withSchema(schema)
|
||||
.withConf(conf)
|
||||
.build();
|
||||
|
||||
return writer;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -42,25 +42,30 @@ import org.apache.parquet.avro.AvroParquetReader;
|
|||
import org.apache.parquet.hadoop.ParquetReader;
|
||||
import org.apache.parquet.hadoop.util.HadoopInputFile;
|
||||
import org.apache.parquet.io.InputFile;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class TestParquetRecordSetWriter {
|
||||
|
||||
private static final String SCHEMA_PATH = "src/test/resources/avro/user.avsc";
|
||||
|
||||
private static final int USERS = 10;
|
||||
|
||||
private ComponentLog componentLog;
|
||||
private ParquetRecordSetWriter recordSetWriterFactory;
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
recordSetWriterFactory = new ParquetRecordSetWriter();
|
||||
componentLog = new MockComponentLog("1234", recordSetWriterFactory);
|
||||
|
@ -68,42 +73,24 @@ public class TestParquetRecordSetWriter {
|
|||
|
||||
@Test
|
||||
public void testWriteUsers() throws IOException, SchemaNotFoundException, InitializationException {
|
||||
initRecordSetWriter("src/test/resources/avro/user.avsc");
|
||||
|
||||
// get the schema from the writer factory
|
||||
initRecordSetWriter();
|
||||
final RecordSchema writeSchema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null);
|
||||
|
||||
// write some records
|
||||
final int numUsers = 10;
|
||||
final File parquetFile = new File("target/testWriterUsers-" + System.currentTimeMillis());
|
||||
|
||||
// write some records...
|
||||
writeUsers(writeSchema, parquetFile, numUsers);
|
||||
|
||||
// read the records back in to verify
|
||||
verifyParquetRecords(parquetFile, numUsers);
|
||||
writeUsers(writeSchema, parquetFile);
|
||||
verifyParquetRecords(parquetFile);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteUsersWhenSchemaFormatNotAvro() throws IOException, SchemaNotFoundException, InitializationException {
|
||||
initRecordSetWriter("src/test/resources/avro/user.avsc");
|
||||
|
||||
// get the schema from the writer factory
|
||||
initRecordSetWriter();
|
||||
final RecordSchema writeSchema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null);
|
||||
final RecordSchema writeSchemaWithOtherFormat = new SimpleRecordSchema(writeSchema.getFields(), null, "OTHER-FORMAT", SchemaIdentifier.EMPTY);
|
||||
|
||||
// write some records
|
||||
final int numUsers = 10;
|
||||
final File parquetFile = new File("target/testWriterUsers-" + System.currentTimeMillis());
|
||||
|
||||
// write some records...
|
||||
writeUsers(writeSchemaWithOtherFormat, parquetFile, numUsers);
|
||||
|
||||
// read the records back in to verify
|
||||
verifyParquetRecords(parquetFile, numUsers);
|
||||
writeUsers(writeSchemaWithOtherFormat, parquetFile);
|
||||
verifyParquetRecords(parquetFile);
|
||||
}
|
||||
|
||||
private void initRecordSetWriter(final String schemaPath) throws IOException, InitializationException {
|
||||
private void initRecordSetWriter() throws IOException, InitializationException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new AbstractProcessor() {
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
|
@ -112,17 +99,17 @@ public class TestParquetRecordSetWriter {
|
|||
|
||||
runner.addControllerService("writer", recordSetWriterFactory);
|
||||
|
||||
final File schemaFile = new File(schemaPath);
|
||||
final File schemaFile = new File(SCHEMA_PATH);
|
||||
final Map<PropertyDescriptor, String> properties = createPropertiesWithSchema(schemaFile);
|
||||
properties.forEach((k, v) -> runner.setProperty(recordSetWriterFactory, k, v));
|
||||
|
||||
runner.enableControllerService(recordSetWriterFactory);
|
||||
}
|
||||
|
||||
private void writeUsers(final RecordSchema writeSchema, final File parquetFile, final int numUsers) throws IOException {
|
||||
private void writeUsers(final RecordSchema writeSchema, final File parquetFile) throws IOException {
|
||||
try(final OutputStream output = new FileOutputStream(parquetFile);
|
||||
final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(componentLog, writeSchema, output, Collections.emptyMap())) {
|
||||
for (int i = 0; i < numUsers; i++) {
|
||||
for (int i = 0; i < USERS; i++) {
|
||||
final Map<String, Object> userFields = new HashMap<>();
|
||||
userFields.put("name", "user" + i);
|
||||
userFields.put("favorite_number", i);
|
||||
|
@ -136,7 +123,7 @@ public class TestParquetRecordSetWriter {
|
|||
}
|
||||
}
|
||||
|
||||
private void verifyParquetRecords(final File parquetFile, final int expectedRecordCount) throws IOException {
|
||||
private void verifyParquetRecords(final File parquetFile) throws IOException {
|
||||
final Configuration conf = new Configuration();
|
||||
final Path path = new Path(parquetFile.getPath());
|
||||
final InputFile inputFile = HadoopInputFile.fromPath(path, conf);
|
||||
|
@ -148,12 +135,12 @@ public class TestParquetRecordSetWriter {
|
|||
while(reader.read() != null) {
|
||||
recordCount++;
|
||||
}
|
||||
assertEquals(expectedRecordCount, recordCount);
|
||||
assertEquals(USERS, recordCount);
|
||||
}
|
||||
}
|
||||
|
||||
private Map<PropertyDescriptor,String> createPropertiesWithSchema(final File schemaFile) throws IOException {
|
||||
return createPropertiesWithSchema(IOUtils.toString(schemaFile.toURI()));
|
||||
return createPropertiesWithSchema(IOUtils.toString(schemaFile.toURI(), StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
private Map<PropertyDescriptor,String> createPropertiesWithSchema(final String schemaText) {
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.avro.Schema;
|
|||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.SystemUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
|
@ -41,11 +40,11 @@ import org.apache.nifi.util.TestRunner;
|
|||
import org.apache.nifi.util.TestRunners;
|
||||
import org.apache.parquet.avro.AvroParquetWriter;
|
||||
import org.apache.parquet.hadoop.ParquetWriter;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.apache.parquet.hadoop.util.HadoopOutputFile;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.condition.DisabledOnOs;
|
||||
import org.junit.jupiter.api.condition.OS;
|
||||
import org.mockito.AdditionalMatchers;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
|
@ -59,15 +58,20 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.isNull;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@DisabledOnOs({ OS.WINDOWS })
|
||||
public class FetchParquetTest {
|
||||
|
||||
static final String DIRECTORY = "target";
|
||||
static final String TEST_CONF_PATH = "src/test/resources/core-site.xml";
|
||||
static final String RECORD_HEADER = "name,favorite_number,favorite_color";
|
||||
private static final int USERS = 10;
|
||||
|
||||
private Schema schema;
|
||||
private Schema schemaWithArray;
|
||||
|
@ -78,13 +82,8 @@ public class FetchParquetTest {
|
|||
private FetchParquet proc;
|
||||
private TestRunner testRunner;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpSuite() {
|
||||
Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException, InitializationException {
|
||||
@BeforeEach
|
||||
public void setup() throws IOException {
|
||||
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/avro/user.avsc"), StandardCharsets.UTF_8);
|
||||
schema = new Schema.Parser().parse(avroSchema);
|
||||
|
||||
|
@ -119,8 +118,7 @@ public class FetchParquetTest {
|
|||
|
||||
final File parquetDir = new File(DIRECTORY);
|
||||
final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet");
|
||||
final int numUsers = 10;
|
||||
writeParquetUsers(parquetFile, numUsers);
|
||||
writeParquetUsers(parquetFile);
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
|
||||
|
@ -131,14 +129,14 @@ public class FetchParquetTest {
|
|||
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
|
||||
|
||||
final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
|
||||
flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, String.valueOf(numUsers));
|
||||
flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, String.valueOf(USERS));
|
||||
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain");
|
||||
|
||||
// the mock record writer will write the header for each record so replace those to get down to just the records
|
||||
String flowFileContent = new String(flowFile.toByteArray(), StandardCharsets.UTF_8);
|
||||
flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
|
||||
|
||||
verifyCSVRecords(numUsers, flowFileContent);
|
||||
verifyCSVRecords(flowFileContent);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -156,7 +154,7 @@ public class FetchParquetTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFetchWhenDoesntExistShouldRouteToFailure() throws InitializationException {
|
||||
public void testFetchWhenDoesNotExistShouldRouteToFailure() throws InitializationException {
|
||||
configure(proc);
|
||||
|
||||
final String filename = "/tmp/does-not-exist-" + System.currentTimeMillis();
|
||||
|
@ -185,8 +183,7 @@ public class FetchParquetTest {
|
|||
|
||||
final File parquetDir = new File(DIRECTORY);
|
||||
final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet");
|
||||
final int numUsers = 10;
|
||||
writeParquetUsers(parquetFile, numUsers);
|
||||
writeParquetUsers(parquetFile);
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
|
||||
|
@ -204,15 +201,14 @@ public class FetchParquetTest {
|
|||
public void testIOExceptionWhileReadingShouldRouteToRetry() throws IOException, InitializationException {
|
||||
final FetchParquet proc = new FetchParquet() {
|
||||
@Override
|
||||
public HDFSRecordReader createHDFSRecordReader(ProcessContext context, FlowFile flowFile, Configuration conf, Path path)
|
||||
throws IOException {
|
||||
public HDFSRecordReader createHDFSRecordReader(ProcessContext context, FlowFile flowFile, Configuration conf, Path path) {
|
||||
return new HDFSRecordReader() {
|
||||
@Override
|
||||
public Record nextRecord() throws IOException {
|
||||
throw new IOException("IOException");
|
||||
}
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
public void close() {
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -222,8 +218,7 @@ public class FetchParquetTest {
|
|||
|
||||
final File parquetDir = new File(DIRECTORY);
|
||||
final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet");
|
||||
final int numUsers = 10;
|
||||
writeParquetUsers(parquetFile, numUsers);
|
||||
writeParquetUsers(parquetFile);
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
|
||||
|
@ -255,8 +250,7 @@ public class FetchParquetTest {
|
|||
|
||||
final File parquetDir = new File(DIRECTORY);
|
||||
final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet");
|
||||
final int numUsers = 10;
|
||||
writeParquetUsers(parquetFile, numUsers);
|
||||
writeParquetUsers(parquetFile);
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
|
||||
|
@ -276,8 +270,7 @@ public class FetchParquetTest {
|
|||
|
||||
final File parquetDir = new File(DIRECTORY);
|
||||
final File parquetFile = new File(parquetDir,"testFetchParquetWithArrayToCSV.parquet");
|
||||
final int numUsers = 10;
|
||||
writeParquetUsersWithArray(parquetFile, numUsers);
|
||||
writeParquetUsersWithArray(parquetFile);
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
|
||||
|
@ -294,8 +287,7 @@ public class FetchParquetTest {
|
|||
|
||||
final File parquetDir = new File(DIRECTORY);
|
||||
final File parquetFile = new File(parquetDir,"testFetchParquetWithNullableArrayToCSV.parquet");
|
||||
final int numUsers = 10;
|
||||
writeParquetUsersWithNullableArray(parquetFile, numUsers);
|
||||
writeParquetUsersWithNullableArray(parquetFile);
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
|
||||
|
@ -312,8 +304,7 @@ public class FetchParquetTest {
|
|||
|
||||
final File parquetDir = new File(DIRECTORY);
|
||||
final File parquetFile = new File(parquetDir,"testFetchParquetWithDecimal.parquet");
|
||||
final int numUsers = 10;
|
||||
writeParquetUsersWithDecimal(parquetFile, numUsers);
|
||||
writeParquetUsersWithDecimal(parquetFile);
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
|
||||
|
@ -324,34 +315,34 @@ public class FetchParquetTest {
|
|||
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
protected void verifyCSVRecords(int numUsers, String csvContent) {
|
||||
protected void verifyCSVRecords(String csvContent) {
|
||||
final String[] splits = csvContent.split("[\\n]");
|
||||
Assert.assertEquals(numUsers, splits.length);
|
||||
assertEquals(USERS, splits.length);
|
||||
|
||||
for (int i=0; i < numUsers; i++) {
|
||||
for (int i=0; i < USERS; i++) {
|
||||
final String line = splits[i];
|
||||
Assert.assertEquals("Bob" + i + "," + i + ",blue" + i, line);
|
||||
assertEquals("Bob" + i + "," + i + ",blue" + i, line);
|
||||
}
|
||||
}
|
||||
|
||||
private AvroParquetWriter.Builder<GenericRecord> createAvroParquetWriter(final File parquetFile, final Schema schema) {
|
||||
private AvroParquetWriter.Builder<GenericRecord> createAvroParquetWriter(final File parquetFile, final Schema schema) throws IOException {
|
||||
final Path parquetPath = new Path(parquetFile.getPath());
|
||||
|
||||
return AvroParquetWriter
|
||||
.<GenericRecord>builder(parquetPath)
|
||||
.<GenericRecord>builder(HadoopOutputFile.fromPath(parquetPath, testConf))
|
||||
.withSchema(schema)
|
||||
.withConf(testConf);
|
||||
}
|
||||
|
||||
private void writeParquetUsers(final File parquetFile, int numUsers) throws IOException {
|
||||
private void writeParquetUsers(final File parquetFile) throws IOException {
|
||||
if (parquetFile.exists()) {
|
||||
Assert.assertTrue(parquetFile.delete());
|
||||
assertTrue(parquetFile.delete());
|
||||
}
|
||||
|
||||
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = createAvroParquetWriter(parquetFile, schema);
|
||||
|
||||
try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
|
||||
for (int i=0; i < numUsers; i++) {
|
||||
for (int i=0; i < USERS; i++) {
|
||||
final GenericRecord user = new GenericData.Record(schema);
|
||||
user.put("name", "Bob" + i);
|
||||
user.put("favorite_number", i);
|
||||
|
@ -362,9 +353,9 @@ public class FetchParquetTest {
|
|||
}
|
||||
}
|
||||
|
||||
private void writeParquetUsersWithArray(final File parquetFile, int numUsers) throws IOException {
|
||||
private void writeParquetUsersWithArray(final File parquetFile) throws IOException {
|
||||
if (parquetFile.exists()) {
|
||||
Assert.assertTrue(parquetFile.delete());
|
||||
assertTrue(parquetFile.delete());
|
||||
}
|
||||
|
||||
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = createAvroParquetWriter(parquetFile, schemaWithArray);
|
||||
|
@ -372,7 +363,7 @@ public class FetchParquetTest {
|
|||
final Schema favoriteColorsSchema = schemaWithArray.getField("favorite_colors").schema();
|
||||
|
||||
try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
|
||||
for (int i=0; i < numUsers; i++) {
|
||||
for (int i=0; i < USERS; i++) {
|
||||
final GenericRecord user = new GenericData.Record(schema);
|
||||
user.put("name", "Bob" + i);
|
||||
user.put("favorite_number", i);
|
||||
|
@ -388,9 +379,9 @@ public class FetchParquetTest {
|
|||
}
|
||||
}
|
||||
|
||||
private void writeParquetUsersWithNullableArray(final File parquetFile, int numUsers) throws IOException {
|
||||
private void writeParquetUsersWithNullableArray(final File parquetFile) throws IOException {
|
||||
if (parquetFile.exists()) {
|
||||
Assert.assertTrue(parquetFile.delete());
|
||||
assertTrue(parquetFile.delete());
|
||||
}
|
||||
|
||||
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = createAvroParquetWriter(parquetFile, schemaWithNullableArray);
|
||||
|
@ -400,7 +391,7 @@ public class FetchParquetTest {
|
|||
final Schema favoriteColorsSchema = schemaWithArray.getField("favorite_colors").schema();
|
||||
|
||||
try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
|
||||
for (int i=0; i < numUsers; i++) {
|
||||
for (int i=0; i < USERS; i++) {
|
||||
final GenericRecord user = new GenericData.Record(schema);
|
||||
user.put("name", "Bob" + i);
|
||||
user.put("favorite_number", i);
|
||||
|
@ -416,9 +407,9 @@ public class FetchParquetTest {
|
|||
}
|
||||
}
|
||||
|
||||
private void writeParquetUsersWithDecimal(final File parquetFile, int numUsers) throws IOException {
|
||||
private void writeParquetUsersWithDecimal(final File parquetFile) throws IOException {
|
||||
if (parquetFile.exists()) {
|
||||
Assert.assertTrue(parquetFile.delete());
|
||||
assertTrue(parquetFile.delete());
|
||||
}
|
||||
|
||||
final BigDecimal initialAmount = new BigDecimal("1234567.0123456789");
|
||||
|
@ -426,12 +417,12 @@ public class FetchParquetTest {
|
|||
|
||||
final List<Schema> amountSchemaUnion = schemaWithDecimal.getField("amount").schema().getTypes();
|
||||
final Schema amountSchema = amountSchemaUnion.stream().filter(s -> s.getType() == Schema.Type.FIXED).findFirst().orElse(null);
|
||||
Assert.assertNotNull(amountSchema);
|
||||
assertNotNull(amountSchema);
|
||||
|
||||
final Conversions.DecimalConversion decimalConversion = new Conversions.DecimalConversion();
|
||||
|
||||
try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
|
||||
for (int i=0; i < numUsers; i++) {
|
||||
for (int i=0; i < USERS; i++) {
|
||||
final BigDecimal incrementedAmount = initialAmount.add(new BigDecimal("1"));
|
||||
final GenericRecord user = new GenericData.Record(schemaWithDecimal);
|
||||
user.put("name", "Bob" + i);
|
||||
|
@ -442,5 +433,4 @@ public class FetchParquetTest {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,6 +16,10 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.parquet;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -31,7 +35,6 @@ import java.util.Map;
|
|||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.SystemUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -61,14 +64,15 @@ import org.apache.nifi.util.TestRunners;
|
|||
import org.apache.parquet.avro.AvroParquetReader;
|
||||
import org.apache.parquet.hadoop.ParquetReader;
|
||||
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.apache.parquet.hadoop.util.HadoopInputFile;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.condition.DisabledOnOs;
|
||||
import org.junit.jupiter.api.condition.OS;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
|
||||
@DisabledOnOs(OS.WINDOWS)
|
||||
public class PutParquetTest {
|
||||
|
||||
static final String DIRECTORY = "target";
|
||||
|
@ -80,14 +84,13 @@ public class PutParquetTest {
|
|||
private MockRecordParser readerFactory;
|
||||
private TestRunner testRunner;
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void setupBeforeClass() {
|
||||
Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
|
||||
BasicConfigurator.configure();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException, InitializationException {
|
||||
@BeforeEach
|
||||
public void setup() throws IOException {
|
||||
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/avro/user.avsc"), StandardCharsets.UTF_8);
|
||||
schema = new Schema.Parser().parse(avroSchema);
|
||||
|
||||
|
@ -142,28 +145,28 @@ public class PutParquetTest {
|
|||
|
||||
// verify we generated a provenance event
|
||||
final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents();
|
||||
Assert.assertEquals(1, provEvents.size());
|
||||
assertEquals(1, provEvents.size());
|
||||
|
||||
// verify it was a SEND event with the correct URI
|
||||
final ProvenanceEventRecord provEvent = provEvents.get(0);
|
||||
Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
|
||||
assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
|
||||
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
|
||||
Assert.assertTrue(provEvent.getTransitUri().endsWith(DIRECTORY + "/" + filename));
|
||||
assertTrue(provEvent.getTransitUri().endsWith(DIRECTORY + "/" + filename));
|
||||
|
||||
// verify the content of the parquet file by reading it back in
|
||||
verifyAvroParquetUsers(avroParquetFile, 100);
|
||||
|
||||
// verify we don't have the temp dot file after success
|
||||
final File tempAvroParquetFile = new File(DIRECTORY + "/." + filename);
|
||||
Assert.assertFalse(tempAvroParquetFile.exists());
|
||||
assertFalse(tempAvroParquetFile.exists());
|
||||
|
||||
// verify we DO have the CRC file after success
|
||||
final File crcAvroParquetFile = new File(DIRECTORY + "/." + filename + ".crc");
|
||||
Assert.assertTrue(crcAvroParquetFile.exists());
|
||||
assertTrue(crcAvroParquetFile.exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteAvroAndRemoveCRCFiles() throws IOException, InitializationException {
|
||||
public void testWriteAvroAndRemoveCRCFiles() throws InitializationException {
|
||||
configure(proc,100);
|
||||
testRunner.setProperty(PutParquet.REMOVE_CRC_FILES, "true");
|
||||
|
||||
|
@ -178,11 +181,11 @@ public class PutParquetTest {
|
|||
|
||||
// verify we don't have the temp dot file after success
|
||||
final File tempAvroParquetFile = new File(DIRECTORY + "/." + filename);
|
||||
Assert.assertFalse(tempAvroParquetFile.exists());
|
||||
assertFalse(tempAvroParquetFile.exists());
|
||||
|
||||
// verify we don't have the CRC file after success because we set remove to true
|
||||
final File crcAvroParquetFile = new File(DIRECTORY + "/." + filename + ".crc");
|
||||
Assert.assertFalse(crcAvroParquetFile.exists());
|
||||
assertFalse(crcAvroParquetFile.exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -229,11 +232,11 @@ public class PutParquetTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDirectoryIOExceptionShouldRouteToRetry() throws InitializationException, IOException {
|
||||
public void testCreateDirectoryIOExceptionShouldRouteToRetry() throws InitializationException {
|
||||
final PutParquet proc = new PutParquet() {
|
||||
@Override
|
||||
protected void createDirectory(FileSystem fileSystem, Path directory, String remoteOwner, String remoteGroup)
|
||||
throws IOException, FailureException {
|
||||
throws IOException {
|
||||
throw new IOException("IOException creating directory");
|
||||
}
|
||||
};
|
||||
|
@ -251,11 +254,11 @@ public class PutParquetTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDirectoryFailureExceptionShouldRouteToFailure() throws InitializationException, IOException {
|
||||
public void testCreateDirectoryFailureExceptionShouldRouteToFailure() throws InitializationException {
|
||||
final PutParquet proc = new PutParquet() {
|
||||
@Override
|
||||
protected void createDirectory(FileSystem fileSystem, Path directory, String remoteOwner, String remoteGroup)
|
||||
throws IOException, FailureException {
|
||||
throws FailureException {
|
||||
throw new FailureException("FailureException creating directory");
|
||||
}
|
||||
};
|
||||
|
@ -281,7 +284,7 @@ public class PutParquetTest {
|
|||
|
||||
// create a file in the directory with the same name
|
||||
final File avroParquetFile = new File(DIRECTORY + "/" + filename);
|
||||
Assert.assertTrue(avroParquetFile.createNewFile());
|
||||
assertTrue(avroParquetFile.createNewFile());
|
||||
|
||||
final Map<String,String> flowFileAttributes = new HashMap<>();
|
||||
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
|
||||
|
@ -301,7 +304,7 @@ public class PutParquetTest {
|
|||
|
||||
// create a file in the directory with the same name
|
||||
final File avroParquetFile = new File(DIRECTORY + "/" + filename);
|
||||
Assert.assertTrue(avroParquetFile.createNewFile());
|
||||
assertTrue(avroParquetFile.createNewFile());
|
||||
|
||||
final Map<String,String> flowFileAttributes = new HashMap<>();
|
||||
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
|
||||
|
@ -320,7 +323,7 @@ public class PutParquetTest {
|
|||
|
||||
// create a file in the directory with the same name
|
||||
final File avroParquetFile = new File(DIRECTORY + "/" + filename);
|
||||
Assert.assertTrue(avroParquetFile.createNewFile());
|
||||
assertTrue(avroParquetFile.createNewFile());
|
||||
|
||||
final Map<String,String> flowFileAttributes = new HashMap<>();
|
||||
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
|
||||
|
@ -331,7 +334,7 @@ public class PutParquetTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testValidSchemaWithELShouldBeSuccessful() throws InitializationException, IOException {
|
||||
public void testValidSchemaWithELShouldBeSuccessful() throws InitializationException {
|
||||
configure(proc, 10);
|
||||
|
||||
final String filename = "testValidSchemaWithELShouldBeSuccessful-" + System.currentTimeMillis();
|
||||
|
@ -372,11 +375,11 @@ public class PutParquetTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testIOExceptionCreatingWriterShouldRouteToRetry() throws InitializationException, IOException, MalformedRecordException {
|
||||
public void testIOExceptionCreatingWriterShouldRouteToRetry() throws InitializationException {
|
||||
final PutParquet proc = new PutParquet() {
|
||||
@Override
|
||||
public HDFSRecordWriter createHDFSRecordWriter(ProcessContext context, FlowFile flowFile, Configuration conf, Path path, RecordSchema schema)
|
||||
throws IOException, SchemaNotFoundException {
|
||||
throws IOException {
|
||||
throw new IOException("IOException");
|
||||
}
|
||||
};
|
||||
|
@ -422,11 +425,11 @@ public class PutParquetTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testIOExceptionRenamingShouldRouteToRetry() throws InitializationException, IOException {
|
||||
public void testIOExceptionRenamingShouldRouteToRetry() throws InitializationException {
|
||||
final PutParquet proc = new PutParquet() {
|
||||
@Override
|
||||
protected void rename(FileSystem fileSystem, Path srcFile, Path destFile)
|
||||
throws IOException, InterruptedException, FailureException {
|
||||
throws IOException {
|
||||
throw new IOException("IOException renaming");
|
||||
}
|
||||
};
|
||||
|
@ -444,15 +447,15 @@ public class PutParquetTest {
|
|||
|
||||
// verify we don't have the temp dot file after success
|
||||
final File tempAvroParquetFile = new File(DIRECTORY + "/." + filename);
|
||||
Assert.assertFalse(tempAvroParquetFile.exists());
|
||||
assertFalse(tempAvroParquetFile.exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailureExceptionRenamingShouldRouteToFailure() throws InitializationException, IOException {
|
||||
public void testFailureExceptionRenamingShouldRouteToFailure() throws InitializationException {
|
||||
final PutParquet proc = new PutParquet() {
|
||||
@Override
|
||||
protected void rename(FileSystem fileSystem, Path srcFile, Path destFile)
|
||||
throws IOException, InterruptedException, FailureException {
|
||||
throws FailureException {
|
||||
throw new FailureException("FailureException renaming");
|
||||
}
|
||||
};
|
||||
|
@ -470,11 +473,11 @@ public class PutParquetTest {
|
|||
|
||||
// verify we don't have the temp dot file after success
|
||||
final File tempAvroParquetFile = new File(DIRECTORY + "/." + filename);
|
||||
Assert.assertFalse(tempAvroParquetFile.exists());
|
||||
assertFalse(tempAvroParquetFile.exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRowGroupSize() throws IOException, InitializationException {
|
||||
public void testRowGroupSize() throws InitializationException {
|
||||
configure(proc, 10);
|
||||
testRunner.setProperty(ParquetUtils.ROW_GROUP_SIZE, "1024 B");
|
||||
|
||||
|
@ -489,7 +492,7 @@ public class PutParquetTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidRowGroupSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
|
||||
public void testInvalidRowGroupSizeFromELShouldRouteToFailure() throws InitializationException {
|
||||
configure(proc, 10);
|
||||
testRunner.setProperty(ParquetUtils.ROW_GROUP_SIZE, "${row.group.size}");
|
||||
|
||||
|
@ -505,7 +508,7 @@ public class PutParquetTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPageSize() throws IOException, InitializationException {
|
||||
public void testPageSize() throws InitializationException {
|
||||
configure(proc, 10);
|
||||
testRunner.setProperty(ParquetUtils.PAGE_SIZE, "1024 B");
|
||||
|
||||
|
@ -520,7 +523,7 @@ public class PutParquetTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
|
||||
public void testInvalidPageSizeFromELShouldRouteToFailure() throws InitializationException {
|
||||
configure(proc, 10);
|
||||
testRunner.setProperty(ParquetUtils.PAGE_SIZE, "${page.size}");
|
||||
|
||||
|
@ -536,7 +539,7 @@ public class PutParquetTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testDictionaryPageSize() throws IOException, InitializationException {
|
||||
public void testDictionaryPageSize() throws InitializationException {
|
||||
configure(proc, 10);
|
||||
testRunner.setProperty(ParquetUtils.DICTIONARY_PAGE_SIZE, "1024 B");
|
||||
|
||||
|
@ -551,7 +554,7 @@ public class PutParquetTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidDictionaryPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
|
||||
public void testInvalidDictionaryPageSizeFromELShouldRouteToFailure() throws InitializationException {
|
||||
configure(proc, 10);
|
||||
testRunner.setProperty(ParquetUtils.DICTIONARY_PAGE_SIZE, "${dictionary.page.size}");
|
||||
|
||||
|
@ -567,7 +570,7 @@ public class PutParquetTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testMaxPaddingPageSize() throws IOException, InitializationException {
|
||||
public void testMaxPaddingPageSize() throws InitializationException {
|
||||
configure(proc, 10);
|
||||
testRunner.setProperty(ParquetUtils.MAX_PADDING_SIZE, "1024 B");
|
||||
|
||||
|
@ -582,7 +585,7 @@ public class PutParquetTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidMaxPaddingSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
|
||||
public void testInvalidMaxPaddingSizeFromELShouldRouteToFailure() throws InitializationException {
|
||||
configure(proc, 10);
|
||||
testRunner.setProperty(ParquetUtils.MAX_PADDING_SIZE, "${max.padding.size}");
|
||||
|
||||
|
@ -621,7 +624,7 @@ public class PutParquetTest {
|
|||
|
||||
private void verifyAvroParquetUsers(final Path avroParquetUsers, final int numExpectedUsers) throws IOException {
|
||||
final ParquetReader.Builder<GenericRecord> readerBuilder = AvroParquetReader
|
||||
.<GenericRecord>builder(avroParquetUsers)
|
||||
.<GenericRecord>builder(HadoopInputFile.fromPath(avroParquetUsers, testConf))
|
||||
.withConf(testConf);
|
||||
|
||||
int currUser = 0;
|
||||
|
@ -629,15 +632,15 @@ public class PutParquetTest {
|
|||
try (final ParquetReader<GenericRecord> reader = readerBuilder.build()) {
|
||||
GenericRecord nextRecord;
|
||||
while((nextRecord = reader.read()) != null) {
|
||||
Assert.assertNotNull(nextRecord);
|
||||
Assert.assertEquals("name" + currUser, nextRecord.get("name").toString());
|
||||
Assert.assertEquals(currUser, nextRecord.get("favorite_number"));
|
||||
Assert.assertEquals("blue" + currUser, nextRecord.get("favorite_color").toString());
|
||||
assertNotNull(nextRecord);
|
||||
assertEquals("name" + currUser, nextRecord.get("name").toString());
|
||||
assertEquals(currUser, nextRecord.get("favorite_number"));
|
||||
assertEquals("blue" + currUser, nextRecord.get("favorite_color").toString());
|
||||
currUser++;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(numExpectedUsers, currUser);
|
||||
assertEquals(numExpectedUsers, currUser);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.nifi.processors.parquet;
|
|||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.Resources;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.file.DataFileReader;
|
||||
import org.apache.avro.file.DataFileWriter;
|
||||
|
@ -29,7 +28,6 @@ import org.apache.avro.generic.GenericDatumWriter;
|
|||
import org.apache.avro.generic.GenericDatumReader;
|
||||
import org.apache.avro.io.DatumReader;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
|
@ -41,43 +39,49 @@ import org.apache.parquet.hadoop.ParquetReader;
|
|||
import org.apache.parquet.hadoop.example.GroupReadSupport;
|
||||
import org.apache.parquet.hadoop.metadata.BlockMetaData;
|
||||
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
|
||||
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
/**
|
||||
* Unit tests for ConvertAvroToParquet processor
|
||||
*/
|
||||
public class TestConvertAvroToParquet {
|
||||
|
||||
private ConvertAvroToParquet processor;
|
||||
private TestRunner runner;
|
||||
|
||||
private List<GenericRecord> records = new ArrayList<>();
|
||||
File tmpAvro = new File("target/test.avro");
|
||||
File tmpParquet = new File("target/test.parquet");
|
||||
private final List<GenericRecord> records = new ArrayList<>();
|
||||
private File tmpAvro;
|
||||
private File tmpParquet;
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
processor = new ConvertAvroToParquet();
|
||||
tmpAvro = File.createTempFile(TestConvertAvroToParquet.class.getSimpleName(), ".avro");
|
||||
tmpAvro.deleteOnExit();
|
||||
|
||||
tmpParquet = File.createTempFile(TestConvertAvroToParquet.class.getSimpleName(), ".parquet");
|
||||
tmpParquet.deleteOnExit();
|
||||
|
||||
ConvertAvroToParquet processor = new ConvertAvroToParquet();
|
||||
runner = TestRunners.newTestRunner(processor);
|
||||
|
||||
Schema schema = new Schema.Parser().parse(Resources.getResource("avro/all-minus-enum.avsc").openStream());
|
||||
Schema schema = new Schema.Parser().parse(getClass().getResourceAsStream("/avro/all-minus-enum.avsc"));
|
||||
|
||||
DataFileWriter<Object> awriter = new DataFileWriter<Object>(new GenericDatumWriter<Object>());
|
||||
DataFileWriter<Object> awriter = new DataFileWriter<>(new GenericDatumWriter<>());
|
||||
GenericData.Record nestedRecord = new GenericRecordBuilder(
|
||||
schema.getField("mynestedrecord").schema())
|
||||
.set("mynestedint", 1).build();
|
||||
|
@ -92,7 +96,7 @@ public class TestConvertAvroToParquet {
|
|||
.set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)))
|
||||
.set("mystring", "hello")
|
||||
.set("mynestedrecord", nestedRecord)
|
||||
.set("myarray", new GenericData.Array<Integer>(Schema.createArray(Schema.create(Schema.Type.INT)), Arrays.asList(1, 2)))
|
||||
.set("myarray", new GenericData.Array<>(Schema.createArray(Schema.create(Schema.Type.INT)), Arrays.asList(1, 2)))
|
||||
.set("mymap", ImmutableMap.of("a", 1, "b", 2))
|
||||
.set("myfixed", new GenericData.Fixed(Schema.createFixed("ignored", null, null, 1), new byte[] { (byte) 65 }))
|
||||
.build();
|
||||
|
@ -102,19 +106,17 @@ public class TestConvertAvroToParquet {
|
|||
awriter.flush();
|
||||
awriter.close();
|
||||
|
||||
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
|
||||
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(tmpAvro, datumReader);
|
||||
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
|
||||
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(tmpAvro, datumReader);
|
||||
GenericRecord record1 = null;
|
||||
while (dataFileReader.hasNext()) {
|
||||
record1 = dataFileReader.next(record1);
|
||||
records.add(record1);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_Processor() throws Exception {
|
||||
|
||||
public void testProcessor() throws Exception {
|
||||
FileInputStream fileInputStream = new FileInputStream(tmpAvro);
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
int readedBytes;
|
||||
|
@ -124,9 +126,7 @@ public class TestConvertAvroToParquet {
|
|||
}
|
||||
out.close();
|
||||
|
||||
Map<String, String> attributes = new HashMap<String, String>() {{
|
||||
put(CoreAttributes.FILENAME.key(), "test.avro");
|
||||
}};
|
||||
Map<String, String> attributes = Collections.singletonMap(CoreAttributes.FILENAME.key(), "test.avro");
|
||||
runner.enqueue(out.toByteArray(), attributes);
|
||||
runner.run();
|
||||
|
||||
|
@ -134,16 +134,12 @@ public class TestConvertAvroToParquet {
|
|||
|
||||
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0);
|
||||
|
||||
// assert meta data
|
||||
assertEquals("1", resultFlowFile.getAttribute(ConvertAvroToParquet.RECORD_COUNT_ATTRIBUTE));
|
||||
assertEquals("test.parquet", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_Meta_Info() throws Exception {
|
||||
|
||||
public void testMetaInfo() throws Exception {
|
||||
FileInputStream fileInputStream = new FileInputStream(tmpAvro);
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
int readedBytes;
|
||||
|
@ -153,14 +149,11 @@ public class TestConvertAvroToParquet {
|
|||
}
|
||||
out.close();
|
||||
|
||||
Map<String, String> attributes = new HashMap<String, String>() {{
|
||||
put(CoreAttributes.FILENAME.key(), "test.avro");
|
||||
}};
|
||||
Map<String, String> attributes = Collections.singletonMap(CoreAttributes.FILENAME.key(), "test.avro");
|
||||
runner.enqueue(out.toByteArray(), attributes);
|
||||
runner.run();
|
||||
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0);
|
||||
|
||||
// Save the flowfile
|
||||
byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
|
||||
FileOutputStream fos = new FileOutputStream(tmpParquet);
|
||||
fos.write(resultContents);
|
||||
|
@ -168,11 +161,9 @@ public class TestConvertAvroToParquet {
|
|||
fos.close();
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
FileSystem fs = FileSystem.getLocal(conf);
|
||||
ParquetMetadata metaData;
|
||||
metaData = ParquetFileReader.readFooter(conf, new Path(tmpParquet.getAbsolutePath()), NO_FILTER);
|
||||
|
||||
// #number of records
|
||||
long nParquetRecords = 0;
|
||||
for(BlockMetaData meta : metaData.getBlocks()){
|
||||
nParquetRecords += meta.getRowCount();
|
||||
|
@ -183,9 +174,7 @@ public class TestConvertAvroToParquet {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void test_Data() throws Exception {
|
||||
|
||||
|
||||
public void testData() throws Exception {
|
||||
FileInputStream fileInputStream = new FileInputStream(tmpAvro);
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
int readedBytes;
|
||||
|
@ -195,14 +184,11 @@ public class TestConvertAvroToParquet {
|
|||
}
|
||||
out.close();
|
||||
|
||||
Map<String, String> attributes = new HashMap<String, String>() {{
|
||||
put(CoreAttributes.FILENAME.key(), "test.avro");
|
||||
}};
|
||||
Map<String, String> attributes = Collections.singletonMap(CoreAttributes.FILENAME.key(), "test.avro");
|
||||
runner.enqueue(out.toByteArray(), attributes);
|
||||
runner.run();
|
||||
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0);
|
||||
|
||||
// Save the flowfile
|
||||
byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
|
||||
FileOutputStream fos = new FileOutputStream(tmpParquet);
|
||||
fos.write(resultContents);
|
||||
|
@ -210,17 +196,15 @@ public class TestConvertAvroToParquet {
|
|||
fos.close();
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
FileSystem fs = FileSystem.getLocal(conf);
|
||||
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(tmpParquet.getAbsolutePath()))
|
||||
.withConf(conf)
|
||||
.build();
|
||||
|
||||
List<Group> parquetRecords = new ArrayList<Group>();
|
||||
List<Group> parquetRecords = new ArrayList<>();
|
||||
|
||||
Group current;
|
||||
current = reader.read();
|
||||
while (current != null) {
|
||||
assertTrue(current instanceof Group);
|
||||
parquetRecords.add(current);
|
||||
current = reader.read();
|
||||
}
|
||||
|
@ -230,7 +214,7 @@ public class TestConvertAvroToParquet {
|
|||
// Primitive
|
||||
assertEquals(firstRecord.getInteger("myint", 0), 1);
|
||||
assertEquals(firstRecord.getLong("mylong", 0), 2);
|
||||
assertEquals(firstRecord.getBoolean("myboolean", 0), true);
|
||||
assertTrue(firstRecord.getBoolean("myboolean", 0));
|
||||
assertEquals(firstRecord.getFloat("myfloat", 0), 3.1, 0.0001);
|
||||
assertEquals(firstRecord.getDouble("mydouble", 0), 4.1, 0.001);
|
||||
assertEquals(firstRecord.getString("mybytes", 0), "hello");
|
||||
|
@ -249,15 +233,5 @@ public class TestConvertAvroToParquet {
|
|||
|
||||
// Fixed
|
||||
assertEquals(firstRecord.getString("myfixed",0), "A");
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup(){
|
||||
tmpAvro.delete();
|
||||
tmpParquet.delete();
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -46,12 +46,12 @@ import org.bouncycastle.openpgp.operator.bc.BcPublicKeyKeyEncryptionMethodGenera
|
|||
import org.bouncycastle.openpgp.operator.jcajce.JcaPGPContentSignerBuilder;
|
||||
import org.bouncycastle.openpgp.operator.jcajce.JcePBEKeyEncryptionMethodGenerator;
|
||||
import org.bouncycastle.openpgp.operator.jcajce.JcePBESecretKeyDecryptorBuilder;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -66,12 +66,12 @@ import java.util.UUID;
|
|||
|
||||
import static org.hamcrest.CoreMatchers.hasItem;
|
||||
import static org.hamcrest.CoreMatchers.isA;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class DecryptContentPGPTest {
|
||||
private static final int ENCRYPTION_ALGORITHM = SymmetricKeyAlgorithmTags.AES_256;
|
||||
|
||||
|
@ -112,7 +112,7 @@ public class DecryptContentPGPTest {
|
|||
@Mock
|
||||
private PGPPrivateKeyService privateKeyService;
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void setKeys() throws Exception {
|
||||
rsaSecretKey = PGPSecretKeyGenerator.generateRsaSecretKey(PASSPHRASE.toCharArray());
|
||||
|
||||
|
@ -128,7 +128,7 @@ public class DecryptContentPGPTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setRunner() {
|
||||
runner = TestRunners.newTestRunner(new DecryptContentPGP());
|
||||
}
|
||||
|
|
|
@ -49,12 +49,12 @@ import org.bouncycastle.openpgp.operator.bc.BcPGPDigestCalculatorProvider;
|
|||
import org.bouncycastle.openpgp.operator.bc.BcPublicKeyDataDecryptorFactory;
|
||||
import org.bouncycastle.openpgp.operator.jcajce.JcePBESecretKeyDecryptorBuilder;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -63,12 +63,12 @@ import java.util.Optional;
|
|||
import java.util.UUID;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class EncryptContentPGPTest {
|
||||
private static final String PASSPHRASE = UUID.randomUUID().toString();
|
||||
|
||||
|
@ -91,7 +91,7 @@ public class EncryptContentPGPTest {
|
|||
@Mock
|
||||
private PGPPublicKeyService publicKeyService;
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void setKeys() throws Exception {
|
||||
rsaSecretKey = PGPSecretKeyGenerator.generateRsaSecretKey(PASSPHRASE.toCharArray());
|
||||
final PGPSecretKeyRing dsaElGamalSecretKeyRing = PGPSecretKeyGenerator.generateDsaElGamalSecretKeyRing(PASSPHRASE.toCharArray());
|
||||
|
@ -107,7 +107,7 @@ public class EncryptContentPGPTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setRunner() {
|
||||
runner = TestRunners.newTestRunner(new EncryptContentPGP());
|
||||
}
|
||||
|
@ -248,7 +248,7 @@ public class EncryptContentPGPTest {
|
|||
final Optional<PGPEncryptedData> encryptedData = StreamSupport.stream(encryptedDataList.spliterator(), false)
|
||||
.filter(pgpEncryptedData -> pgpEncryptedData instanceof PGPPublicKeyEncryptedData)
|
||||
.findFirst();
|
||||
assertTrue("Public Key Encrypted Data not found", encryptedData.isPresent());
|
||||
assertTrue(encryptedData.isPresent(), "Public Key Encrypted Data not found");
|
||||
|
||||
final PGPPublicKeyEncryptedData publicKeyEncryptedData = (PGPPublicKeyEncryptedData) encryptedData.get();
|
||||
final String decryptedData = getDecryptedData(publicKeyEncryptedData, privateKey);
|
||||
|
@ -264,7 +264,7 @@ public class EncryptContentPGPTest {
|
|||
final Optional<PGPEncryptedData> encryptedData = StreamSupport.stream(encryptedDataList.spliterator(), false)
|
||||
.filter(pgpEncryptedData -> pgpEncryptedData instanceof PGPPBEEncryptedData)
|
||||
.findFirst();
|
||||
assertTrue("Password Based Encrypted Data not found", encryptedData.isPresent());
|
||||
assertTrue(encryptedData.isPresent(), "Password Based Encrypted Data not found");
|
||||
|
||||
final PGPPBEEncryptedData passwordBasedEncryptedData = (PGPPBEEncryptedData) encryptedData.get();
|
||||
final String decryptedData = getDecryptedData(passwordBasedEncryptedData, passphrase);
|
||||
|
|
|
@ -27,17 +27,17 @@ import org.bouncycastle.openpgp.PGPPrivateKey;
|
|||
import org.bouncycastle.openpgp.PGPSecretKey;
|
||||
|
||||
import org.bouncycastle.openpgp.PGPSecretKeyRing;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class StandardPGPPrivateKeyServiceTest {
|
||||
|
@ -61,7 +61,7 @@ public class StandardPGPPrivateKeyServiceTest {
|
|||
|
||||
private TestRunner runner;
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void setKey() throws Exception {
|
||||
rsaSecretKey = PGPSecretKeyGenerator.generateRsaSecretKey(KEY_ENCRYPTION_PASSWORD.toCharArray());
|
||||
|
||||
|
@ -75,7 +75,7 @@ public class StandardPGPPrivateKeyServiceTest {
|
|||
elGamalKeyringAscii = PGPFileUtils.getArmored(elGamalKeyring.getEncoded());
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setService() {
|
||||
service = new StandardPGPPrivateKeyService();
|
||||
final Processor processor = mock(Processor.class);
|
||||
|
|
|
@ -25,18 +25,17 @@ import org.apache.nifi.util.TestRunners;
|
|||
|
||||
import org.bouncycastle.openpgp.PGPPublicKey;
|
||||
import org.bouncycastle.openpgp.PGPSecretKey;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class StandardPGPPublicKeyServiceTest {
|
||||
|
@ -56,7 +55,7 @@ public class StandardPGPPublicKeyServiceTest {
|
|||
|
||||
private TestRunner runner;
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void setKey() throws Exception {
|
||||
rsaSecretKey = PGPSecretKeyGenerator.generateRsaSecretKey(KEY_ENCRYPTION_PASSWORD.toCharArray());
|
||||
final PGPPublicKey publicKey = rsaSecretKey.getPublicKey();
|
||||
|
@ -67,7 +66,7 @@ public class StandardPGPPublicKeyServiceTest {
|
|||
keyringFileBinary = PGPFileUtils.getKeyFile(publicKeyEncoded);
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setService() {
|
||||
service = new StandardPGPPublicKeyService();
|
||||
final Processor processor = mock(Processor.class);
|
||||
|
|
|
@ -16,10 +16,9 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.poi;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.text.DecimalFormatSymbols;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
@ -27,40 +26,38 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.lang3.SystemUtils;
|
||||
import org.apache.nifi.csv.CSVUtils;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.util.LogMessage;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.Assume;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class ConvertExcelToCSVProcessorTest {
|
||||
|
||||
private TestRunner testRunner;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() {
|
||||
Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void init() {
|
||||
testRunner = TestRunners.newTestRunner(ConvertExcelToCSVProcessor.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleSheetsGeneratesMultipleFlowFiles() throws Exception {
|
||||
public void testMultipleSheetsGeneratesMultipleFlowFiles() throws IOException {
|
||||
|
||||
Map<String, String> attributes = new HashMap<String, String>();
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("test", "attribute");
|
||||
|
||||
testRunner.enqueue(new File("src/test/resources/TwoSheets.xlsx").toPath(), attributes);
|
||||
final URL resourceUrl = getClass().getResource("/TwoSheets.xlsx");
|
||||
assertNotNull(resourceUrl);
|
||||
|
||||
testRunner.enqueue(new File(resourceUrl.getPath()).toPath(), attributes);
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.SUCCESS, 2);
|
||||
|
@ -68,30 +65,30 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ffSheetA = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long rowsSheetA = new Long(ffSheetA.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(rowsSheetA == 4l);
|
||||
long rowsSheetA = Long.parseLong(ffSheetA.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(4, rowsSheetA);
|
||||
assertTrue(ffSheetA.getAttribute(ConvertExcelToCSVProcessor.SHEET_NAME).equalsIgnoreCase("TestSheetA"));
|
||||
assertTrue(ffSheetA.getAttribute(ConvertExcelToCSVProcessor.SOURCE_FILE_NAME).equals("TwoSheets.xlsx"));
|
||||
assertEquals("TwoSheets.xlsx", ffSheetA.getAttribute(ConvertExcelToCSVProcessor.SOURCE_FILE_NAME));
|
||||
|
||||
//Since TestRunner.run() will create a random filename even if the attribute is set in enqueue manually we just check that "_{SHEETNAME}.csv is present
|
||||
assertTrue(ffSheetA.getAttribute(CoreAttributes.FILENAME.key()).endsWith("_TestSheetA.csv"));
|
||||
assertTrue(ffSheetA.getAttribute("test").equals("attribute"));
|
||||
assertEquals("attribute", ffSheetA.getAttribute("test"));
|
||||
|
||||
MockFlowFile ffSheetB = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(1);
|
||||
Long rowsSheetB = new Long(ffSheetB.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(rowsSheetB == 3l);
|
||||
long rowsSheetB = Long.parseLong(ffSheetB.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(3, rowsSheetB);
|
||||
assertTrue(ffSheetB.getAttribute(ConvertExcelToCSVProcessor.SHEET_NAME).equalsIgnoreCase("TestSheetB"));
|
||||
assertTrue(ffSheetB.getAttribute(ConvertExcelToCSVProcessor.SOURCE_FILE_NAME).equals("TwoSheets.xlsx"));
|
||||
assertEquals("TwoSheets.xlsx", ffSheetB.getAttribute(ConvertExcelToCSVProcessor.SOURCE_FILE_NAME));
|
||||
|
||||
//Since TestRunner.run() will create a random filename even if the attribute is set in enqueue manually we just check that "_{SHEETNAME}.csv is present
|
||||
assertTrue(ffSheetB.getAttribute(CoreAttributes.FILENAME.key()).endsWith("_TestSheetB.csv"));
|
||||
assertTrue(ffSheetB.getAttribute("test").equals("attribute"));
|
||||
assertEquals("attribute", ffSheetB.getAttribute("test"));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDataFormatting() throws Exception {
|
||||
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath());
|
||||
public void testDataFormatting() {
|
||||
testRunner.enqueue(getClass().getResourceAsStream("/dataformatting.xlsx"));
|
||||
|
||||
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "false");
|
||||
|
||||
|
@ -102,8 +99,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(rowsSheet == 9);
|
||||
long rowsSheet = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(9, rowsSheet);
|
||||
|
||||
ff.assertContentEquals("Numbers,Timestamps,Money\n" +
|
||||
"1234.4559999999999,42736.5,123.45\n" +
|
||||
|
@ -117,8 +114,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testQuoting() throws Exception {
|
||||
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath());
|
||||
public void testQuoting() {
|
||||
testRunner.enqueue(getClass().getResourceAsStream("/dataformatting.xlsx"));
|
||||
|
||||
testRunner.setProperty(CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL);
|
||||
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "true");
|
||||
|
@ -130,8 +127,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(rowsSheet == 9);
|
||||
long rowsSheet = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(9, rowsSheet);
|
||||
|
||||
LocalDateTime localDt = LocalDateTime.of(2017, 1, 1, 12, 0, 0);
|
||||
DecimalFormatSymbols decimalFormatSymbols = DecimalFormatSymbols.getInstance();
|
||||
|
@ -163,8 +160,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSkipRows() throws Exception {
|
||||
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath());
|
||||
public void testSkipRows() {
|
||||
testRunner.enqueue(getClass().getResourceAsStream("/dataformatting.xlsx"));
|
||||
|
||||
testRunner.setProperty(ConvertExcelToCSVProcessor.ROWS_TO_SKIP, "2");
|
||||
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "true");
|
||||
|
@ -176,8 +173,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals("Row count does match expected value.", "7", rowsSheet.toString());
|
||||
long rowsSheet = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(7, rowsSheet, "Row count does match expected value.");
|
||||
|
||||
LocalDateTime localDt = LocalDateTime.of(2017, 1, 1, 12, 0, 0);
|
||||
DecimalFormatSymbols decimalFormatSymbols = DecimalFormatSymbols.getInstance();
|
||||
|
@ -193,10 +190,10 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSkipRowsWithEL() throws Exception {
|
||||
Map<String, String> attributes = new HashMap<String, String>();
|
||||
public void testSkipRowsWithEL() {
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("rowsToSkip", "2");
|
||||
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath(), attributes);
|
||||
testRunner.enqueue(getClass().getResourceAsStream("/dataformatting.xlsx"), attributes);
|
||||
|
||||
testRunner.setProperty(ConvertExcelToCSVProcessor.ROWS_TO_SKIP, "${rowsToSkip}");
|
||||
testRunner.setProperty(ConvertExcelToCSVProcessor.FORMAT_VALUES, "true");
|
||||
|
@ -208,8 +205,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals("Row count does match expected value.", "7", rowsSheet.toString());
|
||||
long rowsSheet = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(7, rowsSheet, "Row count does match expected value.");
|
||||
|
||||
LocalDateTime localDt = LocalDateTime.of(2017, 1, 1, 12, 0, 0);
|
||||
DecimalFormatSymbols decimalFormatSymbols = DecimalFormatSymbols.getInstance();
|
||||
|
@ -238,8 +235,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(rowsSheet == 9);
|
||||
long rowsSheet = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(9, rowsSheet);
|
||||
|
||||
DecimalFormatSymbols decimalFormatSymbols = DecimalFormatSymbols.getInstance();
|
||||
String decimalSeparator = decimalFormatSymbols.getDecimalSeparator() == ',' ? "\\," : String.valueOf(decimalFormatSymbols.getDecimalSeparator());
|
||||
|
@ -257,7 +254,7 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
|
||||
@Test
|
||||
public void testSkipColumnsWithEL() throws Exception {
|
||||
Map<String, String> attributes = new HashMap<String, String>();
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("columnsToSkip", "2");
|
||||
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath(), attributes);
|
||||
|
||||
|
@ -271,8 +268,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(rowsSheet == 9);
|
||||
long rowsSheet = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(9, rowsSheet);
|
||||
|
||||
DecimalFormatSymbols decimalFormatSymbols = DecimalFormatSymbols.getInstance();
|
||||
String decimalSeparator = decimalFormatSymbols.getDecimalSeparator() == ',' ? "\\," : String.valueOf(decimalFormatSymbols.getDecimalSeparator());
|
||||
|
@ -303,8 +300,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(rowsSheet == 9);
|
||||
long rowsSheet = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(9, rowsSheet);
|
||||
|
||||
LocalDateTime localDt = LocalDateTime.of(2017, 1, 1, 12, 0, 0);
|
||||
DecimalFormatSymbols decimalFormatSymbols = DecimalFormatSymbols.getInstance();
|
||||
|
@ -326,7 +323,7 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
|
||||
@Test
|
||||
public void testCustomValueSeparatorWithEL() throws Exception {
|
||||
Map<String, String> attributes = new HashMap<String, String>();
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("csv.delimiter", "|");
|
||||
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath(), attributes);
|
||||
|
||||
|
@ -340,8 +337,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(rowsSheet == 9);
|
||||
long rowsSheet = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(9, rowsSheet);
|
||||
|
||||
LocalDateTime localDt = LocalDateTime.of(2017, 1, 1, 12, 0, 0);
|
||||
DecimalFormatSymbols decimalFormatSymbols = DecimalFormatSymbols.getInstance();
|
||||
|
@ -363,7 +360,7 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
|
||||
@Test
|
||||
public void testCustomQuoteCharWithEL() throws Exception {
|
||||
Map<String, String> attributes = new HashMap<String, String>();
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("csv.quote", "'");
|
||||
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath(), attributes);
|
||||
|
||||
|
@ -378,8 +375,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(rowsSheet == 9);
|
||||
long rowsSheet = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(9, rowsSheet);
|
||||
|
||||
LocalDateTime localDt = LocalDateTime.of(2017, 1, 1, 12, 0, 0);
|
||||
String quoteCharValue = testRunner.getProcessContext().getProperty(CSVUtils.QUOTE_CHAR).evaluateAttributeExpressions(ff).getValue();
|
||||
|
@ -387,31 +384,31 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
char decimalSeparator = decimalFormatSymbols.getDecimalSeparator();
|
||||
char groupingSeparator = decimalFormatSymbols.getGroupingSeparator();
|
||||
ff.assertContentEquals(("'Numbers','Timestamps','Money'\n" +
|
||||
addQuotingIfNeeded(String.format("1234%1$s456", decimalSeparator), ",", quoteCharValue, true) + "," + quoteCharValue +
|
||||
addQuotingIfNeeded(String.format("1234%1$s456", decimalSeparator), quoteCharValue, true) + "," + quoteCharValue +
|
||||
DateTimeFormatter.ofPattern("d/M/yy").format(localDt) + quoteCharValue + "," +
|
||||
addQuotingIfNeeded(String.format("$ 123%1$s45", decimalSeparator), ",", quoteCharValue, true) + "\n" +
|
||||
addQuotingIfNeeded(String.format("1234%1$s46", decimalSeparator), ",", quoteCharValue, true) + "," + quoteCharValue +
|
||||
addQuotingIfNeeded(String.format("$ 123%1$s45", decimalSeparator), quoteCharValue, true) + "\n" +
|
||||
addQuotingIfNeeded(String.format("1234%1$s46", decimalSeparator), quoteCharValue, true) + "," + quoteCharValue +
|
||||
DateTimeFormatter.ofPattern("hh:mm:ss a").format(localDt) + quoteCharValue + "," +
|
||||
addQuotingIfNeeded(String.format("£ 123%1$s45", decimalSeparator), ",", quoteCharValue, true) + "\n" +
|
||||
addQuotingIfNeeded(String.format("1234%1$s5", decimalSeparator), ",", quoteCharValue, true) + "," + quoteCharValue +
|
||||
addQuotingIfNeeded(String.format("£ 123%1$s45", decimalSeparator), quoteCharValue, true) + "\n" +
|
||||
addQuotingIfNeeded(String.format("1234%1$s5", decimalSeparator), quoteCharValue, true) + "," + quoteCharValue +
|
||||
DateTimeFormatter.ofPattern("EEEE, MMMM dd, yyyy").format(localDt) + quoteCharValue + "," +
|
||||
addQuotingIfNeeded(String.format("¥ 123%1$s45", decimalSeparator), ",", quoteCharValue, true) + "\n" +
|
||||
addQuotingIfNeeded(String.format("1%2$s234%1$s46", decimalSeparator, groupingSeparator), ",", quoteCharValue, true) + "," + quoteCharValue +
|
||||
addQuotingIfNeeded(String.format("¥ 123%1$s45", decimalSeparator), quoteCharValue, true) + "\n" +
|
||||
addQuotingIfNeeded(String.format("1%2$s234%1$s46", decimalSeparator, groupingSeparator), quoteCharValue, true) + "," + quoteCharValue +
|
||||
DateTimeFormatter.ofPattern("d/M/yy HH:mm").format(localDt) + quoteCharValue + "," +
|
||||
addQuotingIfNeeded(String.format("$ 1%2$s023%1$s45", decimalSeparator, groupingSeparator), ",", quoteCharValue, true) + "\n" +
|
||||
addQuotingIfNeeded(String.format("1%2$s234%1$s4560", decimalSeparator, groupingSeparator), ",", quoteCharValue, true) + "," + quoteCharValue +
|
||||
addQuotingIfNeeded(String.format("$ 1%2$s023%1$s45", decimalSeparator, groupingSeparator), quoteCharValue, true) + "\n" +
|
||||
addQuotingIfNeeded(String.format("1%2$s234%1$s4560", decimalSeparator, groupingSeparator), quoteCharValue, true) + "," + quoteCharValue +
|
||||
DateTimeFormatter.ofPattern("hh:mm a").format(localDt) + quoteCharValue + "," +
|
||||
addQuotingIfNeeded(String.format("£ 1%2$s023%1$s45", decimalSeparator, groupingSeparator), ",", quoteCharValue, true) + "\n" +
|
||||
addQuotingIfNeeded(String.format("9%1$s88E+08", decimalSeparator), ",", quoteCharValue, true) + "," + quoteCharValue +
|
||||
addQuotingIfNeeded(String.format("£ 1%2$s023%1$s45", decimalSeparator, groupingSeparator), quoteCharValue, true) + "\n" +
|
||||
addQuotingIfNeeded(String.format("9%1$s88E+08", decimalSeparator), quoteCharValue, true) + "," + quoteCharValue +
|
||||
DateTimeFormatter.ofPattern("yyyy/MM/dd/ HH:mm").format(localDt) + quoteCharValue + "," +
|
||||
addQuotingIfNeeded(String.format("¥ 1%2$s023%1$s45", decimalSeparator, groupingSeparator), ",", quoteCharValue, true) + "\n" +
|
||||
addQuotingIfNeeded(String.format("9%1$s877E+08", decimalSeparator), ",", quoteCharValue, true) + ",,\n" +
|
||||
addQuotingIfNeeded(String.format("9%1$s8765E+08", decimalSeparator), ",", quoteCharValue, true) + ",,\n").replace("E+", getExponentSeparator(decimalFormatSymbols)));
|
||||
addQuotingIfNeeded(String.format("¥ 1%2$s023%1$s45", decimalSeparator, groupingSeparator), quoteCharValue, true) + "\n" +
|
||||
addQuotingIfNeeded(String.format("9%1$s877E+08", decimalSeparator), quoteCharValue, true) + ",,\n" +
|
||||
addQuotingIfNeeded(String.format("9%1$s8765E+08", decimalSeparator), quoteCharValue, true) + ",,\n").replace("E+", getExponentSeparator(decimalFormatSymbols)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomEscapeCharWithEL() throws Exception {
|
||||
Map<String, String> attributes = new HashMap<String, String>();
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("csv.escape", "^");
|
||||
testRunner.enqueue(new File("src/test/resources/dataformatting.xlsx").toPath(), attributes);
|
||||
|
||||
|
@ -425,8 +422,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long rowsSheet = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(rowsSheet == 9);
|
||||
long rowsSheet = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(9, rowsSheet);
|
||||
|
||||
LocalDateTime localDt = LocalDateTime.of(2017, 1, 1, 12, 0, 0);
|
||||
DecimalFormatSymbols decimalFormatSymbols = DecimalFormatSymbols.getInstance();
|
||||
|
@ -463,8 +460,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long l = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(l == 10l);
|
||||
long l = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(10, l);
|
||||
|
||||
testRunner.clearProvenanceEvents();
|
||||
testRunner.clearTransferState();
|
||||
|
@ -477,12 +474,12 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
l = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(l == 4l);
|
||||
l = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(4, l);
|
||||
|
||||
ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(1);
|
||||
l = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(l == 3l);
|
||||
l = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(3, l);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -503,8 +500,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long l = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(l == 10l);
|
||||
long l = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(10, l);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -544,8 +541,8 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 0);
|
||||
|
||||
MockFlowFile ff = testRunner.getFlowFilesForRelationship(ConvertExcelToCSVProcessor.SUCCESS).get(0);
|
||||
Long l = new Long(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertTrue(l == 8l);
|
||||
long l = Long.parseLong(ff.getAttribute(ConvertExcelToCSVProcessor.ROW_NUM));
|
||||
assertEquals(8, l);
|
||||
|
||||
ff.assertContentEquals(new File("src/test/resources/with-blank-cells.csv"));
|
||||
}
|
||||
|
@ -564,24 +561,16 @@ public class ConvertExcelToCSVProcessorTest {
|
|||
testRunner.assertTransferCount(ConvertExcelToCSVProcessor.FAILURE, 1);
|
||||
|
||||
List<LogMessage> errorMessages = testRunner.getLogger().getErrorMessages();
|
||||
Assert.assertEquals(1, errorMessages.size());
|
||||
assertEquals(1, errorMessages.size());
|
||||
String messageText = errorMessages.get(0).getMsg();
|
||||
Assert.assertTrue(messageText.contains("Excel") && messageText.contains("OLE2"));
|
||||
assertTrue(messageText.contains("Excel") && messageText.contains("OLE2"));
|
||||
}
|
||||
|
||||
private String addQuotingIfNeeded(String csvField) {
|
||||
return addQuotingIfNeeded(csvField, ",");
|
||||
return addQuotingIfNeeded(csvField, "\"", false);
|
||||
}
|
||||
|
||||
private String addQuotingIfNeeded(String csvField, String csvSeparator) {
|
||||
return addQuotingIfNeeded(csvField, csvSeparator, "\"");
|
||||
}
|
||||
|
||||
private String addQuotingIfNeeded(String csvField, String csvSeparator, String csvQuote) {
|
||||
return addQuotingIfNeeded(csvField, csvSeparator, csvQuote, false);
|
||||
}
|
||||
|
||||
private String addQuotingIfNeeded(String csvField, String csvSeparator, String csvQuote, boolean force) {
|
||||
return csvField.contains(csvSeparator) || force ? String.format("%2$s%1$s%2$s", csvField, csvQuote) : csvField;
|
||||
private String addQuotingIfNeeded(String csvField, String csvQuote, boolean force) {
|
||||
return csvField.contains(",") || force ? String.format("%2$s%1$s%2$s", csvField, csvQuote) : csvField;
|
||||
}
|
||||
}
|
|
@ -34,9 +34,8 @@ import org.apache.nifi.util.MockConfigurationContext;
|
|||
import org.apache.nifi.util.MockReportingContext;
|
||||
import org.apache.nifi.util.MockReportingInitializationContext;
|
||||
import org.apache.nifi.util.MockVariableRegistry;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
|
@ -45,7 +44,10 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
public class PrometheusReportingTaskIT {
|
||||
private static final String TEST_INIT_CONTEXT_ID = "test-init-context-id";
|
||||
|
@ -57,7 +59,7 @@ public class PrometheusReportingTaskIT {
|
|||
private PrometheusReportingTask testedReportingTask;
|
||||
private ProcessGroupStatus rootGroupStatus;
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
testedReportingTask = new PrometheusReportingTask();
|
||||
rootGroupStatus = new ProcessGroupStatus();
|
||||
|
@ -123,30 +125,30 @@ public class PrometheusReportingTaskIT {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testOnTrigger() throws IOException, InterruptedException, InitializationException {
|
||||
public void testOnTrigger() throws IOException, InitializationException {
|
||||
testedReportingTask.initialize(reportingInitContextStub);
|
||||
testedReportingTask.onScheduled(configurationContextStub);
|
||||
reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus);
|
||||
testedReportingTask.onTrigger(reportingContextStub);
|
||||
|
||||
String content = getMetrics();
|
||||
Assert.assertTrue(content.contains(
|
||||
assertTrue(content.contains(
|
||||
"nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
|
||||
Assert.assertTrue(content.contains(
|
||||
assertTrue(content.contains(
|
||||
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
|
||||
Assert.assertTrue(content.contains(
|
||||
assertTrue(content.contains(
|
||||
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"ProcessGroup\",component_name=\"nestedPG\",component_id=\"3378\",parent_id=\"1234\",} 2.0"));
|
||||
|
||||
// Rename the component
|
||||
rootGroupStatus.setName("rootroot");
|
||||
content = getMetrics();
|
||||
Assert.assertFalse(content.contains(
|
||||
assertFalse(content.contains(
|
||||
"nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
|
||||
Assert.assertFalse(content.contains(
|
||||
assertFalse(content.contains(
|
||||
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"root\",component_id=\"1234\",parent_id=\"\",} 5.0"));
|
||||
Assert.assertTrue(content.contains(
|
||||
assertTrue(content.contains(
|
||||
"nifi_amount_flowfiles_received{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0"));
|
||||
Assert.assertTrue(content.contains(
|
||||
assertTrue(content.contains(
|
||||
"nifi_amount_threads_active{instance=\"localhost\",component_type=\"RootProcessGroup\",component_name=\"rootroot\",component_id=\"1234\",parent_id=\"\",} 5.0"));
|
||||
try {
|
||||
testedReportingTask.OnStopped();
|
||||
|
@ -160,7 +162,7 @@ public class PrometheusReportingTaskIT {
|
|||
HttpURLConnection con = (HttpURLConnection) url.openConnection();
|
||||
con.setRequestMethod("GET");
|
||||
int status = con.getResponseCode();
|
||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, status);
|
||||
assertEquals(HttpURLConnection.HTTP_OK, status);
|
||||
|
||||
HttpClient client = HttpClientBuilder.create().build();
|
||||
HttpGet request = new HttpGet("http://localhost:9092/metrics");
|
||||
|
@ -178,7 +180,7 @@ public class PrometheusReportingTaskIT {
|
|||
testedReportingTask.onTrigger(reportingContextStub);
|
||||
|
||||
String content = getMetrics();
|
||||
Assert.assertTrue(content.contains("parent_id=\"\""));
|
||||
assertTrue(content.contains("parent_id=\"\""));
|
||||
|
||||
try {
|
||||
testedReportingTask.OnStopped();
|
||||
|
@ -188,7 +190,7 @@ public class PrometheusReportingTaskIT {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testTwoInstances() throws IOException, InterruptedException, InitializationException {
|
||||
public void testTwoInstances() throws InitializationException {
|
||||
testedReportingTask.initialize(reportingInitContextStub);
|
||||
testedReportingTask.onScheduled(configurationContextStub);
|
||||
PrometheusReportingTask testedReportingTask2 = new PrometheusReportingTask();
|
||||
|
|
|
@ -46,8 +46,7 @@ import org.apache.nifi.state.MockStateManager;
|
|||
import org.apache.nifi.util.MockControllerServiceInitializationContext;
|
||||
import org.apache.nifi.util.MockPropertyValue;
|
||||
import org.apache.nifi.util.MockVariableRegistry;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.math.BigDecimal;
|
||||
|
@ -59,10 +58,10 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -116,19 +115,10 @@ public class TestPrometheusRecordSink {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testTwoInstances() throws InitializationException {
|
||||
public void testTwoInstances() throws Exception {
|
||||
PrometheusRecordSink sink1 = initTask();
|
||||
try {
|
||||
PrometheusRecordSink sink2 = initTask();
|
||||
fail("Should have reported Address In Use");
|
||||
} catch (ProcessException pe) {
|
||||
// Do nothing, this is the expected behavior
|
||||
}
|
||||
try {
|
||||
sink1.onStopped();
|
||||
} catch (Exception e) {
|
||||
// Do nothing, just need to shut down the server before the next run
|
||||
}
|
||||
assertThrows(ProcessException.class, this::initTask);
|
||||
sink1.onStopped();
|
||||
}
|
||||
|
||||
private String getMetrics() throws IOException {
|
||||
|
@ -136,7 +126,7 @@ public class TestPrometheusRecordSink {
|
|||
HttpURLConnection con = (HttpURLConnection) url.openConnection();
|
||||
con.setRequestMethod("GET");
|
||||
int status = con.getResponseCode();
|
||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, status);
|
||||
assertEquals(HttpURLConnection.HTTP_OK, status);
|
||||
|
||||
HttpClient client = HttpClientBuilder.create().build();
|
||||
HttpGet request = new HttpGet("http://localhost:" + portString + "/metrics");
|
||||
|
|
10
pom.xml
10
pom.xml
|
@ -285,6 +285,11 @@
|
|||
<artifactId>mockito-core</artifactId>
|
||||
<version>2.28.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-junit-jupiter</artifactId>
|
||||
<version>2.28.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.groovy</groupId>
|
||||
<artifactId>groovy-all</artifactId>
|
||||
|
@ -482,6 +487,11 @@
|
|||
<artifactId>mockito-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-junit-jupiter</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-simple</artifactId>
|
||||
|
|
Loading…
Reference in New Issue