NIFI-9920 Fetching all bulletins of a process group on update process group (#6335)

This commit is contained in:
aksharau 2022-11-21 21:54:47 +05:30 committed by GitHub
parent 594e93d97e
commit a861bab34d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 209 additions and 5 deletions

View File

@ -1623,8 +1623,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode);
final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification());
final ProcessGroupStatusDTO status = dtoFactory.createConciseProcessGroupStatusDto(controllerFacade.getProcessGroupStatus(processGroupNode.getIdentifier()));
final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(processGroupNode.getIdentifier()));
final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
final List<BulletinEntity> bulletinEntities = getProcessGroupBulletins(processGroupNode);
return entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, permissions, status, bulletinEntities);
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.web;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
import org.apache.nifi.action.Operation;
@ -36,6 +37,7 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ParameterProviderReference;
import org.apache.nifi.flow.VersionedControllerService;
@ -50,17 +52,26 @@ import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinFactory;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.util.MockBulletinRepository;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.EntityFactory;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.entity.ActionEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import org.apache.nifi.web.revision.RevisionManager;
import org.apache.nifi.web.revision.RevisionUpdate;
import org.apache.nifi.web.revision.StandardRevisionUpdate;
import org.apache.nifi.web.security.token.NiFiAuthenticationToken;
import org.junit.Assert;
import org.junit.Before;
@ -71,6 +82,7 @@ import org.mockito.Mockito;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
@ -90,10 +102,10 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyMap;
import static org.mockito.Mockito.anySet;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.same;
import static org.mockito.Mockito.spy;
@ -101,6 +113,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class StandardNiFiServiceFacadeTest {
private static final String USER_1 = "user-1";
@ -114,6 +127,18 @@ public class StandardNiFiServiceFacadeTest {
private static final Integer ACTION_ID_2 = 2;
private static final String PROCESSOR_ID_2 = "processor-2";
private static final String GROUP_NAME_1 = "group-name-1";
private static final String GROUP_NAME_2 = "group-name-2";
private static final String PROCESSOR_NAME_1 = "Processor1";
private static final String PROCESSOR_NAME_2 = "Processor2";
private static final String BULLETIN_CATEGORY = "Log Message";
private static final String BULLETIN_SEVERITY = "ERROR";
private static final String BULLETIN_MESSAGE_1 = "Error1";
private static final String BULLETIN_MESSAGE_2 = "Error2";
private static final String PATH_TO_GROUP_1 = "Path1";
private static final String PATH_TO_GROUP_2 = "Path2";
private static final String RANDOM_GROUP_ID = "randomGroupId";
private StandardNiFiServiceFacade serviceFacade;
private Authorizer authorizer;
private FlowController flowController;
@ -555,4 +580,184 @@ public class StandardNiFiServiceFacadeTest {
return remoteProcessGroup;
}
@Test
public void testUpdateProcessGroup_WithProcessorBulletin() {
//GIVEN
final String groupId = UUID.randomUUID().toString();
final ProcessGroup processGroup = mock(ProcessGroup.class);
when(processGroup.getIdentifier()).thenReturn(groupId);
ProcessGroupStatus processGroupStatus = new ProcessGroupStatus();
processGroupStatus.setId(groupId);
processGroupStatus.setName(GROUP_NAME_1);
final ControllerFacade controllerFacade = mock(ControllerFacade.class);
when(controllerFacade.getProcessGroupStatus(any())).thenReturn(processGroupStatus);
final StandardNiFiServiceFacade serviceFacadeSpy = spy(serviceFacade);
serviceFacadeSpy.setControllerFacade(controllerFacade);
ProcessGroupDTO processGroupDTO = new ProcessGroupDTO();
processGroupDTO.setId(groupId);
when(processGroupDAO.getProcessGroup(groupId)).thenReturn(processGroup);
when(processGroupDAO.updateProcessGroup(processGroupDTO)).thenReturn(processGroup);
final RevisionManager revisionManager = mock(RevisionManager.class);
Revision revision = new Revision(1L, "a", "b");
final FlowModification lastModification = new FlowModification(revision, "a");
RevisionUpdate<Object> snapshot = new StandardRevisionUpdate<>(processGroupDTO, lastModification);
when(revisionManager.updateRevision(any(), any(), any())).thenReturn((RevisionUpdate<Object> )snapshot);
serviceFacadeSpy.setRevisionManager(revisionManager);
MockTestBulletinRepository bulletinRepository = new MockTestBulletinRepository();
serviceFacadeSpy.setBulletinRepository(bulletinRepository);
//add a bulletin for a processor in the current processor group
bulletinRepository.addBulletin(
BulletinFactory.createBulletin(groupId, GROUP_NAME_1, PROCESSOR_ID_1,
ComponentType.PROCESSOR, PROCESSOR_NAME_1,
BULLETIN_CATEGORY, BULLETIN_SEVERITY, BULLETIN_MESSAGE_1, PATH_TO_GROUP_1));
//add a bulletin for a processor in a different processor group
bulletinRepository.addBulletin(
BulletinFactory.createBulletin(RANDOM_GROUP_ID, GROUP_NAME_2, PROCESSOR_ID_2,
ComponentType.PROCESSOR, PROCESSOR_NAME_2,
BULLETIN_CATEGORY, BULLETIN_SEVERITY, BULLETIN_MESSAGE_2, PATH_TO_GROUP_2));
//WHEN
ProcessGroupEntity result = serviceFacadeSpy.updateProcessGroup(revision, processGroupDTO);
//THEN
Assert.assertNotNull(result);
Assert.assertEquals(1, result.getBulletins().size());
Assert.assertEquals(groupId, result.getBulletins().get(0).getGroupId());
}
@Test
public void testUpdateProcessGroup_WithNoBulletinForProcessGroup() {
//GIVEN
final String groupId = UUID.randomUUID().toString();
final ProcessGroup processGroup = mock(ProcessGroup.class);
when(processGroup.getIdentifier()).thenReturn(groupId);
ProcessGroupStatus processGroupStatus = new ProcessGroupStatus();
processGroupStatus.setId(groupId);
processGroupStatus.setName(GROUP_NAME_1);
final ControllerFacade controllerFacade = mock(ControllerFacade.class);
when(controllerFacade.getProcessGroupStatus(any())).thenReturn(processGroupStatus);
final StandardNiFiServiceFacade serviceFacadeSpy = spy(serviceFacade);
serviceFacadeSpy.setControllerFacade(controllerFacade);
ProcessGroupDTO processGroupDTO = new ProcessGroupDTO();
processGroupDTO.setId(groupId);
when(processGroupDAO.getProcessGroup(groupId)).thenReturn(processGroup);
when(processGroupDAO.updateProcessGroup(processGroupDTO)).thenReturn(processGroup);
final RevisionManager revisionManager = mock(RevisionManager.class);
Revision revision = new Revision(1L, "a", "b");
final FlowModification lastModification = new FlowModification(revision, "a");
RevisionUpdate<Object> snapshot = new StandardRevisionUpdate<>(processGroupDTO,lastModification);
when(revisionManager.updateRevision(any(), any(), any())).thenReturn((RevisionUpdate<Object> )snapshot);
serviceFacadeSpy.setRevisionManager(revisionManager);
MockTestBulletinRepository bulletinRepository = new MockTestBulletinRepository();
serviceFacadeSpy.setBulletinRepository(bulletinRepository);
//add a bulletin for a processor in a different processor group
bulletinRepository.addBulletin(
BulletinFactory.createBulletin(RANDOM_GROUP_ID, GROUP_NAME_2, PROCESSOR_ID_2,
ComponentType.PROCESSOR, PROCESSOR_NAME_2,
BULLETIN_CATEGORY, BULLETIN_SEVERITY, BULLETIN_MESSAGE_2, PATH_TO_GROUP_2));
//WHEN
ProcessGroupEntity result = serviceFacadeSpy.updateProcessGroup(revision, processGroupDTO);
//THEN
Assert.assertNotNull(result);
Assert.assertEquals(0, result.getBulletins().size());
}
@Test
public void testUpdateProcessGroup_WithProcessorGroupBulletin() {
//GIVEN
final String groupId = UUID.randomUUID().toString();
final ProcessGroup processGroup = mock(ProcessGroup.class);
when(processGroup.getIdentifier()).thenReturn(groupId);
ProcessGroupStatus processGroupStatus = new ProcessGroupStatus();
processGroupStatus.setId(groupId);
processGroupStatus.setName(GROUP_NAME_1);
final ControllerFacade controllerFacade = mock(ControllerFacade.class);
when(controllerFacade.getProcessGroupStatus(any())).thenReturn(processGroupStatus);
final StandardNiFiServiceFacade serviceFacadeSpy = spy(serviceFacade);
serviceFacadeSpy.setControllerFacade(controllerFacade);
ProcessGroupDTO processGroupDTO = new ProcessGroupDTO();
processGroupDTO.setId(groupId);
when(processGroupDAO.getProcessGroup(groupId)).thenReturn(processGroup);
when(processGroupDAO.updateProcessGroup(processGroupDTO)).thenReturn(processGroup);
final RevisionManager revisionManager = mock(RevisionManager.class);
Revision revision = new Revision(1L, "a", "b");
final FlowModification lastModification = new FlowModification(revision, "a");
RevisionUpdate<Object> snapshot = new StandardRevisionUpdate<>(processGroupDTO,lastModification);
when(revisionManager.updateRevision(any(), any(), any())).thenReturn((RevisionUpdate<Object> )snapshot);
serviceFacadeSpy.setRevisionManager(revisionManager);
MockTestBulletinRepository bulletinRepository = new MockTestBulletinRepository();
serviceFacadeSpy.setBulletinRepository(bulletinRepository);
//add a bulletin for current processor group, meaning the source is also the process group
bulletinRepository.addBulletin(
BulletinFactory.createBulletin(groupId, GROUP_NAME_1, groupId,
ComponentType.PROCESSOR, GROUP_NAME_1,
BULLETIN_CATEGORY, BULLETIN_SEVERITY, BULLETIN_MESSAGE_1, PATH_TO_GROUP_1));
//add a bulletin for a processor in a different processor group
bulletinRepository.addBulletin(
BulletinFactory.createBulletin(RANDOM_GROUP_ID,GROUP_NAME_2, PROCESSOR_ID_2,
ComponentType.PROCESSOR, PROCESSOR_NAME_2,
BULLETIN_CATEGORY, BULLETIN_SEVERITY, BULLETIN_MESSAGE_2, PATH_TO_GROUP_2));
//WHEN
ProcessGroupEntity result = serviceFacadeSpy.updateProcessGroup(revision, processGroupDTO);
//THEN
Assert.assertNotNull(result);
Assert.assertEquals(1, result.getBulletins().size());
Assert.assertEquals(groupId, result.getBulletins().get(0).getGroupId());
}
private static class MockTestBulletinRepository extends MockBulletinRepository {
List<Bulletin> bulletinList;
public MockTestBulletinRepository() {
bulletinList = new ArrayList<>();
}
@Override
public void addBulletin(Bulletin bulletin) {
bulletinList.add(bulletin);
}
@Override
public List<Bulletin> findBulletinsForGroupBySource(String groupId) {
List<Bulletin> ans = new ArrayList<>();
for(Bulletin b : bulletinList) {
if(b.getGroupId().equals(groupId))
ans.add(b);
}
return ans;
}
}
}