From 68a90ad7b7dbcb62ecdf42fdef4c7e03bd1a776a Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Thu, 4 Jan 2024 23:16:17 -0600 Subject: [PATCH] NIFI-12570 Upgraded Apache IoTDB from 1.2.2 to 1.3.0 - Removed iotdb-server dependencies and integration tests based on banned SNAPSHOT dependency versions Signed-off-by: Matt Burgess This closes #8207 --- .../nifi-iotdb-processors/pom.xml | 40 ---- .../apache/nifi/processors/AbstractIoTDB.java | 3 +- .../nifi/processors/PutIoTDBRecordIT.java | 212 ------------------ .../apache/nifi/processors/QueryIoTDBIT.java | 101 --------- nifi-nar-bundles/nifi-iotdb-bundle/pom.xml | 2 +- 5 files changed, 3 insertions(+), 355 deletions(-) delete mode 100755 nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordIT.java delete mode 100755 nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/QueryIoTDBIT.java diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml index 026625342d..df40336eb9 100644 --- a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml @@ -72,46 +72,6 @@ com.fasterxml.jackson.core jackson-databind - - org.apache.iotdb - iotdb-server - ${iotdb.sdk.version} - test - - - ch.qos.logback - logback-classic - - - - org.apache.iotdb - iotdb-consensus - - - - - org.apache.iotdb - iotdb-server - ${iotdb.sdk.version} - test-jar - test - - - ch.qos.logback - logback-classic - - - - org.apache.iotdb - iotdb-consensus - - - - - org.glassfish.jersey.inject - jersey-hk2 - test - org.apache.nifi nifi-mock diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java index 70fa43cd81..44f7dd2185 100755 --- a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -335,7 +336,7 @@ public abstract class AbstractIoTDB extends AbstractProcessor { protected Object convertType(Object value, TSDataType type) { switch (type) { case TEXT: - return Binary.valueOf(String.valueOf(value)); + return new Binary(String.valueOf(value), StandardCharsets.UTF_8); case INT32: return Integer.parseInt(value.toString()); case INT64: diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordIT.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordIT.java deleted file mode 100755 index f1b4507fd5..0000000000 --- a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordIT.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors; - -import org.apache.iotdb.db.utils.EnvironmentUtils; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.serialization.record.MockRecordParser; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; - -import java.sql.Timestamp; - -public class PutIoTDBRecordIT { - private TestRunner testRunner; - private MockRecordParser recordReader; - - @BeforeEach - public void setRunner() { - testRunner = TestRunners.newTestRunner(PutIoTDBRecord.class); - recordReader = new MockRecordParser(); - testRunner.setProperty(PutIoTDBRecord.RECORD_READER_FACTORY, "reader"); - testRunner.setProperty(PutIoTDBRecord.IOTDB_HOST, "127.0.0.1"); - testRunner.setProperty(PutIoTDBRecord.USERNAME, "root"); - testRunner.setProperty(PutIoTDBRecord.PASSWORD, "root"); - testRunner.setProperty(PutIoTDBRecord.MAX_ROW_NUMBER, "1024"); - EnvironmentUtils.envSetUp(); - } - - @AfterEach - public void shutdown() throws Exception { - testRunner.shutdown(); - recordReader.disabled(); - EnvironmentUtils.cleanEnv(); - EnvironmentUtils.shutdownDaemon(); - } - - private void setUpStandardTestConfig() throws InitializationException { - testRunner.addControllerService("reader", recordReader); - testRunner.enableControllerService(recordReader); - } - - @Test - public void testInsertByNativeSchemaWithSingleDevice() throws InitializationException { - setUpStandardTestConfig(); - - recordReader.addSchemaField("TIME", RecordFieldType.LONG); - recordReader.addSchemaField("s1", RecordFieldType.INT); - recordReader.addSchemaField("s2", RecordFieldType.LONG); - recordReader.addSchemaField("s3", RecordFieldType.FLOAT); - recordReader.addSchemaField("s4", RecordFieldType.DOUBLE); - recordReader.addSchemaField("s5", RecordFieldType.BOOLEAN); - recordReader.addSchemaField("s6", RecordFieldType.STRING); - - recordReader.addRecord(1L, 1, 2L, 3.0F, 4.0D, true, "abc"); - recordReader.addRecord(2L, 1, 2L, 3.0F, 4.0D, true, "abc"); - recordReader.addRecord(3L, 1, 2L, 3.0F, 4.0D, true, "abc"); - recordReader.addRecord(4L, 1, 2L, 3.0F, 4.0D, true, "abc"); - recordReader.addRecord(5L, 1, 2L, 3.0F, 4.0D, true, "abc"); - recordReader.addRecord(6L, 1, 2L, 3.0F, 4.0D, true, "abc"); - recordReader.addRecord(7L, 1, 2L, 3.0F, 4.0D, true, "abc"); - - testRunner.setProperty(PutIoTDBRecord.TIME_FIELD, "TIME"); - testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg0.d1."); - testRunner.enqueue(""); - testRunner.run(); - - testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1); - } - - @Test - public void testInsertByNativeSchemaWithTimeStamp() throws InitializationException { - setUpStandardTestConfig(); - - recordReader.addSchemaField("Time", RecordFieldType.TIMESTAMP); - recordReader.addSchemaField("s1", RecordFieldType.INT); - recordReader.addSchemaField("s2", RecordFieldType.LONG); - recordReader.addSchemaField("s3", RecordFieldType.FLOAT); - recordReader.addSchemaField("s4", RecordFieldType.DOUBLE); - recordReader.addSchemaField("s5", RecordFieldType.BOOLEAN); - recordReader.addSchemaField("s6", RecordFieldType.STRING); - - recordReader.addRecord(new Timestamp(1L), 1, 2L, 3.0F, 4.0D, true, "abc"); - recordReader.addRecord(new Timestamp(2L), 1, 2L, 3.0F, 4.0D, true, "abc"); - recordReader.addRecord(new Timestamp(3L), 1, 2L, 3.0F, 4.0D, true, "abc"); - recordReader.addRecord(new Timestamp(4L), 1, 2L, 3.0F, 4.0D, true, "abc"); - recordReader.addRecord(new Timestamp(5L), 1, 2L, 3.0F, 4.0D, true, "abc"); - recordReader.addRecord(new Timestamp(6L), 1, 2L, 3.0F, 4.0D, true, "abc"); - recordReader.addRecord(new Timestamp(7L), 1, 2L, 3.0F, 4.0D, true, "abc"); - - testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg1.d1."); - testRunner.enqueue(""); - testRunner.run(); - - testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1); - } - - @Test - public void testInsertByNativeSchemaWithNullValue() throws InitializationException { - setUpStandardTestConfig(); - - recordReader.addSchemaField("Time", RecordFieldType.LONG); - recordReader.addSchemaField("s1", RecordFieldType.INT); - recordReader.addSchemaField("s2", RecordFieldType.LONG); - - recordReader.addRecord(1L, 1, 2L); - recordReader.addRecord(2L, 1, 2L); - recordReader.addRecord(3L, 1, null); - - testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg2.d1."); - testRunner.enqueue(""); - testRunner.run(); - - testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1); - } - - @Test - public void testInsertByNativeSchemaWithEmptyValue() throws InitializationException { - setUpStandardTestConfig(); - - recordReader.addSchemaField("Time", RecordFieldType.LONG); - recordReader.addSchemaField("s1", RecordFieldType.INT); - recordReader.addSchemaField("s2", RecordFieldType.LONG); - - testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg3.d1."); - testRunner.enqueue(""); - testRunner.run(); - - testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1); - } - - @Test - public void testInsertByNativeSchemaWithUnsupportedDataType() throws InitializationException { - setUpStandardTestConfig(); - - recordReader.addSchemaField("Time", RecordFieldType.LONG); - recordReader.addSchemaField("s1", RecordFieldType.ARRAY); - - recordReader.addRecord(1L, new String[]{"1"}); - - testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg4.d1."); - testRunner.enqueue(""); - testRunner.run(); - - testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1); - } - - @Test - public void testInsertByNativeSchemaWithoutTimeField() throws InitializationException { - setUpStandardTestConfig(); - - recordReader.addSchemaField("s1", RecordFieldType.INT); - recordReader.addSchemaField("s2", RecordFieldType.INT); - recordReader.addRecord(1, 1); - - testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg5.d1."); - testRunner.enqueue(""); - testRunner.run(); - - testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1); - } - - @Test - public void testInsertByNativeSchemaWithWrongTimeType() throws InitializationException { - setUpStandardTestConfig(); - - recordReader.addSchemaField("Time", RecordFieldType.INT); - recordReader.addSchemaField("s1", RecordFieldType.INT); - - recordReader.addRecord(1, 1); - - testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg5.d1."); - testRunner.enqueue(new byte[]{}); - testRunner.run(); - - testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1); - } - - @Test - public void testInsertByNativeSchemaNotStartWithRoot() throws InitializationException { - setUpStandardTestConfig(); - - recordReader.addSchemaField("Time", RecordFieldType.LONG); - recordReader.addSchemaField("s1", RecordFieldType.INT); - - recordReader.addRecord(1L, 1); - - testRunner.setProperty(PutIoTDBRecord.PREFIX, "sg6.d1."); - testRunner.enqueue(new byte[]{}); - testRunner.run(); - - testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1); - } -} diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/QueryIoTDBIT.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/QueryIoTDBIT.java deleted file mode 100755 index 6e641b268b..0000000000 --- a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/QueryIoTDBIT.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors; - -import org.apache.iotdb.db.utils.EnvironmentUtils; -import org.apache.iotdb.rpc.IoTDBConnectionException; -import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.session.Session; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.serialization.record.MockRecordWriter; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.apache.nifi.util.MockFlowFile; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; - -public class QueryIoTDBIT { - private static final String WRITER_SERVICE_ID = "writer"; - - private static final String DEVICE_ID = "root.sg7.d1"; - - private static final String FIRST_MEASUREMENT = "s0"; - - private static final String SECOND_MEASUREMENT = "s1"; - - private static final long TIMESTAMP = 1; - - private TestRunner testRunner; - private MockRecordWriter recordWriter; - private Session session; - - @BeforeEach - public void setRunner() throws IoTDBConnectionException, StatementExecutionException { - testRunner = TestRunners.newTestRunner(QueryIoTDBRecord.class); - recordWriter = new MockRecordWriter("header", true); - testRunner.setProperty(QueryIoTDBRecord.RECORD_WRITER_FACTORY, WRITER_SERVICE_ID); - testRunner.setProperty(QueryIoTDBRecord.IOTDB_HOST, "127.0.0.1"); - testRunner.setProperty(QueryIoTDBRecord.USERNAME, "root"); - testRunner.setProperty(QueryIoTDBRecord.PASSWORD, "root"); - session = new Session.Builder().build(); - session.open(); - - List measurements = new ArrayList<>(2); - measurements.add(FIRST_MEASUREMENT); - measurements.add(SECOND_MEASUREMENT); - - List values = new ArrayList<>(2); - values.add("5.0"); - values.add("6.0"); - session.insertRecord(DEVICE_ID, TIMESTAMP, measurements, values); - } - - @AfterEach - public void shutdown() throws Exception { - testRunner.shutdown(); - recordWriter.disabled(); - session.close(); - EnvironmentUtils.cleanEnv(); - EnvironmentUtils.shutdownDaemon(); - } - - @Test - public void testQueryIoTDBbyProperty() throws InitializationException { - setUpStandardTestConfig(); - - final String query = String.format("SELECT %s, %s FROM %s", FIRST_MEASUREMENT, SECOND_MEASUREMENT, DEVICE_ID); - testRunner.setProperty(QueryIoTDBRecord.QUERY, query); - testRunner.enqueue(new byte[]{}); - testRunner.run(); - - testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1); - - final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(PutIoTDBRecord.REL_SUCCESS).get(0); - flowFile.assertContentEquals("header\n\"1\",\"5.0\",\"6.0\"\n"); - flowFile.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); - } - - private void setUpStandardTestConfig() throws InitializationException { - testRunner.addControllerService(WRITER_SERVICE_ID, recordWriter); - testRunner.enableControllerService(recordWriter); - } -} diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/pom.xml b/nifi-nar-bundles/nifi-iotdb-bundle/pom.xml index 254adf83eb..8ad7482855 100644 --- a/nifi-nar-bundles/nifi-iotdb-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-iotdb-bundle/pom.xml @@ -31,7 +31,7 @@ - 1.2.2 + 1.3.0