mirror of https://github.com/apache/nifi.git
NIFI-13008 - CLI command to upgrade all instances of a versioned flow
Signed-off-by: Joe Gresock <jgresock@gmail.com> This closes #8611.
This commit is contained in:
parent
160c7ae24b
commit
ba1109f1f1
|
@ -81,6 +81,7 @@ The following are available commands:
|
|||
nifi pg-get-version
|
||||
nifi pg-stop-version-control
|
||||
nifi pg-change-version
|
||||
nifi pg-change-all-versions
|
||||
nifi pg-get-all-versions
|
||||
nifi pg-list
|
||||
nifi pg-status
|
||||
|
|
|
@ -170,7 +170,7 @@ public enum CommandOption {
|
|||
KERBEROS_PASSWORD("krbPw", "kerberosPassword", "The password for a kerberos principal", true),
|
||||
|
||||
// Miscellaneous
|
||||
FORCE("force", "force", "Indicates to force a delete operation", false),
|
||||
FORCE("force", "force", "Indicates to force the operation", false),
|
||||
OUTPUT_TYPE("ot", "outputType", "The type of output to produce (json or simple)", true),
|
||||
VERBOSE("verbose", "verbose", "Indicates that verbose output should be provided", false),
|
||||
RECURSIVE("r", "recursive", "Indicates the command should perform the action recursively", false),
|
||||
|
|
|
@ -62,6 +62,7 @@ import org.apache.nifi.toolkit.cli.impl.command.nifi.params.RemoveInheritedParam
|
|||
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.SetInheritedParamContexts;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.SetParam;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.SetParamProviderProperty;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGChangeAllVersions;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGChangeVersion;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGConnect;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGCreate;
|
||||
|
@ -133,6 +134,7 @@ public class NiFiCommandGroup extends AbstractCommandGroup {
|
|||
commands.add(new PGGetVersion());
|
||||
commands.add(new PGStopVersionControl());
|
||||
commands.add(new PGChangeVersion());
|
||||
commands.add(new PGChangeAllVersions());
|
||||
commands.add(new PGGetAllVersions());
|
||||
commands.add(new PGList());
|
||||
commands.add(new PGStatus());
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.toolkit.cli.impl.command.nifi.pg;
|
||||
|
||||
import org.apache.commons.cli.MissingOptionException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.toolkit.cli.api.CommandException;
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.FlowClient;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.nifi.ProcessGroupsResult;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.nifi.ProcessGroupsVersionChangeResult;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.nifi.ChangeVersionResult;
|
||||
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
|
||||
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Command to change the version of a version controlled process group.
|
||||
*/
|
||||
public class PGChangeAllVersions extends AbstractNiFiCommand<ProcessGroupsVersionChangeResult> {
|
||||
|
||||
public PGChangeAllVersions() {
|
||||
super("pg-change-all-versions", ProcessGroupsVersionChangeResult.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "Changes the version for all of the controlled process group instances for a given flow ID. "
|
||||
+ "This can be used to upgrade all the instances of a versioned flow to a new version, or "
|
||||
+ "revert to a previous version. If no version is specified, the latest version will be used. "
|
||||
+ "If no process group ID is provided, the root process group will be used to recursively "
|
||||
+ "search for all instances of the Flow ID. It is possible to force the recursive operation "
|
||||
+ "and not stop the operation in case the upgrade of a process group fails.";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doInitialize(final Context context) {
|
||||
addOption(CommandOption.FLOW_ID.createOption());
|
||||
addOption(CommandOption.FLOW_VERSION.createOption());
|
||||
addOption(CommandOption.PG_ID.createOption());
|
||||
addOption(CommandOption.FORCE.createOption());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessGroupsVersionChangeResult doExecute(final NiFiClient client, final Properties properties)
|
||||
throws NiFiClientException, IOException, MissingOptionException, CommandException {
|
||||
|
||||
final FlowClient flowClient = client.getFlowClient();
|
||||
final String flowId = getRequiredArg(properties, CommandOption.FLOW_ID);
|
||||
|
||||
// get the optional id of the parent PG, otherwise fallback to the root group
|
||||
String parentPgId = getArg(properties, CommandOption.PG_ID);
|
||||
if (StringUtils.isBlank(parentPgId)) {
|
||||
parentPgId = flowClient.getRootGroupId();
|
||||
}
|
||||
|
||||
final PGList doPGList = new PGList();
|
||||
final List<ProcessGroupDTO> pgList = new ArrayList<ProcessGroupDTO>();
|
||||
recursivePGList(pgList, doPGList, client, properties, parentPgId);
|
||||
|
||||
final PGChangeVersion doPGChangeVersion = new PGChangeVersion();
|
||||
|
||||
// new version, if specified in the arguments
|
||||
String newVersion = getArg(properties, CommandOption.FLOW_VERSION);
|
||||
|
||||
// force operation, if specified in the arguments
|
||||
final boolean forceOperation = properties.containsKey(CommandOption.FORCE.getLongName());
|
||||
|
||||
final List<ProcessGroupDTO> processGroups = new ArrayList<>();
|
||||
final Map<String, ChangeVersionResult> changeVersionResults = new HashMap<String, ChangeVersionResult>();
|
||||
|
||||
for (final ProcessGroupDTO pgDTO : pgList) {
|
||||
final VersionControlInformationEntity entity = client.getVersionsClient().getVersionControlInfo(pgDTO.getId());
|
||||
|
||||
if(entity.getVersionControlInformation() == null || !entity.getVersionControlInformation().getFlowId().equals(flowId)) {
|
||||
continue; // the process group is not version controlled or does not match the provided Flow ID
|
||||
}
|
||||
|
||||
if(newVersion == null) {
|
||||
newVersion = doPGChangeVersion.getLatestVersion(client, entity.getVersionControlInformation());
|
||||
}
|
||||
|
||||
processGroups.add(pgDTO);
|
||||
|
||||
final String previousVersion = pgDTO.getVersionControlInformation().getVersion();
|
||||
if (previousVersion.equals(newVersion)) {
|
||||
changeVersionResults.put(pgDTO.getId(), new ChangeVersionResult(newVersion, newVersion, "Process group already at desired version"));
|
||||
continue; // Process group already at desired version
|
||||
}
|
||||
|
||||
try {
|
||||
doPGChangeVersion.changeVersion(client, entity, newVersion, pgDTO.getId(), getContext());
|
||||
changeVersionResults.put(pgDTO.getId(), new ChangeVersionResult(previousVersion, newVersion, "SUCCESS"));
|
||||
} catch (Exception e) {
|
||||
changeVersionResults.put(pgDTO.getId(), new ChangeVersionResult(previousVersion, null, e.getMessage()));
|
||||
if (forceOperation) {
|
||||
continue;
|
||||
} else {
|
||||
e.printStackTrace();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new ProcessGroupsVersionChangeResult(getResultType(properties), processGroups, changeVersionResults);
|
||||
}
|
||||
|
||||
private void recursivePGList(final List<ProcessGroupDTO> pgList, final PGList doPGList, final NiFiClient client,
|
||||
final Properties properties, final String pgId) throws NiFiClientException, IOException {
|
||||
final ProcessGroupsResult result = doPGList.getList(client, properties, pgId);
|
||||
for(ProcessGroupDTO pgDTO : result.getResult()) {
|
||||
pgList.add(pgDTO);
|
||||
recursivePGList(pgList, doPGList, client, properties, pgDTO.getId());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -75,6 +75,14 @@ public class PGChangeVersion extends AbstractNiFiCommand<VoidResult> {
|
|||
// start with the version specified in the arguments
|
||||
String newVersion = getArg(properties, CommandOption.FLOW_VERSION);
|
||||
|
||||
return changeVersion(client, existingVersionControlInfo, newVersion, pgId, getContext());
|
||||
}
|
||||
|
||||
public VoidResult changeVersion(final NiFiClient client, final VersionControlInformationEntity existingVersionControlInfo,
|
||||
String newVersion, final String pgId, final Context context) throws NiFiClientException, IOException, MissingOptionException, CommandException {
|
||||
final VersionsClient versionsClient = client.getVersionsClient();
|
||||
final VersionControlInformationDTO existingVersionControlDTO = existingVersionControlInfo.getVersionControlInformation();
|
||||
|
||||
// if no version was specified, automatically determine the latest and change to that
|
||||
if (newVersion == null) {
|
||||
newVersion = getLatestVersion(client, existingVersionControlDTO);
|
||||
|
@ -99,10 +107,13 @@ public class PGChangeVersion extends AbstractNiFiCommand<VoidResult> {
|
|||
final VersionedFlowUpdateRequestEntity updateRequest = versionsClient.getUpdateRequest(updateRequestId);
|
||||
if (updateRequest != null && updateRequest.getRequest().isComplete()) {
|
||||
completed = true;
|
||||
if (updateRequest.getRequest().getFailureReason() != null) {
|
||||
throw new NiFiClientException(updateRequest.getRequest().getFailureReason());
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
try {
|
||||
if (getContext().isInteractive()) {
|
||||
if (context.isInteractive()) {
|
||||
println("Waiting for update request to complete...");
|
||||
}
|
||||
Thread.sleep(2000);
|
||||
|
@ -123,7 +134,7 @@ public class PGChangeVersion extends AbstractNiFiCommand<VoidResult> {
|
|||
return VoidResult.getInstance();
|
||||
}
|
||||
|
||||
private String getLatestVersion(final NiFiClient client, final VersionControlInformationDTO existingVersionControlDTO)
|
||||
String getLatestVersion(final NiFiClient client, final VersionControlInformationDTO existingVersionControlDTO)
|
||||
throws NiFiClientException, IOException {
|
||||
final FlowClient flowClient = client.getFlowClient();
|
||||
|
||||
|
@ -138,7 +149,7 @@ public class PGChangeVersion extends AbstractNiFiCommand<VoidResult> {
|
|||
return getLatestVersion(versions);
|
||||
}
|
||||
|
||||
private static String getLatestVersion(final VersionedFlowSnapshotMetadataSetEntity versions) {
|
||||
private String getLatestVersion(final VersionedFlowSnapshotMetadataSetEntity versions) {
|
||||
long latestTimestamp = 0;
|
||||
String latestVersion = null;
|
||||
for (VersionedFlowSnapshotMetadataEntity version : versions.getVersionedFlowSnapshotMetadataSet()) {
|
||||
|
|
|
@ -66,7 +66,12 @@ public class PGList extends AbstractNiFiCommand<ProcessGroupsResult> {
|
|||
parentPgId = flowClient.getRootGroupId();
|
||||
}
|
||||
|
||||
final ProcessGroupFlowEntity processGroupFlowEntity = flowClient.getProcessGroup(parentPgId);
|
||||
return getList(client, properties, parentPgId);
|
||||
}
|
||||
|
||||
public ProcessGroupsResult getList(final NiFiClient client, final Properties properties, final String pgID)
|
||||
throws NiFiClientException, IOException {
|
||||
final ProcessGroupFlowEntity processGroupFlowEntity = client.getFlowClient().getProcessGroup(pgID);
|
||||
final ProcessGroupFlowDTO processGroupFlowDTO = processGroupFlowEntity.getProcessGroupFlow();
|
||||
final FlowDTO flowDTO = processGroupFlowDTO.getFlow();
|
||||
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.toolkit.cli.impl.result.nifi;
|
||||
|
||||
/**
|
||||
* Object to help with the result of a change version operation
|
||||
*/
|
||||
public class ChangeVersionResult {
|
||||
String previousVersion;
|
||||
String newVersion;
|
||||
String message;
|
||||
|
||||
public ChangeVersionResult(final String previousVersion, final String newVersion, final String message) {
|
||||
this.previousVersion = previousVersion;
|
||||
this.newVersion = newVersion;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public String getPreviousVersion() {
|
||||
return previousVersion;
|
||||
}
|
||||
|
||||
public String getNewVersion() {
|
||||
return newVersion;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return message;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.toolkit.cli.impl.result.nifi;
|
||||
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.apache.nifi.toolkit.cli.api.ReferenceResolver;
|
||||
import org.apache.nifi.toolkit.cli.api.Referenceable;
|
||||
import org.apache.nifi.toolkit.cli.api.ResolvedReference;
|
||||
import org.apache.nifi.toolkit.cli.api.ResultType;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.AbstractWritableResult;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.writer.DynamicTableWriter;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.writer.Table;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.writer.TableWriter;
|
||||
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Result for a list of ProcessGroupEntities.
|
||||
*/
|
||||
public class ProcessGroupsVersionChangeResult extends AbstractWritableResult<List<ProcessGroupDTO>> implements Referenceable {
|
||||
|
||||
private final List<ProcessGroupDTO> processGroups;
|
||||
private final Map<String, ChangeVersionResult> changeVersionResults;
|
||||
|
||||
public ProcessGroupsVersionChangeResult(final ResultType resultType, final List<ProcessGroupDTO> processGroups,
|
||||
final Map<String, ChangeVersionResult> changeVersionResults) {
|
||||
super(resultType);
|
||||
this.processGroups = Objects.requireNonNull(processGroups);
|
||||
this.processGroups.sort(Comparator.comparing(ProcessGroupDTO::getName));
|
||||
this.changeVersionResults = Objects.requireNonNull(changeVersionResults);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ProcessGroupDTO> getResult() {
|
||||
return processGroups;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeSimpleResult(final PrintStream output) {
|
||||
|
||||
final Table table = new Table.Builder()
|
||||
.column("#", 3, 3, false)
|
||||
.column("Name", 20, 36, true)
|
||||
.column("Id", 36, 36, false)
|
||||
.column("Prev Version", 15, 15, false)
|
||||
.column("New Version", 15, 15, false)
|
||||
.column("Message", 100, 100, false)
|
||||
.build();
|
||||
|
||||
for (int i=0; i < processGroups.size(); i++) {
|
||||
final ProcessGroupDTO dto = processGroups.get(i);
|
||||
table.addRow(
|
||||
String.valueOf(i+1),
|
||||
dto.getName(),
|
||||
dto.getId(),
|
||||
String.valueOf(changeVersionResults.get(dto.getId()).getPreviousVersion()),
|
||||
String.valueOf(changeVersionResults.get(dto.getId()).getNewVersion()),
|
||||
String.valueOf(changeVersionResults.get(dto.getId()).getMessage())
|
||||
);
|
||||
}
|
||||
|
||||
final TableWriter tableWriter = new DynamicTableWriter();
|
||||
tableWriter.write(table, output);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceResolver createReferenceResolver(final Context context) {
|
||||
final Map<Integer, ProcessGroupDTO> backRefs = new HashMap<>();
|
||||
final AtomicInteger position = new AtomicInteger(0);
|
||||
processGroups.forEach(p -> backRefs.put(position.incrementAndGet(), p));
|
||||
|
||||
return new ReferenceResolver() {
|
||||
@Override
|
||||
public ResolvedReference resolve(final CommandOption option, final Integer position) {
|
||||
final ProcessGroupDTO pg = backRefs.get(position);
|
||||
if (pg != null) {
|
||||
return new ResolvedReference(option, position, pg.getName(), pg.getId());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return backRefs.isEmpty();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue