mirror of https://github.com/apache/nifi.git
NIFI-13937 Add command pg-empty-queues to NiFi CLI (#9459)
Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
7de2e68ce9
commit
852f3a2170
|
@ -18,6 +18,7 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi;
|
|||
|
||||
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
||||
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
|
||||
import org.apache.nifi.web.api.entity.DropRequestEntity;
|
||||
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
|
||||
import org.apache.nifi.web.api.entity.FlowEntity;
|
||||
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
|
||||
|
@ -62,4 +63,8 @@ public interface ProcessGroupClient {
|
|||
FlowComparisonEntity getLocalModifications(String processGroupId) throws NiFiClientException, IOException;
|
||||
|
||||
File exportProcessGroup(String processGroupId, boolean includeReferencedServices, File outputFile) throws NiFiClientException, IOException;
|
||||
|
||||
DropRequestEntity emptyQueues(String processGroupId) throws NiFiClientException, IOException;
|
||||
|
||||
DropRequestEntity getEmptyQueuesRequest(String processGroupId, String requestId) throws NiFiClientException, IOException;
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
|
|||
import org.apache.nifi.toolkit.cli.impl.util.FileUtils;
|
||||
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
||||
import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
|
||||
import org.apache.nifi.web.api.entity.DropRequestEntity;
|
||||
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
|
||||
import org.apache.nifi.web.api.entity.FlowEntity;
|
||||
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
|
||||
|
@ -286,4 +287,39 @@ public class JerseyProcessGroupClient extends AbstractJerseyClient implements Pr
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public DropRequestEntity emptyQueues(String processGroupId) throws NiFiClientException, IOException {
|
||||
if (StringUtils.isBlank(processGroupId)) {
|
||||
throw new IllegalArgumentException("Process group id cannot be null or blank");
|
||||
}
|
||||
|
||||
return executeAction("Error emptying queues in Process Group", () -> {
|
||||
final WebTarget target = processGroupsTarget
|
||||
.path("{id}/empty-all-connections-requests")
|
||||
.resolveTemplate("id", processGroupId);
|
||||
|
||||
return getRequestBuilder(target).post(null, DropRequestEntity.class);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public DropRequestEntity getEmptyQueuesRequest(String processGroupId, String requestId)
|
||||
throws NiFiClientException, IOException {
|
||||
if (StringUtils.isBlank(processGroupId)) {
|
||||
throw new IllegalArgumentException("Process group id cannot be null or blank");
|
||||
}
|
||||
if (StringUtils.isBlank(requestId)) {
|
||||
throw new IllegalArgumentException("Request id cannot be null or blank");
|
||||
}
|
||||
|
||||
return executeAction("Error getting Drop Request status for Process Group", () -> {
|
||||
final WebTarget target = processGroupsTarget
|
||||
.path("{id}/empty-all-connections-requests/{drop-request-id}")
|
||||
.resolveTemplate("id", processGroupId)
|
||||
.resolveTemplate("drop-request-id", requestId);
|
||||
|
||||
return getRequestBuilder(target).get(DropRequestEntity.class);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
|
@ -85,6 +85,7 @@ import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGConnect;
|
|||
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGCreate;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGCreateControllerService;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGDisableControllerServices;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGEmptyQueues;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGEnableControllerServices;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGExport;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGGetAllVersions;
|
||||
|
@ -163,6 +164,7 @@ public class NiFiCommandGroup extends AbstractCommandGroup {
|
|||
commands.add(new PGSetParamContext());
|
||||
commands.add(new PGReplace());
|
||||
commands.add(new PGExport());
|
||||
commands.add(new PGEmptyQueues());
|
||||
commands.add(new GetControllerServices());
|
||||
commands.add(new GetControllerService());
|
||||
commands.add(new CreateControllerService());
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.toolkit.cli.impl.command.nifi.pg;
|
||||
|
||||
import org.apache.commons.cli.MissingOptionException;
|
||||
import org.apache.nifi.toolkit.cli.api.Context;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessGroupClient;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
|
||||
import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand;
|
||||
import org.apache.nifi.toolkit.cli.impl.result.VoidResult;
|
||||
import org.apache.nifi.web.api.entity.DropRequestEntity;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Command to stop the components of a process group.
|
||||
*/
|
||||
public class PGEmptyQueues extends AbstractNiFiCommand<VoidResult> {
|
||||
|
||||
public static final int MAX_ITERATIONS = 20;
|
||||
public static final long DELAY_MS = 1000;
|
||||
|
||||
public PGEmptyQueues() {
|
||||
super("pg-empty-queues", VoidResult.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "Empty all queues, recursively, in the specified Process Group. It is recommended to first use pg-stop.";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doInitialize(final Context context) {
|
||||
addOption(CommandOption.PG_ID.createOption());
|
||||
}
|
||||
|
||||
@Override
|
||||
public VoidResult doExecute(final NiFiClient client, final Properties properties)
|
||||
throws NiFiClientException, IOException, MissingOptionException {
|
||||
|
||||
final String pgId = getRequiredArg(properties, CommandOption.PG_ID);
|
||||
|
||||
final ProcessGroupClient pgClient = client.getProcessGroupClient();
|
||||
DropRequestEntity requestEntity = pgClient.emptyQueues(pgId);
|
||||
final String requestId = requestEntity.getDropRequest().getId();
|
||||
|
||||
int iterations = 1;
|
||||
while (!requestEntity.getDropRequest().isFinished() && iterations < MAX_ITERATIONS) {
|
||||
if (shouldPrint(properties)) {
|
||||
println("Emptying queues, currently at " + requestEntity.getDropRequest().getPercentCompleted() + "% ("
|
||||
+ iterations + " of " + MAX_ITERATIONS + ")...");
|
||||
}
|
||||
sleep(DELAY_MS);
|
||||
iterations++;
|
||||
requestEntity = pgClient.getEmptyQueuesRequest(pgId, requestId);
|
||||
}
|
||||
|
||||
if (shouldPrint(properties)) {
|
||||
if (requestEntity.getDropRequest().isFinished()) {
|
||||
println("Drop request completed. Deleted: " + requestEntity.getDropRequest().getDropped());
|
||||
} else {
|
||||
println("Drop request didn't complete yet. Thus far, deleted: " + requestEntity.getDropRequest().getDropped());
|
||||
}
|
||||
}
|
||||
|
||||
return VoidResult.getInstance();
|
||||
}
|
||||
|
||||
private boolean shouldPrint(final Properties properties) {
|
||||
return isInteractive() || isVerbose(properties);
|
||||
}
|
||||
|
||||
private void sleep(long millis) {
|
||||
try {
|
||||
Thread.sleep(millis);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.interrupted();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue