NIFI-11857 - CLI - recursively change version of Processors

This closes #7528

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Pierre Villard 2023-07-25 19:12:47 +02:00 committed by exceptionfactory
parent ad753318e7
commit cb03d6de74
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 296 additions and 0 deletions

View File

@ -60,6 +60,8 @@ public enum CommandOption {
EXT_BUNDLE_GROUP("gr", "group", "The group id of a bundle", true),
EXT_BUNDLE_ARTIFACT("ar", "artifact", "The artifact id of a bundle", true),
EXT_BUNDLE_VERSION("ver", "version", "The version of the bundle", true),
EXT_BUNDLE_CURRENT_VERSION("cver", "current-version", "The current version of the bundle", true),
EXT_QUALIFIED_NAME("extname", "extension-name", "The qualified name of the extension", true),
EXT_TYPE("et", "extensionType", "The type of extension, one of 'PROCESSOR', 'CONTROLLER_SERVICE', or 'REPORTING_TASK'.", true),
EXT_BUNDLE_TYPE("ebt", "extensionBundleType", "The type of extension bundle, either nifi-nar or minifi-cpp", true),

View File

@ -80,6 +80,7 @@ import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGStop;
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGStopVersionControl;
import org.apache.nifi.toolkit.cli.impl.command.nifi.policies.GetAccessPolicy;
import org.apache.nifi.toolkit.cli.impl.command.nifi.policies.UpdateAccessPolicy;
import org.apache.nifi.toolkit.cli.impl.command.nifi.processors.ChangeVersionProcessor;
import org.apache.nifi.toolkit.cli.impl.command.nifi.registry.CreateRegistryClient;
import org.apache.nifi.toolkit.cli.impl.command.nifi.registry.GetRegistryClientId;
import org.apache.nifi.toolkit.cli.impl.command.nifi.registry.ListRegistryClients;
@ -180,6 +181,7 @@ public class NiFiCommandGroup extends AbstractCommandGroup {
commands.add(new LogoutAccessToken());
commands.add(new GetControllerConfiguration());
commands.add(new UpdateControllerConfiguration());
commands.add(new ChangeVersionProcessor());
return new ArrayList<>(commands);
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.cli.impl.command.nifi.processors;
import org.apache.commons.cli.MissingOptionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.toolkit.cli.api.CommandException;
import org.apache.nifi.toolkit.cli.api.Context;
import org.apache.nifi.toolkit.cli.impl.client.nifi.FlowClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand;
import org.apache.nifi.toolkit.cli.impl.result.nifi.ProcessorsResult;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsEntity;
import java.io.IOException;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
/**
* Command to update the version of processors
*/
public class ChangeVersionProcessor extends AbstractNiFiCommand<ProcessorsResult> {
public ChangeVersionProcessor() {
super("change-version-processor", ProcessorsResult.class);
}
@Override
public String getDescription() {
return "Recursively changes the version of the instances of the specified processor. If the process group is specified, the changes "
+ "will be scoped to that process group and its childs ; if not specified the changes will recursively apply to the root process "
+ "group. If the source version is specified, only instances with this version will be updated to the new version.";
}
@Override
protected void doInitialize(Context context) {
addOption(CommandOption.PG_ID.createOption());
addOption(CommandOption.EXT_BUNDLE_GROUP.createOption());
addOption(CommandOption.EXT_BUNDLE_ARTIFACT.createOption());
addOption(CommandOption.EXT_BUNDLE_VERSION.createOption());
addOption(CommandOption.EXT_QUALIFIED_NAME.createOption());
addOption(CommandOption.EXT_BUNDLE_CURRENT_VERSION.createOption());
}
@Override
public ProcessorsResult doExecute(NiFiClient client, Properties properties)
throws NiFiClientException, IOException, MissingOptionException, CommandException {
final String bundleGroup = getRequiredArg(properties, CommandOption.EXT_BUNDLE_GROUP);
final String bundleArtifact = getRequiredArg(properties, CommandOption.EXT_BUNDLE_ARTIFACT);
final String bundleVersion = getRequiredArg(properties, CommandOption.EXT_BUNDLE_VERSION);
final String qualifiedName = getRequiredArg(properties, CommandOption.EXT_QUALIFIED_NAME);
final String sourceVersion = getArg(properties, CommandOption.EXT_BUNDLE_CURRENT_VERSION);
final FlowClient flowClient = client.getFlowClient();
final ProcessorClient processorClient = client.getProcessorClient();
String pgId = getArg(properties, CommandOption.PG_ID);
if(StringUtils.isBlank(pgId)) {
pgId = flowClient.getRootGroupId();
}
Set<ProcessorEntity> updatedComponents = recursivelyChangeVersionProcessor(flowClient, processorClient, pgId, bundleGroup,
bundleArtifact, bundleVersion, sourceVersion, qualifiedName);
ProcessorsEntity processorsEntity = new ProcessorsEntity();
processorsEntity.setProcessors(updatedComponents);
return new ProcessorsResult(getResultType(properties), processorsEntity);
}
private Set<ProcessorEntity> recursivelyChangeVersionProcessor(FlowClient flowClient, ProcessorClient processorClient, String pgId, String bundleGroup,
String bundleArtifact, String bundleVersion, String sourceVersion, String qualifiedName) throws NiFiClientException, IOException {
Set<ProcessorEntity> updatedComponents = new HashSet<ProcessorEntity>();
final ProcessGroupFlowEntity sourcePgEntity = flowClient.getProcessGroup(pgId);
final ProcessGroupFlowDTO flow = sourcePgEntity.getProcessGroupFlow();
final Set<ProcessorEntity> processors = flow.getFlow().getProcessors();
for(ProcessorEntity processor : processors) {
final BundleDTO bundle = processor.getComponent().getBundle();
if(bundle.getGroup().equals(bundleGroup)
&& bundle.getArtifact().equals(bundleArtifact)
&& processor.getComponent().getType().equals(qualifiedName)
&& (StringUtils.isBlank(sourceVersion) || bundle.getVersion().equals(sourceVersion))) {
final boolean isRunning = processor.getComponent().getState().equals("RUNNING");
if(isRunning) {
// processor needs to be stopped for changing the version
processorClient.stopProcessor(processor);
// get the updated entity to have the correct revision
processor = processorClient.getProcessor(processor.getId());
}
final BundleDTO updatedBundle = new BundleDTO(bundleGroup, bundleArtifact, bundleVersion);
final ProcessorDTO processorDto = new ProcessorDTO();
processorDto.setId(processor.getId());
processorDto.setBundle(updatedBundle);
final ProcessorEntity updatedEntity = new ProcessorEntity();
updatedEntity.setRevision(processor.getRevision());
updatedEntity.setComponent(processorDto);
updatedEntity.setId(processor.getId());
processorClient.updateProcessor(updatedEntity);
if(isRunning) { // restart the component that was previously running
// get the updated entity to have the correct revision
processor = processorClient.getProcessor(processor.getId());
processorClient.startProcessor(processor);
}
// get latest version of the entity
processor = processorClient.getProcessor(processor.getId());
updatedComponents.add(processor);
}
}
final Set<ProcessGroupEntity> processGroups = flow.getFlow().getProcessGroups();
for(ProcessGroupEntity processGroup : processGroups) {
updatedComponents.addAll(recursivelyChangeVersionProcessor(flowClient, processorClient, processGroup.getId(), bundleGroup,
bundleArtifact, bundleVersion, sourceVersion, qualifiedName));
}
return updatedComponents;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.cli.impl.result.nifi;
import org.apache.commons.lang3.Validate;
import org.apache.nifi.toolkit.cli.api.ResultType;
import org.apache.nifi.toolkit.cli.impl.result.AbstractWritableResult;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import java.io.IOException;
import java.io.PrintStream;
public class ProcessorResult extends AbstractWritableResult<ProcessorEntity> {
private final ProcessorEntity processorEntity;
public ProcessorResult(ResultType resultType, ProcessorEntity processorEntity) {
super(resultType);
this.processorEntity = processorEntity;
Validate.notNull(processorEntity);
}
@Override
public ProcessorEntity getResult() {
return processorEntity;
}
@Override
protected void writeSimpleResult(PrintStream output) throws IOException {
final ProcessorDTO processorDTO = processorEntity.getComponent();
final BundleDTO bundle = processorDTO.getBundle();
output.printf("Name : %s\nID : %s\nType : %s\nBundle: %s - %s %s\nState : %s\n",
processorDTO.getName(), processorDTO.getId(), processorDTO.getType(),
bundle.getGroup(), bundle.getArtifact(), bundle.getVersion(), processorDTO.getState());
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.cli.impl.result.nifi;
import org.apache.commons.lang3.Validate;
import org.apache.nifi.toolkit.cli.api.ResultType;
import org.apache.nifi.toolkit.cli.impl.result.AbstractWritableResult;
import org.apache.nifi.toolkit.cli.impl.result.writer.DynamicTableWriter;
import org.apache.nifi.toolkit.cli.impl.result.writer.Table;
import org.apache.nifi.toolkit.cli.impl.result.writer.TableWriter;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsEntity;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class ProcessorsResult extends AbstractWritableResult<ProcessorsEntity> {
private final ProcessorsEntity processorsEntity;
public ProcessorsResult(ResultType resultType, ProcessorsEntity processorsEntity) {
super(resultType);
this.processorsEntity = processorsEntity;
Validate.notNull(processorsEntity);
}
@Override
public ProcessorsEntity getResult() {
return processorsEntity;
}
@Override
protected void writeSimpleResult(PrintStream output) throws IOException {
final Set<ProcessorEntity> processorsEntities = processorsEntity.getProcessors();
if (processorsEntities == null) {
return;
}
final List<ProcessorDTO> processorDTOS = processorsEntities.stream()
.map(ProcessorEntity::getComponent)
.sorted(Comparator.comparing(ProcessorDTO::getName))
.collect(Collectors.toList());
final Table table = new Table.Builder()
.column("#", 3, 3, false)
.column("Name", 5, 40, true)
.column("ID", 36, 36, false)
.column("Type", 5, 40, true)
.column("Run Status", 10, 20, false)
.column("Version", 10, 20, false)
.build();
for (int i = 0; i < processorDTOS.size(); i++) {
final ProcessorDTO processorDTO = processorDTOS.get(i);
final String[] typeSplit = processorDTO.getType().split("\\.", -1);
table.addRow(
String.valueOf(i + 1),
processorDTO.getName(),
processorDTO.getId(),
typeSplit[typeSplit.length - 1],
processorDTO.getState(),
processorDTO.getBundle().getVersion()
);
}
final TableWriter tableWriter = new DynamicTableWriter();
tableWriter.write(table, output);
}
}