mirror of https://github.com/apache/nifi.git
NIFI-972 ExecuteSQL bug in createSchema() create Arvo Schema 1
Signed-off-by: Toivo Adams <toivo.adams@gmail.com> Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
da28b81eec
commit
ba3225fe92
|
@ -190,6 +190,15 @@ language governing permissions and limitations under the License. -->
|
|||
<artifactId>derby</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.h2database</groupId>
|
||||
<artifactId>h2</artifactId>
|
||||
<version>1.4.187</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -16,15 +16,20 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import static java.sql.Types.ARRAY;
|
||||
import static java.sql.Types.BIGINT;
|
||||
import static java.sql.Types.BINARY;
|
||||
import static java.sql.Types.BLOB;
|
||||
import static java.sql.Types.BOOLEAN;
|
||||
import static java.sql.Types.CHAR;
|
||||
import static java.sql.Types.CLOB;
|
||||
import static java.sql.Types.DATE;
|
||||
import static java.sql.Types.DECIMAL;
|
||||
import static java.sql.Types.DOUBLE;
|
||||
import static java.sql.Types.FLOAT;
|
||||
import static java.sql.Types.INTEGER;
|
||||
import static java.sql.Types.LONGNVARCHAR;
|
||||
import static java.sql.Types.LONGVARBINARY;
|
||||
import static java.sql.Types.LONGVARCHAR;
|
||||
import static java.sql.Types.NCHAR;
|
||||
import static java.sql.Types.NUMERIC;
|
||||
|
@ -35,10 +40,12 @@ import static java.sql.Types.SMALLINT;
|
|||
import static java.sql.Types.TIME;
|
||||
import static java.sql.Types.TIMESTAMP;
|
||||
import static java.sql.Types.TINYINT;
|
||||
import static java.sql.Types.VARBINARY;
|
||||
import static java.sql.Types.VARCHAR;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.ResultSetMetaData;
|
||||
import java.sql.SQLException;
|
||||
|
@ -70,17 +77,34 @@ public class JdbcCommon {
|
|||
long nrOfRows = 0;
|
||||
while (rs.next()) {
|
||||
for (int i = 1; i <= nrOfColumns; i++) {
|
||||
final int javaSqlType = meta.getColumnType(i);
|
||||
final Object value = rs.getObject(i);
|
||||
|
||||
// The different types that we support are numbers (int, long, double, float),
|
||||
// as well as boolean values and Strings. Since Avro doesn't provide
|
||||
// timestamp types, we want to convert those to Strings. So we will cast anything other
|
||||
// than numbers or booleans to strings by using to toString() method.
|
||||
if (value == null) {
|
||||
rec.put(i - 1, null);
|
||||
|
||||
} else if (javaSqlType==BINARY || javaSqlType==VARBINARY || javaSqlType==LONGVARBINARY || javaSqlType==ARRAY || javaSqlType==BLOB || javaSqlType==CLOB) {
|
||||
// bytes requires little bit different handling
|
||||
byte[] bytes = rs.getBytes(i);
|
||||
ByteBuffer bb = ByteBuffer.wrap(bytes);
|
||||
rec.put(i - 1, bb);
|
||||
|
||||
} else if (value instanceof Byte) {
|
||||
// tinyint(1) type is returned by JDBC driver as java.sql.Types.TINYINT
|
||||
// But value is returned by JDBC as java.lang.Byte
|
||||
// (at least H2 JDBC works this way)
|
||||
// direct put to avro record results:
|
||||
// org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte
|
||||
rec.put(i - 1, ((Byte) value).intValue());
|
||||
|
||||
} else if (value instanceof Number || value instanceof Boolean) {
|
||||
rec.put(i - 1, value);
|
||||
|
||||
} else {
|
||||
// The different types that we support are numbers (int, long, double, float),
|
||||
// as well as boolean values and Strings. Since Avro doesn't provide
|
||||
// timestamp types, we want to convert those to Strings. So we will cast anything other
|
||||
// than numbers or booleans to strings by using to toString() method.
|
||||
rec.put(i - 1, value.toString());
|
||||
}
|
||||
}
|
||||
|
@ -110,53 +134,76 @@ public class JdbcCommon {
|
|||
case NCHAR:
|
||||
case NVARCHAR:
|
||||
case VARCHAR:
|
||||
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
|
||||
// builder.name(meta.getColumnName(i)).type().stringType().noDefault();
|
||||
// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
|
||||
// builder.name(meta.getColumnName(i)).type().stringType().stringDefault(null);
|
||||
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
|
||||
|
||||
|
||||
break;
|
||||
|
||||
case BOOLEAN:
|
||||
builder.name(meta.getColumnName(i)).type().booleanType().noDefault();
|
||||
break;
|
||||
// builder.name(meta.getColumnName(i)).type().nullable().booleanType().noDefault();
|
||||
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
|
||||
break;
|
||||
|
||||
case INTEGER:
|
||||
case SMALLINT:
|
||||
case TINYINT:
|
||||
builder.name(meta.getColumnName(i)).type().intType().noDefault();
|
||||
// builder.name(meta.getColumnName(i)).type().intType().noDefault();
|
||||
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
|
||||
break;
|
||||
|
||||
case BIGINT:
|
||||
builder.name(meta.getColumnName(i)).type().longType().noDefault();
|
||||
// builder.name(meta.getColumnName(i)).type().nullable().longType().noDefault();
|
||||
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
|
||||
break;
|
||||
|
||||
// java.sql.RowId is interface, is seems to be database
|
||||
// implementation specific, let's convert to String
|
||||
case ROWID:
|
||||
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
|
||||
// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
|
||||
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
|
||||
break;
|
||||
|
||||
case FLOAT:
|
||||
case REAL:
|
||||
builder.name(meta.getColumnName(i)).type().floatType().noDefault();
|
||||
// builder.name(meta.getColumnName(i)).type().nullable().floatType().noDefault();
|
||||
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
|
||||
break;
|
||||
|
||||
case DOUBLE:
|
||||
builder.name(meta.getColumnName(i)).type().doubleType().noDefault();
|
||||
// builder.name(meta.getColumnName(i)).type().nullable().doubleType().noDefault();
|
||||
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
|
||||
break;
|
||||
|
||||
// Did not find direct suitable type, need to be clarified!!!!
|
||||
case DECIMAL:
|
||||
case NUMERIC:
|
||||
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
|
||||
// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
|
||||
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
|
||||
break;
|
||||
|
||||
// Did not find direct suitable type, need to be clarified!!!!
|
||||
case DATE:
|
||||
case TIME:
|
||||
case TIMESTAMP:
|
||||
builder.name(meta.getColumnName(i)).type().stringType().noDefault();
|
||||
// builder.name(meta.getColumnName(i)).type().nullable().stringType().noDefault();
|
||||
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
|
||||
break;
|
||||
|
||||
default:
|
||||
case BINARY:
|
||||
case VARBINARY:
|
||||
case LONGVARBINARY:
|
||||
case ARRAY:
|
||||
case BLOB:
|
||||
case CLOB:
|
||||
builder.name(meta.getColumnName(i)).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
|
||||
break;
|
||||
|
||||
|
||||
default:
|
||||
throw new IllegalArgumentException("createSchema: Unknown SQL type " + meta.getColumnType(i) + " cannot be converted to Avro type");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,137 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
|
||||
import org.apache.avro.file.DataFileStream;
|
||||
import org.apache.avro.generic.GenericDatumReader;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.io.DatumReader;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Useless test, Derby is so much different from MySQL
|
||||
* so it is impossible reproduce problems with MySQL.
|
||||
*
|
||||
*
|
||||
*/
|
||||
@Ignore
|
||||
public class TestJdbcTypesDerby {
|
||||
|
||||
final static String DB_LOCATION = "target/db";
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
System.setProperty("derby.stream.error.file", "target/derby.log");
|
||||
}
|
||||
|
||||
String createTable = "create table users ("
|
||||
+ " id int NOT NULL GENERATED ALWAYS AS IDENTITY, "
|
||||
+ " email varchar(255) NOT NULL UNIQUE, "
|
||||
+ " password varchar(255) DEFAULT NULL, "
|
||||
+ " activation_code varchar(255) DEFAULT NULL, "
|
||||
+ " forgotten_password_code varchar(255) DEFAULT NULL, "
|
||||
+ " forgotten_password_time datetime DEFAULT NULL, "
|
||||
+ " created datetime NOT NULL, "
|
||||
+ " active tinyint NOT NULL DEFAULT 0, "
|
||||
+ " home_module_id int DEFAULT NULL, "
|
||||
+ " PRIMARY KEY (id) ) " ;
|
||||
// + " UNIQUE email ) " ;
|
||||
// + " KEY home_module_id (home_module_id) ) " ;
|
||||
// + " CONSTRAINT users_ibfk_1 FOREIGN KEY (home_module_id) REFERENCES "
|
||||
// + " modules (id) ON DELETE SET NULL " ;
|
||||
|
||||
String dropTable = "drop table users";
|
||||
|
||||
@Test
|
||||
public void testSQLTypesMapping() throws ClassNotFoundException, SQLException, IOException {
|
||||
// remove previous test database, if any
|
||||
final File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
final Connection con = createConnection();
|
||||
final Statement st = con.createStatement();
|
||||
|
||||
try {
|
||||
st.executeUpdate(dropTable);
|
||||
} catch (final Exception e) {
|
||||
// table may not exist, this is not serious problem.
|
||||
}
|
||||
|
||||
st.executeUpdate(createTable);
|
||||
|
||||
st.executeUpdate("insert into users (email, password, activation_code, created, active) "
|
||||
+ " values ('robert.gates@cold.com', '******', 'CAS', '2005-12-09', 'Y')");
|
||||
|
||||
final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U");
|
||||
|
||||
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
|
||||
JdbcCommon.convertToAvroStream(resultSet, outStream);
|
||||
|
||||
final byte[] serializedBytes = outStream.toByteArray();
|
||||
assertNotNull(serializedBytes);
|
||||
System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
|
||||
|
||||
st.close();
|
||||
con.close();
|
||||
|
||||
// Deserialize bytes to records
|
||||
|
||||
final InputStream instream = new ByteArrayInputStream(serializedBytes);
|
||||
|
||||
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
|
||||
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
|
||||
GenericRecord record = null;
|
||||
while (dataFileReader.hasNext()) {
|
||||
// Reuse record object by passing it to next(). This saves us from
|
||||
// allocating and garbage collecting many objects for files with
|
||||
// many items.
|
||||
record = dataFileReader.next(record);
|
||||
System.out.println(record);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// many test use Derby as database, so ensure driver is available
|
||||
@Test
|
||||
public void testDriverLoad() throws ClassNotFoundException {
|
||||
final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
|
||||
assertNotNull(clazz);
|
||||
}
|
||||
|
||||
private Connection createConnection() throws ClassNotFoundException, SQLException {
|
||||
|
||||
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
|
||||
final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
|
||||
return con;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
|
||||
import org.apache.avro.file.DataFileStream;
|
||||
import org.apache.avro.generic.GenericDatumReader;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.io.DatumReader;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestJdbcTypesH2 {
|
||||
|
||||
final static String DB_LOCATION = "~/var/test/h2";
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
System.setProperty("derby.stream.error.file", "target/derby.log");
|
||||
}
|
||||
|
||||
String createTable = " CREATE TABLE `users` ( "
|
||||
+ " `id` int(11) NOT NULL AUTO_INCREMENT, "
|
||||
+ " `email` varchar(255) NOT NULL, "
|
||||
+ " `password` varchar(255) DEFAULT NULL, "
|
||||
+ " `activation_code` varchar(255) DEFAULT NULL, "
|
||||
+ " `forgotten_password_code` varchar(255) DEFAULT NULL, "
|
||||
+ " `forgotten_password_time` datetime DEFAULT NULL, "
|
||||
+ " `created` datetime NOT NULL, "
|
||||
+ " `active` tinyint(1) NOT NULL DEFAULT '0', "
|
||||
+ " `home_module_id` int(11) DEFAULT NULL, "
|
||||
|
||||
+ " somebinary BINARY default null, "
|
||||
+ " somebinary2 VARBINARY default null, "
|
||||
+ " somebinary3 LONGVARBINARY default null, "
|
||||
+ " somearray ARRAY default null, "
|
||||
+ " someblob BLOB default null, "
|
||||
+ " someclob CLOB default null, "
|
||||
|
||||
+ " PRIMARY KEY (`id`), "
|
||||
+ " UNIQUE KEY `email` (`email`) ) " ;
|
||||
// + " KEY `home_module_id` (`home_module_id`) )" ;
|
||||
/* + " CONSTRAINT `users_ibfk_1` FOREIGN KEY (`home_module_id`) REFERENCES "
|
||||
+ "`modules` (`id`) ON DELETE SET NULL "
|
||||
+ ") ENGINE=InnoDB DEFAULT CHARSET=utf8 " ;
|
||||
*/
|
||||
|
||||
String dropTable = "drop table users";
|
||||
|
||||
@Test
|
||||
public void testSQLTypesMapping() throws ClassNotFoundException, SQLException, IOException {
|
||||
// remove previous test database, if any
|
||||
final File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
final Connection con = createConnection();
|
||||
final Statement st = con.createStatement();
|
||||
|
||||
try {
|
||||
st.executeUpdate(dropTable);
|
||||
} catch (final Exception e) {
|
||||
// table may not exist, this is not serious problem.
|
||||
}
|
||||
|
||||
st.executeUpdate(createTable);
|
||||
|
||||
// st.executeUpdate("insert into users (email, password, activation_code, forgotten_password_code, forgotten_password_time, created, active, home_module_id) "
|
||||
// + " values ('robert.gates@cold.com', '******', 'CAS', 'ounou', '2005-12-09', '2005-12-03', 1, 5)");
|
||||
|
||||
st.executeUpdate("insert into users (email, password, activation_code, created, active, somebinary, somebinary2, somebinary3, someblob, someclob) "
|
||||
+ " values ('mari.gates@cold.com', '******', 'CAS', '2005-12-03', 3, '66FF', 'ABDF', 'EE64', 'BB22', 'CC88')");
|
||||
|
||||
final ResultSet resultSet = st.executeQuery("select U.*, ROW_NUMBER() OVER () as rownr from users U");
|
||||
// final ResultSet resultSet = st.executeQuery("select U.active from users U");
|
||||
// final ResultSet resultSet = st.executeQuery("select U.somebinary from users U");
|
||||
|
||||
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
|
||||
JdbcCommon.convertToAvroStream(resultSet, outStream);
|
||||
|
||||
final byte[] serializedBytes = outStream.toByteArray();
|
||||
assertNotNull(serializedBytes);
|
||||
System.out.println("Avro serialized result size in bytes: " + serializedBytes.length);
|
||||
|
||||
st.close();
|
||||
con.close();
|
||||
|
||||
// Deserialize bytes to records
|
||||
|
||||
final InputStream instream = new ByteArrayInputStream(serializedBytes);
|
||||
|
||||
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
|
||||
try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(instream, datumReader)) {
|
||||
GenericRecord record = null;
|
||||
while (dataFileReader.hasNext()) {
|
||||
// Reuse record object by passing it to next(). This saves us from
|
||||
// allocating and garbage collecting many objects for files with
|
||||
// many items.
|
||||
record = dataFileReader.next(record);
|
||||
System.out.println(record);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// verify H2 driver loading and get Connections works
|
||||
@Test
|
||||
public void testDriverLoad() throws ClassNotFoundException, SQLException {
|
||||
// final Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
|
||||
|
||||
Connection con = createConnection();
|
||||
|
||||
assertNotNull(con);
|
||||
con.close();
|
||||
}
|
||||
|
||||
private Connection createConnection() throws ClassNotFoundException, SQLException {
|
||||
|
||||
// Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
|
||||
String connectionString = "jdbc:h2:file:" + DB_LOCATION + "/testdb7";
|
||||
final Connection con = DriverManager.getConnection(connectionString, "SA", "");
|
||||
return con;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue