NIFI-13476 Optimized Authorization Checking in Process Group Status (#9021)

- Avoided invoking Authorizer when nested component is not included in Process Group Status based on depth of recursion
This commit is contained in:
David Handermann 2024-07-01 13:58:22 -05:00 committed by GitHub
parent 903090b649
commit 3bb79f1ba6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 249 additions and 7 deletions

View File

@ -72,6 +72,9 @@ import java.util.function.Predicate;
public abstract class AbstractEventAccess implements EventAccess {
private static final Logger logger = LoggerFactory.getLogger(AbstractEventAccess.class);
private static final Predicate<Authorizable> AUTHORIZATION_APPROVED = authorizable -> true;
private static final Predicate<Authorizable> AUTHORIZATION_DENIED = authorizable -> false;
private final ProcessScheduler processScheduler;
private final StatusAnalyticsEngine statusAnalyticsEngine;
private final FlowManager flowManager;
@ -96,7 +99,7 @@ public abstract class AbstractEventAccess implements EventAccess {
public ProcessGroupStatus getGroupStatus(final String groupId) {
final RepositoryStatusReport statusReport = generateRepositoryStatusReport();
final ProcessGroup group = flowManager.getGroup(groupId);
return getGroupStatus(group, statusReport, authorizable -> true, Integer.MAX_VALUE, 1, true);
return getGroupStatus(group, statusReport, AUTHORIZATION_APPROVED, Integer.MAX_VALUE, 1, true);
}
/**
@ -112,7 +115,7 @@ public abstract class AbstractEventAccess implements EventAccess {
final ProcessGroup group = flowManager.getGroup(groupId);
// this was invoked with no user context so the results will be unfiltered... necessary for aggregating status history
return getGroupStatus(group, statusReport, authorizable -> true, Integer.MAX_VALUE, 1, false);
return getGroupStatus(group, statusReport, AUTHORIZATION_APPROVED, Integer.MAX_VALUE, 1, false);
}
protected RepositoryStatusReport generateRepositoryStatusReport() {
@ -127,13 +130,13 @@ public abstract class AbstractEventAccess implements EventAccess {
*
* @param group group id
* @param statusReport report
* @param isAuthorized is authorized check
* @param checkAuthorization is authorized check
* @param recursiveStatusDepth the number of levels deep we should recurse and still include the the processors' statuses, the groups' statuses, etc. in the returned ProcessGroupStatus
* @param currentDepth the current number of levels deep that we have recursed
* @param includeConnectionDetails whether or not to include the details of the connections that may be expensive to calculate and/or require locks be obtained
* @return the component status
*/
ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport, final Predicate<Authorizable> isAuthorized,
ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStatusReport statusReport, final Predicate<Authorizable> checkAuthorization,
final int recursiveStatusDepth, final int currentDepth, final boolean includeConnectionDetails) {
if (group == null) {
return null;
@ -141,7 +144,7 @@ public abstract class AbstractEventAccess implements EventAccess {
final ProcessGroupStatus status = new ProcessGroupStatus();
status.setId(group.getIdentifier());
status.setName(isAuthorized.test(group) ? group.getName() : group.getIdentifier());
status.setName(checkAuthorization.test(group) ? group.getName() : group.getIdentifier());
int activeGroupThreads = 0;
int terminatedGroupThreads = 0;
long bytesRead = 0L;
@ -162,6 +165,14 @@ public abstract class AbstractEventAccess implements EventAccess {
final boolean populateChildStatuses = currentDepth <= recursiveStatusDepth;
// Set Authorization predicate based on whether to populate child component status avoiding unnecessary calls to Authorizer
final Predicate<Authorizable> isAuthorized;
if (populateChildStatuses) {
isAuthorized = checkAuthorization;
} else {
isAuthorized = AUTHORIZATION_DENIED;
}
// set status for processors
final Collection<ProcessorStatus> processorStatusCollection = new ArrayList<>();
status.setProcessorStatus(processorStatusCollection);
@ -196,7 +207,7 @@ public abstract class AbstractEventAccess implements EventAccess {
// avoid performing any sort of authorizations. Because we only care about the numbers that come back, we can just indicate
// that the user is not authorized. This allows us to avoid the expense of both performing the authorization and calculating
// things that we would otherwise need to calculate if the user were in fact authorized.
childGroupStatus = getGroupStatus(childGroup, statusReport, authorizable -> false, recursiveStatusDepth, currentDepth + 1, includeConnectionDetails);
childGroupStatus = getGroupStatus(childGroup, statusReport, AUTHORIZATION_DENIED, recursiveStatusDepth, currentDepth + 1, includeConnectionDetails);
}
activeGroupThreads += childGroupStatus.getActiveThreadCount();
@ -686,7 +697,7 @@ public abstract class AbstractEventAccess implements EventAccess {
final ProcessGroup group = flowManager.getGroup(rootGroupId);
final RepositoryStatusReport statusReport = generateRepositoryStatusReport();
return getGroupStatus(group, statusReport, authorizable -> true, Integer.MAX_VALUE, 1, true);
return getGroupStatus(group, statusReport, AUTHORIZATION_APPROVED, Integer.MAX_VALUE, 1, true);
}
@Override

View File

@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.reporting;
import org.apache.nifi.action.Action;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.repository.StandardRepositoryStatusReport;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Predicate;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class AbstractEventAccessTest {
private static final boolean INCLUDE_CONNECTION_DETAILS = true;
private static final String PROCESS_GROUP_NAME = "Root Group";
private static final String PROCESS_GROUP_ID = UUID.randomUUID().toString();
private static final String PROCESSOR_NAME = "Event Processor";
private static final String PROCESSOR_ID = UUID.randomUUID().toString();
private static final int ZERO_DEPTH = 0;
private static final int SINGLE_DEPTH = 1;
@Mock
private ProcessScheduler processScheduler;
@Mock
private StatusAnalyticsEngine statusAnalyticsEngine;
@Mock
private FlowManager flowManager;
@Mock
private FlowFileEventRepository flowFileEventRepository;
@Mock
private ProcessGroup processGroup;
@Mock
private ProcessorNode processorNode;
private AbstractEventAccess eventAccess;
@BeforeEach
void setEventAccess() {
eventAccess = new ConcreteEventAccess(processScheduler, statusAnalyticsEngine, flowManager, flowFileEventRepository);
}
@Test
void testGetGroupStatusAuthorized() {
final List<Authorizable> authorizables = new ArrayList<>();
final RepositoryStatusReport repositoryStatusReport = new StandardRepositoryStatusReport();
final Predicate<Authorizable> checkAuthorization = authorizables::add;
when(processGroup.getName()).thenReturn(PROCESS_GROUP_NAME);
when(processGroup.getIdentifier()).thenReturn(PROCESS_GROUP_ID);
final ProcessGroupStatus groupStatus = eventAccess.getGroupStatus(processGroup, repositoryStatusReport, checkAuthorization, SINGLE_DEPTH, SINGLE_DEPTH, INCLUDE_CONNECTION_DETAILS);
assertNotNull(groupStatus);
assertEquals(PROCESS_GROUP_NAME, groupStatus.getName());
assertEquals(PROCESS_GROUP_ID, groupStatus.getId());
assertEquals(1, authorizables.size());
}
@Test
void testGetGroupStatusProcessorAuthorizedNotIncluded() {
final List<Authorizable> authorizables = new ArrayList<>();
final RepositoryStatusReport repositoryStatusReport = new StandardRepositoryStatusReport();
final Predicate<Authorizable> checkAuthorization = authorizables::add;
when(processorNode.getIdentifier()).thenReturn(PROCESSOR_ID);
when(processorNode.getProcessGroup()).thenReturn(processGroup);
when(processGroup.getProcessors()).thenReturn(List.of(processorNode));
when(processGroup.getName()).thenReturn(PROCESS_GROUP_NAME);
when(processGroup.getIdentifier()).thenReturn(PROCESS_GROUP_ID);
final ProcessGroupStatus groupStatus = eventAccess.getGroupStatus(processGroup, repositoryStatusReport, checkAuthorization, ZERO_DEPTH, SINGLE_DEPTH, INCLUDE_CONNECTION_DETAILS);
assertNotNull(groupStatus);
assertEquals(PROCESS_GROUP_NAME, groupStatus.getName());
assertEquals(PROCESS_GROUP_ID, groupStatus.getId());
assertTrue(groupStatus.getProcessorStatus().isEmpty());
assertEquals(1, authorizables.size());
}
@Test
void testGetGroupStatusProcessorAuthorized() {
final List<Authorizable> authorizables = new ArrayList<>();
final RepositoryStatusReport repositoryStatusReport = new StandardRepositoryStatusReport();
final Predicate<Authorizable> checkAuthorization = authorizables::add;
when(processorNode.getName()).thenReturn(PROCESSOR_NAME);
when(processorNode.getIdentifier()).thenReturn(PROCESSOR_ID);
when(processorNode.getProcessGroup()).thenReturn(processGroup);
when(processGroup.getProcessors()).thenReturn(List.of(processorNode));
when(processGroup.getName()).thenReturn(PROCESS_GROUP_NAME);
when(processGroup.getIdentifier()).thenReturn(PROCESS_GROUP_ID);
final ProcessGroupStatus groupStatus = eventAccess.getGroupStatus(processGroup, repositoryStatusReport, checkAuthorization, SINGLE_DEPTH, SINGLE_DEPTH, INCLUDE_CONNECTION_DETAILS);
assertNotNull(groupStatus);
assertEquals(PROCESS_GROUP_NAME, groupStatus.getName());
assertEquals(PROCESS_GROUP_ID, groupStatus.getId());
final Optional<ProcessorStatus> processorStatusFound = groupStatus.getProcessorStatus().stream().findFirst();
assertTrue(processorStatusFound.isPresent());
final ProcessorStatus processorStatus = processorStatusFound.get();
assertEquals(PROCESSOR_NAME, processorStatus.getName());
assertEquals(PROCESSOR_ID, processorStatus.getId());
assertEquals(2, authorizables.size());
}
@Test
void testGetGroupStatusNotAuthorized() {
final List<Authorizable> authorizables = new ArrayList<>();
final RepositoryStatusReport repositoryStatusReport = new StandardRepositoryStatusReport();
final Predicate<Authorizable> checkAuthorization = authorizable -> {
authorizables.add(authorizable);
return false;
};
when(processorNode.getIdentifier()).thenReturn(PROCESSOR_ID);
when(processorNode.getProcessGroup()).thenReturn(processGroup);
when(processGroup.getProcessors()).thenReturn(List.of(processorNode));
when(processGroup.getIdentifier()).thenReturn(PROCESS_GROUP_ID);
final ProcessGroupStatus groupStatus = eventAccess.getGroupStatus(processGroup, repositoryStatusReport, checkAuthorization, SINGLE_DEPTH, SINGLE_DEPTH, INCLUDE_CONNECTION_DETAILS);
assertNotNull(groupStatus);
assertEquals(PROCESS_GROUP_ID, groupStatus.getName());
assertEquals(PROCESS_GROUP_ID, groupStatus.getId());
final Optional<ProcessorStatus> processorStatusFound = groupStatus.getProcessorStatus().stream().findFirst();
assertTrue(processorStatusFound.isPresent());
final ProcessorStatus processorStatus = processorStatusFound.get();
assertEquals(PROCESSOR_ID, processorStatus.getName());
assertEquals(PROCESSOR_ID, processorStatus.getId());
assertEquals(2, authorizables.size());
}
private static class ConcreteEventAccess extends AbstractEventAccess {
public ConcreteEventAccess(
final ProcessScheduler processScheduler,
final StatusAnalyticsEngine analyticsEngine,
final FlowManager flowManager,
final FlowFileEventRepository flowFileEventRepository
) {
super(processScheduler, analyticsEngine, flowManager, flowFileEventRepository);
}
@Override
public ProvenanceEventRepository getProvenanceRepository() {
return null;
}
@Override
public List<Action> getFlowChanges(int firstActionId, int maxActions) {
return List.of();
}
@Override
public Map<String, StorageUsage> getProvenanceRepositoryStorageUsage() {
return Map.of();
}
@Override
public Map<String, StorageUsage> getContentRepositoryStorageUsage() {
return Map.of();
}
@Override
public StorageUsage getFlowFileRepositoryStorageUsage() {
return null;
}
}
}