mirror of https://github.com/apache/nifi.git
NIFI-12570 Upgraded Apache IoTDB from 1.2.2 to 1.3.0
- Removed iotdb-server dependencies and integration tests based on banned SNAPSHOT dependency versions Signed-off-by: Matt Burgess <mattyb149@apache.org> This closes #8207
This commit is contained in:
parent
2460219590
commit
68a90ad7b7
|
@ -72,46 +72,6 @@
|
|||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.iotdb</groupId>
|
||||
<artifactId>iotdb-server</artifactId>
|
||||
<version>${iotdb.sdk.version}</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
</exclusion>
|
||||
<!-- Exclude IoTDB Consensus to avoid SNAPSHOT versions of Apache Ratis -->
|
||||
<exclusion>
|
||||
<groupId>org.apache.iotdb</groupId>
|
||||
<artifactId>iotdb-consensus</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.iotdb</groupId>
|
||||
<artifactId>iotdb-server</artifactId>
|
||||
<version>${iotdb.sdk.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
</exclusion>
|
||||
<!-- Exclude IoTDB Consensus to avoid SNAPSHOT versions of Apache Ratis -->
|
||||
<exclusion>
|
||||
<groupId>org.apache.iotdb</groupId>
|
||||
<artifactId>iotdb-consensus</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.glassfish.jersey.inject</groupId>
|
||||
<artifactId>jersey-hk2</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.nifi.processors;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -335,7 +336,7 @@ public abstract class AbstractIoTDB extends AbstractProcessor {
|
|||
protected Object convertType(Object value, TSDataType type) {
|
||||
switch (type) {
|
||||
case TEXT:
|
||||
return Binary.valueOf(String.valueOf(value));
|
||||
return new Binary(String.valueOf(value), StandardCharsets.UTF_8);
|
||||
case INT32:
|
||||
return Integer.parseInt(value.toString());
|
||||
case INT64:
|
||||
|
|
|
@ -1,212 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors;
|
||||
|
||||
import org.apache.iotdb.db.utils.EnvironmentUtils;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.serialization.record.MockRecordParser;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
|
||||
public class PutIoTDBRecordIT {
|
||||
private TestRunner testRunner;
|
||||
private MockRecordParser recordReader;
|
||||
|
||||
@BeforeEach
|
||||
public void setRunner() {
|
||||
testRunner = TestRunners.newTestRunner(PutIoTDBRecord.class);
|
||||
recordReader = new MockRecordParser();
|
||||
testRunner.setProperty(PutIoTDBRecord.RECORD_READER_FACTORY, "reader");
|
||||
testRunner.setProperty(PutIoTDBRecord.IOTDB_HOST, "127.0.0.1");
|
||||
testRunner.setProperty(PutIoTDBRecord.USERNAME, "root");
|
||||
testRunner.setProperty(PutIoTDBRecord.PASSWORD, "root");
|
||||
testRunner.setProperty(PutIoTDBRecord.MAX_ROW_NUMBER, "1024");
|
||||
EnvironmentUtils.envSetUp();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void shutdown() throws Exception {
|
||||
testRunner.shutdown();
|
||||
recordReader.disabled();
|
||||
EnvironmentUtils.cleanEnv();
|
||||
EnvironmentUtils.shutdownDaemon();
|
||||
}
|
||||
|
||||
private void setUpStandardTestConfig() throws InitializationException {
|
||||
testRunner.addControllerService("reader", recordReader);
|
||||
testRunner.enableControllerService(recordReader);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertByNativeSchemaWithSingleDevice() throws InitializationException {
|
||||
setUpStandardTestConfig();
|
||||
|
||||
recordReader.addSchemaField("TIME", RecordFieldType.LONG);
|
||||
recordReader.addSchemaField("s1", RecordFieldType.INT);
|
||||
recordReader.addSchemaField("s2", RecordFieldType.LONG);
|
||||
recordReader.addSchemaField("s3", RecordFieldType.FLOAT);
|
||||
recordReader.addSchemaField("s4", RecordFieldType.DOUBLE);
|
||||
recordReader.addSchemaField("s5", RecordFieldType.BOOLEAN);
|
||||
recordReader.addSchemaField("s6", RecordFieldType.STRING);
|
||||
|
||||
recordReader.addRecord(1L, 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
recordReader.addRecord(2L, 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
recordReader.addRecord(3L, 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
recordReader.addRecord(4L, 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
recordReader.addRecord(5L, 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
recordReader.addRecord(6L, 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
recordReader.addRecord(7L, 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
|
||||
testRunner.setProperty(PutIoTDBRecord.TIME_FIELD, "TIME");
|
||||
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg0.d1.");
|
||||
testRunner.enqueue("");
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertByNativeSchemaWithTimeStamp() throws InitializationException {
|
||||
setUpStandardTestConfig();
|
||||
|
||||
recordReader.addSchemaField("Time", RecordFieldType.TIMESTAMP);
|
||||
recordReader.addSchemaField("s1", RecordFieldType.INT);
|
||||
recordReader.addSchemaField("s2", RecordFieldType.LONG);
|
||||
recordReader.addSchemaField("s3", RecordFieldType.FLOAT);
|
||||
recordReader.addSchemaField("s4", RecordFieldType.DOUBLE);
|
||||
recordReader.addSchemaField("s5", RecordFieldType.BOOLEAN);
|
||||
recordReader.addSchemaField("s6", RecordFieldType.STRING);
|
||||
|
||||
recordReader.addRecord(new Timestamp(1L), 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
recordReader.addRecord(new Timestamp(2L), 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
recordReader.addRecord(new Timestamp(3L), 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
recordReader.addRecord(new Timestamp(4L), 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
recordReader.addRecord(new Timestamp(5L), 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
recordReader.addRecord(new Timestamp(6L), 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
recordReader.addRecord(new Timestamp(7L), 1, 2L, 3.0F, 4.0D, true, "abc");
|
||||
|
||||
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg1.d1.");
|
||||
testRunner.enqueue("");
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertByNativeSchemaWithNullValue() throws InitializationException {
|
||||
setUpStandardTestConfig();
|
||||
|
||||
recordReader.addSchemaField("Time", RecordFieldType.LONG);
|
||||
recordReader.addSchemaField("s1", RecordFieldType.INT);
|
||||
recordReader.addSchemaField("s2", RecordFieldType.LONG);
|
||||
|
||||
recordReader.addRecord(1L, 1, 2L);
|
||||
recordReader.addRecord(2L, 1, 2L);
|
||||
recordReader.addRecord(3L, 1, null);
|
||||
|
||||
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg2.d1.");
|
||||
testRunner.enqueue("");
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertByNativeSchemaWithEmptyValue() throws InitializationException {
|
||||
setUpStandardTestConfig();
|
||||
|
||||
recordReader.addSchemaField("Time", RecordFieldType.LONG);
|
||||
recordReader.addSchemaField("s1", RecordFieldType.INT);
|
||||
recordReader.addSchemaField("s2", RecordFieldType.LONG);
|
||||
|
||||
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg3.d1.");
|
||||
testRunner.enqueue("");
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertByNativeSchemaWithUnsupportedDataType() throws InitializationException {
|
||||
setUpStandardTestConfig();
|
||||
|
||||
recordReader.addSchemaField("Time", RecordFieldType.LONG);
|
||||
recordReader.addSchemaField("s1", RecordFieldType.ARRAY);
|
||||
|
||||
recordReader.addRecord(1L, new String[]{"1"});
|
||||
|
||||
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg4.d1.");
|
||||
testRunner.enqueue("");
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertByNativeSchemaWithoutTimeField() throws InitializationException {
|
||||
setUpStandardTestConfig();
|
||||
|
||||
recordReader.addSchemaField("s1", RecordFieldType.INT);
|
||||
recordReader.addSchemaField("s2", RecordFieldType.INT);
|
||||
recordReader.addRecord(1, 1);
|
||||
|
||||
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg5.d1.");
|
||||
testRunner.enqueue("");
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertByNativeSchemaWithWrongTimeType() throws InitializationException {
|
||||
setUpStandardTestConfig();
|
||||
|
||||
recordReader.addSchemaField("Time", RecordFieldType.INT);
|
||||
recordReader.addSchemaField("s1", RecordFieldType.INT);
|
||||
|
||||
recordReader.addRecord(1, 1);
|
||||
|
||||
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg5.d1.");
|
||||
testRunner.enqueue(new byte[]{});
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertByNativeSchemaNotStartWithRoot() throws InitializationException {
|
||||
setUpStandardTestConfig();
|
||||
|
||||
recordReader.addSchemaField("Time", RecordFieldType.LONG);
|
||||
recordReader.addSchemaField("s1", RecordFieldType.INT);
|
||||
|
||||
recordReader.addRecord(1L, 1);
|
||||
|
||||
testRunner.setProperty(PutIoTDBRecord.PREFIX, "sg6.d1.");
|
||||
testRunner.enqueue(new byte[]{});
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1);
|
||||
}
|
||||
}
|
|
@ -1,101 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors;
|
||||
|
||||
import org.apache.iotdb.db.utils.EnvironmentUtils;
|
||||
import org.apache.iotdb.rpc.IoTDBConnectionException;
|
||||
import org.apache.iotdb.rpc.StatementExecutionException;
|
||||
import org.apache.iotdb.session.Session;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class QueryIoTDBIT {
|
||||
private static final String WRITER_SERVICE_ID = "writer";
|
||||
|
||||
private static final String DEVICE_ID = "root.sg7.d1";
|
||||
|
||||
private static final String FIRST_MEASUREMENT = "s0";
|
||||
|
||||
private static final String SECOND_MEASUREMENT = "s1";
|
||||
|
||||
private static final long TIMESTAMP = 1;
|
||||
|
||||
private TestRunner testRunner;
|
||||
private MockRecordWriter recordWriter;
|
||||
private Session session;
|
||||
|
||||
@BeforeEach
|
||||
public void setRunner() throws IoTDBConnectionException, StatementExecutionException {
|
||||
testRunner = TestRunners.newTestRunner(QueryIoTDBRecord.class);
|
||||
recordWriter = new MockRecordWriter("header", true);
|
||||
testRunner.setProperty(QueryIoTDBRecord.RECORD_WRITER_FACTORY, WRITER_SERVICE_ID);
|
||||
testRunner.setProperty(QueryIoTDBRecord.IOTDB_HOST, "127.0.0.1");
|
||||
testRunner.setProperty(QueryIoTDBRecord.USERNAME, "root");
|
||||
testRunner.setProperty(QueryIoTDBRecord.PASSWORD, "root");
|
||||
session = new Session.Builder().build();
|
||||
session.open();
|
||||
|
||||
List<String> measurements = new ArrayList<>(2);
|
||||
measurements.add(FIRST_MEASUREMENT);
|
||||
measurements.add(SECOND_MEASUREMENT);
|
||||
|
||||
List<String> values = new ArrayList<>(2);
|
||||
values.add("5.0");
|
||||
values.add("6.0");
|
||||
session.insertRecord(DEVICE_ID, TIMESTAMP, measurements, values);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void shutdown() throws Exception {
|
||||
testRunner.shutdown();
|
||||
recordWriter.disabled();
|
||||
session.close();
|
||||
EnvironmentUtils.cleanEnv();
|
||||
EnvironmentUtils.shutdownDaemon();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueryIoTDBbyProperty() throws InitializationException {
|
||||
setUpStandardTestConfig();
|
||||
|
||||
final String query = String.format("SELECT %s, %s FROM %s", FIRST_MEASUREMENT, SECOND_MEASUREMENT, DEVICE_ID);
|
||||
testRunner.setProperty(QueryIoTDBRecord.QUERY, query);
|
||||
testRunner.enqueue(new byte[]{});
|
||||
testRunner.run();
|
||||
|
||||
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1);
|
||||
|
||||
final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(PutIoTDBRecord.REL_SUCCESS).get(0);
|
||||
flowFile.assertContentEquals("header\n\"1\",\"5.0\",\"6.0\"\n");
|
||||
flowFile.assertAttributeExists(CoreAttributes.MIME_TYPE.key());
|
||||
}
|
||||
|
||||
private void setUpStandardTestConfig() throws InitializationException {
|
||||
testRunner.addControllerService(WRITER_SERVICE_ID, recordWriter);
|
||||
testRunner.enableControllerService(recordWriter);
|
||||
}
|
||||
}
|
|
@ -31,7 +31,7 @@
|
|||
</modules>
|
||||
|
||||
<properties>
|
||||
<iotdb.sdk.version>1.2.2</iotdb.sdk.version>
|
||||
<iotdb.sdk.version>1.3.0</iotdb.sdk.version>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
|
|
Loading…
Reference in New Issue