mirror of https://github.com/apache/nifi.git
NIFI-1994: Fixed issue with Controller Service Fully Qualified Class Names and ensure that services are added to the process groups as appropriate when instantiating templates
NIFI-1882: Ensuring Controller Services are copied as part of a ProcessGroupDTO. This closes #517
This commit is contained in:
parent
d64fe416be
commit
ce8a0de368
|
@ -278,7 +278,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
|
|||
|
||||
@Override
|
||||
public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String event) {
|
||||
eventReporter.reportEvent(severity, EVENT_CATEGORY, "Event Reported for " + nodeId.toString() + " -- " + event);
|
||||
eventReporter.reportEvent(severity, EVENT_CATEGORY, nodeId == null ? event : "Event Reported for " + nodeId + " -- " + event);
|
||||
if (nodeId != null) {
|
||||
addNodeEvent(nodeId, severity, event);
|
||||
}
|
||||
|
|
|
@ -94,8 +94,8 @@ public class ControllerServiceEntityMerger {
|
|||
clientDto.setValidationErrors(ErrorMerger.normalizedMergedErrors(validationErrorMap, dtoMap.size()));
|
||||
}
|
||||
|
||||
public static void mergeControllerServiceReferences(Set<ControllerServiceReferencingComponentEntity> referencingComponents,
|
||||
Map<NodeIdentifier, Set<ControllerServiceReferencingComponentEntity>> referencingComponentMap) {
|
||||
public static void mergeControllerServiceReferences(final Set<ControllerServiceReferencingComponentEntity> referencingComponents,
|
||||
final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentEntity>> referencingComponentMap) {
|
||||
|
||||
final Map<String, Integer> activeThreadCounts = new HashMap<>();
|
||||
final Map<String, String> states = new HashMap<>();
|
||||
|
@ -131,15 +131,17 @@ public class ControllerServiceEntityMerger {
|
|||
}
|
||||
|
||||
// go through each referencing components
|
||||
for (final ControllerServiceReferencingComponentEntity referencingComponent : referencingComponents) {
|
||||
final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId());
|
||||
if (activeThreadCount != null) {
|
||||
referencingComponent.getComponent().setActiveThreadCount(activeThreadCount);
|
||||
}
|
||||
if (referencingComponents != null) {
|
||||
for (final ControllerServiceReferencingComponentEntity referencingComponent : referencingComponents) {
|
||||
final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId());
|
||||
if (activeThreadCount != null) {
|
||||
referencingComponent.getComponent().setActiveThreadCount(activeThreadCount);
|
||||
}
|
||||
|
||||
final String state = states.get(referencingComponent.getId());
|
||||
if (state != null) {
|
||||
referencingComponent.getComponent().setState(state);
|
||||
final String state = states.get(referencingComponent.getId());
|
||||
if (state != null) {
|
||||
referencingComponent.getComponent().setState(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1523,6 +1523,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
|
||||
serviceNode.setComments(controllerServiceDTO.getComments());
|
||||
serviceNode.setName(controllerServiceDTO.getName());
|
||||
|
||||
group.addControllerService(serviceNode);
|
||||
}
|
||||
|
||||
// configure controller services. We do this after creating all of them in case 1 service
|
||||
|
@ -1717,6 +1719,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
childTemplateDTO.setProcessors(contents.getProcessors());
|
||||
childTemplateDTO.setFunnels(contents.getFunnels());
|
||||
childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups());
|
||||
childTemplateDTO.setControllerServices(contents.getControllerServices());
|
||||
instantiateSnippet(childGroup, childTemplateDTO);
|
||||
}
|
||||
|
||||
|
|
|
@ -112,16 +112,16 @@ public abstract class ApplicationResource {
|
|||
* @param path path
|
||||
* @return resource uri
|
||||
*/
|
||||
protected String generateResourceUri(String... path) {
|
||||
UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
|
||||
protected String generateResourceUri(final String... path) {
|
||||
final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
|
||||
uriBuilder.segment(path);
|
||||
URI uri = uriBuilder.build();
|
||||
try {
|
||||
|
||||
// check for proxy settings
|
||||
String scheme = httpServletRequest.getHeader(PROXY_SCHEME_HTTP_HEADER);
|
||||
String host = httpServletRequest.getHeader(PROXY_HOST_HTTP_HEADER);
|
||||
String port = httpServletRequest.getHeader(PROXY_PORT_HTTP_HEADER);
|
||||
final String scheme = httpServletRequest.getHeader(PROXY_SCHEME_HTTP_HEADER);
|
||||
final String host = httpServletRequest.getHeader(PROXY_HOST_HTTP_HEADER);
|
||||
final String port = httpServletRequest.getHeader(PROXY_PORT_HTTP_HEADER);
|
||||
String baseContextPath = httpServletRequest.getHeader(PROXY_CONTEXT_PATH_HTTP_HEADER);
|
||||
|
||||
// if necessary, prepend the context path
|
||||
|
@ -144,7 +144,7 @@ public abstract class ApplicationResource {
|
|||
} else {
|
||||
try {
|
||||
uriPort = Integer.parseInt(port);
|
||||
} catch (NumberFormatException nfe) {
|
||||
} catch (final NumberFormatException nfe) {
|
||||
logger.warn(String.format("Unable to parse proxy port HTTP header '%s'. Using port from request URI '%s'.", port, uriPort));
|
||||
}
|
||||
}
|
||||
|
@ -172,8 +172,8 @@ public abstract class ApplicationResource {
|
|||
* @param response response
|
||||
* @return builder
|
||||
*/
|
||||
protected ResponseBuilder noCache(ResponseBuilder response) {
|
||||
CacheControl cacheControl = new CacheControl();
|
||||
protected ResponseBuilder noCache(final ResponseBuilder response) {
|
||||
final CacheControl cacheControl = new CacheControl();
|
||||
cacheControl.setPrivate(true);
|
||||
cacheControl.setNoCache(true);
|
||||
cacheControl.setNoStore(true);
|
||||
|
@ -186,7 +186,7 @@ public abstract class ApplicationResource {
|
|||
* @param response response
|
||||
* @return builder
|
||||
*/
|
||||
protected ResponseBuilder clusterContext(ResponseBuilder response) {
|
||||
protected ResponseBuilder clusterContext(final ResponseBuilder response) {
|
||||
// TODO: Remove this method. Since ClusterContext was removed, it is no longer needed. However,
|
||||
// it is called by practically every endpoint so for now it is just being stubbed out.
|
||||
return response;
|
||||
|
@ -222,8 +222,8 @@ public abstract class ApplicationResource {
|
|||
* @param entity The entity
|
||||
* @return The response to be built
|
||||
*/
|
||||
protected ResponseBuilder generateOkResponse(Object entity) {
|
||||
ResponseBuilder response = Response.ok(entity);
|
||||
protected ResponseBuilder generateOkResponse(final Object entity) {
|
||||
final ResponseBuilder response = Response.ok(entity);
|
||||
return noCache(response);
|
||||
}
|
||||
|
||||
|
@ -234,7 +234,7 @@ public abstract class ApplicationResource {
|
|||
* @param entity entity
|
||||
* @return The response to be built
|
||||
*/
|
||||
protected ResponseBuilder generateCreatedResponse(URI uri, Object entity) {
|
||||
protected ResponseBuilder generateCreatedResponse(final URI uri, final Object entity) {
|
||||
// generate the response builder
|
||||
return Response.created(uri).entity(entity);
|
||||
}
|
||||
|
@ -268,7 +268,7 @@ public abstract class ApplicationResource {
|
|||
// get the form that jersey processed and use it if it exists (only exist for requests with a body and application form urlencoded
|
||||
final Form form = (Form) httpContext.getProperties().get(FormDispatchProvider.FORM_PROPERTY);
|
||||
if (form == null) {
|
||||
for (Map.Entry<String, String[]> entry : httpServletRequest.getParameterMap().entrySet()) {
|
||||
for (final Map.Entry<String, String[]> entry : httpServletRequest.getParameterMap().entrySet()) {
|
||||
if (entry.getValue() == null) {
|
||||
entity.add(entry.getKey(), null);
|
||||
} else {
|
||||
|
@ -319,7 +319,7 @@ public abstract class ApplicationResource {
|
|||
}
|
||||
|
||||
// set the proxy scheme to request scheme if not already set client
|
||||
String proxyScheme = httpServletRequest.getHeader(PROXY_SCHEME_HTTP_HEADER);
|
||||
final String proxyScheme = httpServletRequest.getHeader(PROXY_SCHEME_HTTP_HEADER);
|
||||
if (proxyScheme == null) {
|
||||
result.put(PROXY_SCHEME_HTTP_HEADER, httpServletRequest.getScheme());
|
||||
}
|
||||
|
@ -351,7 +351,7 @@ public abstract class ApplicationResource {
|
|||
* @param httpServletRequest the request
|
||||
* @return <code>true</code> if the request represents a two-phase commit style request
|
||||
*/
|
||||
protected boolean isTwoPhaseRequest(HttpServletRequest httpServletRequest) {
|
||||
protected boolean isTwoPhaseRequest(final HttpServletRequest httpServletRequest) {
|
||||
final String headerValue = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
|
||||
return headerValue != null;
|
||||
}
|
||||
|
@ -366,11 +366,11 @@ public abstract class ApplicationResource {
|
|||
* @return <code>true</code> if the request represents a two-phase commit style request and is the
|
||||
* first of the two phases.
|
||||
*/
|
||||
protected boolean isValidationPhase(HttpServletRequest httpServletRequest) {
|
||||
protected boolean isValidationPhase(final HttpServletRequest httpServletRequest) {
|
||||
return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null;
|
||||
}
|
||||
|
||||
protected boolean isClaimCancelationPhase(HttpServletRequest httpServletRequest) {
|
||||
protected boolean isClaimCancelationPhase(final HttpServletRequest httpServletRequest) {
|
||||
return httpServletRequest.getHeader(RequestReplicator.CLAIM_CANCEL_HEADER) != null;
|
||||
}
|
||||
|
||||
|
@ -402,7 +402,7 @@ public abstract class ApplicationResource {
|
|||
* @param componentId the ID of the component that the Revision DTO belongs to
|
||||
* @return a Revision that has the same client ID and Version as the Revision DTO and the Component ID specified
|
||||
*/
|
||||
protected Revision getRevision(RevisionDTO revisionDto, String componentId) {
|
||||
protected Revision getRevision(final RevisionDTO revisionDto, final String componentId) {
|
||||
return new Revision(revisionDto.getVersion(), revisionDto.getClientId(), componentId);
|
||||
}
|
||||
|
||||
|
@ -412,7 +412,7 @@ public abstract class ApplicationResource {
|
|||
* @param entity the ComponentEntity that contains the Revision DTO & ID
|
||||
* @return the Revision specified in the ComponentEntity
|
||||
*/
|
||||
protected Revision getRevision(ComponentEntity entity, String componentId) {
|
||||
protected Revision getRevision(final ComponentEntity entity, final String componentId) {
|
||||
return getRevision(entity.getRevision(), componentId);
|
||||
}
|
||||
|
||||
|
@ -650,7 +650,7 @@ public abstract class ApplicationResource {
|
|||
return clusterCoordinator != null && clusterCoordinator.isConnected();
|
||||
}
|
||||
|
||||
public void setRequestReplicator(RequestReplicator requestReplicator) {
|
||||
public void setRequestReplicator(final RequestReplicator requestReplicator) {
|
||||
this.requestReplicator = requestReplicator;
|
||||
}
|
||||
|
||||
|
@ -658,11 +658,11 @@ public abstract class ApplicationResource {
|
|||
return requestReplicator;
|
||||
}
|
||||
|
||||
public void setProperties(NiFiProperties properties) {
|
||||
public void setProperties(final NiFiProperties properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
public void setClusterCoordinator(ClusterCoordinator clusterCoordinator) {
|
||||
public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) {
|
||||
this.clusterCoordinator = clusterCoordinator;
|
||||
}
|
||||
|
||||
|
|
|
@ -16,13 +16,25 @@
|
|||
*/
|
||||
package org.apache.nifi.web.api;
|
||||
|
||||
import com.sun.jersey.api.core.ResourceContext;
|
||||
import com.wordnik.swagger.annotations.Api;
|
||||
import com.wordnik.swagger.annotations.ApiOperation;
|
||||
import com.wordnik.swagger.annotations.ApiParam;
|
||||
import com.wordnik.swagger.annotations.ApiResponse;
|
||||
import com.wordnik.swagger.annotations.ApiResponses;
|
||||
import com.wordnik.swagger.annotations.Authorization;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.HttpMethod;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.PUT;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.authorization.AccessDeniedException;
|
||||
import org.apache.nifi.authorization.AuthorizationRequest;
|
||||
|
@ -50,23 +62,13 @@ import org.apache.nifi.web.api.entity.Entity;
|
|||
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
|
||||
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.HttpMethod;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.PUT;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import com.sun.jersey.api.core.ResourceContext;
|
||||
import com.wordnik.swagger.annotations.Api;
|
||||
import com.wordnik.swagger.annotations.ApiOperation;
|
||||
import com.wordnik.swagger.annotations.ApiParam;
|
||||
import com.wordnik.swagger.annotations.ApiResponse;
|
||||
import com.wordnik.swagger.annotations.ApiResponses;
|
||||
import com.wordnik.swagger.annotations.Authorization;
|
||||
|
||||
/**
|
||||
* RESTful endpoint for managing a Flow Controller.
|
||||
|
|
|
@ -1096,7 +1096,7 @@ public final class DtoFactory {
|
|||
final ReportingTaskDTO dto = new ReportingTaskDTO();
|
||||
dto.setId(reportingTaskNode.getIdentifier());
|
||||
dto.setName(reportingTaskNode.getName());
|
||||
dto.setType(reportingTaskNode.getComponentType());
|
||||
dto.setType(reportingTaskNode.getCanonicalClassName());
|
||||
dto.setSchedulingStrategy(reportingTaskNode.getSchedulingStrategy().name());
|
||||
dto.setSchedulingPeriod(reportingTaskNode.getSchedulingPeriod());
|
||||
dto.setState(reportingTaskNode.getScheduledState().name());
|
||||
|
@ -1168,7 +1168,7 @@ public final class DtoFactory {
|
|||
dto.setId(controllerServiceNode.getIdentifier());
|
||||
dto.setParentGroupId(controllerServiceNode.getProcessGroup() == null ? null : controllerServiceNode.getProcessGroup().getIdentifier());
|
||||
dto.setName(controllerServiceNode.getName());
|
||||
dto.setType(controllerServiceNode.getComponentType());
|
||||
dto.setType(controllerServiceNode.getCanonicalClassName());
|
||||
dto.setState(controllerServiceNode.getState().name());
|
||||
dto.setAnnotationData(controllerServiceNode.getAnnotationData());
|
||||
dto.setComments(controllerServiceNode.getComments());
|
||||
|
@ -2404,6 +2404,7 @@ public final class DtoFactory {
|
|||
copy.setCustomUiUrl(original.getCustomUiUrl());
|
||||
copy.setDescriptors(copy(original.getDescriptors()));
|
||||
copy.setId(original.getId());
|
||||
copy.setParentGroupId(original.getParentGroupId());
|
||||
copy.setName(original.getName());
|
||||
copy.setProperties(copy(original.getProperties()));
|
||||
copy.setReferencingComponents(copy(original.getReferencingComponents()));
|
||||
|
@ -2683,6 +2684,7 @@ public final class DtoFactory {
|
|||
final Set<ProcessorDTO> processors = new LinkedHashSet<>();
|
||||
final Set<RemoteProcessGroupDTO> remoteProcessGroups = new LinkedHashSet<>();
|
||||
final Set<FunnelDTO> funnels = new LinkedHashSet<>();
|
||||
final Set<ControllerServiceDTO> controllerServices = new LinkedHashSet<>();
|
||||
|
||||
if (deep) {
|
||||
for (final ProcessGroupDTO group : original.getProcessGroups()) {
|
||||
|
@ -2716,6 +2718,10 @@ public final class DtoFactory {
|
|||
for (final ConnectionDTO connection : original.getConnections()) {
|
||||
connections.add(copy(connection));
|
||||
}
|
||||
|
||||
for (final ControllerServiceDTO controllerService : original.getControllerServices()) {
|
||||
controllerServices.add(copy(controllerService));
|
||||
}
|
||||
} else {
|
||||
if (original.getConnections() != null) {
|
||||
connections.addAll(copy(original.getConnections()));
|
||||
|
@ -2741,6 +2747,9 @@ public final class DtoFactory {
|
|||
if (original.getFunnels() != null) {
|
||||
funnels.addAll(copy(original.getFunnels()));
|
||||
}
|
||||
if (original.getControllerServices() != null) {
|
||||
controllerServices.addAll(copy(original.getControllerServices()));
|
||||
}
|
||||
}
|
||||
|
||||
copy.setConnections(connections);
|
||||
|
@ -2751,6 +2760,7 @@ public final class DtoFactory {
|
|||
copy.setProcessors(processors);
|
||||
copy.setRemoteProcessGroups(remoteProcessGroups);
|
||||
copy.setFunnels(funnels);
|
||||
copy.setControllerServices(controllerServices);
|
||||
|
||||
return copy;
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.nifi.web.util;
|
|||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
|
@ -26,6 +25,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.connectable.ConnectableType;
|
||||
|
@ -73,7 +73,7 @@ public final class SnippetUtils {
|
|||
* @param includeControllerServices whether or not to include controller services in the flow snippet dto
|
||||
* @return snippet
|
||||
*/
|
||||
public FlowSnippetDTO populateFlowSnippet(Snippet snippet, boolean recurse, boolean includeControllerServices) {
|
||||
public FlowSnippetDTO populateFlowSnippet(final Snippet snippet, final boolean recurse, final boolean includeControllerServices) {
|
||||
final FlowSnippetDTO snippetDto = new FlowSnippetDTO();
|
||||
final String groupId = snippet.getParentGroupId();
|
||||
final ProcessGroup processGroup = flowController.getGroup(groupId);
|
||||
|
@ -83,15 +83,21 @@ public final class SnippetUtils {
|
|||
throw new IllegalStateException("The parent process group for this snippet could not be found.");
|
||||
}
|
||||
|
||||
final Set<ControllerServiceDTO> controllerServices = new HashSet<>();
|
||||
|
||||
// add any processors
|
||||
if (!snippet.getProcessors().isEmpty()) {
|
||||
final Set<ProcessorDTO> processors = new LinkedHashSet<>();
|
||||
for (String processorId : snippet.getProcessors().keySet()) {
|
||||
for (final String processorId : snippet.getProcessors().keySet()) {
|
||||
final ProcessorNode processor = processGroup.getProcessor(processorId);
|
||||
if (processor == null) {
|
||||
throw new IllegalStateException("A processor in this snippet could not be found.");
|
||||
}
|
||||
processors.add(dtoFactory.createProcessorDto(processor));
|
||||
|
||||
if (includeControllerServices) {
|
||||
controllerServices.addAll(getControllerServices(processor.getProperties()));
|
||||
}
|
||||
}
|
||||
snippetDto.setProcessors(processors);
|
||||
}
|
||||
|
@ -99,7 +105,7 @@ public final class SnippetUtils {
|
|||
// add any connections
|
||||
if (!snippet.getConnections().isEmpty()) {
|
||||
final Set<ConnectionDTO> connections = new LinkedHashSet<>();
|
||||
for (String connectionId : snippet.getConnections().keySet()) {
|
||||
for (final String connectionId : snippet.getConnections().keySet()) {
|
||||
final Connection connection = processGroup.getConnection(connectionId);
|
||||
if (connection == null) {
|
||||
throw new IllegalStateException("A connection in this snippet could not be found.");
|
||||
|
@ -112,7 +118,7 @@ public final class SnippetUtils {
|
|||
// add any funnels
|
||||
if (!snippet.getFunnels().isEmpty()) {
|
||||
final Set<FunnelDTO> funnels = new LinkedHashSet<>();
|
||||
for (String funnelId : snippet.getFunnels().keySet()) {
|
||||
for (final String funnelId : snippet.getFunnels().keySet()) {
|
||||
final Funnel funnel = processGroup.getFunnel(funnelId);
|
||||
if (funnel == null) {
|
||||
throw new IllegalStateException("A funnel in this snippet could not be found.");
|
||||
|
@ -125,7 +131,7 @@ public final class SnippetUtils {
|
|||
// add any input ports
|
||||
if (!snippet.getInputPorts().isEmpty()) {
|
||||
final Set<PortDTO> inputPorts = new LinkedHashSet<>();
|
||||
for (String inputPortId : snippet.getInputPorts().keySet()) {
|
||||
for (final String inputPortId : snippet.getInputPorts().keySet()) {
|
||||
final Port inputPort = processGroup.getInputPort(inputPortId);
|
||||
if (inputPort == null) {
|
||||
throw new IllegalStateException("An input port in this snippet could not be found.");
|
||||
|
@ -138,7 +144,7 @@ public final class SnippetUtils {
|
|||
// add any labels
|
||||
if (!snippet.getLabels().isEmpty()) {
|
||||
final Set<LabelDTO> labels = new LinkedHashSet<>();
|
||||
for (String labelId : snippet.getLabels().keySet()) {
|
||||
for (final String labelId : snippet.getLabels().keySet()) {
|
||||
final Label label = processGroup.getLabel(labelId);
|
||||
if (label == null) {
|
||||
throw new IllegalStateException("A label in this snippet could not be found.");
|
||||
|
@ -151,7 +157,7 @@ public final class SnippetUtils {
|
|||
// add any output ports
|
||||
if (!snippet.getOutputPorts().isEmpty()) {
|
||||
final Set<PortDTO> outputPorts = new LinkedHashSet<>();
|
||||
for (String outputPortId : snippet.getOutputPorts().keySet()) {
|
||||
for (final String outputPortId : snippet.getOutputPorts().keySet()) {
|
||||
final Port outputPort = processGroup.getOutputPort(outputPortId);
|
||||
if (outputPort == null) {
|
||||
throw new IllegalStateException("An output port in this snippet could not be found.");
|
||||
|
@ -164,12 +170,16 @@ public final class SnippetUtils {
|
|||
// add any process groups
|
||||
if (!snippet.getProcessGroups().isEmpty()) {
|
||||
final Set<ProcessGroupDTO> processGroups = new LinkedHashSet<>();
|
||||
for (String childGroupId : snippet.getProcessGroups().keySet()) {
|
||||
for (final String childGroupId : snippet.getProcessGroups().keySet()) {
|
||||
final ProcessGroup childGroup = processGroup.getProcessGroup(childGroupId);
|
||||
if (childGroup == null) {
|
||||
throw new IllegalStateException("A process group in this snippet could not be found.");
|
||||
}
|
||||
processGroups.add(dtoFactory.createProcessGroupDto(childGroup, recurse));
|
||||
|
||||
final ProcessGroupDTO childGroupDto = dtoFactory.createProcessGroupDto(childGroup, recurse);
|
||||
processGroups.add(childGroupDto);
|
||||
|
||||
addControllerServices(childGroup, childGroupDto);
|
||||
}
|
||||
snippetDto.setProcessGroups(processGroups);
|
||||
}
|
||||
|
@ -177,7 +187,7 @@ public final class SnippetUtils {
|
|||
// add any remote process groups
|
||||
if (!snippet.getRemoteProcessGroups().isEmpty()) {
|
||||
final Set<RemoteProcessGroupDTO> remoteProcessGroups = new LinkedHashSet<>();
|
||||
for (String remoteProcessGroupId : snippet.getRemoteProcessGroups().keySet()) {
|
||||
for (final String remoteProcessGroupId : snippet.getRemoteProcessGroups().keySet()) {
|
||||
final RemoteProcessGroup remoteProcessGroup = processGroup.getRemoteProcessGroup(remoteProcessGroupId);
|
||||
if (remoteProcessGroup == null) {
|
||||
throw new IllegalStateException("A remote process group in this snippet could not be found.");
|
||||
|
@ -187,107 +197,62 @@ public final class SnippetUtils {
|
|||
snippetDto.setRemoteProcessGroups(remoteProcessGroups);
|
||||
}
|
||||
|
||||
if (includeControllerServices) {
|
||||
Set<ControllerServiceDTO> controllerServices = snippetDto.getControllerServices();
|
||||
if (controllerServices == null) {
|
||||
controllerServices = new HashSet<>();
|
||||
snippetDto.setControllerServices(controllerServices);
|
||||
}
|
||||
|
||||
addControllerServicesToSnippet(snippetDto, controllerServices);
|
||||
}
|
||||
snippetDto.setControllerServices(controllerServices);
|
||||
|
||||
return snippetDto;
|
||||
}
|
||||
|
||||
private void addControllerServicesToSnippet(final FlowSnippetDTO snippetDto, final Set<ControllerServiceDTO> destinationSet) {
|
||||
final Set<ProcessorDTO> processors = snippetDto.getProcessors();
|
||||
if (processors != null) {
|
||||
for (final ProcessorDTO processorDto : processors) {
|
||||
addControllerServicesToSnippet(snippetDto, processorDto, destinationSet);
|
||||
}
|
||||
}
|
||||
|
||||
final Set<ProcessGroupDTO> childGroups = snippetDto.getProcessGroups();
|
||||
if (childGroups != null) {
|
||||
for (final ProcessGroupDTO processGroupDto : childGroups) {
|
||||
final FlowSnippetDTO childGroupDto = processGroupDto.getContents();
|
||||
if (childGroupDto != null) {
|
||||
addControllerServicesToSnippet(childGroupDto, destinationSet);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void addControllerServicesToSnippet(final FlowSnippetDTO snippet, final ProcessorDTO processorDto, final Set<ControllerServiceDTO> destinationSet) {
|
||||
final ProcessorConfigDTO configDto = processorDto.getConfig();
|
||||
if (configDto == null) {
|
||||
private void addControllerServices(final ProcessGroup group, final ProcessGroupDTO dto) {
|
||||
final FlowSnippetDTO contents = dto.getContents();
|
||||
if (contents == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final Map<String, PropertyDescriptorDTO> descriptors = configDto.getDescriptors();
|
||||
final Map<String, String> properties = configDto.getProperties();
|
||||
final Set<ControllerServiceDTO> controllerServices = new HashSet<>();
|
||||
|
||||
if (properties != null && descriptors != null) {
|
||||
for (final Map.Entry<String, String> entry : properties.entrySet()) {
|
||||
final String propName = entry.getKey();
|
||||
final String propValue = entry.getValue();
|
||||
if (propValue == null) {
|
||||
continue;
|
||||
}
|
||||
for (final ProcessorNode procNode : group.getProcessors()) {
|
||||
final Set<ControllerServiceDTO> servicesForProcessor = getControllerServices(procNode.getProperties());
|
||||
controllerServices.addAll(servicesForProcessor);
|
||||
}
|
||||
|
||||
final PropertyDescriptorDTO propertyDescriptorDto = descriptors.get(propName);
|
||||
if (propertyDescriptorDto != null && propertyDescriptorDto.getIdentifiesControllerService() != null) {
|
||||
final ControllerServiceNode serviceNode = flowController.getControllerServiceNode(propValue);
|
||||
contents.setControllerServices(controllerServices);
|
||||
|
||||
// Map child process group ID to the child process group for easy lookup
|
||||
final Map<String, ProcessGroupDTO> childGroupMap = contents.getProcessGroups().stream()
|
||||
.collect(Collectors.toMap(childGroupDto -> childGroupDto.getId(), childGroupDto -> childGroupDto));
|
||||
|
||||
for (final ProcessGroup childGroup : group.getProcessGroups()) {
|
||||
final ProcessGroupDTO childDto = childGroupMap.get(childGroup.getIdentifier());
|
||||
if (childDto == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
addControllerServices(childGroup, childDto);
|
||||
}
|
||||
}
|
||||
|
||||
private Set<ControllerServiceDTO> getControllerServices(final Map<PropertyDescriptor, String> componentProperties) {
|
||||
final Set<ControllerServiceDTO> serviceDtos = new HashSet<>();
|
||||
|
||||
for (final Map.Entry<PropertyDescriptor, String> entry : componentProperties.entrySet()) {
|
||||
final PropertyDescriptor descriptor = entry.getKey();
|
||||
if (descriptor.getControllerServiceDefinition() != null) {
|
||||
final String controllerServiceId = entry.getValue();
|
||||
if (controllerServiceId != null) {
|
||||
final ControllerServiceNode serviceNode = flowController.getControllerServiceNode(controllerServiceId);
|
||||
if (serviceNode != null) {
|
||||
addControllerServicesToSnippet(snippet, serviceNode, destinationSet);
|
||||
serviceDtos.add(dtoFactory.createControllerServiceDto(serviceNode));
|
||||
|
||||
final Set<ControllerServiceDTO> recursiveRefs = getControllerServices(serviceNode.getProperties());
|
||||
serviceDtos.addAll(recursiveRefs);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return serviceDtos;
|
||||
}
|
||||
|
||||
private void addControllerServicesToSnippet(final FlowSnippetDTO snippet, final ControllerServiceNode serviceNode, final Set<ControllerServiceDTO> destinationSet) {
|
||||
if (isServicePresent(serviceNode.getIdentifier(), snippet.getControllerServices())) {
|
||||
return;
|
||||
}
|
||||
|
||||
final ControllerServiceDTO serviceNodeDto = dtoFactory.createControllerServiceDto(serviceNode);
|
||||
destinationSet.add(serviceNodeDto);
|
||||
|
||||
for (final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet()) {
|
||||
final PropertyDescriptor descriptor = entry.getKey();
|
||||
final String propertyValue = entry.getValue();
|
||||
|
||||
if (descriptor.getControllerServiceDefinition() != null && propertyValue != null) {
|
||||
final ControllerServiceNode referencedNode = flowController.getControllerServiceNode(propertyValue);
|
||||
if (referencedNode == null) {
|
||||
throw new IllegalStateException("Controller Service with ID " + propertyValue + " is referenced in template but cannot be found");
|
||||
}
|
||||
|
||||
final String referencedNodeId = referencedNode.getIdentifier();
|
||||
|
||||
final boolean alreadyPresent = isServicePresent(referencedNodeId, snippet.getControllerServices());
|
||||
if (!alreadyPresent) {
|
||||
addControllerServicesToSnippet(snippet, referencedNode, destinationSet);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isServicePresent(final String serviceId, final Collection<ControllerServiceDTO> services) {
|
||||
if (services == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (final ControllerServiceDTO existingService : services) {
|
||||
if (serviceId.equals(existingService.getId())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group, final String idGenerationSeed) {
|
||||
final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null, idGenerationSeed);
|
||||
|
@ -346,7 +311,7 @@ public final class SnippetUtils {
|
|||
}
|
||||
|
||||
private FlowSnippetDTO copyContentsForGroup(final FlowSnippetDTO snippetContents, final String groupId, final Map<String, ConnectableDTO> parentConnectableMap, Map<String, String> serviceIdMap,
|
||||
String idGenerationSeed) {
|
||||
final String idGenerationSeed) {
|
||||
final FlowSnippetDTO snippetContentsCopy = new FlowSnippetDTO();
|
||||
|
||||
//
|
||||
|
@ -354,39 +319,40 @@ public final class SnippetUtils {
|
|||
//
|
||||
if (serviceIdMap == null) {
|
||||
serviceIdMap = new HashMap<>();
|
||||
final Set<ControllerServiceDTO> services = new HashSet<>();
|
||||
if (snippetContents.getControllerServices() != null) {
|
||||
for (final ControllerServiceDTO serviceDTO : snippetContents.getControllerServices()) {
|
||||
final ControllerServiceDTO service = dtoFactory.copy(serviceDTO);
|
||||
service.setId(generateId(serviceDTO.getId(), idGenerationSeed));
|
||||
service.setState(ControllerServiceState.DISABLED.name());
|
||||
services.add(service);
|
||||
}
|
||||
|
||||
// Map old service ID to new service ID so that we can make sure that we reference the new ones.
|
||||
serviceIdMap.put(serviceDTO.getId(), service.getId());
|
||||
}
|
||||
final Set<ControllerServiceDTO> services = new HashSet<>();
|
||||
if (snippetContents.getControllerServices() != null) {
|
||||
for (final ControllerServiceDTO serviceDTO : snippetContents.getControllerServices()) {
|
||||
final ControllerServiceDTO service = dtoFactory.copy(serviceDTO);
|
||||
service.setId(generateId(serviceDTO.getId(), idGenerationSeed));
|
||||
service.setState(ControllerServiceState.DISABLED.name());
|
||||
services.add(service);
|
||||
|
||||
// Map old service ID to new service ID so that we can make sure that we reference the new ones.
|
||||
serviceIdMap.put(serviceDTO.getId(), service.getId());
|
||||
}
|
||||
}
|
||||
|
||||
// if there is any controller service that maps to another controller service, update the id's
|
||||
for (final ControllerServiceDTO serviceDTO : services) {
|
||||
final Map<String, String> properties = serviceDTO.getProperties();
|
||||
final Map<String, PropertyDescriptorDTO> descriptors = serviceDTO.getDescriptors();
|
||||
if (properties != null && descriptors != null) {
|
||||
for (final PropertyDescriptorDTO descriptor : descriptors.values()) {
|
||||
if (descriptor.getIdentifiesControllerService() != null) {
|
||||
final String currentServiceId = properties.get(descriptor.getName());
|
||||
if (currentServiceId == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final String newServiceId = serviceIdMap.get(currentServiceId);
|
||||
properties.put(descriptor.getName(), newServiceId);
|
||||
// if there is any controller service that maps to another controller service, update the id's
|
||||
for (final ControllerServiceDTO serviceDTO : services) {
|
||||
final Map<String, String> properties = serviceDTO.getProperties();
|
||||
final Map<String, PropertyDescriptorDTO> descriptors = serviceDTO.getDescriptors();
|
||||
if (properties != null && descriptors != null) {
|
||||
for (final PropertyDescriptorDTO descriptor : descriptors.values()) {
|
||||
if (descriptor.getIdentifiesControllerService() != null) {
|
||||
final String currentServiceId = properties.get(descriptor.getName());
|
||||
if (currentServiceId == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final String newServiceId = serviceIdMap.get(currentServiceId);
|
||||
properties.put(descriptor.getName(), newServiceId);
|
||||
}
|
||||
}
|
||||
}
|
||||
snippetContentsCopy.setControllerServices(services);
|
||||
}
|
||||
snippetContentsCopy.setControllerServices(services);
|
||||
|
||||
//
|
||||
// Copy the labels
|
||||
|
@ -601,11 +567,11 @@ public final class SnippetUtils {
|
|||
}
|
||||
|
||||
/* setters */
|
||||
public void setDtoFactory(DtoFactory dtoFactory) {
|
||||
public void setDtoFactory(final DtoFactory dtoFactory) {
|
||||
this.dtoFactory = dtoFactory;
|
||||
}
|
||||
|
||||
public void setFlowController(FlowController flowController) {
|
||||
public void setFlowController(final FlowController flowController) {
|
||||
this.flowController = flowController;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue