NIFI-10234 Added PutIoTDBRecord Processor

This closes #6416

Signed-off-by: David Handermann <exceptionfactory@apache.org>
Co-authored-by: David Handermann <exceptionfactory@apache.org>
Co-authored-by: Xuan Ronaldo <xuanronaldo@qq.com>
Co-authored-by: Zhizhou Li <lizhizhou1983@gmail.com>
This commit is contained in:
lizhizhou 2023-01-06 14:33:58 +08:00 committed by exceptionfactory
parent 0c0f7e87be
commit 40ccb71f85
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
13 changed files with 1565 additions and 0 deletions

View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-iotdb-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.20.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-iotdb-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-iotdb-processors</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>${project.version}</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,108 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-iotdb-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.20.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-iotdb-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>${iotdb.sdk.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-server</artifactId>
<version>${iotdb.sdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-server</artifactId>
<version>${iotdb.sdk.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,359 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.processors.model.DatabaseField;
import org.apache.nifi.processors.model.DatabaseSchema;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.model.ValidationResult;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
public abstract class AbstractIoTDB extends AbstractProcessor {
private static final int DEFAULT_IOTDB_PORT = 6667;
protected static ObjectMapper mapper = new ObjectMapper();
private static final String FIELDS = "fields";
private static final Map<RecordFieldType, TSDataType> typeMap =
new HashMap<>();
static final Set<RecordFieldType> supportedType =
new HashSet<>();
static final PropertyDescriptor IOTDB_HOST = new PropertyDescriptor.Builder()
.name("Host")
.description("IoTDB server host address")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.required(true)
.build();
static final PropertyDescriptor IOTDB_PORT = new PropertyDescriptor.Builder()
.name("Port")
.description("IoTDB server port number")
.defaultValue(String.valueOf(DEFAULT_IOTDB_PORT))
.addValidator(StandardValidators.PORT_VALIDATOR)
.required(true)
.build();
static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("Username")
.description("Username for access to IoTDB")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.required(true)
.build();
static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("Password for access to IoTDB")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.required(true)
.sensitive(true)
.build();
protected final static Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Processing succeeded")
.build();
protected final static Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Processing failed")
.build();
private static final List<PropertyDescriptor> descriptors = new ArrayList<>();
private static final Set<Relationship> relationships = new LinkedHashSet<>();
static {
descriptors.add(IOTDB_HOST);
descriptors.add(IOTDB_PORT);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
typeMap.put(RecordFieldType.STRING, TSDataType.TEXT);
typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN);
typeMap.put(RecordFieldType.INT, TSDataType.INT32);
typeMap.put(RecordFieldType.LONG, TSDataType.INT64);
typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT);
typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE);
supportedType.add(RecordFieldType.BOOLEAN);
supportedType.add(RecordFieldType.STRING);
supportedType.add(RecordFieldType.INT);
supportedType.add(RecordFieldType.LONG);
supportedType.add(RecordFieldType.FLOAT);
supportedType.add(RecordFieldType.DOUBLE);
supportedType.add(RecordFieldType.TIMESTAMP);
supportedType.add(RecordFieldType.TIME);
supportedType.add(RecordFieldType.DATE);
}
protected final AtomicReference<Session> session = new AtomicReference<>(null);
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@OnScheduled
public void onScheduled(ProcessContext context) throws IoTDBConnectionException {
if (session.get() == null) {
final String host = context.getProperty(IOTDB_HOST).getValue();
final int port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue());
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
session.set(new Session.Builder()
.host(host)
.port(port)
.username(username)
.password(password)
.build());
session.get().open();
}
}
@OnStopped
public void stop() {
if (session.get() != null) {
try {
session.get().close();
} catch (final IoTDBConnectionException e) {
getLogger().error("IoTDB disconnection failed", e);
}
session.set(null);
}
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.unmodifiableList(descriptors);
}
protected TSDataType getType(RecordFieldType type) {
return typeMap.get(type);
}
protected ValidationResult validateSchemaAttribute(String schemaAttribute) {
JsonNode schema;
try {
schema = mapper.readTree(schemaAttribute);
} catch (JsonProcessingException e) {
return new ValidationResult(false, e.getMessage());
}
Set<String> keySet = new HashSet<>();
schema.fieldNames().forEachRemaining(keySet::add);
if (!keySet.contains(FIELDS)) {
String msg = "The JSON of schema must contain `fields`";
return new ValidationResult(false, msg);
}
for (int i = 0; i < schema.get(FIELDS).size(); i++) {
JsonNode field = schema.get(FIELDS).get(i);
Set<String> fieldKeySet = new HashSet<>();
field.fieldNames().forEachRemaining(fieldKeySet::add);
if (!fieldKeySet.contains("tsName") || !fieldKeySet.contains("dataType")) {
String msg = "`tsName` or `dataType` has not been set";
return new ValidationResult(false, msg);
}
if (!DatabaseField.getSupportedDataType().contains(field.get("dataType").asText())) {
String msg =
String.format(
"Unknown `dataType`: %s. The supported dataTypes are %s",
field.get("dataType").asText(), DatabaseField.getSupportedDataType());
return new ValidationResult(false, msg);
}
Set<String> supportedKeySet = new HashSet<>();
supportedKeySet.add("tsName");
supportedKeySet.add("dataType");
supportedKeySet.add("encoding");
supportedKeySet.add("compressionType");
Set<String> tmpKetSet = new HashSet<>();
tmpKetSet.addAll(supportedKeySet);
tmpKetSet.addAll(fieldKeySet);
tmpKetSet.removeAll(supportedKeySet);
if (!tmpKetSet.isEmpty()) {
String msg = "Unknown property or properties: " + tmpKetSet;
return new ValidationResult(false, msg);
}
if (fieldKeySet.contains("compressionType") && !fieldKeySet.contains("encoding")) {
String msg =
"The `compressionType` has been set, but the `encoding` has not. The property `compressionType` will not take effect";
return new ValidationResult(true, msg);
}
if (field.get("encoding") != null
&& !DatabaseField.getSupportedEncoding().contains(field.get("encoding").asText())) {
String msg =
String.format(
"Unknown `encoding`: %s, The supported encoding types are %s",
field.get("encoding").asText(), DatabaseField.getSupportedEncoding());
return new ValidationResult(false, msg);
}
if (field.get("compressionType") != null
&& !DatabaseField.getSupportedCompressionType().contains(field.get("compressionType").asText())) {
String msg =
String.format(
"Unknown `compressionType`: %s, The supported compressionType are %s",
field.get("compressionType").asText(), DatabaseField.getSupportedCompressionType());
return new ValidationResult(false, msg);
}
}
return new ValidationResult(true, null);
}
protected ValidationResult validateSchema(String timeField, RecordSchema recordSchema) {
List<String> fieldNames = recordSchema.getFieldNames();
List<DataType> dataTypes = recordSchema.getDataTypes();
if (!fieldNames.contains(timeField)) {
return new ValidationResult(false, "The fields must contain "+ timeField);
}
fieldNames.remove(timeField);
for (DataType type : dataTypes) {
RecordFieldType dataType = type.getFieldType();
if (!supportedType.contains(dataType)) {
String msg =
String.format(
"Unknown `dataType`: %s. The supported dataTypes are %s",
dataType.toString(), supportedType);
return new ValidationResult(false, msg);
}
}
return new ValidationResult(true, null);
}
protected Map<String, List<String>> parseSchema(final List<String> fieldNames) {
final Map<String, List<String>> deviceMeasurementMap = new LinkedHashMap<>();
fieldNames.forEach(
field -> {
final List<String> paths = new ArrayList<>(Arrays.asList(field.split("\\.")));
final int lastIndex = paths.size() - 1;
final String lastPath = paths.remove(lastIndex);
final String device = String.join(".", paths);
if (!deviceMeasurementMap.containsKey(device)) {
deviceMeasurementMap.put(device, new ArrayList<>());
}
deviceMeasurementMap.get(device).add(lastPath);
});
return deviceMeasurementMap;
}
protected Map<String, Tablet> generateTablets(DatabaseSchema schema, String prefix, int maxRowNumber) {
final Map<String, List<String>> deviceMeasurementMap = parseSchema(schema.getFieldNames(prefix));
final Map<String, Tablet> tablets = new LinkedHashMap<>();
deviceMeasurementMap.forEach(
(device, measurements) -> {
ArrayList<MeasurementSchema> schemas = new ArrayList<>();
for (String measurement : measurements) {
TSDataType dataType = schema.getDataType(measurement);
TSEncoding encoding = schema.getEncodingType(measurement);
CompressionType compressionType = schema.getCompressionType(measurement);
if (encoding == null) {
schemas.add(new MeasurementSchema(measurement, dataType));
} else if (compressionType == null) {
schemas.add(new MeasurementSchema(measurement, dataType, encoding));
} else {
schemas.add(new MeasurementSchema(measurement, dataType, encoding, compressionType));
}
}
Tablet tablet = new Tablet(device, schemas, maxRowNumber);
tablets.put(device, tablet);
});
return tablets;
}
protected Object convertType(Object value, TSDataType type) {
switch (type) {
case TEXT:
return Binary.valueOf(String.valueOf(value));
case INT32:
return Integer.parseInt(value.toString());
case INT64:
return Long.parseLong(value.toString());
case FLOAT:
return Float.parseFloat(value.toString());
case DOUBLE:
return Double.parseDouble(value.toString());
case BOOLEAN:
return Boolean.parseBoolean(value.toString());
default:
return null;
}
}
protected DatabaseSchema convertSchema(final String timeField, final RecordSchema recordSchema) {
final List<String> fieldNames = recordSchema.getFieldNames();
fieldNames.remove(timeField);
final List<DatabaseField> fields = new ArrayList<>();
fieldNames.forEach(fieldName -> {
final Optional<DataType> dataTypeFound = recordSchema.getDataType(fieldName);
final DataType dataType = dataTypeFound.orElseThrow(() -> new IllegalArgumentException(String.format("Field [%s] Data Type not found", fieldName)));
final RecordFieldType recordFieldType = dataType.getFieldType();
final TSDataType timeSeriesDataType = getType(recordFieldType);
final DatabaseField field = new DatabaseField(fieldName, timeSeriesDataType);
fields.add(field);
});
return new DatabaseSchema(fields);
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.nifi.processors.model.DatabaseSchema;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.model.ValidationResult;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import java.sql.Timestamp;
import java.sql.Time;
import java.sql.Date;
@Tags({"IoT", "Timeseries"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Read input FlowFile Records and write to Apache IoTDB")
public class PutIoTDBRecord extends AbstractIoTDB {
static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
.name("Prefix")
.description("The Timeseries prefix where records will be stored. The prefix must begin with [root] and end with [.]")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
static final PropertyDescriptor TIME_FIELD = new PropertyDescriptor.Builder()
.name("Time Field")
.description("The name of field containing the timestamp in FlowFile Records")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("Time")
.required(true)
.build();
static final PropertyDescriptor ALIGNED = new PropertyDescriptor.Builder()
.name("Aligned")
.description("Whether to use the Apache IoTDB Aligned Timeseries interface")
.allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString())
.defaultValue(Boolean.FALSE.toString())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
static final PropertyDescriptor MAX_ROW_NUMBER = new PropertyDescriptor.Builder()
.name("Max Row Number")
.description("Maximum row number of each Apache IoTDB Tablet")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.defaultValue("1024")
.build();
static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
.name("Record Reader")
.description("Record Reader used for parsing the incoming FlowFiles and determining the schema")
.identifiesControllerService(RecordReaderFactory.class)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
static final PropertyDescriptor SCHEMA_TEMPLATE = new PropertyDescriptor.Builder()
.name("Schema Template")
.description("Apache IoTDB Schema Template defined using JSON. " +
"The Processor will infer the IoTDB Schema when this property is not configured. " +
"See the Apache IoTDB Documentation for more details: https://iotdb.apache.org/UserGuide/Master/Ecosystem-Integration/NiFi-IoTDB.html")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
private static final String ROOT_PREFIX = "root.";
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> propertyDescriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
propertyDescriptors.add(PREFIX);
propertyDescriptors.add(TIME_FIELD);
propertyDescriptors.add(ALIGNED);
propertyDescriptors.add(MAX_ROW_NUMBER);
propertyDescriptors.add(RECORD_READER_FACTORY);
propertyDescriptors.add(SCHEMA_TEMPLATE);
return Collections.unmodifiableList(propertyDescriptors);
}
@Override
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
FlowFile flowFile = processSession.get();
if (flowFile == null) {
return;
}
final String prefix = processContext.getProperty(PREFIX).evaluateAttributeExpressions(flowFile).getValue();
if (!prefix.startsWith(ROOT_PREFIX) || !prefix.endsWith(".")) {
getLogger().error("Prefix does not begin with [root] and end with [.] {}", flowFile);
processSession.transfer(flowFile, REL_FAILURE);
return;
}
final String schemaProperty = processContext.getProperty(SCHEMA_TEMPLATE).evaluateAttributeExpressions(flowFile).getValue();
final boolean aligned = processContext.getProperty(ALIGNED).evaluateAttributeExpressions(flowFile).asBoolean();
final int maxRowNumber = processContext.getProperty(MAX_ROW_NUMBER).evaluateAttributeExpressions(flowFile).asInteger();
final String timeField = processContext.getProperty(TIME_FIELD).evaluateAttributeExpressions(flowFile).getValue();
final RecordReaderFactory recordParserFactory = processContext.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
try (final InputStream inputStream = processSession.read(flowFile);
final RecordReader recordReader = recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) {
final DatabaseSchema schema = getSchema(timeField, schemaProperty, recordReader);
final Map<String, Tablet> tablets = generateTablets(schema, prefix, maxRowNumber);
Record record;
while ((record = recordReader.nextRecord()) != null) {
long timestamp = getTimestamp(timeField, record);
boolean filled = false;
for (final Map.Entry<String, Tablet> entry : tablets.entrySet()) {
Tablet tablet = entry.getValue();
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, timestamp);
List<MeasurementSchema> measurements = tablet.getSchemas();
for (MeasurementSchema measurement : measurements) {
String id = measurement.getMeasurementId();
TSDataType type = measurement.getType();
Object value = getTypedValue(record.getValue(id), type);
tablet.addValue(id, rowIndex, value);
}
filled = tablet.rowSize == tablet.getMaxRowNumber();
}
if (filled) {
if (aligned) {
session.get().insertAlignedTablets(tablets);
} else {
session.get().insertTablets(tablets);
}
tablets.values().forEach(Tablet::reset);
}
}
final AtomicBoolean remaining = new AtomicBoolean(false);
tablets.forEach(
(device, tablet) -> {
if (!remaining.get() && tablet.rowSize != 0) {
remaining.set(true);
}
});
if (remaining.get()) {
if (aligned) {
session.get().insertAlignedTablets(tablets);
} else {
session.get().insertTablets(tablets);
}
}
} catch (final Exception e) {
getLogger().error("Processing failed {}", flowFile, e);
processSession.transfer(flowFile, REL_FAILURE);
return;
}
processSession.transfer(flowFile, REL_SUCCESS);
}
private DatabaseSchema getSchema(String timeField, String property, RecordReader recordReader) throws MalformedRecordException, IOException {
final ValidationResult result = property == null
? validateSchema(timeField, recordReader.getSchema())
: validateSchemaAttribute(property);
if (result.isValid()) {
return property == null
? convertSchema(timeField, recordReader.getSchema())
: mapper.readValue(property, DatabaseSchema.class);
} else {
final String message = String.format("Schema validation failed: %s", result.getMessage());
throw new IllegalArgumentException(message);
}
}
private long getTimestamp(final String timeField, final Record record) {
final long timestamp;
final Object time = record.getValue(timeField);
if (time instanceof Timestamp) {
Timestamp temp = (Timestamp) time;
timestamp = temp.getTime();
} else if (time instanceof Time) {
Time temp = (Time) time;
timestamp = temp.getTime();
} else if (time instanceof Date) {
Date temp = (Date) time;
timestamp = temp.getTime();
} else if (time instanceof Long) {
timestamp = (Long) time;
} else {
throw new IllegalArgumentException(String.format("Unexpected Time Field Type: %s", time));
}
return timestamp;
}
private Object getTypedValue(Object value, TSDataType type) {
final Object typedValue;
if (value == null) {
typedValue = null;
} else {
try {
typedValue = convertType(value, type);
} catch (final Exception e) {
final String message = String.format("Value [%s] cannot be converted to the type [%s]", value, type);
throw new IllegalArgumentException(message, e);
}
}
return typedValue;
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.model;
import java.util.HashMap;
import java.util.Set;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
public class DatabaseField {
private String tsName;
private TSDataType dataType;
private TSEncoding encoding;
private CompressionType compressionType;
private static final HashMap<String, TSDataType> typeMap = new HashMap<>();
private static final HashMap<String, TSEncoding> encodingMap = new HashMap<>();
private static final HashMap<String, CompressionType> compressionMap = new HashMap<>();
static {
typeMap.put("INT32", TSDataType.INT32);
typeMap.put("INT64", TSDataType.INT64);
typeMap.put("FLOAT", TSDataType.FLOAT);
typeMap.put("DOUBLE", TSDataType.DOUBLE);
typeMap.put("BOOLEAN", TSDataType.BOOLEAN);
typeMap.put("TEXT", TSDataType.TEXT);
encodingMap.put("PLAIN", TSEncoding.PLAIN);
encodingMap.put("DICTIONARY", TSEncoding.DICTIONARY);
encodingMap.put("RLE", TSEncoding.RLE);
encodingMap.put("DIFF", TSEncoding.DIFF);
encodingMap.put("TS_2DIFF", TSEncoding.TS_2DIFF);
encodingMap.put("BITMAP", TSEncoding.BITMAP);
encodingMap.put("GORILLA_V1", TSEncoding.GORILLA_V1);
encodingMap.put("REGULAR", TSEncoding.REGULAR);
encodingMap.put("GORILLA", TSEncoding.GORILLA);
compressionMap.put("UNCOMPRESSED", CompressionType.UNCOMPRESSED);
compressionMap.put("SNAPPY", CompressionType.SNAPPY);
compressionMap.put("GZIP", CompressionType.GZIP);
compressionMap.put("LZ4", CompressionType.LZ4);
}
public DatabaseField() {
}
public DatabaseField(String tsName, TSDataType dataType) {
this.tsName = tsName;
this.dataType = dataType;
}
public String getTsName() {
return tsName;
}
public void setTsName(String tsName) {
this.tsName = tsName;
}
public TSDataType getDataType() {
return dataType;
}
public void setDataType(TSDataType dataType) {
this.dataType = dataType;
}
public TSEncoding getEncoding() {
return encoding;
}
public void setEncoding(TSEncoding encoding) {
this.encoding = encoding;
}
public CompressionType getCompressionType() {
return compressionType;
}
public void setCompressionType(CompressionType compressionType) {
this.compressionType = compressionType;
}
public static Set<String> getSupportedDataType() {
return typeMap.keySet();
}
public static Set<String> getSupportedEncoding() {
return encodingMap.keySet();
}
public static Set<String> getSupportedCompressionType() {
return compressionMap.keySet();
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.model;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
public class DatabaseSchema {
private final Map<String, DatabaseField> fieldMap;
private final List<String> fieldNames;
@JsonCreator
public DatabaseSchema(@JsonProperty("fields") List<DatabaseField> fields) {
this.fieldMap = new LinkedHashMap<>();
this.fieldNames = new ArrayList<>();
fields.forEach(
field -> {
fieldMap.put(field.getTsName(), field);
fieldNames.add(field.getTsName());
});
}
public List<String> getFieldNames(String prefix) {
return fieldNames.stream()
.map(field -> prefix+field)
.collect(Collectors.toList());
}
public List<TSDataType> getDataTypes() {
return fieldMap.values().stream()
.map(DatabaseField::getDataType)
.collect(Collectors.toList());
}
public List<TSEncoding> getEncodingTypes() {
return fieldMap.values().stream()
.map(DatabaseField::getEncoding)
.collect(Collectors.toList());
}
public List<CompressionType> getCompressionTypes() {
return fieldMap.values().stream()
.map(DatabaseField::getCompressionType)
.collect(Collectors.toList());
}
public TSDataType getDataType(String fieldName) {
return fieldMap.get(fieldName).getDataType();
}
public TSEncoding getEncodingType(String fieldName) {
return fieldMap.get(fieldName).getEncoding();
}
public CompressionType getCompressionType(String fieldName) {
return fieldMap.get(fieldName).getCompressionType();
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.model;
public class ValidationResult {
private final boolean valid;
private final String message;
public ValidationResult(final boolean valid, final String message) {
this.valid = valid;
this.message = message;
}
public boolean isValid() {
return valid;
}
public String getMessage() {
return message;
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.PutIoTDBRecord

View File

@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.processors.model.DatabaseSchema;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.model.ValidationResult;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AbstractIoTDBTest {
private static TestAbstractIoTDBProcessor processor;
@BeforeEach
public void init() {
processor = new TestAbstractIoTDBProcessor();
}
@Test
public void testValidateSchemaAttribute() {
// normal schema
String schemaAttribute =
"{\n"
+ "\t\"fields\": [{\n"
+ "\t\t\"tsName\": \"s1\",\n"
+ "\t\t\"dataType\": \"INT32\",\n"
+ "\t\t\"encoding\": \"RLE\"\n"
+ "\t}, {\n"
+ "\t\t\"tsName\": \"s2\",\n"
+ "\t\t\"dataType\": \"DOUBLE\",\n"
+ "\t\t\"encoding\": \"PLAIN\"\n"
+ "\t}]\n"
+ "}";
ValidationResult result = processor.validateSchemaAttribute(schemaAttribute);
assertTrue(result.isValid());
assertNull(result.getMessage());
// schema with wrong field
schemaAttribute =
"{\n"
+ "\t\"field\": [{\n"
+ "\t\t\"tsName\": \"s1\",\n"
+ "\t\t\"dataType\": \"INT32\",\n"
+ "\t\t\"encoding\": \"RLE\"\n"
+ "\t}, {\n"
+ "\t\t\"tsName\": \"s2\",\n"
+ "\t\t\"dataType\": \"DOUBLE\",\n"
+ "\t\t\"encoding\": \"PLAIN\"\n"
+ "\t}]\n"
+ "}";
result = processor.validateSchemaAttribute(schemaAttribute);
String exceptedMsg = "The JSON of schema must contain `fields`";
assertFalse(result.isValid());
assertEquals(exceptedMsg, result.getMessage());
// schema without tsName
schemaAttribute =
"{\n"
+ "\t\"fields\": [{\n"
+ "\t\t\"dataType\": \"INT32\",\n"
+ "\t\t\"encoding\": \"RLE\"\n"
+ "\t}, {\n"
+ "\t\t\"tsName\": \"s2\",\n"
+ "\t\t\"dataType\": \"DOUBLE\",\n"
+ "\t\t\"encoding\": \"PLAIN\"\n"
+ "\t}]\n"
+ "}";
result = processor.validateSchemaAttribute(schemaAttribute);
exceptedMsg = "`tsName` or `dataType` has not been set";
assertFalse(result.isValid());
assertEquals(exceptedMsg, result.getMessage());
// schema without data type
schemaAttribute =
"{\n"
+ "\t\"fields\": [{\n"
+ "\t\t\"tsName\": \"s1\",\n"
+ "\t\t\"encoding\": \"RLE\"\n"
+ "\t}, {\n"
+ "\t\t\"tsName\": \"s2\",\n"
+ "\t\t\"dataType\": \"DOUBLE\",\n"
+ "\t\t\"encoding\": \"PLAIN\"\n"
+ "\t}]\n"
+ "}";
result = processor.validateSchemaAttribute(schemaAttribute);
exceptedMsg = "`tsName` or `dataType` has not been set";
assertFalse(result.isValid());
assertEquals(exceptedMsg, result.getMessage());
// schema with wrong data type
schemaAttribute =
"{\n"
+ "\t\"fields\": [{\n"
+ "\t\t\"tsName\": \"s1\",\n"
+ "\t\t\"dataType\": \"INT\",\n"
+ "\t\t\"encoding\": \"RLE\"\n"
+ "\t}, {\n"
+ "\t\t\"tsName\": \"s2\",\n"
+ "\t\t\"dataType\": \"DOUBLE\",\n"
+ "\t\t\"encoding\": \"PLAIN\"\n"
+ "\t}]\n"
+ "}";
result = processor.validateSchemaAttribute(schemaAttribute);
exceptedMsg =
"Unknown `dataType`: INT. The supported dataTypes are [FLOAT, INT64, INT32, TEXT, DOUBLE, BOOLEAN]";
assertFalse(result.isValid());
assertEquals(exceptedMsg, result.getMessage());
// schema with wrong key
schemaAttribute =
"{\n"
+ "\t\"fields\": [{\n"
+ "\t\t\"tsName\": \"s1\",\n"
+ "\t\t\"dataType\": \"INT32\",\n"
+ "\t\t\"encode\": \"RLE\"\n"
+ "\t}, {\n"
+ "\t\t\"tsName\": \"s2\",\n"
+ "\t\t\"dataType\": \"DOUBLE\",\n"
+ "\t\t\"encoding\": \"PLAIN\"\n"
+ "\t}]\n"
+ "}";
result = processor.validateSchemaAttribute(schemaAttribute);
exceptedMsg = "Unknown property or properties: [encode]";
assertFalse(result.isValid());
assertEquals(exceptedMsg, result.getMessage());
// schema with wrong compression type
schemaAttribute =
"{\n"
+ "\t\"fields\": [{\n"
+ "\t\t\"tsName\": \"s1\",\n"
+ "\t\t\"dataType\": \"INT32\",\n"
+ "\t\t\"encoding\": \"RLE\",\n"
+ "\t\t\"compressionType\": \"ZIP\"\n"
+ "\t}, {\n"
+ "\t\t\"tsName\": \"s2\",\n"
+ "\t\t\"dataType\": \"DOUBLE\",\n"
+ "\t\t\"encoding\": \"PLAIN\",\n"
+ "\t\t\"compressionType\": \"GZIP\"\n"
+ "\t}]\n"
+ "}";
result = processor.validateSchemaAttribute(schemaAttribute);
exceptedMsg =
"Unknown `compressionType`: ZIP, The supported compressionType are [UNCOMPRESSED, LZ4, GZIP, SNAPPY]";
assertFalse(result.isValid());
assertEquals(exceptedMsg, result.getMessage());
}
@Test
public void testParseSchema() {
List<String> filedNames = Arrays.asList("root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d2.s1");
Map<String, List<String>> deviceMeasurementMap = processor.parseSchema(filedNames);
Map<String, List<String>> exceptedMap = new LinkedHashMap<>();
exceptedMap.put("root.sg1.d1", Arrays.asList("s1","s2"));
exceptedMap.put("root.sg1.d2", Collections.singletonList("s1"));
assertEquals(exceptedMap, deviceMeasurementMap);
}
@Test
public void testGenerateTablet() throws JsonProcessingException {
String schemaAttribute =
"{\n"
+ "\t\"fields\": [{\n"
+ "\t\t\"tsName\": \"s1\",\n"
+ "\t\t\"dataType\": \"INT32\",\n"
+ "\t\t\"encoding\": \"RLE\"\n"
+ "\t}, {\n"
+ "\t\t\"tsName\": \"s2\",\n"
+ "\t\t\"dataType\": \"DOUBLE\",\n"
+ "\t\t\"encoding\": \"PLAIN\"\n"
+ "\t}]\n"
+ "}";
DatabaseSchema schema = new ObjectMapper().readValue(schemaAttribute, DatabaseSchema.class);
Map<String, Tablet> tablets = processor.generateTablets(schema, "root.test_sg.test_d1." ,1);
Map<String, Tablet> exceptedTablets = new HashMap<>();
List<MeasurementSchema> schemas = Arrays.asList(
new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
new MeasurementSchema("s2", TSDataType.DOUBLE, TSEncoding.PLAIN));
exceptedTablets.put("root.test_sg.test_d1", new Tablet("root.test_sg.test_d1", schemas, 1));
assertEquals("root.test_sg.test_d1", tablets.keySet().toArray()[0]);
assertEquals(exceptedTablets.get("root.test_sg.test_d1").getSchemas(), tablets.get("root.test_sg.test_d1").getSchemas());
assertEquals(exceptedTablets.get("root.test_sg.test_d1").getMaxRowNumber(), tablets.get("root.test_sg.test_d1").getMaxRowNumber());
assertEquals(exceptedTablets.get("root.test_sg.test_d1").getTimeBytesSize(), tablets.get("root.test_sg.test_d1").getTimeBytesSize());
assertEquals(exceptedTablets.get("root.test_sg.test_d1").getTotalValueOccupation(), tablets.get("root.test_sg.test_d1").getTotalValueOccupation());
assertEquals(exceptedTablets.get("root.test_sg.test_d1").deviceId, tablets.get("root.test_sg.test_d1").deviceId);
assertEquals(exceptedTablets.get("root.test_sg.test_d1").rowSize, tablets.get("root.test_sg.test_d1").rowSize);
}
public static class TestAbstractIoTDBProcessor extends AbstractIoTDB {
@Override
public void onTrigger(ProcessContext processContext, ProcessSession processSession)
throws ProcessException {
}
}
}

View File

@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import java.sql.Timestamp;
public class PutIoTDBRecordIT {
private TestRunner testRunner;
private MockRecordParser recordReader;
@BeforeEach
public void setRunner() {
testRunner = TestRunners.newTestRunner(PutIoTDBRecord.class);
recordReader = new MockRecordParser();
testRunner.setProperty(PutIoTDBRecord.RECORD_READER_FACTORY, "reader");
testRunner.setProperty(PutIoTDBRecord.IOTDB_HOST, "127.0.0.1");
testRunner.setProperty(PutIoTDBRecord.USERNAME, "root");
testRunner.setProperty(PutIoTDBRecord.PASSWORD, "root");
testRunner.setProperty(PutIoTDBRecord.MAX_ROW_NUMBER, "1024");
EnvironmentUtils.envSetUp();
}
@AfterEach
public void shutdown() throws Exception {
testRunner.shutdown();
recordReader.disabled();
EnvironmentUtils.cleanEnv();
EnvironmentUtils.shutdownDaemon();
}
private void setUpStandardTestConfig() throws InitializationException {
testRunner.addControllerService("reader", recordReader);
testRunner.enableControllerService(recordReader);
}
@Test
public void testInsertByNativeSchemaWithSingleDevice() throws InitializationException {
setUpStandardTestConfig();
recordReader.addSchemaField("TIME", RecordFieldType.LONG);
recordReader.addSchemaField("s1", RecordFieldType.INT);
recordReader.addSchemaField("s2", RecordFieldType.LONG);
recordReader.addSchemaField("s3", RecordFieldType.FLOAT);
recordReader.addSchemaField("s4", RecordFieldType.DOUBLE);
recordReader.addSchemaField("s5", RecordFieldType.BOOLEAN);
recordReader.addSchemaField("s6", RecordFieldType.STRING);
recordReader.addRecord(1L, 1, 2L, 3.0F, 4.0D, true, "abc");
recordReader.addRecord(2L, 1, 2L, 3.0F, 4.0D, true, "abc");
recordReader.addRecord(3L, 1, 2L, 3.0F, 4.0D, true, "abc");
recordReader.addRecord(4L, 1, 2L, 3.0F, 4.0D, true, "abc");
recordReader.addRecord(5L, 1, 2L, 3.0F, 4.0D, true, "abc");
recordReader.addRecord(6L, 1, 2L, 3.0F, 4.0D, true, "abc");
recordReader.addRecord(7L, 1, 2L, 3.0F, 4.0D, true, "abc");
testRunner.setProperty(PutIoTDBRecord.TIME_FIELD, "TIME");
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg0.d1.");
testRunner.enqueue("");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1);
}
@Test
public void testInsertByNativeSchemaWithTimeStamp() throws InitializationException {
setUpStandardTestConfig();
recordReader.addSchemaField("Time", RecordFieldType.TIMESTAMP);
recordReader.addSchemaField("s1", RecordFieldType.INT);
recordReader.addSchemaField("s2", RecordFieldType.LONG);
recordReader.addSchemaField("s3", RecordFieldType.FLOAT);
recordReader.addSchemaField("s4", RecordFieldType.DOUBLE);
recordReader.addSchemaField("s5", RecordFieldType.BOOLEAN);
recordReader.addSchemaField("s6", RecordFieldType.STRING);
recordReader.addRecord(new Timestamp(1L), 1, 2L, 3.0F, 4.0D, true, "abc");
recordReader.addRecord(new Timestamp(2L), 1, 2L, 3.0F, 4.0D, true, "abc");
recordReader.addRecord(new Timestamp(3L), 1, 2L, 3.0F, 4.0D, true, "abc");
recordReader.addRecord(new Timestamp(4L), 1, 2L, 3.0F, 4.0D, true, "abc");
recordReader.addRecord(new Timestamp(5L), 1, 2L, 3.0F, 4.0D, true, "abc");
recordReader.addRecord(new Timestamp(6L), 1, 2L, 3.0F, 4.0D, true, "abc");
recordReader.addRecord(new Timestamp(7L), 1, 2L, 3.0F, 4.0D, true, "abc");
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg1.d1.");
testRunner.enqueue("");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1);
}
@Test
public void testInsertByNativeSchemaWithNullValue() throws InitializationException {
setUpStandardTestConfig();
recordReader.addSchemaField("Time", RecordFieldType.LONG);
recordReader.addSchemaField("s1", RecordFieldType.INT);
recordReader.addSchemaField("s2", RecordFieldType.LONG);
recordReader.addRecord(1L, 1, 2L);
recordReader.addRecord(2L, 1, 2L);
recordReader.addRecord(3L, 1, null);
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg2.d1.");
testRunner.enqueue("");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1);
}
@Test
public void testInsertByNativeSchemaWithEmptyValue() throws InitializationException {
setUpStandardTestConfig();
recordReader.addSchemaField("Time", RecordFieldType.LONG);
recordReader.addSchemaField("s1", RecordFieldType.INT);
recordReader.addSchemaField("s2", RecordFieldType.LONG);
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg3.d1.");
testRunner.enqueue("");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1);
}
@Test
public void testInsertByNativeSchemaWithUnsupportedDataType() throws InitializationException {
setUpStandardTestConfig();
recordReader.addSchemaField("Time", RecordFieldType.LONG);
recordReader.addSchemaField("s1", RecordFieldType.ARRAY);
recordReader.addRecord(1L, new String[]{"1"});
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg4.d1.");
testRunner.enqueue("");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1);
}
@Test
public void testInsertByNativeSchemaWithoutTimeField() throws InitializationException {
setUpStandardTestConfig();
recordReader.addSchemaField("s1", RecordFieldType.INT);
recordReader.addSchemaField("s2", RecordFieldType.INT);
recordReader.addRecord(1, 1);
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg5.d1.");
testRunner.enqueue("");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1);
}
@Test
public void testInsertByNativeSchemaWithWrongTimeType() throws InitializationException {
setUpStandardTestConfig();
recordReader.addSchemaField("Time", RecordFieldType.INT);
recordReader.addSchemaField("s1", RecordFieldType.INT);
recordReader.addRecord(1, 1);
testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg5.d1.");
testRunner.enqueue(new byte[]{});
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1);
}
@Test
public void testInsertByNativeSchemaNotStartWithRoot() throws InitializationException {
setUpStandardTestConfig();
recordReader.addSchemaField("Time", RecordFieldType.LONG);
recordReader.addSchemaField("s1", RecordFieldType.INT);
recordReader.addRecord(1L, 1);
testRunner.setProperty(PutIoTDBRecord.PREFIX, "sg6.d1.");
testRunner.enqueue(new byte[]{});
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.processors.model.DatabaseSchema;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class PutIoTDBRecordTest {
@Test
public void testParseSchemaByAttribute() throws JsonProcessingException {
String schemaAttribute =
"{"
+ "\"fields\": ["
+ "{\"tsName\": \"s1\",\"dataType\": \"INT32\", \"encoding\": \"PLAIN\", \"compressionType\": \"SNAPPY\"},"
+ "{\"tsName\": \"s2\",\"dataType\": \"BOOLEAN\", \"encoding\": \"PLAIN\", \"compressionType\": \"GZIP\"},"
+ "{\"tsName\": \"s3\",\"dataType\": \"TEXT\", \"encoding\": \"DICTIONARY\"}"
+ "]"
+ "}";
List<String> exceptedFieldNames = Arrays.asList("root.sg.d1.s1","root.sg.d1.s2","root.sg.d1.s3");
List<TSDataType> exceptedDataTypes = Arrays.asList(TSDataType.INT32, TSDataType.BOOLEAN, TSDataType.TEXT);
List<TSEncoding> exceptedEncodings = Arrays.asList(TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.DICTIONARY);
List<CompressionType> exceptedCompressionTypes = Arrays.asList(CompressionType.SNAPPY, CompressionType.GZIP, null);
DatabaseSchema schema = new ObjectMapper().readValue(schemaAttribute, DatabaseSchema.class);
assertEquals(exceptedFieldNames, schema.getFieldNames("root.sg.d1."));
assertArrayEquals(exceptedDataTypes.stream().sorted().toArray(), schema.getDataTypes().stream().sorted().toArray());
assertArrayEquals(exceptedEncodings.stream().sorted().toArray(), schema.getEncodingTypes().stream().sorted().toArray());
assertArrayEquals(exceptedCompressionTypes.stream().filter(Objects::nonNull).sorted().toArray(), schema.getCompressionTypes().stream().filter(Objects::nonNull).sorted().toArray());
}
}

View File

@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.20.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-iotdb-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-iotdb-processors</module>
<module>nifi-iotdb-nar</module>
</modules>
<properties>
<iotdb.sdk.version>1.0.0</iotdb.sdk.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-iotdb-processors</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -122,6 +122,7 @@
<module>nifi-shopify-bundle</module>
<module>nifi-iceberg-bundle</module>
<module>nifi-jslt-bundle</module>
<module>nifi-iotdb-bundle</module>
</modules>
<build>