diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-nar/pom.xml b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-nar/pom.xml new file mode 100644 index 0000000000..501f2b4216 --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-nar/pom.xml @@ -0,0 +1,43 @@ + + + + + nifi-iotdb-bundle + org.apache.nifi + 1.20.0-SNAPSHOT + + 4.0.0 + + org.apache.nifi + nifi-iotdb-nar + nar + + + + org.apache.nifi + nifi-iotdb-processors + + + org.apache.nifi + nifi-standard-services-api-nar + ${project.version} + nar + + + + diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml new file mode 100644 index 0000000000..86cff9b8c6 --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/pom.xml @@ -0,0 +1,108 @@ + + + + + nifi-iotdb-bundle + org.apache.nifi + 1.20.0-SNAPSHOT + + 4.0.0 + + nifi-iotdb-processors + jar + + + + org.apache.iotdb + iotdb-session + ${iotdb.sdk.version} + + + commons-logging + commons-logging + + + ch.qos.logback + logback-classic + + + + + org.apache.nifi + nifi-api + ${project.version} + + + org.apache.nifi + nifi-record-serialization-service-api + ${project.version} + + + org.apache.nifi + nifi-record + ${project.version} + + + org.apache.nifi + nifi-utils + ${project.version} + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-databind + + + org.apache.iotdb + iotdb-server + ${iotdb.sdk.version} + test + + + org.apache.iotdb + iotdb-server + ${iotdb.sdk.version} + test-jar + test + + + org.glassfish.jersey.inject + jersey-hk2 + test + + + org.apache.nifi + nifi-mock + ${project.version} + test + + + org.apache.nifi + nifi-mock-record-utils + ${project.version} + test + + + diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java new file mode 100755 index 0000000000..f45275a22d --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/AbstractIoTDB.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.processors.model.DatabaseField; +import org.apache.nifi.processors.model.DatabaseSchema; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.model.ValidationResult; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +public abstract class AbstractIoTDB extends AbstractProcessor { + private static final int DEFAULT_IOTDB_PORT = 6667; + + protected static ObjectMapper mapper = new ObjectMapper(); + + private static final String FIELDS = "fields"; + + private static final Map typeMap = + new HashMap<>(); + + static final Set supportedType = + new HashSet<>(); + + static final PropertyDescriptor IOTDB_HOST = new PropertyDescriptor.Builder() + .name("Host") + .description("IoTDB server host address") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .required(true) + .build(); + + static final PropertyDescriptor IOTDB_PORT = new PropertyDescriptor.Builder() + .name("Port") + .description("IoTDB server port number") + .defaultValue(String.valueOf(DEFAULT_IOTDB_PORT)) + .addValidator(StandardValidators.PORT_VALIDATOR) + .required(true) + .build(); + + static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("Username") + .description("Username for access to IoTDB") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .required(true) + .build(); + + static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("Password for access to IoTDB") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .required(true) + .sensitive(true) + .build(); + + protected final static Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Processing succeeded") + .build(); + + protected final static Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Processing failed") + .build(); + + private static final List descriptors = new ArrayList<>(); + + private static final Set relationships = new LinkedHashSet<>(); + + static { + descriptors.add(IOTDB_HOST); + descriptors.add(IOTDB_PORT); + descriptors.add(USERNAME); + descriptors.add(PASSWORD); + + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + + typeMap.put(RecordFieldType.STRING, TSDataType.TEXT); + typeMap.put(RecordFieldType.BOOLEAN, TSDataType.BOOLEAN); + typeMap.put(RecordFieldType.INT, TSDataType.INT32); + typeMap.put(RecordFieldType.LONG, TSDataType.INT64); + typeMap.put(RecordFieldType.FLOAT, TSDataType.FLOAT); + typeMap.put(RecordFieldType.DOUBLE, TSDataType.DOUBLE); + + supportedType.add(RecordFieldType.BOOLEAN); + supportedType.add(RecordFieldType.STRING); + supportedType.add(RecordFieldType.INT); + supportedType.add(RecordFieldType.LONG); + supportedType.add(RecordFieldType.FLOAT); + supportedType.add(RecordFieldType.DOUBLE); + supportedType.add(RecordFieldType.TIMESTAMP); + supportedType.add(RecordFieldType.TIME); + supportedType.add(RecordFieldType.DATE); + } + + protected final AtomicReference session = new AtomicReference<>(null); + + @Override + public Set getRelationships() { + return relationships; + } + + @OnScheduled + public void onScheduled(ProcessContext context) throws IoTDBConnectionException { + if (session.get() == null) { + final String host = context.getProperty(IOTDB_HOST).getValue(); + final int port = Integer.parseInt(context.getProperty(IOTDB_PORT).getValue()); + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + + session.set(new Session.Builder() + .host(host) + .port(port) + .username(username) + .password(password) + .build()); + session.get().open(); + } + } + + @OnStopped + public void stop() { + if (session.get() != null) { + try { + session.get().close(); + } catch (final IoTDBConnectionException e) { + getLogger().error("IoTDB disconnection failed", e); + } + session.set(null); + } + } + + @Override + protected List getSupportedPropertyDescriptors() { + return Collections.unmodifiableList(descriptors); + } + + protected TSDataType getType(RecordFieldType type) { + return typeMap.get(type); + } + + protected ValidationResult validateSchemaAttribute(String schemaAttribute) { + JsonNode schema; + try { + schema = mapper.readTree(schemaAttribute); + } catch (JsonProcessingException e) { + return new ValidationResult(false, e.getMessage()); + } + Set keySet = new HashSet<>(); + schema.fieldNames().forEachRemaining(keySet::add); + + if (!keySet.contains(FIELDS)) { + String msg = "The JSON of schema must contain `fields`"; + return new ValidationResult(false, msg); + } + + for (int i = 0; i < schema.get(FIELDS).size(); i++) { + JsonNode field = schema.get(FIELDS).get(i); + Set fieldKeySet = new HashSet<>(); + + field.fieldNames().forEachRemaining(fieldKeySet::add); + if (!fieldKeySet.contains("tsName") || !fieldKeySet.contains("dataType")) { + String msg = "`tsName` or `dataType` has not been set"; + return new ValidationResult(false, msg); + } + + if (!DatabaseField.getSupportedDataType().contains(field.get("dataType").asText())) { + String msg = + String.format( + "Unknown `dataType`: %s. The supported dataTypes are %s", + field.get("dataType").asText(), DatabaseField.getSupportedDataType()); + return new ValidationResult(false, msg); + } + + Set supportedKeySet = new HashSet<>(); + supportedKeySet.add("tsName"); + supportedKeySet.add("dataType"); + supportedKeySet.add("encoding"); + supportedKeySet.add("compressionType"); + + Set tmpKetSet = new HashSet<>(); + tmpKetSet.addAll(supportedKeySet); + tmpKetSet.addAll(fieldKeySet); + tmpKetSet.removeAll(supportedKeySet); + if (!tmpKetSet.isEmpty()) { + String msg = "Unknown property or properties: " + tmpKetSet; + return new ValidationResult(false, msg); + } + + if (fieldKeySet.contains("compressionType") && !fieldKeySet.contains("encoding")) { + String msg = + "The `compressionType` has been set, but the `encoding` has not. The property `compressionType` will not take effect"; + return new ValidationResult(true, msg); + } + + if (field.get("encoding") != null + && !DatabaseField.getSupportedEncoding().contains(field.get("encoding").asText())) { + String msg = + String.format( + "Unknown `encoding`: %s, The supported encoding types are %s", + field.get("encoding").asText(), DatabaseField.getSupportedEncoding()); + return new ValidationResult(false, msg); + } + + if (field.get("compressionType") != null + && !DatabaseField.getSupportedCompressionType().contains(field.get("compressionType").asText())) { + String msg = + String.format( + "Unknown `compressionType`: %s, The supported compressionType are %s", + field.get("compressionType").asText(), DatabaseField.getSupportedCompressionType()); + return new ValidationResult(false, msg); + } + } + + return new ValidationResult(true, null); + } + + protected ValidationResult validateSchema(String timeField, RecordSchema recordSchema) { + List fieldNames = recordSchema.getFieldNames(); + List dataTypes = recordSchema.getDataTypes(); + if (!fieldNames.contains(timeField)) { + return new ValidationResult(false, "The fields must contain "+ timeField); + } + fieldNames.remove(timeField); + for (DataType type : dataTypes) { + RecordFieldType dataType = type.getFieldType(); + if (!supportedType.contains(dataType)) { + String msg = + String.format( + "Unknown `dataType`: %s. The supported dataTypes are %s", + dataType.toString(), supportedType); + return new ValidationResult(false, msg); + } + } + + return new ValidationResult(true, null); + } + + protected Map> parseSchema(final List fieldNames) { + final Map> deviceMeasurementMap = new LinkedHashMap<>(); + fieldNames.forEach( + field -> { + final List paths = new ArrayList<>(Arrays.asList(field.split("\\."))); + final int lastIndex = paths.size() - 1; + final String lastPath = paths.remove(lastIndex); + final String device = String.join(".", paths); + if (!deviceMeasurementMap.containsKey(device)) { + deviceMeasurementMap.put(device, new ArrayList<>()); + } + deviceMeasurementMap.get(device).add(lastPath); + }); + + return deviceMeasurementMap; + } + + protected Map generateTablets(DatabaseSchema schema, String prefix, int maxRowNumber) { + final Map> deviceMeasurementMap = parseSchema(schema.getFieldNames(prefix)); + final Map tablets = new LinkedHashMap<>(); + deviceMeasurementMap.forEach( + (device, measurements) -> { + ArrayList schemas = new ArrayList<>(); + for (String measurement : measurements) { + TSDataType dataType = schema.getDataType(measurement); + TSEncoding encoding = schema.getEncodingType(measurement); + CompressionType compressionType = schema.getCompressionType(measurement); + if (encoding == null) { + schemas.add(new MeasurementSchema(measurement, dataType)); + } else if (compressionType == null) { + schemas.add(new MeasurementSchema(measurement, dataType, encoding)); + } else { + schemas.add(new MeasurementSchema(measurement, dataType, encoding, compressionType)); + } + } + Tablet tablet = new Tablet(device, schemas, maxRowNumber); + tablets.put(device, tablet); + }); + return tablets; + } + + protected Object convertType(Object value, TSDataType type) { + switch (type) { + case TEXT: + return Binary.valueOf(String.valueOf(value)); + case INT32: + return Integer.parseInt(value.toString()); + case INT64: + return Long.parseLong(value.toString()); + case FLOAT: + return Float.parseFloat(value.toString()); + case DOUBLE: + return Double.parseDouble(value.toString()); + case BOOLEAN: + return Boolean.parseBoolean(value.toString()); + default: + return null; + } + } + + protected DatabaseSchema convertSchema(final String timeField, final RecordSchema recordSchema) { + final List fieldNames = recordSchema.getFieldNames(); + fieldNames.remove(timeField); + + final List fields = new ArrayList<>(); + fieldNames.forEach(fieldName -> { + final Optional dataTypeFound = recordSchema.getDataType(fieldName); + final DataType dataType = dataTypeFound.orElseThrow(() -> new IllegalArgumentException(String.format("Field [%s] Data Type not found", fieldName))); + final RecordFieldType recordFieldType = dataType.getFieldType(); + final TSDataType timeSeriesDataType = getType(recordFieldType); + final DatabaseField field = new DatabaseField(fieldName, timeSeriesDataType); + fields.add(field); + }); + return new DatabaseSchema(fields); + } +} diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDBRecord.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDBRecord.java new file mode 100755 index 0000000000..a677840a22 --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/PutIoTDBRecord.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.nifi.processors.model.DatabaseSchema; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.model.ValidationResult; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import java.sql.Timestamp; +import java.sql.Time; +import java.sql.Date; + +@Tags({"IoT", "Timeseries"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Read input FlowFile Records and write to Apache IoTDB") +public class PutIoTDBRecord extends AbstractIoTDB { + + static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder() + .name("Prefix") + .description("The Timeseries prefix where records will be stored. The prefix must begin with [root] and end with [.]") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + + static final PropertyDescriptor TIME_FIELD = new PropertyDescriptor.Builder() + .name("Time Field") + .description("The name of field containing the timestamp in FlowFile Records") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("Time") + .required(true) + .build(); + + static final PropertyDescriptor ALIGNED = new PropertyDescriptor.Builder() + .name("Aligned") + .description("Whether to use the Apache IoTDB Aligned Timeseries interface") + .allowableValues(Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .defaultValue(Boolean.FALSE.toString()) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + + static final PropertyDescriptor MAX_ROW_NUMBER = new PropertyDescriptor.Builder() + .name("Max Row Number") + .description("Maximum row number of each Apache IoTDB Tablet") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .defaultValue("1024") + .build(); + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("Record Reader") + .description("Record Reader used for parsing the incoming FlowFiles and determining the schema") + .identifiesControllerService(RecordReaderFactory.class) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + + static final PropertyDescriptor SCHEMA_TEMPLATE = new PropertyDescriptor.Builder() + .name("Schema Template") + .description("Apache IoTDB Schema Template defined using JSON. " + + "The Processor will infer the IoTDB Schema when this property is not configured. " + + "See the Apache IoTDB Documentation for more details: https://iotdb.apache.org/UserGuide/Master/Ecosystem-Integration/NiFi-IoTDB.html") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + + private static final String ROOT_PREFIX = "root."; + + @Override + protected List getSupportedPropertyDescriptors() { + final List propertyDescriptors = new ArrayList<>(super.getSupportedPropertyDescriptors()); + propertyDescriptors.add(PREFIX); + propertyDescriptors.add(TIME_FIELD); + propertyDescriptors.add(ALIGNED); + propertyDescriptors.add(MAX_ROW_NUMBER); + propertyDescriptors.add(RECORD_READER_FACTORY); + propertyDescriptors.add(SCHEMA_TEMPLATE); + return Collections.unmodifiableList(propertyDescriptors); + } + + @Override + public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException { + FlowFile flowFile = processSession.get(); + if (flowFile == null) { + return; + } + + final String prefix = processContext.getProperty(PREFIX).evaluateAttributeExpressions(flowFile).getValue(); + if (!prefix.startsWith(ROOT_PREFIX) || !prefix.endsWith(".")) { + getLogger().error("Prefix does not begin with [root] and end with [.] {}", flowFile); + processSession.transfer(flowFile, REL_FAILURE); + return; + } + + final String schemaProperty = processContext.getProperty(SCHEMA_TEMPLATE).evaluateAttributeExpressions(flowFile).getValue(); + final boolean aligned = processContext.getProperty(ALIGNED).evaluateAttributeExpressions(flowFile).asBoolean(); + final int maxRowNumber = processContext.getProperty(MAX_ROW_NUMBER).evaluateAttributeExpressions(flowFile).asInteger(); + final String timeField = processContext.getProperty(TIME_FIELD).evaluateAttributeExpressions(flowFile).getValue(); + final RecordReaderFactory recordParserFactory = processContext.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class); + + try (final InputStream inputStream = processSession.read(flowFile); + final RecordReader recordReader = recordParserFactory.createRecordReader(flowFile, inputStream, getLogger())) { + final DatabaseSchema schema = getSchema(timeField, schemaProperty, recordReader); + final Map tablets = generateTablets(schema, prefix, maxRowNumber); + + Record record; + while ((record = recordReader.nextRecord()) != null) { + long timestamp = getTimestamp(timeField, record); + boolean filled = false; + + for (final Map.Entry entry : tablets.entrySet()) { + Tablet tablet = entry.getValue(); + int rowIndex = tablet.rowSize++; + + tablet.addTimestamp(rowIndex, timestamp); + List measurements = tablet.getSchemas(); + for (MeasurementSchema measurement : measurements) { + String id = measurement.getMeasurementId(); + TSDataType type = measurement.getType(); + Object value = getTypedValue(record.getValue(id), type); + tablet.addValue(id, rowIndex, value); + } + filled = tablet.rowSize == tablet.getMaxRowNumber(); + } + if (filled) { + if (aligned) { + session.get().insertAlignedTablets(tablets); + } else { + session.get().insertTablets(tablets); + } + tablets.values().forEach(Tablet::reset); + } + } + + final AtomicBoolean remaining = new AtomicBoolean(false); + tablets.forEach( + (device, tablet) -> { + if (!remaining.get() && tablet.rowSize != 0) { + remaining.set(true); + } + }); + if (remaining.get()) { + if (aligned) { + session.get().insertAlignedTablets(tablets); + } else { + session.get().insertTablets(tablets); + } + } + } catch (final Exception e) { + getLogger().error("Processing failed {}", flowFile, e); + processSession.transfer(flowFile, REL_FAILURE); + return; + } + processSession.transfer(flowFile, REL_SUCCESS); + } + + private DatabaseSchema getSchema(String timeField, String property, RecordReader recordReader) throws MalformedRecordException, IOException { + final ValidationResult result = property == null + ? validateSchema(timeField, recordReader.getSchema()) + : validateSchemaAttribute(property); + + if (result.isValid()) { + return property == null + ? convertSchema(timeField, recordReader.getSchema()) + : mapper.readValue(property, DatabaseSchema.class); + } else { + final String message = String.format("Schema validation failed: %s", result.getMessage()); + throw new IllegalArgumentException(message); + } + } + + private long getTimestamp(final String timeField, final Record record) { + final long timestamp; + final Object time = record.getValue(timeField); + if (time instanceof Timestamp) { + Timestamp temp = (Timestamp) time; + timestamp = temp.getTime(); + } else if (time instanceof Time) { + Time temp = (Time) time; + timestamp = temp.getTime(); + } else if (time instanceof Date) { + Date temp = (Date) time; + timestamp = temp.getTime(); + } else if (time instanceof Long) { + timestamp = (Long) time; + } else { + throw new IllegalArgumentException(String.format("Unexpected Time Field Type: %s", time)); + } + return timestamp; + } + + private Object getTypedValue(Object value, TSDataType type) { + final Object typedValue; + if (value == null) { + typedValue = null; + } else { + try { + typedValue = convertType(value, type); + } catch (final Exception e) { + final String message = String.format("Value [%s] cannot be converted to the type [%s]", value, type); + throw new IllegalArgumentException(message, e); + } + } + return typedValue; + } +} diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseField.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseField.java new file mode 100755 index 0000000000..4c944cdf4c --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseField.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.model; + +import java.util.HashMap; +import java.util.Set; + +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + +public class DatabaseField { + private String tsName; + private TSDataType dataType; + private TSEncoding encoding; + private CompressionType compressionType; + + private static final HashMap typeMap = new HashMap<>(); + private static final HashMap encodingMap = new HashMap<>(); + + private static final HashMap compressionMap = new HashMap<>(); + + static { + typeMap.put("INT32", TSDataType.INT32); + typeMap.put("INT64", TSDataType.INT64); + typeMap.put("FLOAT", TSDataType.FLOAT); + typeMap.put("DOUBLE", TSDataType.DOUBLE); + typeMap.put("BOOLEAN", TSDataType.BOOLEAN); + typeMap.put("TEXT", TSDataType.TEXT); + + encodingMap.put("PLAIN", TSEncoding.PLAIN); + encodingMap.put("DICTIONARY", TSEncoding.DICTIONARY); + encodingMap.put("RLE", TSEncoding.RLE); + encodingMap.put("DIFF", TSEncoding.DIFF); + encodingMap.put("TS_2DIFF", TSEncoding.TS_2DIFF); + encodingMap.put("BITMAP", TSEncoding.BITMAP); + encodingMap.put("GORILLA_V1", TSEncoding.GORILLA_V1); + encodingMap.put("REGULAR", TSEncoding.REGULAR); + encodingMap.put("GORILLA", TSEncoding.GORILLA); + + compressionMap.put("UNCOMPRESSED", CompressionType.UNCOMPRESSED); + compressionMap.put("SNAPPY", CompressionType.SNAPPY); + compressionMap.put("GZIP", CompressionType.GZIP); + compressionMap.put("LZ4", CompressionType.LZ4); + } + + public DatabaseField() { + + } + + public DatabaseField(String tsName, TSDataType dataType) { + this.tsName = tsName; + this.dataType = dataType; + } + + public String getTsName() { + return tsName; + } + + public void setTsName(String tsName) { + this.tsName = tsName; + } + + public TSDataType getDataType() { + return dataType; + } + + public void setDataType(TSDataType dataType) { + this.dataType = dataType; + } + + public TSEncoding getEncoding() { + return encoding; + } + + public void setEncoding(TSEncoding encoding) { + this.encoding = encoding; + } + + public CompressionType getCompressionType() { + return compressionType; + } + + public void setCompressionType(CompressionType compressionType) { + this.compressionType = compressionType; + } + + public static Set getSupportedDataType() { + return typeMap.keySet(); + } + + public static Set getSupportedEncoding() { + return encodingMap.keySet(); + } + + public static Set getSupportedCompressionType() { + return compressionMap.keySet(); + } +} diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseSchema.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseSchema.java new file mode 100755 index 0000000000..954e8ef80e --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/DatabaseSchema.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.model; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + +public class DatabaseSchema { + private final Map fieldMap; + private final List fieldNames; + + @JsonCreator + public DatabaseSchema(@JsonProperty("fields") List fields) { + this.fieldMap = new LinkedHashMap<>(); + this.fieldNames = new ArrayList<>(); + fields.forEach( + field -> { + fieldMap.put(field.getTsName(), field); + fieldNames.add(field.getTsName()); + }); + } + + public List getFieldNames(String prefix) { + return fieldNames.stream() + .map(field -> prefix+field) + .collect(Collectors.toList()); + } + + public List getDataTypes() { + return fieldMap.values().stream() + .map(DatabaseField::getDataType) + .collect(Collectors.toList()); + } + + public List getEncodingTypes() { + return fieldMap.values().stream() + .map(DatabaseField::getEncoding) + .collect(Collectors.toList()); + } + + public List getCompressionTypes() { + return fieldMap.values().stream() + .map(DatabaseField::getCompressionType) + .collect(Collectors.toList()); + } + + public TSDataType getDataType(String fieldName) { + return fieldMap.get(fieldName).getDataType(); + } + + public TSEncoding getEncodingType(String fieldName) { + return fieldMap.get(fieldName).getEncoding(); + } + + public CompressionType getCompressionType(String fieldName) { + return fieldMap.get(fieldName).getCompressionType(); + } + +} diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/ValidationResult.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/ValidationResult.java new file mode 100755 index 0000000000..862b9a2646 --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/java/org/apache/nifi/processors/model/ValidationResult.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.model; + +public class ValidationResult { + private final boolean valid; + private final String message; + + public ValidationResult(final boolean valid, final String message) { + this.valid = valid; + this.message = message; + } + + public boolean isValid() { + return valid; + } + + public String getMessage() { + return message; + } +} diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100755 index 0000000000..4a19720633 --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.PutIoTDBRecord diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/AbstractIoTDBTest.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/AbstractIoTDBTest.java new file mode 100755 index 0000000000..974b77f719 --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/AbstractIoTDBTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.processors.model.DatabaseSchema; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.model.ValidationResult; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AbstractIoTDBTest { + private static TestAbstractIoTDBProcessor processor; + + @BeforeEach + public void init() { + processor = new TestAbstractIoTDBProcessor(); + } + + @Test + public void testValidateSchemaAttribute() { + // normal schema + String schemaAttribute = + "{\n" + + "\t\"fields\": [{\n" + + "\t\t\"tsName\": \"s1\",\n" + + "\t\t\"dataType\": \"INT32\",\n" + + "\t\t\"encoding\": \"RLE\"\n" + + "\t}, {\n" + + "\t\t\"tsName\": \"s2\",\n" + + "\t\t\"dataType\": \"DOUBLE\",\n" + + "\t\t\"encoding\": \"PLAIN\"\n" + + "\t}]\n" + + "}"; + + ValidationResult result = processor.validateSchemaAttribute(schemaAttribute); + assertTrue(result.isValid()); + assertNull(result.getMessage()); + + // schema with wrong field + schemaAttribute = + "{\n" + + "\t\"field\": [{\n" + + "\t\t\"tsName\": \"s1\",\n" + + "\t\t\"dataType\": \"INT32\",\n" + + "\t\t\"encoding\": \"RLE\"\n" + + "\t}, {\n" + + "\t\t\"tsName\": \"s2\",\n" + + "\t\t\"dataType\": \"DOUBLE\",\n" + + "\t\t\"encoding\": \"PLAIN\"\n" + + "\t}]\n" + + "}"; + result = processor.validateSchemaAttribute(schemaAttribute); + String exceptedMsg = "The JSON of schema must contain `fields`"; + + assertFalse(result.isValid()); + assertEquals(exceptedMsg, result.getMessage()); + + // schema without tsName + schemaAttribute = + "{\n" + + "\t\"fields\": [{\n" + + "\t\t\"dataType\": \"INT32\",\n" + + "\t\t\"encoding\": \"RLE\"\n" + + "\t}, {\n" + + "\t\t\"tsName\": \"s2\",\n" + + "\t\t\"dataType\": \"DOUBLE\",\n" + + "\t\t\"encoding\": \"PLAIN\"\n" + + "\t}]\n" + + "}"; + result = processor.validateSchemaAttribute(schemaAttribute); + exceptedMsg = "`tsName` or `dataType` has not been set"; + + assertFalse(result.isValid()); + assertEquals(exceptedMsg, result.getMessage()); + + // schema without data type + schemaAttribute = + "{\n" + + "\t\"fields\": [{\n" + + "\t\t\"tsName\": \"s1\",\n" + + "\t\t\"encoding\": \"RLE\"\n" + + "\t}, {\n" + + "\t\t\"tsName\": \"s2\",\n" + + "\t\t\"dataType\": \"DOUBLE\",\n" + + "\t\t\"encoding\": \"PLAIN\"\n" + + "\t}]\n" + + "}"; + result = processor.validateSchemaAttribute(schemaAttribute); + exceptedMsg = "`tsName` or `dataType` has not been set"; + + assertFalse(result.isValid()); + assertEquals(exceptedMsg, result.getMessage()); + + // schema with wrong data type + schemaAttribute = + "{\n" + + "\t\"fields\": [{\n" + + "\t\t\"tsName\": \"s1\",\n" + + "\t\t\"dataType\": \"INT\",\n" + + "\t\t\"encoding\": \"RLE\"\n" + + "\t}, {\n" + + "\t\t\"tsName\": \"s2\",\n" + + "\t\t\"dataType\": \"DOUBLE\",\n" + + "\t\t\"encoding\": \"PLAIN\"\n" + + "\t}]\n" + + "}"; + + result = processor.validateSchemaAttribute(schemaAttribute); + exceptedMsg = + "Unknown `dataType`: INT. The supported dataTypes are [FLOAT, INT64, INT32, TEXT, DOUBLE, BOOLEAN]"; + + assertFalse(result.isValid()); + assertEquals(exceptedMsg, result.getMessage()); + + // schema with wrong key + schemaAttribute = + "{\n" + + "\t\"fields\": [{\n" + + "\t\t\"tsName\": \"s1\",\n" + + "\t\t\"dataType\": \"INT32\",\n" + + "\t\t\"encode\": \"RLE\"\n" + + "\t}, {\n" + + "\t\t\"tsName\": \"s2\",\n" + + "\t\t\"dataType\": \"DOUBLE\",\n" + + "\t\t\"encoding\": \"PLAIN\"\n" + + "\t}]\n" + + "}"; + + result = processor.validateSchemaAttribute(schemaAttribute); + exceptedMsg = "Unknown property or properties: [encode]"; + + assertFalse(result.isValid()); + assertEquals(exceptedMsg, result.getMessage()); + + // schema with wrong compression type + schemaAttribute = + "{\n" + + "\t\"fields\": [{\n" + + "\t\t\"tsName\": \"s1\",\n" + + "\t\t\"dataType\": \"INT32\",\n" + + "\t\t\"encoding\": \"RLE\",\n" + + "\t\t\"compressionType\": \"ZIP\"\n" + + "\t}, {\n" + + "\t\t\"tsName\": \"s2\",\n" + + "\t\t\"dataType\": \"DOUBLE\",\n" + + "\t\t\"encoding\": \"PLAIN\",\n" + + "\t\t\"compressionType\": \"GZIP\"\n" + + "\t}]\n" + + "}"; + + result = processor.validateSchemaAttribute(schemaAttribute); + exceptedMsg = + "Unknown `compressionType`: ZIP, The supported compressionType are [UNCOMPRESSED, LZ4, GZIP, SNAPPY]"; + + assertFalse(result.isValid()); + assertEquals(exceptedMsg, result.getMessage()); + } + + @Test + public void testParseSchema() { + List filedNames = Arrays.asList("root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d2.s1"); + Map> deviceMeasurementMap = processor.parseSchema(filedNames); + Map> exceptedMap = new LinkedHashMap<>(); + exceptedMap.put("root.sg1.d1", Arrays.asList("s1","s2")); + exceptedMap.put("root.sg1.d2", Collections.singletonList("s1")); + assertEquals(exceptedMap, deviceMeasurementMap); + } + + @Test + public void testGenerateTablet() throws JsonProcessingException { + String schemaAttribute = + "{\n" + + "\t\"fields\": [{\n" + + "\t\t\"tsName\": \"s1\",\n" + + "\t\t\"dataType\": \"INT32\",\n" + + "\t\t\"encoding\": \"RLE\"\n" + + "\t}, {\n" + + "\t\t\"tsName\": \"s2\",\n" + + "\t\t\"dataType\": \"DOUBLE\",\n" + + "\t\t\"encoding\": \"PLAIN\"\n" + + "\t}]\n" + + "}"; + DatabaseSchema schema = new ObjectMapper().readValue(schemaAttribute, DatabaseSchema.class); + Map tablets = processor.generateTablets(schema, "root.test_sg.test_d1." ,1); + + Map exceptedTablets = new HashMap<>(); + List schemas = Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE), + new MeasurementSchema("s2", TSDataType.DOUBLE, TSEncoding.PLAIN)); + exceptedTablets.put("root.test_sg.test_d1", new Tablet("root.test_sg.test_d1", schemas, 1)); + + assertEquals("root.test_sg.test_d1", tablets.keySet().toArray()[0]); + assertEquals(exceptedTablets.get("root.test_sg.test_d1").getSchemas(), tablets.get("root.test_sg.test_d1").getSchemas()); + assertEquals(exceptedTablets.get("root.test_sg.test_d1").getMaxRowNumber(), tablets.get("root.test_sg.test_d1").getMaxRowNumber()); + assertEquals(exceptedTablets.get("root.test_sg.test_d1").getTimeBytesSize(), tablets.get("root.test_sg.test_d1").getTimeBytesSize()); + assertEquals(exceptedTablets.get("root.test_sg.test_d1").getTotalValueOccupation(), tablets.get("root.test_sg.test_d1").getTotalValueOccupation()); + assertEquals(exceptedTablets.get("root.test_sg.test_d1").deviceId, tablets.get("root.test_sg.test_d1").deviceId); + assertEquals(exceptedTablets.get("root.test_sg.test_d1").rowSize, tablets.get("root.test_sg.test_d1").rowSize); + } + + public static class TestAbstractIoTDBProcessor extends AbstractIoTDB { + + @Override + public void onTrigger(ProcessContext processContext, ProcessSession processSession) + throws ProcessException { + } + } +} diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordIT.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordIT.java new file mode 100755 index 0000000000..f1b4507fd5 --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordIT.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors; + +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +import java.sql.Timestamp; + +public class PutIoTDBRecordIT { + private TestRunner testRunner; + private MockRecordParser recordReader; + + @BeforeEach + public void setRunner() { + testRunner = TestRunners.newTestRunner(PutIoTDBRecord.class); + recordReader = new MockRecordParser(); + testRunner.setProperty(PutIoTDBRecord.RECORD_READER_FACTORY, "reader"); + testRunner.setProperty(PutIoTDBRecord.IOTDB_HOST, "127.0.0.1"); + testRunner.setProperty(PutIoTDBRecord.USERNAME, "root"); + testRunner.setProperty(PutIoTDBRecord.PASSWORD, "root"); + testRunner.setProperty(PutIoTDBRecord.MAX_ROW_NUMBER, "1024"); + EnvironmentUtils.envSetUp(); + } + + @AfterEach + public void shutdown() throws Exception { + testRunner.shutdown(); + recordReader.disabled(); + EnvironmentUtils.cleanEnv(); + EnvironmentUtils.shutdownDaemon(); + } + + private void setUpStandardTestConfig() throws InitializationException { + testRunner.addControllerService("reader", recordReader); + testRunner.enableControllerService(recordReader); + } + + @Test + public void testInsertByNativeSchemaWithSingleDevice() throws InitializationException { + setUpStandardTestConfig(); + + recordReader.addSchemaField("TIME", RecordFieldType.LONG); + recordReader.addSchemaField("s1", RecordFieldType.INT); + recordReader.addSchemaField("s2", RecordFieldType.LONG); + recordReader.addSchemaField("s3", RecordFieldType.FLOAT); + recordReader.addSchemaField("s4", RecordFieldType.DOUBLE); + recordReader.addSchemaField("s5", RecordFieldType.BOOLEAN); + recordReader.addSchemaField("s6", RecordFieldType.STRING); + + recordReader.addRecord(1L, 1, 2L, 3.0F, 4.0D, true, "abc"); + recordReader.addRecord(2L, 1, 2L, 3.0F, 4.0D, true, "abc"); + recordReader.addRecord(3L, 1, 2L, 3.0F, 4.0D, true, "abc"); + recordReader.addRecord(4L, 1, 2L, 3.0F, 4.0D, true, "abc"); + recordReader.addRecord(5L, 1, 2L, 3.0F, 4.0D, true, "abc"); + recordReader.addRecord(6L, 1, 2L, 3.0F, 4.0D, true, "abc"); + recordReader.addRecord(7L, 1, 2L, 3.0F, 4.0D, true, "abc"); + + testRunner.setProperty(PutIoTDBRecord.TIME_FIELD, "TIME"); + testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg0.d1."); + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1); + } + + @Test + public void testInsertByNativeSchemaWithTimeStamp() throws InitializationException { + setUpStandardTestConfig(); + + recordReader.addSchemaField("Time", RecordFieldType.TIMESTAMP); + recordReader.addSchemaField("s1", RecordFieldType.INT); + recordReader.addSchemaField("s2", RecordFieldType.LONG); + recordReader.addSchemaField("s3", RecordFieldType.FLOAT); + recordReader.addSchemaField("s4", RecordFieldType.DOUBLE); + recordReader.addSchemaField("s5", RecordFieldType.BOOLEAN); + recordReader.addSchemaField("s6", RecordFieldType.STRING); + + recordReader.addRecord(new Timestamp(1L), 1, 2L, 3.0F, 4.0D, true, "abc"); + recordReader.addRecord(new Timestamp(2L), 1, 2L, 3.0F, 4.0D, true, "abc"); + recordReader.addRecord(new Timestamp(3L), 1, 2L, 3.0F, 4.0D, true, "abc"); + recordReader.addRecord(new Timestamp(4L), 1, 2L, 3.0F, 4.0D, true, "abc"); + recordReader.addRecord(new Timestamp(5L), 1, 2L, 3.0F, 4.0D, true, "abc"); + recordReader.addRecord(new Timestamp(6L), 1, 2L, 3.0F, 4.0D, true, "abc"); + recordReader.addRecord(new Timestamp(7L), 1, 2L, 3.0F, 4.0D, true, "abc"); + + testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg1.d1."); + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1); + } + + @Test + public void testInsertByNativeSchemaWithNullValue() throws InitializationException { + setUpStandardTestConfig(); + + recordReader.addSchemaField("Time", RecordFieldType.LONG); + recordReader.addSchemaField("s1", RecordFieldType.INT); + recordReader.addSchemaField("s2", RecordFieldType.LONG); + + recordReader.addRecord(1L, 1, 2L); + recordReader.addRecord(2L, 1, 2L); + recordReader.addRecord(3L, 1, null); + + testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg2.d1."); + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1); + } + + @Test + public void testInsertByNativeSchemaWithEmptyValue() throws InitializationException { + setUpStandardTestConfig(); + + recordReader.addSchemaField("Time", RecordFieldType.LONG); + recordReader.addSchemaField("s1", RecordFieldType.INT); + recordReader.addSchemaField("s2", RecordFieldType.LONG); + + testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg3.d1."); + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_SUCCESS, 1); + } + + @Test + public void testInsertByNativeSchemaWithUnsupportedDataType() throws InitializationException { + setUpStandardTestConfig(); + + recordReader.addSchemaField("Time", RecordFieldType.LONG); + recordReader.addSchemaField("s1", RecordFieldType.ARRAY); + + recordReader.addRecord(1L, new String[]{"1"}); + + testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg4.d1."); + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1); + } + + @Test + public void testInsertByNativeSchemaWithoutTimeField() throws InitializationException { + setUpStandardTestConfig(); + + recordReader.addSchemaField("s1", RecordFieldType.INT); + recordReader.addSchemaField("s2", RecordFieldType.INT); + recordReader.addRecord(1, 1); + + testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg5.d1."); + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1); + } + + @Test + public void testInsertByNativeSchemaWithWrongTimeType() throws InitializationException { + setUpStandardTestConfig(); + + recordReader.addSchemaField("Time", RecordFieldType.INT); + recordReader.addSchemaField("s1", RecordFieldType.INT); + + recordReader.addRecord(1, 1); + + testRunner.setProperty(PutIoTDBRecord.PREFIX, "root.sg5.d1."); + testRunner.enqueue(new byte[]{}); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1); + } + + @Test + public void testInsertByNativeSchemaNotStartWithRoot() throws InitializationException { + setUpStandardTestConfig(); + + recordReader.addSchemaField("Time", RecordFieldType.LONG); + recordReader.addSchemaField("s1", RecordFieldType.INT); + + recordReader.addRecord(1L, 1); + + testRunner.setProperty(PutIoTDBRecord.PREFIX, "sg6.d1."); + testRunner.enqueue(new byte[]{}); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutIoTDBRecord.REL_FAILURE, 1); + } +} diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordTest.java b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordTest.java new file mode 100755 index 0000000000..0e77256e57 --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/nifi-iotdb-processors/src/test/java/org/apache/nifi/processors/PutIoTDBRecordTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.processors.model.DatabaseSchema; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class PutIoTDBRecordTest { + + @Test + public void testParseSchemaByAttribute() throws JsonProcessingException { + String schemaAttribute = + "{" + + "\"fields\": [" + + "{\"tsName\": \"s1\",\"dataType\": \"INT32\", \"encoding\": \"PLAIN\", \"compressionType\": \"SNAPPY\"}," + + "{\"tsName\": \"s2\",\"dataType\": \"BOOLEAN\", \"encoding\": \"PLAIN\", \"compressionType\": \"GZIP\"}," + + "{\"tsName\": \"s3\",\"dataType\": \"TEXT\", \"encoding\": \"DICTIONARY\"}" + + "]" + + "}"; + List exceptedFieldNames = Arrays.asList("root.sg.d1.s1","root.sg.d1.s2","root.sg.d1.s3"); + List exceptedDataTypes = Arrays.asList(TSDataType.INT32, TSDataType.BOOLEAN, TSDataType.TEXT); + List exceptedEncodings = Arrays.asList(TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.DICTIONARY); + + List exceptedCompressionTypes = Arrays.asList(CompressionType.SNAPPY, CompressionType.GZIP, null); + + DatabaseSchema schema = new ObjectMapper().readValue(schemaAttribute, DatabaseSchema.class); + assertEquals(exceptedFieldNames, schema.getFieldNames("root.sg.d1.")); + assertArrayEquals(exceptedDataTypes.stream().sorted().toArray(), schema.getDataTypes().stream().sorted().toArray()); + assertArrayEquals(exceptedEncodings.stream().sorted().toArray(), schema.getEncodingTypes().stream().sorted().toArray()); + assertArrayEquals(exceptedCompressionTypes.stream().filter(Objects::nonNull).sorted().toArray(), schema.getCompressionTypes().stream().filter(Objects::nonNull).sorted().toArray()); + } +} diff --git a/nifi-nar-bundles/nifi-iotdb-bundle/pom.xml b/nifi-nar-bundles/nifi-iotdb-bundle/pom.xml new file mode 100644 index 0000000000..77949401e1 --- /dev/null +++ b/nifi-nar-bundles/nifi-iotdb-bundle/pom.xml @@ -0,0 +1,47 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 1.20.0-SNAPSHOT + + + nifi-iotdb-bundle + pom + + + nifi-iotdb-processors + nifi-iotdb-nar + + + + 1.0.0 + + + + + + org.apache.nifi + nifi-iotdb-processors + ${project.version} + + + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 034c68ccab..c8a53d5b6b 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -122,6 +122,7 @@ nifi-shopify-bundle nifi-iceberg-bundle nifi-jslt-bundle + nifi-iotdb-bundle