NIFI-7788 Created a new endpoint in RemoteProcessGroupResource to allow updating run statuses/transmission state of all remote process groups within a process group.

When selecting run/stop on a process group/canvas/selection, it will try to enable/disable transmission of all involved remote process groups.

NIFI-7788 Supplied same functionality missed when selecting a process group.
NIFI-7788 Updated endpoint URL paths.
NIFI-7788 No need to return list of remote process groups when updating en masse.
NIFI-7788 Added some null checks in RemoteProcessGroupsEndpointMerger.merge.
NIFI-7788 Fix checkstyle violation.

This closes #4516.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2020-09-08 18:26:00 +02:00 committed by Peter Turcsanyi
parent 13d5be622b
commit 5bcfcf42bb
6 changed files with 274 additions and 30 deletions

View File

@ -32,10 +32,17 @@ import java.util.regex.Pattern;
public class RemoteProcessGroupsEndpointMerger implements EndpointResponseMerger {
public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
public static final Pattern REMOTE_PROCESS_GROUPS_RUN_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/remote-process-groups/process-group/[a-f0-9\\-]{36}/run-status");
@Override
public boolean canHandle(final URI uri, final String method) {
return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches();
if ("GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
} else if ("PUT".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_RUN_STATUS_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
}
return false;
}
@Override
@ -47,11 +54,16 @@ public class RemoteProcessGroupsEndpointMerger implements EndpointResponseMerger
final RemoteProcessGroupsEntity responseEntity = clientResponse.getClientResponse().readEntity(RemoteProcessGroupsEntity.class);
final Set<RemoteProcessGroupEntity> rpgEntities = responseEntity.getRemoteProcessGroups();
if (rpgEntities != null) {
final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> entityMap = new HashMap<>();
for (final NodeResponse nodeResponse : successfulResponses) {
final RemoteProcessGroupsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().readEntity(RemoteProcessGroupsEntity.class);
final RemoteProcessGroupsEntity nodeResponseEntity =
nodeResponse == clientResponse
? responseEntity
: nodeResponse.getClientResponse().readEntity(RemoteProcessGroupsEntity.class);
final Set<RemoteProcessGroupEntity> nodeRpgEntities = nodeResponseEntity.getRemoteProcessGroups();
if (nodeRpgEntities != null) {
for (final RemoteProcessGroupEntity nodeRpgEntity : nodeRpgEntities) {
final NodeIdentifier nodeId = nodeResponse.getNodeId();
Map<NodeIdentifier, RemoteProcessGroupEntity> innerMap = entityMap.get(nodeId);
@ -63,8 +75,10 @@ public class RemoteProcessGroupsEndpointMerger implements EndpointResponseMerger
innerMap.put(nodeResponse.getNodeId(), nodeRpgEntity);
}
}
}
RemoteProcessGroupsEntityMerger.mergeRemoteProcessGroups(rpgEntities, entityMap);
}
// create a new client response
return new NodeResponse(clientResponse, responseEntity);

View File

@ -1310,6 +1310,14 @@ public interface NiFiServiceFacade {
*/
StatusHistoryEntity getRemoteProcessGroupStatusHistory(String id);
/**
* Verifies that transmission state of all remote process groups within the specified process group can be updated.
* @param processGroupId The process group in which to verify remote process groups
* @param shouldTransmit The transmission state to verify for
*/
void verifyUpdateRemoteProcessGroups(String processGroupId, boolean shouldTransmit);
/**
* Verifies the specified remote process group can be updated.
*

View File

@ -570,6 +570,20 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
processGroupDAO.verifyDelete(groupId);
}
@Override
public void verifyUpdateRemoteProcessGroups(String processGroupId, boolean shouldTransmit) {
List<RemoteProcessGroup> allRemoteProcessGroups = processGroupDAO.getProcessGroup(processGroupId).findAllRemoteProcessGroups();
allRemoteProcessGroups.stream()
.map(remoteProcessGroup -> {
final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
dto.setId(remoteProcessGroup.getIdentifier());
dto.setTransmitting(shouldTransmit);
return dto;
})
.forEach(this::verifyUpdateRemoteProcessGroup);
}
@Override
public void verifyUpdateRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroupDTO) {
// if remote group does not exist, then the update request is likely creating it

View File

@ -24,10 +24,12 @@ import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.ProcessGroupAuthorizable;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.OperationAuthorizable;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.ComponentStateDTO;
@ -39,6 +41,7 @@ import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.RemotePortRunStatusEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
@ -58,6 +61,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.net.URI;
import java.util.Set;
import java.util.stream.Collectors;
/**
* RESTful endpoint for managing a Remote group.
@ -829,6 +833,104 @@ public class RemoteProcessGroupResource extends ApplicationResource {
);
}
/**
* Updates the operational status for all remote process groups in the specified process group with the specified value.
*
* @param httpServletRequest request
* @param processGroupId The id of the process group in which all remote process groups to update.
* @param requestRemotePortRunStatusEntity A remotePortRunStatusEntity that holds the desired run status
* @return A response with an array of RemoteProcessGroupEntity objects.
*/
@PUT
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("process-group/{id}/run-status")
@ApiOperation(
value = "Updates run status of all remote process groups in a process group (recursively)",
response = RemoteProcessGroupEntity.class,
authorizations = {
@Authorization(value = "Write - /remote-process-groups/{uuid} or /operation/remote-process-groups/{uuid}")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response updateRemoteProcessGroupRunStatuses(
@Context HttpServletRequest httpServletRequest,
@ApiParam(
value = "The process group id.",
required = true
)
@PathParam("id") String processGroupId,
@ApiParam(
value = "The remote process groups run status.",
required = true
) final RemotePortRunStatusEntity requestRemotePortRunStatusEntity
) {
if (requestRemotePortRunStatusEntity == null) {
throw new IllegalArgumentException("Remote process group run status must be specified.");
}
requestRemotePortRunStatusEntity.validateState();
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, requestRemotePortRunStatusEntity);
} else if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(requestRemotePortRunStatusEntity.isDisconnectedNodeAcknowledged());
}
// handle expects request (usually from the cluster manager)
final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(
processGroupId,
group -> group.findAllRemoteProcessGroups().stream()
.filter(remoteProcessGroup ->
requestRemotePortRunStatusEntity.getState().equals("TRANSMITTING") && !remoteProcessGroup.isTransmitting()
|| requestRemotePortRunStatusEntity.getState().equals("STOPPED") && remoteProcessGroup.isTransmitting()
)
.filter(remoteProcessGroup -> OperationAuthorizable.isOperationAuthorized(remoteProcessGroup, authorizer, NiFiUserUtils.getNiFiUser()))
.map(RemoteProcessGroup::getIdentifier)
.collect(Collectors.toSet())
);
return withWriteLock(
serviceFacade,
requestRemotePortRunStatusEntity,
revisions,
lookup -> {
final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId);
authorizeProcessGroup(processGroup, authorizer, lookup, RequestAction.READ, false, false, false, false, false);
Set<Authorizable> remoteProcessGroups = processGroup.getEncapsulatedRemoteProcessGroups();
for (Authorizable remoteProcessGroup : remoteProcessGroups) {
OperationAuthorizable.authorizeOperation(remoteProcessGroup, authorizer, NiFiUserUtils.getNiFiUser());
}
},
() -> serviceFacade.verifyUpdateRemoteProcessGroups(processGroupId, shouldTransmit(requestRemotePortRunStatusEntity)),
(_revisions, remotePortRunStatusEntity) -> {
Set<RemoteProcessGroupEntity> remoteProcessGroupEntities = _revisions.stream()
.map(revision -> {
final RemoteProcessGroupEntity entity = serviceFacade.updateRemoteProcessGroup(revision, createDTOWithDesiredRunStatus(revision.getComponentId(), remotePortRunStatusEntity));
populateRemainingRemoteProcessGroupEntityContent(entity);
return entity;
})
.collect(Collectors.toSet());
RemoteProcessGroupsEntity remoteProcessGroupsEntity = new RemoteProcessGroupsEntity();
Response response = generateOkResponse(remoteProcessGroupsEntity).build();
return response;
}
);
}
/**
* Gets the state for a RemoteProcessGroup.
*

View File

@ -38,6 +38,7 @@ import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.nar.ExtensionManager;
@ -50,6 +51,7 @@ import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.EntityFactory;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
@ -57,18 +59,23 @@ import org.apache.nifi.web.api.entity.ActionEntity;
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import org.apache.nifi.web.security.token.NiFiAuthenticationToken;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import java.util.Arrays;
import java.util.List;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -186,7 +193,7 @@ public class StandardNiFiServiceFacadeTest {
final ControllerFacade controllerFacade = new ControllerFacade();
controllerFacade.setFlowController(flowController);
processGroupDAO = mock(ProcessGroupDAO.class);
processGroupDAO = mock(ProcessGroupDAO.class, Answers.RETURNS_DEEP_STUBS);
serviceFacade = new StandardNiFiServiceFacade();
serviceFacade.setAuditService(auditService);
@ -418,4 +425,67 @@ public class StandardNiFiServiceFacadeTest {
assertTrue(serviceFacade.isAnyProcessGroupUnderVersionControl(groupId));
}
@Test
public void testVerifyUpdateRemoteProcessGroups() throws Exception {
// GIVEN
RemoteProcessGroupDAO remoteProcessGroupDAO = mock(RemoteProcessGroupDAO.class);
serviceFacade.setRemoteProcessGroupDAO(remoteProcessGroupDAO);
String groupId = "groupId";
boolean shouldTransmit = true;
String remoteProcessGroupId1 = "remoteProcessGroupId1";
String remoteProcessGroupId2 = "remoteProcessGroupId2";
List<RemoteProcessGroup> remoteProcessGroups = Arrays.asList(
// Current 'transmitting' status should not influence the verification, which should be solely based on the 'shouldTransmitting' value
mockRemoteProcessGroup(remoteProcessGroupId1, true),
mockRemoteProcessGroup(remoteProcessGroupId2, false)
);
List<RemoteProcessGroupDTO> expected = Arrays.asList(
createRemoteProcessGroupDTO(remoteProcessGroupId1, shouldTransmit),
createRemoteProcessGroupDTO(remoteProcessGroupId2, shouldTransmit)
);
when(processGroupDAO.getProcessGroup(groupId).findAllRemoteProcessGroups()).thenReturn(remoteProcessGroups);
expected.stream()
.map(RemoteProcessGroupDTO::getId)
.forEach(remoteProcessGroupId -> when(remoteProcessGroupDAO.hasRemoteProcessGroup(remoteProcessGroupId)).thenReturn(true));
// WHEN
serviceFacade.verifyUpdateRemoteProcessGroups(groupId, shouldTransmit);
// THEN
ArgumentCaptor<RemoteProcessGroupDTO> remoteProcessGroupDTOArgumentCaptor = ArgumentCaptor.forClass(RemoteProcessGroupDTO.class);
verify(remoteProcessGroupDAO, times(remoteProcessGroups.size())).verifyUpdate(remoteProcessGroupDTOArgumentCaptor.capture());
List<RemoteProcessGroupDTO> actual = remoteProcessGroupDTOArgumentCaptor.getAllValues();
assertEquals(toMap(expected), toMap(actual));
}
private Map<String, Boolean> toMap(List<RemoteProcessGroupDTO> list) {
return list.stream().collect(Collectors.toMap(RemoteProcessGroupDTO::getId, RemoteProcessGroupDTO::isTransmitting));
}
private RemoteProcessGroup mockRemoteProcessGroup(String identifier, boolean transmitting) {
RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
when(remoteProcessGroup.getIdentifier()).thenReturn(identifier);
when(remoteProcessGroup.isTransmitting()).thenReturn(transmitting);
return remoteProcessGroup;
}
private RemoteProcessGroupDTO createRemoteProcessGroupDTO(String id, boolean transmitting) {
RemoteProcessGroupDTO remoteProcessGroup = new RemoteProcessGroupDTO();
remoteProcessGroup.setId(id);
remoteProcessGroup.setTransmitting(transmitting);
return remoteProcessGroup;
}
}

View File

@ -652,11 +652,18 @@
'id': nfCanvasUtils.getGroupId(),
'state': 'RUNNING'
};
updateResource(config.urls.api + '/flow/process-groups/' + encodeURIComponent(nfCanvasUtils.getGroupId()), entity).done(updateProcessGroup);
var remoteProcessGroupEntity = {
'state': 'TRANSMITTING'
};
updateResource(config.urls.api + '/remote-process-groups/process-group/' + encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', remoteProcessGroupEntity)
.done(function (response) {
nfRemoteProcessGroup.set(response.remoteProcessGroups);
});
} else {
var componentsToStart = selection.filter(function (d) {
return nfCanvasUtils.isRunnable(d3.select(this));
return nfCanvasUtils.isRunnable(d3.select(this)) || nfCanvasUtils.canStartTransmitting(d3.select(this));
});
// ensure there are startable components selected
@ -675,6 +682,12 @@
'id': d.id,
'state': 'RUNNING'
}
} else if (nfCanvasUtils.isRemoteProcessGroup(selected)) {
uri = d.uri + '/run-status';
entity = {
'revision': nfClient.getRevision(d),
'state': 'TRANSMITTING'
};
} else {
uri = d.uri + '/run-status';
entity = {
@ -683,13 +696,21 @@
};
}
startRequests.push(updateResource(uri, entity).done(function (response) {
if (nfCanvasUtils.isProcessGroup(selected)) {
var remoteProcessGroupEntity = {
'state': 'TRANSMITTING'
};
var startRemoteProcessGroups = updateResource(config.urls.api + '/remote-process-groups/process-group/' + encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', remoteProcessGroupEntity);
startRequests.push(startRemoteProcessGroups.done(function (response) {}));
startRequests.push(updateResource(uri, entity).done(function (response) {
nfCanvasUtils.getComponentByType('ProcessGroup').reload(d.id);
} else {
nfCanvasUtils.getComponentByType(d.type).set(response);
}
}));
} else {
startRequests.push(updateResource(uri, entity).done(function (response) {
nfCanvasUtils.getComponentByType(d.type).set(response);
}));
}
});
// inform Angular app once the updates have completed
@ -755,11 +776,18 @@
'id': nfCanvasUtils.getGroupId(),
'state': 'STOPPED'
};
updateResource(config.urls.api + '/flow/process-groups/' + encodeURIComponent(nfCanvasUtils.getGroupId()), entity).done(updateProcessGroup);
var remoteProcessGroupEntity = {
'state': 'STOPPED'
};
updateResource(config.urls.api + '/remote-process-groups/process-group/' + encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', remoteProcessGroupEntity)
.done(function (response) {
nfRemoteProcessGroup.set(response.remoteProcessGroups);
});
} else {
var componentsToStop = selection.filter(function (d) {
return nfCanvasUtils.isStoppable(d3.select(this));
return nfCanvasUtils.isStoppable(d3.select(this)) || nfCanvasUtils.canStopTransmitting(d3.select(this));
});
// ensure there are some component to stop
@ -786,13 +814,21 @@
};
}
stopRequests.push(updateResource(uri, entity).done(function (response) {
if (nfCanvasUtils.isProcessGroup(selected)) {
var remoteProcessGroupEntity = {
'state': 'STOPPED'
};
var stopRemoteProcessGroups = updateResource(config.urls.api + '/remote-process-groups/process-group/' + encodeURIComponent(nfCanvasUtils.getGroupId()) + '/run-status', remoteProcessGroupEntity);
stopRequests.push(stopRemoteProcessGroups.done(function (response) {}));
stopRequests.push(updateResource(uri, entity).done(function (response) {
nfCanvasUtils.getComponentByType('ProcessGroup').reload(d.id);
} else {
nfCanvasUtils.getComponentByType(d.type).set(response);
}
}));
} else {
stopRequests.push(updateResource(uri, entity).done(function (response) {
nfCanvasUtils.getComponentByType(d.type).set(response);
}));
}
});
// inform Angular app once the updates have completed