From ba3225fe92258a6aca3cb706412ab62955914dc8 Mon Sep 17 00:00:00 2001 From: Toivo Adams Date: Thu, 1 Oct 2015 17:22:08 +0300 Subject: [PATCH] NIFI-972 ExecuteSQL bug in createSchema() create Arvo Schema 1 Signed-off-by: Toivo Adams Signed-off-by: Mark Payne --- .../nifi-standard-processors/pom.xml | 9 ++ .../processors/standard/util/JdbcCommon.java | 77 +++++++-- .../standard/util/TestJdbcTypesDerby.java | 137 ++++++++++++++++ .../standard/util/TestJdbcTypesH2.java | 149 ++++++++++++++++++ 4 files changed, 357 insertions(+), 15 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 2d949810a8..b0b3afabfc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -190,6 +190,15 @@ language governing permissions and limitations under the License. --> derby test + + + com.h2database + h2 + 1.4.187 + test + + + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index 6fc69ff4bc..de3d5d1924 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -16,15 +16,20 @@ */ package org.apache.nifi.processors.standard.util; +import static java.sql.Types.ARRAY; import static java.sql.Types.BIGINT; +import static java.sql.Types.BINARY; +import static java.sql.Types.BLOB; import static java.sql.Types.BOOLEAN; import static java.sql.Types.CHAR; +import static java.sql.Types.CLOB; import static java.sql.Types.DATE; import static java.sql.Types.DECIMAL; import static java.sql.Types.DOUBLE; import static java.sql.Types.FLOAT; import static java.sql.Types.INTEGER; import static java.sql.Types.LONGNVARCHAR; +import static java.sql.Types.LONGVARBINARY; import static java.sql.Types.LONGVARCHAR; import static java.sql.Types.NCHAR; import static java.sql.Types.NUMERIC; @@ -35,10 +40,12 @@ import static java.sql.Types.SMALLINT; import static java.sql.Types.TIME; import static java.sql.Types.TIMESTAMP; import static java.sql.Types.TINYINT; +import static java.sql.Types.VARBINARY; import static java.sql.Types.VARCHAR; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -70,17 +77,34 @@ public class JdbcCommon { long nrOfRows = 0; while (rs.next()) { for (int i = 1; i <= nrOfColumns; i++) { + final int javaSqlType = meta.getColumnType(i); final Object value = rs.getObject(i); - // The different types that we support are numbers (int, long, double, float), - // as well as boolean values and Strings. Since Avro doesn't provide - // timestamp types, we want to convert those to Strings. So we will cast anything other - // than numbers or booleans to strings by using to toString() method. if (value == null) { rec.put(i - 1, null); + + } else if (javaSqlType==BINARY || javaSqlType==VARBINARY || javaSqlType==LONGVARBINARY || javaSqlType==ARRAY || javaSqlType==BLOB || javaSqlType==CLOB) { + // bytes requires little bit different handling + byte[] bytes = rs.getBytes(i); + ByteBuffer bb = ByteBuffer.wrap(bytes); + rec.put(i - 1, bb); + + } else if (value instanceof Byte) { + // tinyint(1) type is returned by JDBC driver as java.sql.Types.TINYINT + // But value is returned by JDBC as java.lang.Byte + // (at least H2 JDBC works this way) + // direct put to avro record results: + // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte + rec.put(i - 1, ((Byte) value).intValue()); + } else if (value instanceof Number || value instanceof Boolean) { rec.put(i - 1, value); + } else { + // The different types that we support are numbers (int, long, double, float), + // as well as boolean values and Strings. Since Avro doesn't provide + // timestamp types, we want to convert those to Strings. So we will cast anything other + // than numbers or booleans to strings by using to toString() method. rec.put(i - 1, value.toString()); } } @@ -110,53 +134,76 @@ public class JdbcCommon { case NCHAR: case NVARCHAR: case VARCHAR: - builder.name(meta.getColumnName(i)).type().stringType().noDefault(); +// builder.name(meta.getColumnName(i)).type().stringType().noDefault(); +// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault(); +// builder.name(meta.getColumnName(i)).type().stringType().stringDefault(null); + builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); + + break; case BOOLEAN: - builder.name(meta.getColumnName(i)).type().booleanType().noDefault(); - break; +// builder.name(meta.getColumnName(i)).type().nullable().booleanType().noDefault(); + builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault(); + break; case INTEGER: case SMALLINT: case TINYINT: - builder.name(meta.getColumnName(i)).type().intType().noDefault(); +// builder.name(meta.getColumnName(i)).type().intType().noDefault(); + builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault(); break; case BIGINT: - builder.name(meta.getColumnName(i)).type().longType().noDefault(); +// builder.name(meta.getColumnName(i)).type().nullable().longType().noDefault(); + builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault(); break; // java.sql.RowId is interface, is seems to be database // implementation specific, let's convert to String case ROWID: - builder.name(meta.getColumnName(i)).type().stringType().noDefault(); +// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault(); + builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); break; case FLOAT: case REAL: - builder.name(meta.getColumnName(i)).type().floatType().noDefault(); +// builder.name(meta.getColumnName(i)).type().nullable().floatType().noDefault(); + builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault(); break; case DOUBLE: - builder.name(meta.getColumnName(i)).type().doubleType().noDefault(); +// builder.name(meta.getColumnName(i)).type().nullable().doubleType().noDefault(); + builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault(); break; // Did not find direct suitable type, need to be clarified!!!! case DECIMAL: case NUMERIC: - builder.name(meta.getColumnName(i)).type().stringType().noDefault(); +// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault(); + builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); break; // Did not find direct suitable type, need to be clarified!!!! case DATE: case TIME: case TIMESTAMP: - builder.name(meta.getColumnName(i)).type().stringType().noDefault(); +// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault(); + builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault(); break; - default: + case BINARY: + case VARBINARY: + case LONGVARBINARY: + case ARRAY: + case BLOB: + case CLOB: + builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault(); break; + + + default: + throw new IllegalArgumentException("createSchema: Unknown SQL type " + meta.getColumnType(i) + " cannot be converted to Avro type"); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java new file mode 100644 index 0000000000..cf3d0c6b22 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import static org.junit.Assert.assertNotNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Useless test, Derby is so much different from MySQL + * so it is impossible reproduce problems with MySQL. + * + * + */ +@Ignore +public class TestJdbcTypesDerby { + + final static String DB_LOCATION = "target/db"; + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + String createTable = "create table users (" + + " id int NOT NULL GENERATED ALWAYS AS IDENTITY, " + + " email varchar(255) NOT NULL UNIQUE, " + + " password varchar(255) DEFAULT NULL, " + + " activation_code varchar(255) DEFAULT NULL, " + + " forgotten_password_code varchar(255) DEFAULT NULL, " + + " forgotten_password_time datetime DEFAULT NULL, " + + " created datetime NOT NULL, " + + " active tinyint NOT NULL DEFAULT 0, " + + " home_module_id int DEFAULT NULL, " + + " PRIMARY KEY (id) ) " ; +// + " UNIQUE email ) " ; +// + " KEY home_module_id (home_module_id) ) " ; +// + " CONSTRAINT users_ibfk_1 FOREIGN KEY (home_module_id) REFERENCES " +// + " modules (id) ON DELETE SET NULL " ; + + String dropTable = "drop table users"; + + @Test + public void testSQLTypesMapping() throws ClassNotFoundException, SQLException, IOException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + final Connection con = createConnection(); + final Statement st = con.createStatement(); + + try { + st.executeUpdate(dropTable); + } catch (final Exception e) { + // table may not exist, this is not serious problem. + } + + st.executeUpdate(createTable); + + st.executeUpdate("insert into users (email, password, activation_code, created, active) " + + " values ('robert.gates@cold.com', '******', 'CAS', '2005-12-09', 'Y')"); + + final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U"); + + final ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + JdbcCommon.convertToAvroStream(resultSet, outStream); + + final byte[] serializedBytes = outStream.toByteArray(); + assertNotNull(serializedBytes); + System.out.println("Avro serialized result size in bytes: " + serializedBytes.length); + + st.close(); + con.close(); + + // Deserialize bytes to records + + final InputStream instream = new ByteArrayInputStream(serializedBytes); + + final DatumReader datumReader = new GenericDatumReader(); + try (final DataFileStream dataFileReader = new DataFileStream(instream, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with + // many items. + record = dataFileReader.next(record); + System.out.println(record); + } + } + } + + // many test use Derby as database, so ensure driver is available + @Test + public void testDriverLoad() throws ClassNotFoundException { + final Class clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + assertNotNull(clazz); + } + + private Connection createConnection() throws ClassNotFoundException, SQLException { + + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + return con; + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java new file mode 100644 index 0000000000..e3041b6e9d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import static org.junit.Assert.assertNotNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestJdbcTypesH2 { + + final static String DB_LOCATION = "~/var/test/h2"; + + @BeforeClass + public static void setup() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + String createTable = " CREATE TABLE `users` ( " + + " `id` int(11) NOT NULL AUTO_INCREMENT, " + + " `email` varchar(255) NOT NULL, " + + " `password` varchar(255) DEFAULT NULL, " + + " `activation_code` varchar(255) DEFAULT NULL, " + + " `forgotten_password_code` varchar(255) DEFAULT NULL, " + + " `forgotten_password_time` datetime DEFAULT NULL, " + + " `created` datetime NOT NULL, " + + " `active` tinyint(1) NOT NULL DEFAULT '0', " + + " `home_module_id` int(11) DEFAULT NULL, " + + + " somebinary BINARY default null, " + + " somebinary2 VARBINARY default null, " + + " somebinary3 LONGVARBINARY default null, " + + " somearray ARRAY default null, " + + " someblob BLOB default null, " + + " someclob CLOB default null, " + + + " PRIMARY KEY (`id`), " + + " UNIQUE KEY `email` (`email`) ) " ; +// + " KEY `home_module_id` (`home_module_id`) )" ; +/* + " CONSTRAINT `users_ibfk_1` FOREIGN KEY (`home_module_id`) REFERENCES " + + "`modules` (`id`) ON DELETE SET NULL " + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8 " ; + */ + + String dropTable = "drop table users"; + + @Test + public void testSQLTypesMapping() throws ClassNotFoundException, SQLException, IOException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + final Connection con = createConnection(); + final Statement st = con.createStatement(); + + try { + st.executeUpdate(dropTable); + } catch (final Exception e) { + // table may not exist, this is not serious problem. + } + + st.executeUpdate(createTable); + +// st.executeUpdate("insert into users (email, password, activation_code, forgotten_password_code, forgotten_password_time, created, active, home_module_id) " +// + " values ('robert.gates@cold.com', '******', 'CAS', 'ounou', '2005-12-09', '2005-12-03', 1, 5)"); + + st.executeUpdate("insert into users (email, password, activation_code, created, active, somebinary, somebinary2, somebinary3, someblob, someclob) " + + " values ('mari.gates@cold.com', '******', 'CAS', '2005-12-03', 3, '66FF', 'ABDF', 'EE64', 'BB22', 'CC88')"); + + final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U"); +// final ResultSet resultSet = st.executeQuery("select U.active from users U"); +// final ResultSet resultSet = st.executeQuery("select U.somebinary from users U"); + + final ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + JdbcCommon.convertToAvroStream(resultSet, outStream); + + final byte[] serializedBytes = outStream.toByteArray(); + assertNotNull(serializedBytes); + System.out.println("Avro serialized result size in bytes: " + serializedBytes.length); + + st.close(); + con.close(); + + // Deserialize bytes to records + + final InputStream instream = new ByteArrayInputStream(serializedBytes); + + final DatumReader datumReader = new GenericDatumReader(); + try (final DataFileStream dataFileReader = new DataFileStream(instream, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with + // many items. + record = dataFileReader.next(record); + System.out.println(record); + } + } + } + + // verify H2 driver loading and get Connections works + @Test + public void testDriverLoad() throws ClassNotFoundException, SQLException { +// final Class clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + + Connection con = createConnection(); + + assertNotNull(con); + con.close(); + } + + private Connection createConnection() throws ClassNotFoundException, SQLException { + +// Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + String connectionString = "jdbc:h2:file:" + DB_LOCATION + "/testdb7"; + final Connection con = DriverManager.getConnection(connectionString, "SA", ""); + return con; + } + +}