NIFI-13995 Add start, run once and clear state to CLI for Processors (#9509)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Pierre Villard 2024-11-19 17:01:43 +01:00 committed by GitHub
parent 4e869e661d
commit a2c0fa5cd5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 229 additions and 4 deletions

View File

@ -51,7 +51,8 @@ public enum CommandOption {
FLOW_VERSION_1("fv1", "flowVersion1", "A version of a flow", true),
FLOW_VERSION_2("fv2", "flowVersion2", "A version of a flow", true),
// Registry - Source options for when there are two registries involved and one is a source
// Registry - Source options for when there are two registries involved and one
// is a source
SRC_PROPS("sp", "sourceProps", "A properties file to load for the source", true, true),
SRC_FLOW_ID("sf", "sourceFlowIdentifier", "A flow identifier from the source registry", true),
SRC_FLOW_VERSION("sfv", "sourceFlowVersion", "A version of a flow from the source registry", true),
@ -99,6 +100,9 @@ public enum CommandOption {
SOURCE_OUTPUT_PORT("sourceOutput", "source-output-port", "The name of the output port in the source process group", true),
DESTINATION_INPUT_PORT("destInput", "destination-input-port", "The name of the input port in the destination process group", true),
// NiFi - Processors
PROC_ID("procid", "processorId", "The id of a processor", true),
// NiFi - Controller Services
CS_ID("cs", "controllerServiceId", "The id of a controller service", true),

View File

@ -61,20 +61,20 @@ import org.apache.nifi.toolkit.cli.impl.command.nifi.params.CreateAsset;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.CreateParamContext;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.CreateParamProvider;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.DeleteAsset;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.GetAsset;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.ListAssets;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.RemoveAssetReference;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.DeleteParam;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.DeleteParamContext;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.DeleteParamProvider;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.ExportParamContext;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.FetchParams;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.GetAsset;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.GetParamContext;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.GetParamProvider;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.ImportParamContext;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.ListAssets;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.ListParamContexts;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.ListParamProviders;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.MergeParamContext;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.RemoveAssetReference;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.RemoveInheritedParamContexts;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.SetInheritedParamContexts;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.SetParam;
@ -104,6 +104,9 @@ import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGStopVersionControl;
import org.apache.nifi.toolkit.cli.impl.command.nifi.policies.GetAccessPolicy;
import org.apache.nifi.toolkit.cli.impl.command.nifi.policies.UpdateAccessPolicy;
import org.apache.nifi.toolkit.cli.impl.command.nifi.processors.ChangeVersionProcessor;
import org.apache.nifi.toolkit.cli.impl.command.nifi.processors.ProcessorClearState;
import org.apache.nifi.toolkit.cli.impl.command.nifi.processors.ProcessorRunOnce;
import org.apache.nifi.toolkit.cli.impl.command.nifi.processors.ProcessorStart;
import org.apache.nifi.toolkit.cli.impl.command.nifi.registry.CreateRegistryClient;
import org.apache.nifi.toolkit.cli.impl.command.nifi.registry.GetRegistryClientId;
import org.apache.nifi.toolkit.cli.impl.command.nifi.registry.ListRegistryClients;
@ -217,6 +220,9 @@ public class NiFiCommandGroup extends AbstractCommandGroup {
commands.add(new GetControllerConfiguration());
commands.add(new UpdateControllerConfiguration());
commands.add(new ChangeVersionProcessor());
commands.add(new ProcessorStart());
commands.add(new ProcessorRunOnce());
commands.add(new ProcessorClearState());
commands.add(new UploadNar());
commands.add(new DownloadNar());
commands.add(new ListNars());

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.cli.impl.command.nifi.processors;
import org.apache.commons.cli.MissingOptionException;
import org.apache.nifi.toolkit.cli.api.CommandException;
import org.apache.nifi.toolkit.cli.api.Context;
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand;
import org.apache.nifi.toolkit.cli.impl.result.nifi.ProcessorResult;
import org.apache.nifi.toolkit.client.NiFiClient;
import org.apache.nifi.toolkit.client.NiFiClientException;
import org.apache.nifi.toolkit.client.ProcessorClient;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import java.io.IOException;
import java.util.Properties;
/**
* Command to clear the state of a processor
*/
public class ProcessorClearState extends AbstractNiFiCommand<ProcessorResult> {
public ProcessorClearState() {
super("processor-clear-state", ProcessorResult.class);
}
@Override
public String getDescription() {
return "Clears the state of a processor.";
}
@Override
protected void doInitialize(Context context) {
addOption(CommandOption.PROC_ID.createOption());
}
@Override
public ProcessorResult doExecute(NiFiClient client, Properties properties)
throws NiFiClientException, IOException, MissingOptionException, CommandException {
final String procId = getRequiredArg(properties, CommandOption.PROC_ID);
final ProcessorClient processorClient = client.getProcessorClient();
final ProcessorEntity processor = processorClient.getProcessor(procId);
if (!processor.getComponent().getState().equals("STOPPED")) {
throw new NiFiClientException("The processor should be STOPPED before clearing its cache.");
}
processorClient.clearProcessorState(procId);
return new ProcessorResult(getResultType(properties), processor);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.cli.impl.command.nifi.processors;
import org.apache.commons.cli.MissingOptionException;
import org.apache.nifi.toolkit.cli.api.CommandException;
import org.apache.nifi.toolkit.cli.api.Context;
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand;
import org.apache.nifi.toolkit.cli.impl.result.nifi.ProcessorResult;
import org.apache.nifi.toolkit.client.NiFiClient;
import org.apache.nifi.toolkit.client.NiFiClientException;
import org.apache.nifi.toolkit.client.ProcessorClient;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import java.io.IOException;
import java.util.Properties;
/**
* Command to execute run once on a processor
*/
public class ProcessorRunOnce extends AbstractNiFiCommand<ProcessorResult> {
public ProcessorRunOnce() {
super("processor-run-once", ProcessorResult.class);
}
@Override
public String getDescription() {
return "Executes Run Once on a processor.";
}
@Override
protected void doInitialize(Context context) {
addOption(CommandOption.PROC_ID.createOption());
}
@Override
public ProcessorResult doExecute(NiFiClient client, Properties properties)
throws NiFiClientException, IOException, MissingOptionException, CommandException {
final String procId = getRequiredArg(properties, CommandOption.PROC_ID);
final ProcessorClient processorClient = client.getProcessorClient();
final ProcessorEntity processor = processorClient.getProcessor(procId);
final ProcessorEntity result = processorClient.runProcessorOnce(processor);
return new ProcessorResult(getResultType(properties), result);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.cli.impl.command.nifi.processors;
import org.apache.commons.cli.MissingOptionException;
import org.apache.nifi.toolkit.cli.api.CommandException;
import org.apache.nifi.toolkit.cli.api.Context;
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand;
import org.apache.nifi.toolkit.cli.impl.result.nifi.ProcessorResult;
import org.apache.nifi.toolkit.client.NiFiClient;
import org.apache.nifi.toolkit.client.NiFiClientException;
import org.apache.nifi.toolkit.client.ProcessorClient;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import java.io.IOException;
import java.util.Properties;
/**
* Command to start a processor
*/
public class ProcessorStart extends AbstractNiFiCommand<ProcessorResult> {
public ProcessorStart() {
super("processor-start", ProcessorResult.class);
}
@Override
public String getDescription() {
return "Starts a processor.";
}
@Override
protected void doInitialize(Context context) {
addOption(CommandOption.PROC_ID.createOption());
}
@Override
public ProcessorResult doExecute(NiFiClient client, Properties properties)
throws NiFiClientException, IOException, MissingOptionException, CommandException {
final String procId = getRequiredArg(properties, CommandOption.PROC_ID);
final ProcessorClient processorClient = client.getProcessorClient();
final ProcessorEntity processor = processorClient.getProcessor(procId);
final ProcessorEntity result = processorClient.startProcessor(processor);
return new ProcessorResult(getResultType(properties), result);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.toolkit.client;
import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
import org.apache.nifi.web.api.entity.VerifyConfigRequestEntity;
@ -59,6 +60,8 @@ public interface ProcessorClient {
ProcessorEntity terminateProcessor(String processorId) throws NiFiClientException, IOException;
ComponentStateEntity clearProcessorState(String processorId) throws NiFiClientException, IOException;
/**
* Indicates that mutable requests should indicate that the client has
* acknowledged that the node is disconnected.

View File

@ -24,6 +24,7 @@ import org.apache.nifi.toolkit.client.NiFiClientException;
import org.apache.nifi.toolkit.client.ProcessorClient;
import org.apache.nifi.toolkit.client.RequestConfig;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorRunStatusEntity;
import org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
@ -284,4 +285,17 @@ public class JerseyProcessorClient extends AbstractJerseyClient implements Proce
return getRequestBuilder(target).delete(ProcessorEntity.class);
});
}
@Override
public ComponentStateEntity clearProcessorState(String processorId) throws NiFiClientException, IOException {
Objects.requireNonNull(processorId, "Processor ID required");
return executeAction("Error clearing state of the Processor", () -> {
final WebTarget target = processorTarget
.path("/state/clear-requests")
.resolveTemplate("id", processorId);
return getRequestBuilder(target).post(null, ComponentStateEntity.class);
});
}
}