NIFI-7681 - Add update-bucket-policy command, add option to specify timeout and fix documentation to include previously implemented commands (#4450)

* NIFI-7681 - Add update-bucket-policy command, add option to specify
timeout and fix documentation to include previously implemented commands

* Fix return type of UpdateBucketPolicy and add missing registry commands to the documentation

Co-authored-by: Jaya Aditya <jchandra@yahoo-corp.jp>
This commit is contained in:
Jaya Aditya 2020-08-07 22:06:56 +09:00 committed by GitHub
parent 6488db1376
commit 339e09a6e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 289 additions and 8 deletions

View File

@ -94,6 +94,37 @@ The following are available commands:
nifi pg-get-services
nifi pg-enable-services
nifi pg-disable-services
nifi pg-create-service
nifi create-user
nifi list-users
nifi create-user-group
nifi list-user-groups
nifi update-user-group
nifi get-policy
nifi update-policy
nifi create-service
nifi get-services
nifi get-service
nifi disable-services
nifi enable-services
nifi get-reporting-task
nifi get-reporting-tasks
nifi create-reporting-task
nifi set-param
nifi delete-param
nifi list-param-contexts
nifi get-param-context
nifi create-param-context
nifi delete-param-context
nifi merge-param-context
nifi import-param-context
nifi pg-get-param-context
nifi pg-set-param-context
nifi list-templates
nifi download-template
nifi upload-template
nifi start-reporting-tasks
nifi stop-reporting-tasks
registry current-user
registry list-buckets
registry create-bucket
@ -106,12 +137,25 @@ The following are available commands:
registry import-flow-version
registry sync-flow-versions
registry transfer-flow-version
registry diff-flow-versions
registry upload-bundle
registry upload-bundles
registry list-bundle-groups
registry list-bundle-artifacts
registry list-bundle-versions
registry download-bundle
registry get-bundle-checksum
registry list-extension-tags
registry list-extensions
registry list-users
registry create-user
registry update-user
registry list-user-groups
registry create-user-group
registry update-user-group
registry get-policy
registry update-policy
registry update-bucket-policy
session keys
session show
session get
@ -236,13 +280,21 @@ For example, typing tab at an empty prompt should display possible commands for
Typing "nifi " and then a tab will show the sub-commands for NiFi:
#> nifi
cluster-summary get-nodes pg-enable-services pg-set-var
connect-node get-reg-client-id pg-get-all-versions pg-start
create-reg-client get-root-id pg-get-services pg-status
current-user list-reg-clients pg-get-vars pg-stop
delete-node offload-node pg-get-version update-reg-client
disconnect-node pg-change-version pg-import
get-node pg-disable-services pg-list
cluster-summary enable-services list-templates pg-list
connect-node export-param-context list-user-groups pg-set-param-context
create-param-context get-node list-users pg-set-var
create-reg-client get-nodes merge-param-context pg-start
create-reporting-task get-param-context offload-node pg-status
create-service get-policy pg-change-version pg-stop
create-user get-reg-client-id pg-create-service set-param
create-user-group get-reporting-task pg-disable-services start-reporting-tasks
current-user get-reporting-tasks pg-enable-services stop-reporting-tasks
delete-node get-root-id pg-get-all-versions update-policy
delete-param get-service pg-get-param-context update-reg-client
delete-param-context get-services pg-get-services update-user-group
disable-services import-param-context pg-get-vars upload-template
disconnect-node list-param-contexts pg-get-version
download-template list-reg-clients pg-import
Arguments that represent a path to a file, such as `-p` or when setting a properties file in the session, will auto-complete the path being typed:

View File

@ -57,6 +57,9 @@ public class NiFiClientFactory implements ClientFactory<NiFiClient> {
throw new MissingOptionException("Missing required option '" + CommandOption.URL.getLongName() + "'");
}
final String connectionTimeout = properties.getProperty(CommandOption.CONNECTION_TIMEOUT.getLongName());
final String readTimeout = properties.getProperty(CommandOption.READ_TIMEOUT.getLongName());
final String keystore = properties.getProperty(CommandOption.KEYSTORE.getLongName());
final String keystoreType = properties.getProperty(CommandOption.KEYSTORE_TYPE.getLongName());
final String keystorePasswd = properties.getProperty(CommandOption.KEYSTORE_PASSWORD.getLongName());
@ -109,6 +112,25 @@ public class NiFiClientFactory implements ClientFactory<NiFiClient> {
}
}
if (!StringUtils.isBlank(connectionTimeout)) {
try {
Integer timeout = Integer.valueOf(connectionTimeout);
clientConfigBuilder.connectTimeout(timeout);
} catch(Exception e) {
throw new MissingOptionException("connectionTimeout has to be an integer");
}
}
if (!StringUtils.isBlank(readTimeout)) {
try {
Integer timeout = Integer.valueOf(readTimeout);
clientConfigBuilder.readTimeout(timeout);
} catch(Exception e) {
throw new MissingOptionException("readTimeout has to be an integer");
}
}
final NiFiClient client = new JerseyNiFiClient.Builder().config(clientConfigBuilder.build()).build();
// if a proxied entity was specified then return a wrapped client, otherwise return the regular client

View File

@ -51,6 +51,9 @@ public class NiFiRegistryClientFactory implements ClientFactory<NiFiRegistryClie
throw new MissingOptionException("Missing required option '" + CommandOption.URL.getLongName() + "'");
}
final String connectionTimeout = properties.getProperty(CommandOption.CONNECTION_TIMEOUT.getLongName());
final String readTimeout = properties.getProperty(CommandOption.READ_TIMEOUT.getLongName());
final String keystore = properties.getProperty(CommandOption.KEYSTORE.getLongName());
final String keystoreType = properties.getProperty(CommandOption.KEYSTORE_TYPE.getLongName());
final String keystorePasswd = properties.getProperty(CommandOption.KEYSTORE_PASSWORD.getLongName());
@ -99,6 +102,24 @@ public class NiFiRegistryClientFactory implements ClientFactory<NiFiRegistryClie
}
}
if (!StringUtils.isBlank(connectionTimeout)) {
try {
Integer timeout = Integer.valueOf(connectionTimeout);
clientConfigBuilder.connectTimeout(timeout);
} catch(Exception e) {
throw new MissingOptionException("connectionTimeout has to be an integer");
}
}
if (!StringUtils.isBlank(readTimeout)) {
try {
Integer timeout = Integer.valueOf(readTimeout);
clientConfigBuilder.readTimeout(timeout);
} catch(Exception e) {
throw new MissingOptionException("readTimeout has to be an integer");
}
}
final NiFiRegistryClientConfig clientConfig = clientConfigBuilder.build();
final NiFiRegistryClient client = new JerseyNiFiRegistryClient.Builder().config(clientConfig).build();
final ExtendedNiFiRegistryClient extendedClient = new JerseyExtendedNiFiRegistryClient(client, clientConfig);

View File

@ -24,6 +24,8 @@ import org.apache.commons.cli.Option;
public enum CommandOption {
// General
CONNECTION_TIMEOUT("cto", "connectionTimeout", "Timeout parameter for creating a connection to NiFi/Registry, specified in milliseconds", true),
READ_TIMEOUT("rto", "readTimeout", "Timeout parameter for reading from NiFi/Registry, specified in milliseconds", true),
URL("u", "baseUrl", "The URL to execute the command against", true),
INPUT_SOURCE("i", "input", "A local file to read as input contents, or a public URL to fetch", true, true),
OUTPUT_FILE("o", "outputFile", "A file to write output to, must contain full path and filename", true, true),

View File

@ -21,6 +21,7 @@ import org.apache.nifi.toolkit.cli.impl.command.AbstractCommandGroup;
import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.CreateBucket;
import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.DeleteBucket;
import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.UpdateBucketPolicy;
import org.apache.nifi.toolkit.cli.impl.command.registry.extension.DownloadBundle;
import org.apache.nifi.toolkit.cli.impl.command.registry.extension.GetBundleChecksum;
import org.apache.nifi.toolkit.cli.impl.command.registry.extension.ListBundleArtifacts;
@ -96,6 +97,7 @@ public class NiFiRegistryCommandGroup extends AbstractCommandGroup {
commandList.add(new UpdateUserGroup());
commandList.add(new GetAccessPolicy());
commandList.add(new CreateOrUpdateAccessPolicy());
commandList.add(new UpdateBucketPolicy());
return new ArrayList<>(commandList);
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.toolkit.cli.impl.command.registry.bucket;
import org.apache.commons.cli.ParseException;
import org.apache.nifi.registry.authorization.AccessPolicy;
import org.apache.nifi.registry.authorization.Tenant;
import org.apache.nifi.registry.bucket.Bucket;
import org.apache.nifi.registry.client.NiFiRegistryClient;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.toolkit.cli.api.Context;
import org.apache.nifi.toolkit.cli.impl.client.ExtendedNiFiRegistryClient;
import org.apache.nifi.toolkit.cli.impl.client.registry.PoliciesClient;
import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
import org.apache.nifi.toolkit.cli.impl.command.registry.tenant.TenantHelper;
import org.apache.nifi.toolkit.cli.impl.result.StringResult;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
public class UpdateBucketPolicy extends AbstractNiFiRegistryCommand<StringResult> {
public UpdateBucketPolicy() {
super("update-bucket-policy", StringResult.class);
}
@Override
public String getDescription() {
return "Updates access policy of bucket, NOTE: Overwrites the users/user-groups in the specified policy";
}
@Override
public void doInitialize(final Context context) {
addOption(CommandOption.BUCKET_NAME.createOption());
addOption(CommandOption.BUCKET_ID.createOption());
addOption(CommandOption.USER_NAME_LIST.createOption());
addOption(CommandOption.USER_ID_LIST.createOption());
addOption(CommandOption.GROUP_NAME_LIST.createOption());
addOption(CommandOption.GROUP_ID_LIST.createOption());
addOption(CommandOption.POLICY_ACTION.createOption());
}
@Override
public StringResult doExecute(NiFiRegistryClient client, Properties properties) throws IOException, NiFiRegistryException, ParseException {
if (!(client instanceof ExtendedNiFiRegistryClient)) {
throw new IllegalArgumentException("This command needs extended registry client!");
}
final ExtendedNiFiRegistryClient extendedClient = (ExtendedNiFiRegistryClient) client;
final PoliciesClient policiesClient = extendedClient.getPoliciesClient();
final String bucketName = getArg(properties, CommandOption.BUCKET_NAME);
String bucketId = getArg(properties, CommandOption.BUCKET_ID);
final String userNames = getArg(properties, CommandOption.USER_NAME_LIST);
final String userIds = getArg(properties, CommandOption.USER_ID_LIST);
final String groupNames = getArg(properties, CommandOption.GROUP_NAME_LIST);
final String groupIds = getArg(properties, CommandOption.GROUP_ID_LIST);
final String policyAction = getRequiredArg(properties, CommandOption.POLICY_ACTION);
final HashSet<String> permittedActions = new HashSet<>(Arrays.asList("read", "write", "delete"));
if (!permittedActions.contains(policyAction)) {
throw new IllegalArgumentException("Only read, write, delete actions permitted");
}
if (StringUtils.isBlank(bucketName) == StringUtils.isBlank(bucketId)) {
throw new IllegalArgumentException("Specify either bucket name or bucket id");
}
if (StringUtils.isBlank(bucketId)) {
final Optional<Bucket> optionalBucket = client.getBucketClient().getAll()
.stream().filter(b -> bucketName.equals(b.getName())).findAny();
if (!optionalBucket.isPresent()) {
throw new IllegalArgumentException("Specified bucket does not exist");
}
bucketId = optionalBucket.get().getIdentifier();
} else {
try {
extendedClient.getBucketClient().get(bucketId);
} catch (NiFiRegistryException e) {
throw new IllegalArgumentException("Specified bucket does not exist");
}
}
AccessPolicy accessPolicy;
String resource = "/buckets/" + bucketId;
try {
accessPolicy = policiesClient.getAccessPolicy(policyAction, resource);
} catch (NiFiRegistryException e) {
accessPolicy = new AccessPolicy();
accessPolicy.setResource(resource);
accessPolicy.setAction(policyAction);
}
if (!StringUtils.isBlank(userNames) || !StringUtils.isBlank(userIds)) {
Set<Tenant> users = TenantHelper.selectExistingTenants(userNames,
userIds, extendedClient.getTenantsClient().getUsers());
//Overwrite users, similar to CreateOrUpdateAccessPolicy of Registry
accessPolicy.setUsers(users);
}
if (!StringUtils.isBlank(groupNames) || !StringUtils.isBlank(groupIds)) {
Set<Tenant> groups = TenantHelper.selectExistingTenants(groupNames,
groupIds, extendedClient.getTenantsClient().getUserGroups());
//Overwrite user-groups, similar to CreateOrUpdateAccessPolicy of Registry
accessPolicy.setUserGroups(groups);
}
AccessPolicy updatedPolicy = StringUtils.isBlank(accessPolicy.getIdentifier())
? policiesClient.createAccessPolicy(accessPolicy)
: policiesClient.updateAccessPolicy(accessPolicy);
return new StringResult(updatedPolicy.getIdentifier(), getContext().isInteractive());
}
}

View File

@ -55,7 +55,8 @@ public class RegistryManualIT {
private static final String TEST_USER_GROUP_NAME = "testUserGroup";
private String testUserId;
private String testUserGroupId;
private static final String TEST_BUCKET_NAME = "testBucket";
private String testBucketId;
private PrintStream originalStdOut;
private ByteArrayOutputStream out;
@ -98,6 +99,13 @@ public class RegistryManualIT {
testListUserGroup(expectedUserGroup);
}
@Ignore("Run first and only once")
@Test
public void testCreateBucket() throws Exception {
runRegistryCommand("create-bucket", "--bucketName " + TEST_BUCKET_NAME);
testListBuckets(TEST_BUCKET_NAME);
}
@Test
public void testListUsers() throws Exception {
testListUsers(TEST_USER_NAME);
@ -187,6 +195,36 @@ public class RegistryManualIT {
testGetAccessPolicy(action, resource);
}
@Test
public void testUpdateBucketPolicyByName() throws Exception {
String action = "/read";
runRegistryCommand("update-bucket-policy",
"--bucketName " + TEST_BUCKET_NAME +
" --accessPolicyAction " + action +
" --userNameList " + TEST_USER_NAME +
" --userIdList " + testUserId
);
testGetAccessPolicy(action, testBucketId);
}
@Test public void testUpdateBucketPolicyById() throws Exception {
String action = "/write";
runRegistryCommand("update-bucket-policy",
"--bucketId " + testBucketId +
" --accessPolicyAction " + action +
" --groupNameList " + TEST_USER_GROUP_NAME +
" --groupIdList " + testUserGroupId
);
testGetAccessPolicy(action, testBucketId);
}
@Test
public void testListBuckets() throws Exception {
testListBuckets(TEST_BUCKET_NAME);
}
private void testListUsers(String expectedUserName) throws IOException {
runCommand(
"\\s{3,}",
@ -219,6 +257,20 @@ public class RegistryManualIT {
);
}
private void testListBuckets(String expectedBucketName) throws IOException {
runCommand("\\s{3,}",
() -> runRegistryCommand("list-buckets",""),
words -> {
if (words.length > 2 && words[1].equals(expectedBucketName)) {
testBucketId = words[2];
}
},
() -> {
assertNotNull(testBucketId);
assertTrue("Bucket ID should not be blank!", !StringUtils.isBlank(testBucketId));
});
}
private void testGetAccessPolicy(String action, String resource) throws IOException {
AtomicReference<String> resourceR = new AtomicReference<>();
AtomicReference<String> actionR = new AtomicReference<>();