mirror of https://github.com/apache/nifi.git
NIFI-12644 Exported json flows does not match minifi json schema resolved by adding new method transform-nifi to convert Nifi Flow json to MiNifi flow json #8393
Signed-off-by: Ferenc Kis <briansolo1985@gmail.com> This closes #8393.
This commit is contained in:
parent
44e20873b8
commit
fcbd7c690b
|
@ -18,11 +18,15 @@
|
|||
package org.apache.nifi.minifi.toolkit.configuration;
|
||||
|
||||
import static org.apache.nifi.minifi.toolkit.configuration.json.TransformYamlCommandFactory.TRANSFORM_YML;
|
||||
import static org.apache.nifi.minifi.toolkit.configuration.json.TransformNifiCommandFactory.TRANSFORM_NIFI;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.nifi.minifi.toolkit.configuration.json.TransformNifiCommandFactory;
|
||||
import org.apache.nifi.minifi.toolkit.configuration.json.TransformYamlCommandFactory;
|
||||
|
||||
public class ConfigMain {
|
||||
|
@ -41,9 +45,7 @@ public class ConfigMain {
|
|||
public ConfigMain(PathInputStreamFactory pathInputStreamFactory, PathOutputStreamFactory pathOutputStreamFactory) {
|
||||
this.pathInputStreamFactory = pathInputStreamFactory;
|
||||
this.pathOutputStreamFactory = pathOutputStreamFactory;
|
||||
this.commandMap = Map.of(
|
||||
TRANSFORM_YML, new TransformYamlCommandFactory(pathInputStreamFactory, pathOutputStreamFactory).create()
|
||||
);
|
||||
this.commandMap = createCommandMap();
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
@ -65,7 +67,6 @@ public class ConfigMain {
|
|||
commandMap.forEach((s, command) -> System.out.println(s + ": " + command.description));
|
||||
}
|
||||
|
||||
|
||||
public static class Command {
|
||||
private final Function<String[], Integer> function;
|
||||
private final String description;
|
||||
|
@ -75,4 +76,13 @@ public class ConfigMain {
|
|||
this.description = description;
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, Command> createCommandMap() {
|
||||
Map<String, Command> result = new TreeMap<>();
|
||||
result.put(TRANSFORM_YML,
|
||||
new TransformYamlCommandFactory(pathInputStreamFactory, pathOutputStreamFactory).create());
|
||||
result.put(TRANSFORM_NIFI,
|
||||
new TransformNifiCommandFactory(pathInputStreamFactory, pathOutputStreamFactory).create());
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.minifi.toolkit.configuration.json;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntrospector;
|
||||
|
||||
public class ObjectMapperUtils {
|
||||
|
||||
public static ObjectMapper createObjectMapper() {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.setDefaultPropertyInclusion(
|
||||
JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL));
|
||||
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
|
||||
objectMapper.setAnnotationIntrospector(new JakartaXmlBindAnnotationIntrospector(objectMapper.getTypeFactory()));
|
||||
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
return objectMapper;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.minifi.toolkit.configuration.json;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.nifi.controller.flow.VersionedDataflow;
|
||||
import org.apache.nifi.minifi.toolkit.configuration.ConfigMain;
|
||||
import org.apache.nifi.minifi.toolkit.configuration.ConfigTransformException;
|
||||
import org.apache.nifi.minifi.toolkit.configuration.PathInputStreamFactory;
|
||||
import org.apache.nifi.minifi.toolkit.configuration.PathOutputStreamFactory;
|
||||
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
public class TransformNifiCommandFactory {
|
||||
|
||||
public static final String TRANSFORM_NIFI = "transform-nifi";
|
||||
|
||||
private static final String COMMAND_DESCRIPTION = "Transform NiFi flow JSON format into MiNifi flow JSON format";
|
||||
|
||||
private final PathInputStreamFactory pathInputStreamFactory;
|
||||
private final PathOutputStreamFactory pathOutputStreamFactory;
|
||||
|
||||
public TransformNifiCommandFactory(PathInputStreamFactory pathInputStreamFactory,
|
||||
PathOutputStreamFactory pathOutputStreamFactory) {
|
||||
this.pathInputStreamFactory = pathInputStreamFactory;
|
||||
this.pathOutputStreamFactory = pathOutputStreamFactory;
|
||||
}
|
||||
|
||||
public ConfigMain.Command create() {
|
||||
return new ConfigMain.Command(this::transformNifiToJson, COMMAND_DESCRIPTION);
|
||||
}
|
||||
|
||||
private int transformNifiToJson(String[] args) {
|
||||
if (args.length != 3) {
|
||||
printTransformUsage();
|
||||
return ConfigMain.ERR_INVALID_ARGS;
|
||||
}
|
||||
|
||||
String sourceNiFiJsonPath = args[1];
|
||||
String targetMiNiFiJsonPath = args[2];
|
||||
|
||||
try {
|
||||
RegisteredFlowSnapshot registeredFlowSnapshot = readNifiFlow(sourceNiFiJsonPath);
|
||||
VersionedDataflow versionedDataflow = new VersionedDataflow();
|
||||
versionedDataflow.setRootGroup(registeredFlowSnapshot.getFlowContents());
|
||||
versionedDataflow
|
||||
.setParameterContexts(new ArrayList<>(registeredFlowSnapshot.getParameterContexts().values()));
|
||||
persistFlowJson(versionedDataflow, targetMiNiFiJsonPath);
|
||||
} catch (ConfigTransformException e) {
|
||||
System.out.println("Unable to convert NiFi JSON to MiNiFi flow JSON: " + e);
|
||||
return e.getErrorCode();
|
||||
}
|
||||
return ConfigMain.SUCCESS;
|
||||
}
|
||||
|
||||
private void printTransformUsage() {
|
||||
System.out.println("transform Nifi Usage:");
|
||||
System.out.println();
|
||||
System.out.println(" transform-nifi SOURCE_NIFI_JSON_FLOW_FILE TARGET_MINIFI_JSON_FLOW_FILE");
|
||||
System.out.println();
|
||||
}
|
||||
|
||||
private RegisteredFlowSnapshot readNifiFlow(String sourceNiFiJsonPath) throws ConfigTransformException {
|
||||
try (InputStream inputStream = pathInputStreamFactory.create(sourceNiFiJsonPath)) {
|
||||
ObjectMapper objectMapper = ObjectMapperUtils.createObjectMapper();
|
||||
return objectMapper.readValue(inputStream, RegisteredFlowSnapshot.class);
|
||||
} catch (IOException e) {
|
||||
throw new ConfigTransformException("Error when reading NiFi flow json file",
|
||||
ConfigMain.ERR_UNABLE_TO_OPEN_INPUT, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void persistFlowJson(VersionedDataflow flow, String flowJsonPath) throws ConfigTransformException {
|
||||
try (OutputStream outputStream = pathOutputStreamFactory.create(flowJsonPath)) {
|
||||
ObjectMapper objectMapper = ObjectMapperUtils.createObjectMapper();
|
||||
objectMapper.writeValue(outputStream, flow);
|
||||
} catch (IOException e) {
|
||||
throw new ConfigTransformException("Error when persisting flow JSON file: " + flowJsonPath,
|
||||
ConfigMain.ERR_UNABLE_TO_SAVE_CONFIG, e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -23,16 +23,13 @@ import static java.util.stream.Collectors.joining;
|
|||
import static java.util.stream.Collectors.toMap;
|
||||
import static org.apache.commons.io.IOUtils.write;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntrospector;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.nifi.controller.flow.VersionedDataflow;
|
||||
import org.apache.nifi.minifi.toolkit.configuration.ConfigMain;
|
||||
import org.apache.nifi.minifi.toolkit.configuration.ConfigTransformException;
|
||||
|
@ -45,6 +42,8 @@ import org.apache.nifi.minifi.toolkit.schema.exception.SchemaInstantiatonExcepti
|
|||
import org.apache.nifi.minifi.toolkit.schema.exception.SchemaLoaderException;
|
||||
import org.apache.nifi.minifi.toolkit.schema.serialization.SchemaLoader;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
public class TransformYamlCommandFactory {
|
||||
|
||||
public static final String TRANSFORM_YML = "transform-yml";
|
||||
|
@ -136,22 +135,13 @@ public class TransformYamlCommandFactory {
|
|||
|
||||
private void persistFlowJson(VersionedDataflow flow, String flowJsonPath) throws ConfigTransformException {
|
||||
try (OutputStream outputStream = pathOutputStreamFactory.create(flowJsonPath)) {
|
||||
ObjectMapper objectMapper = createObjectMapper();
|
||||
ObjectMapper objectMapper = ObjectMapperUtils.createObjectMapper();
|
||||
objectMapper.writeValue(outputStream, flow);
|
||||
} catch (IOException e) {
|
||||
throw new ConfigTransformException("Error when persisting flow JSON file: " + flowJsonPath, ConfigMain.ERR_UNABLE_TO_SAVE_CONFIG, e);
|
||||
}
|
||||
}
|
||||
|
||||
private ObjectMapper createObjectMapper() {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.setDefaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL));
|
||||
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
|
||||
objectMapper.setAnnotationIntrospector(new JakartaXmlBindAnnotationIntrospector(objectMapper.getTypeFactory()));
|
||||
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
return objectMapper;
|
||||
}
|
||||
|
||||
private void persistProperties(Properties properties, String bootstrapPropertiesPath) throws ConfigTransformException {
|
||||
try (OutputStream outputStream = pathOutputStreamFactory.create(bootstrapPropertiesPath)) {
|
||||
write(
|
||||
|
|
Loading…
Reference in New Issue