NIFI-11566: Adding updateTimeout argument to parameter commands in CLI

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #7267.
This commit is contained in:
Joe Gresock 2023-05-18 13:58:26 -04:00 committed by Pierre Villard
parent 6c70471cc6
commit 6ee4632267
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
7 changed files with 42 additions and 9 deletions

View File

@ -135,6 +135,7 @@ public enum CommandOption {
PARAM_DESC("pd", "paramDescription", "The description of the parameter", true),
PARAM_VALUE("pv", "paramValue", "The value of a parameter", true),
PARAM_SENSITIVE("ps", "paramSensitive", "Whether or not the parameter is sensitive (true/false)", true),
UPDATE_TIMEOUT("ut", "updateTimeout", "Number of seconds after which a parameter context update will timeout (default: 60, maximum: 600)", true),
// Security related
KEYSTORE("ks", "keystore", "A keystore to use for TLS/SSL connections", true),

View File

@ -16,27 +16,36 @@
*/
package org.apache.nifi.toolkit.cli.impl.command.nifi.params;
import org.apache.commons.cli.MissingOptionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.toolkit.cli.api.Result;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ParamContextClient;
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
import org.apache.nifi.web.api.entity.ParameterContextUpdateRequestEntity;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class AbstractUpdateParamContextCommand<R extends Result> extends AbstractNiFiCommand<R> {
private static final int MAX_TIMEOUT_SECONDS = 600;
private static final int DEFAULT_TIMEOUT_SECONDS = 60;
private static final long POLL_INTERVAL_MILLIS = 2000;
public AbstractUpdateParamContextCommand(final String name, final Class<R> resultClass) {
super(name, resultClass);
}
protected ParameterContextUpdateRequestEntity performUpdate(final ParamContextClient client, final ParameterContextEntity parameterContextEntity,
final ParameterContextUpdateRequestEntity updateRequestEntity)
final ParameterContextUpdateRequestEntity updateRequestEntity, final int updateTimeoutSeconds)
throws NiFiClientException, IOException {
final int maxPollIterations = Math.min(Math.max(Long.valueOf(updateTimeoutSeconds * 1000L / POLL_INTERVAL_MILLIS).intValue(), 1), MAX_TIMEOUT_SECONDS);
final AtomicBoolean cancelled = new AtomicBoolean(false);
// poll the update request for up to 30 seconds to see if it has completed
@ -45,7 +54,7 @@ public abstract class AbstractUpdateParamContextCommand<R extends Result> extend
final String updateRequestId = updateRequestEntity.getRequest().getRequestId();
try {
boolean completed = false;
for (int i = 0; i < 30; i++) {
for (int i = 0; i < maxPollIterations; i++) {
final ParameterContextUpdateRequestEntity retrievedUpdateRequest = client.getParamContextUpdateRequest(contextId, updateRequestId);
if (retrievedUpdateRequest != null && retrievedUpdateRequest.getRequest().isComplete()) {
completed = true;
@ -55,7 +64,7 @@ public abstract class AbstractUpdateParamContextCommand<R extends Result> extend
if (getContext().isInteractive()) {
println("Waiting for update request to complete...");
}
Thread.sleep(2000);
Thread.sleep(POLL_INTERVAL_MILLIS);
} catch (InterruptedException e) {
e.printStackTrace();
}
@ -75,10 +84,19 @@ public abstract class AbstractUpdateParamContextCommand<R extends Result> extend
}
if (cancelled.get()) {
throw new NiFiClientException("Unable to update parameter context, cancelling update request");
throw new NiFiClientException("Unable to update parameter context in time, cancelling update request");
}
return deleteUpdateRequest;
}
}
protected int getUpdateTimeout(final Properties properties) {
try {
final Integer updateTimeoutSeconds = getIntArg(properties, CommandOption.UPDATE_TIMEOUT);
return updateTimeoutSeconds == null ? DEFAULT_TIMEOUT_SECONDS : updateTimeoutSeconds;
} catch (final MissingOptionException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}

View File

@ -61,6 +61,9 @@ public class DeleteParam extends AbstractUpdateParamContextCommand<VoidResult> {
final String paramContextId = getRequiredArg(properties, CommandOption.PARAM_CONTEXT_ID);
final String paramName = getRequiredArg(properties, CommandOption.PARAM_NAME);
// Optional args...
final int updateTimeout = getUpdateTimeout(properties);
// Ensure the context exists...
final ParamContextClient paramContextClient = client.getParamContextClient();
final ParameterContextEntity existingEntity = paramContextClient.getParamContext(paramContextId, false);
@ -95,7 +98,7 @@ public class DeleteParam extends AbstractUpdateParamContextCommand<VoidResult> {
// Submit the update request...
final ParameterContextUpdateRequestEntity updateRequestEntity = paramContextClient.updateParamContext(updatedParameterContextEntity);
performUpdate(paramContextClient, updatedParameterContextEntity, updateRequestEntity);
performUpdate(paramContextClient, updatedParameterContextEntity, updateRequestEntity, updateTimeout);
if (isInteractive()) {
println();

View File

@ -63,6 +63,9 @@ public class MergeParamContext extends AbstractUpdateParamContextCommand<VoidRes
final String existingContextId = getRequiredArg(properties, CommandOption.PARAM_CONTEXT_ID);
// Optional args...
final int updateTimeout = getUpdateTimeout(properties);
// read the content of the input source into memory
final String inputSource = getRequiredArg(properties, CommandOption.INPUT_SOURCE);
final String paramContextJson = getInputSourceContent(inputSource);
@ -117,7 +120,7 @@ public class MergeParamContext extends AbstractUpdateParamContextCommand<VoidRes
// Submit the update request...
final ParameterContextUpdateRequestEntity updateRequestEntity = paramContextClient.updateParamContext(updatedContextEntity);
performUpdate(paramContextClient, updatedContextEntity, updateRequestEntity);
performUpdate(paramContextClient, updatedContextEntity, updateRequestEntity, updateTimeout);
printlnIfInteractive("");
return VoidResult.getInstance();

View File

@ -58,6 +58,9 @@ public class RemoveInheritedParamContexts extends AbstractUpdateParamContextComm
// Required args...
final String paramContextId = getRequiredArg(properties, CommandOption.PARAM_CONTEXT_ID);
// Optional args...
final int updateTimeout = getUpdateTimeout(properties);
// Ensure the context exists...
final ParamContextClient paramContextClient = client.getParamContextClient();
final ParameterContextEntity existingParameterContextEntity = paramContextClient.getParamContext(paramContextId, false);
@ -76,7 +79,7 @@ public class RemoveInheritedParamContexts extends AbstractUpdateParamContextComm
// Submit the update request...
final ParameterContextUpdateRequestEntity updateRequestEntity = paramContextClient.updateParamContext(updatedParameterContextEntity);
performUpdate(paramContextClient, updatedParameterContextEntity, updateRequestEntity);
performUpdate(paramContextClient, updatedParameterContextEntity, updateRequestEntity, updateTimeout);
if (isInteractive()) {
println();

View File

@ -61,6 +61,9 @@ public class SetInheritedParamContexts extends AbstractUpdateParamContextCommand
final String paramContextId = getRequiredArg(properties, CommandOption.PARAM_CONTEXT_ID);
final String inheritedIds = getRequiredArg(properties, CommandOption.PARAM_CONTEXT_INHERITED_IDS);
// Optional args...
final int updateTimeout = getUpdateTimeout(properties);
// Ensure the context exists...
final ParamContextClient paramContextClient = client.getParamContextClient();
final ParameterContextEntity existingParameterContextEntity = paramContextClient.getParamContext(paramContextId, false);
@ -93,7 +96,7 @@ public class SetInheritedParamContexts extends AbstractUpdateParamContextCommand
// Submit the update request...
final ParameterContextUpdateRequestEntity updateRequestEntity = paramContextClient.updateParamContext(updatedParameterContextEntity);
performUpdate(paramContextClient, updatedParameterContextEntity, updateRequestEntity);
performUpdate(paramContextClient, updatedParameterContextEntity, updateRequestEntity, updateTimeout);
if (isInteractive()) {
println();

View File

@ -56,6 +56,7 @@ public class SetParam extends AbstractUpdateParamContextCommand<VoidResult> {
addOption(CommandOption.PARAM_DESC.createOption());
addOption(CommandOption.PARAM_VALUE.createOption());
addOption(CommandOption.PARAM_SENSITIVE.createOption());
addOption(CommandOption.UPDATE_TIMEOUT.createOption());
}
@Override
@ -73,6 +74,7 @@ public class SetParam extends AbstractUpdateParamContextCommand<VoidResult> {
if (!StringUtils.isBlank(paramSensitive) && !"true".equals(paramSensitive) && !"false".equals(paramSensitive)) {
throw new IllegalArgumentException("Parameter sensitive flag must be one of 'true' or 'false'");
}
final int updateTimeout = getUpdateTimeout(properties);
// Ensure the context exists...
final ParamContextClient paramContextClient = client.getParamContextClient();
@ -126,7 +128,7 @@ public class SetParam extends AbstractUpdateParamContextCommand<VoidResult> {
// Submit the update request...
final ParameterContextUpdateRequestEntity updateRequestEntity = paramContextClient.updateParamContext(updatedParameterContextEntity);
performUpdate(paramContextClient, updatedParameterContextEntity, updateRequestEntity);
performUpdate(paramContextClient, updatedParameterContextEntity, updateRequestEntity, updateTimeout);
if (isInteractive()) {
println();