From f6c2824f74990f22da4fc1f2e73607daa3fd00e6 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Tue, 7 Jul 2020 14:50:13 -0400 Subject: [PATCH] NIFI-7608 Add CLI command to replace contents of a process group Signed-off-by: Pierre Villard This closes #4392. --- .../impl/client/nifi/ProcessGroupClient.java | 12 ++ .../nifi/impl/JerseyProcessGroupClient.java | 68 +++++++++++ .../impl/command/nifi/NiFiCommandGroup.java | 2 + .../cli/impl/command/nifi/pg/PGReplace.java | 113 ++++++++++++++++++ 4 files changed, 195 insertions(+) create mode 100644 nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGReplace.java diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java index 5843d207db..7204e3016a 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProcessGroupClient.java @@ -19,6 +19,8 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi; import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupImportEntity; +import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity; import org.apache.nifi.web.api.entity.TemplateEntity; import org.apache.nifi.web.api.entity.VariableRegistryEntity; import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity; @@ -53,4 +55,14 @@ public interface ProcessGroupClient { throws NiFiClientException, IOException; TemplateEntity uploadTemplate(String processGroupId, TemplateDTO templateDTO) throws NiFiClientException, IOException; + + ProcessGroupReplaceRequestEntity replaceProcessGroup(String processGroupId, ProcessGroupImportEntity importEntity) + throws NiFiClientException, IOException; + + ProcessGroupReplaceRequestEntity getProcessGroupReplaceRequest(String processGroupId, String requestId) + throws NiFiClientException, IOException; + + ProcessGroupReplaceRequestEntity deleteProcessGroupReplaceRequest(String processGroupId, String requestId) + throws NiFiClientException, IOException; + } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java index 67e71cf133..e46badcdf0 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProcessGroupClient.java @@ -22,6 +22,8 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessGroupClient; import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupImportEntity; +import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity; import org.apache.nifi.web.api.entity.TemplateEntity; import org.apache.nifi.web.api.entity.VariableRegistryEntity; import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity; @@ -242,4 +244,70 @@ public class JerseyProcessGroupClient extends AbstractJerseyClient implements Pr ); }); } + + @Override + public ProcessGroupReplaceRequestEntity replaceProcessGroup(final String processGroupId, final ProcessGroupImportEntity importEntity) + throws NiFiClientException, IOException { + + if (StringUtils.isBlank(processGroupId)) { + throw new IllegalArgumentException("Process group id cannot be null or blank"); + } + + if (importEntity == null || importEntity.getVersionedFlowSnapshot() == null) { + throw new IllegalArgumentException("ProcessGroupImportEntity cannot be null and must have a non-null VersionedFlowSnapshot"); + } + + return executeAction("Error creating process group replacement request", () -> { + final WebTarget target = processGroupsTarget + .path("{processGroupId}/replace-requests") + .resolveTemplate("processGroupId", processGroupId); + + return getRequestBuilder(target).post( + Entity.entity(importEntity, MediaType.APPLICATION_JSON_TYPE), + ProcessGroupReplaceRequestEntity.class + ); + }); + } + + @Override + public ProcessGroupReplaceRequestEntity getProcessGroupReplaceRequest(final String processGroupId, final String requestId) + throws NiFiClientException, IOException { + + if (StringUtils.isBlank(processGroupId)) { + throw new IllegalArgumentException("Process group id cannot be null or blank"); + } + + if (StringUtils.isBlank(requestId)) { + throw new IllegalArgumentException("Request id cannot be null or blank"); + } + + return executeAction("Error getting process group replacement request", () -> { + final WebTarget target = processGroupsTarget + .path("replace-requests/{requestId}") + .resolveTemplate("requestId", requestId); + + return getRequestBuilder(target).get(ProcessGroupReplaceRequestEntity.class); + }); + } + + @Override + public ProcessGroupReplaceRequestEntity deleteProcessGroupReplaceRequest(final String processGroupId, final String requestId) + throws NiFiClientException, IOException { + + if (StringUtils.isBlank(processGroupId)) { + throw new IllegalArgumentException("Process group id cannot be null or blank"); + } + + if (StringUtils.isBlank(requestId)) { + throw new IllegalArgumentException("Request id cannot be null or blank"); + } + + return executeAction("Error deleting process group replacement request", () -> { + final WebTarget target = processGroupsTarget + .path("replace-requests/{requestId}") + .resolveTemplate("requestId", requestId); + + return getRequestBuilder(target).delete(ProcessGroupReplaceRequestEntity.class); + }); + } } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java index 5ce9024480..1c80c0647b 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java @@ -56,6 +56,7 @@ import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGGetVars; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGGetVersion; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGImport; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGList; +import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGReplace; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGSetParamContext; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGSetVar; import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGStart; @@ -125,6 +126,7 @@ public class NiFiCommandGroup extends AbstractCommandGroup { commands.add(new PGDisableControllerServices()); commands.add(new PGGetParamContext()); commands.add(new PGSetParamContext()); + commands.add(new PGReplace()); commands.add(new GetControllerServices()); commands.add(new GetControllerService()); commands.add(new CreateControllerService()); diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGReplace.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGReplace.java new file mode 100644 index 0000000000..feb5143119 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGReplace.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.toolkit.cli.impl.command.nifi.pg; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.cli.MissingOptionException; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.toolkit.cli.api.CommandException; +import org.apache.nifi.toolkit.cli.api.Context; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessGroupClient; +import org.apache.nifi.toolkit.cli.impl.command.CommandOption; +import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand; +import org.apache.nifi.toolkit.cli.impl.result.VoidResult; +import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupImportEntity; +import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity; + +import java.io.IOException; +import java.util.Properties; + +/** + * Command to replace the content of an existing process group with the content from a versioned flow snapshot. + */ +public class PGReplace extends AbstractNiFiCommand { + + public PGReplace() { + super("pg-replace", VoidResult.class); + } + + @Override + public String getDescription() { + return "Replaces the content of a process group with the content from the specified versioned flow snapshot."; + } + + @Override + protected void doInitialize(final Context context) { + addOption(CommandOption.PG_ID.createOption()); + addOption(CommandOption.INPUT_SOURCE.createOption()); + } + + @Override + public VoidResult doExecute(final NiFiClient client, final Properties properties) + throws NiFiClientException, IOException, MissingOptionException, CommandException { + + final String pgId = getRequiredArg(properties, CommandOption.PG_ID); + + final String inputFile = getRequiredArg(properties, CommandOption.INPUT_SOURCE); + final String contents = getInputSourceContent(inputFile); + + final ObjectMapper objectMapper = JacksonUtils.getObjectMapper(); + final VersionedFlowSnapshot deserializedSnapshot = objectMapper.readValue(contents, VersionedFlowSnapshot.class); + if (deserializedSnapshot == null) { + throw new IOException("Unable to deserialize flow version from " + inputFile); + } + + final ProcessGroupClient pgClient = client.getProcessGroupClient(); + final ProcessGroupEntity existingProcessGroup = pgClient.getProcessGroup(pgId); + + final ProcessGroupImportEntity importEntity = new ProcessGroupImportEntity(); + importEntity.setVersionedFlowSnapshot(deserializedSnapshot); + importEntity.setProcessGroupRevision(existingProcessGroup.getRevision()); + + final ProcessGroupReplaceRequestEntity createdReplaceRequestEntity = pgClient.replaceProcessGroup(pgId, importEntity); + final String requestId = createdReplaceRequestEntity.getRequest().getRequestId(); + try { + boolean completed = false; + for (int i = 0; i < 30; i++) { + final ProcessGroupReplaceRequestEntity replaceRequest = + pgClient.getProcessGroupReplaceRequest(pgId, requestId); + + if (replaceRequest != null && replaceRequest.getRequest().isComplete()) { + completed = true; + break; + } else { + try { + if (getContext().isInteractive()) { + println("Waiting for replacement request to complete..."); + } + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + if (!completed) { + throw new NiFiClientException("Unable to replace process group, cancelling request"); + } + + } finally { + pgClient.deleteProcessGroupReplaceRequest(pgId, requestId); + } + + return VoidResult.getInstance(); + } +}