diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 4fdda0683a..d6de553faa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,6 +16,27 @@ */ package org.apache.nifi.web; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + +import javax.ws.rs.WebApplicationException; + import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -59,7 +80,6 @@ import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.history.History; import org.apache.nifi.history.HistoryQuery; import org.apache.nifi.history.PreviousValue; -import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; @@ -91,6 +111,7 @@ import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FunnelDTO; import org.apache.nifi.web.api.dto.LabelDTO; import org.apache.nifi.web.api.dto.ListingRequestDTO; +import org.apache.nifi.web.api.dto.NiFiComponentDTO; import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.PortDTO; import org.apache.nifi.web.api.dto.PreviousValueDTO; @@ -142,24 +163,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.access.AccessDeniedException; -import javax.ws.rs.WebApplicationException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - /** * Implementation of NiFiServiceFacade that performs revision checking. */ @@ -368,26 +371,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return createConnection(revision, groupId, connectionDTO); } - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - final Connection connection = connectionDAO.updateConnection(groupId, connectionDTO); - - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public ConnectionDTO getConfiguration() { - return dtoFactory.createConnectionDto(connection); - } - }; - } - }); + return updateComponent(revision, () -> connectionDAO.updateConnection(groupId, connectionDTO), connection -> dtoFactory.createConnectionDto(connection)); } @Override @@ -397,28 +381,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return createProcessor(revision, groupId, processorDTO); } - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // update the processor - final ProcessorNode processor = processorDAO.updateProcessor(groupId, processorDTO); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public ProcessorDTO getConfiguration() { - return dtoFactory.createProcessorDto(processor); - } - }; - } - }); + return updateComponent(revision, () -> processorDAO.updateProcessor(groupId, processorDTO), proc -> dtoFactory.createProcessorDto(proc)); } @Override @@ -428,28 +391,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return createLabel(revision, groupId, labelDTO); } - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // update the existing label - final Label label = labelDAO.updateLabel(groupId, labelDTO); - - // save updated controller - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public LabelDTO getConfiguration() { - return dtoFactory.createLabelDto(label); - } - }; - } - }); + return updateComponent(revision, () -> labelDAO.updateLabel(groupId, labelDTO), label -> dtoFactory.createLabelDto(label)); } @Override @@ -459,30 +401,48 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return createFunnel(revision, groupId, funnelDTO); } - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { + return updateComponent(revision, () -> funnelDAO.updateFunnel(groupId, funnelDTO), funnel -> dtoFactory.createFunnelDto(funnel)); + } + + + /** + * Updates a component with the given revision, using the provided supplier to call + * into the appropriate DAO and the provided function to convert the component into a DTO. + * + * @param revision the current revision + * @param daoUpdate a Supplier that will update the component via the appropriate DAO + * @param dtoCreation a Function to convert a component into a dao + * + * @param the DTO Type of the updated component + * @param the Component Type of the updated component + * + * @return A ConfigurationSnapshot that represents the new configuration + */ + private ConfigurationSnapshot updateComponent(final Revision revision, final Supplier daoUpdate, final Function dtoCreation) { + return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { @Override - public ConfigurationResult execute() { - // update the existing label - final Funnel funnel = funnelDAO.updateFunnel(groupId, funnelDTO); + public ConfigurationResult execute() { + final C component = daoUpdate.get(); // save updated controller controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult() { @Override public boolean isNew() { return false; } @Override - public FunnelDTO getConfiguration() { - return dtoFactory.createFunnelDto(funnel); + public D getConfiguration() { + return dtoCreation.apply(component); } }; } }); } + @Override public void verifyUpdateSnippet(SnippetDTO snippetDto) { // if snippet does not exist, then the update request is likely creating it @@ -499,34 +459,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return createSnippet(revision, snippetDto); } - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // update the snippet - final Snippet snippet = snippetDAO.updateSnippet(snippetDto); - - // build the snippet dto + return updateComponent(revision, + () -> snippetDAO.updateSnippet(snippetDto), + snippet -> { final SnippetDTO responseSnippetDto = dtoFactory.createSnippetDto(snippet); responseSnippetDto.setContents(snippetUtils.populateFlowSnippet(snippet, false, false)); - - // save updated controller if applicable - if (snippetDto.getParentGroupId() != null && snippet.isLinked()) { - controllerFacade.save(); - } - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public SnippetDTO getConfiguration() { - return responseSnippetDto; - } - }; - } - }); + return responseSnippetDto; + }); } @Override @@ -536,27 +475,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return createInputPort(revision, groupId, inputPortDTO); } - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - final Port inputPort = inputPortDAO.updatePort(groupId, inputPortDTO); - - // save updated controller - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public PortDTO getConfiguration() { - return dtoFactory.createPortDto(inputPort); - } - }; - } - }); + return updateComponent(revision, () -> inputPortDAO.updatePort(groupId, inputPortDTO), port -> dtoFactory.createPortDto(port)); } @Override @@ -566,27 +485,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return createOutputPort(revision, groupId, outputPortDTO); } - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - final Port outputPort = outputPortDAO.updatePort(groupId, outputPortDTO); - - // save updated controller - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public PortDTO getConfiguration() { - return dtoFactory.createPortDto(outputPort); - } - }; - } - }); + return updateComponent(revision, () -> outputPortDAO.updatePort(groupId, outputPortDTO), port -> dtoFactory.createPortDto(port)); } @Override @@ -596,81 +495,27 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return createRemoteProcessGroup(revision, groupId, remoteProcessGroupDTO); } - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.updateRemoteProcessGroup(groupId, remoteProcessGroupDTO); - - // save updated controller - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public RemoteProcessGroupDTO getConfiguration() { - return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup); - } - }; - } - }); + return updateComponent(revision, + () -> remoteProcessGroupDAO.updateRemoteProcessGroup(groupId, remoteProcessGroupDTO), + remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); } @Override public ConfigurationSnapshot updateRemoteProcessGroupInputPort( final Revision revision, final String groupId, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // update the remote port - final RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO); - // save updated controller - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public RemoteProcessGroupPortDTO getConfiguration() { - return dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort); - } - }; - } - }); + return updateComponent(revision, + () -> remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO), + remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort)); } @Override public ConfigurationSnapshot updateRemoteProcessGroupOutputPort( final Revision revision, final String groupId, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // update the remote port - final RemoteGroupPort remoteGroupPort = remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO); - // save updated controller - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public RemoteProcessGroupPortDTO getConfiguration() { - return dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort); - } - }; - } - }); + return updateComponent(revision, + () -> remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(groupId, remoteProcessGroupId, remoteProcessGroupPortDTO), + remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort)); } @Override @@ -684,35 +529,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } } - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // update the process group - final ProcessGroup processGroup = processGroupDAO.updateProcessGroup(processGroupDTO); - - // save updated controller - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public ProcessGroupDTO getConfiguration() { - return dtoFactory.createProcessGroupDto(processGroup); - } - }; - } - }); + return updateComponent(revision, () -> processGroupDAO.updateProcessGroup(processGroupDTO), processGroup -> dtoFactory.createProcessGroupDto(processGroup)); } @Override public ConfigurationSnapshot updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { + return updateComponent(revision, + () -> { // update the controller configuration through the proxy if (controllerConfigurationDTO.getName() != null) { controllerFacade.setName(controllerConfigurationDTO.getName()); @@ -727,25 +550,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount()); } - // create the controller configuration dto - final ControllerConfigurationDTO controllerConfig = getControllerConfiguration(); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public ControllerConfigurationDTO getConfiguration() { - return controllerConfig; - } - }; - } - }); + return controllerConfigurationDTO; + }, + controller -> getControllerConfiguration()); } @Override @@ -778,23 +585,17 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ConfigurationSnapshot clearProcessorState(final Revision revision, final String groupId, final String processorId) { + return clearComponentState(revision, () -> processorDAO.clearState(groupId, processorId)); + } + + private ConfigurationSnapshot clearComponentState(final Revision revision, final Runnable clearState) { return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { @Override public ConfigurationResult execute() { // clear the state for the specified component - processorDAO.clearState(groupId, processorId); + clearState.run(); - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; + return new StandardConfigurationResult(false, null); } }); } @@ -806,25 +607,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ConfigurationSnapshot clearControllerServiceState(final Revision revision, final String controllerServiceId) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // clear the state for the specified component - controllerServiceDAO.clearState(controllerServiceId); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; - } - }); + return clearComponentState(revision, () -> controllerServiceDAO.clearState(controllerServiceId)); } @Override @@ -834,50 +617,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ConfigurationSnapshot clearReportingTaskState(final Revision revision, final String reportingTaskId) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // clear the state for the specified component - reportingTaskDAO.clearState(reportingTaskId); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; - } - }); + return clearComponentState(revision, () -> reportingTaskDAO.clearState(reportingTaskId)); } @Override public ConfigurationSnapshot deleteConnection(final Revision revision, final String groupId, final String connectionId) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - connectionDAO.deleteConnection(groupId, connectionId); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; - } - }); + return deleteComponent(revision, () -> connectionDAO.deleteConnection(groupId, connectionId)); } @Override @@ -903,67 +648,34 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ConfigurationSnapshot deleteProcessor(final Revision revision, final String groupId, final String processorId) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // delete the processor and synchronize the connection state - processorDAO.deleteProcessor(groupId, processorId); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; - } - }); + return deleteComponent(revision, () -> processorDAO.deleteProcessor(groupId, processorId)); } @Override public ConfigurationSnapshot deleteLabel(final Revision revision, final String groupId, final String labelId) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // delete the label - labelDAO.deleteLabel(groupId, labelId); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; - } - }); + return deleteComponent(revision, () -> labelDAO.deleteLabel(groupId, labelId)); } @Override public ConfigurationSnapshot deleteFunnel(final Revision revision, final String groupId, final String funnelId) { + return deleteComponent(revision, () -> funnelDAO.deleteFunnel(groupId, funnelId)); + } + + /** + * Deletes a component using the Optimistic Locking Manager + * + * @param revision the current revision + * @param action the action that deletes the component via the appropriate DAO object + * @return a ConfigurationSnapshot that represents the new configuration + */ + private ConfigurationSnapshot deleteComponent(final Revision revision, final Runnable action) { return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { @Override public ConfigurationResult execute() { - // delete the label - funnelDAO.deleteFunnel(groupId, funnelId); + action.run(); // save the flow controllerFacade.save(); - return new ConfigurationResult() { @Override public boolean isNew() { @@ -986,134 +698,27 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ConfigurationSnapshot deleteSnippet(final Revision revision, final String snippetId) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // determine if this snippet was linked to the data flow - Snippet snippet = snippetDAO.getSnippet(snippetId); - boolean linked = snippet.isLinked(); - - // delete the snippet - snippetDAO.deleteSnippet(snippetId); - - // save the flow if necessary - if (linked) { - controllerFacade.save(); - } - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; - } - }); + return deleteComponent(revision, () -> snippetDAO.deleteSnippet(snippetId)); } @Override public ConfigurationSnapshot deleteInputPort(final Revision revision, final String groupId, final String inputPortId) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - inputPortDAO.deletePort(groupId, inputPortId); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; - } - }); + return deleteComponent(revision, () -> inputPortDAO.deletePort(groupId, inputPortId)); } @Override public ConfigurationSnapshot deleteOutputPort(final Revision revision, final String groupId, final String outputPortId) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - outputPortDAO.deletePort(groupId, outputPortId); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; - } - }); + return deleteComponent(revision, () -> outputPortDAO.deletePort(groupId, outputPortId)); } @Override public ConfigurationSnapshot deleteProcessGroup(final Revision revision, final String groupId) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - processGroupDAO.deleteProcessGroup(groupId); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; - } - }); + return deleteComponent(revision, () -> processGroupDAO.deleteProcessGroup(groupId)); } @Override public ConfigurationSnapshot deleteRemoteProcessGroup(final Revision revision, final String groupId, final String remoteProcessGroupId) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - remoteProcessGroupDAO.deleteRemoteProcessGroup(groupId, remoteProcessGroupId); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; - } - }); + return deleteComponent(revision, () -> remoteProcessGroupDAO.deleteRemoteProcessGroup(groupId, remoteProcessGroupId)); } @Override @@ -1124,32 +729,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ConfigurationSnapshot createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // ensure id is set - if (StringUtils.isBlank(connectionDTO.getId())) { - connectionDTO.setId(UUID.randomUUID().toString()); - } - - final Connection connection = connectionDAO.createConnection(groupId, connectionDTO); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return true; - } - - @Override - public ConnectionDTO getConfiguration() { - return dtoFactory.createConnectionDto(connection); - } - }; - } - }); + return createComponent(revision, connectionDTO, () -> connectionDAO.createConnection(groupId, connectionDTO), connection -> dtoFactory.createConnectionDto(connection)); } @Override @@ -1175,95 +755,64 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ConfigurationSnapshot createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // ensure id is set - if (StringUtils.isBlank(processorDTO.getId())) { - processorDTO.setId(UUID.randomUUID().toString()); - } - - // create the processor - final ProcessorNode processor = processorDAO.createProcessor(groupId, processorDTO); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return true; - } - - @Override - public ProcessorDTO getConfiguration() { - return dtoFactory.createProcessorDto(processor); - } - }; - } - }); + return createComponent(revision, processorDTO, () -> processorDAO.createProcessor(groupId, processorDTO), processor -> dtoFactory.createProcessorDto(processor)); } @Override public ConfigurationSnapshot createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { + return createComponent(revision, labelDTO, () -> labelDAO.createLabel(groupId, labelDTO), label -> dtoFactory.createLabelDto(label)); + } + + /** + * Creates a component using the optimistic locking manager. + * + * @param revision the current revision + * @param componentDto the DTO that will be used to create the component + * @param daoCreation A Supplier that will create the NiFi Component to use + * @param dtoCreation a Function that will convert the NiFi Component into a corresponding DTO + * + * @param the DTO Type + * @param the NiFi Component Type + * + * @return a ConfigurationSnapshot that represents the updated configuration + */ + private ConfigurationSnapshot createComponent(final Revision revision, final NiFiComponentDTO componentDto, + final Supplier daoCreation, final Function dtoCreation) { + + return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { @Override - public ConfigurationResult execute() { + public ConfigurationResult execute() { // ensure id is set - if (StringUtils.isBlank(labelDTO.getId())) { - labelDTO.setId(UUID.randomUUID().toString()); + if (StringUtils.isBlank(componentDto.getId())) { + componentDto.setId(UUID.randomUUID().toString()); } - // add the label - final Label label = labelDAO.createLabel(groupId, labelDTO); + // add the component + final C component = daoCreation.get(); // save the flow controllerFacade.save(); - return new ConfigurationResult() { + return new ConfigurationResult() { @Override public boolean isNew() { return true; } @Override - public LabelDTO getConfiguration() { - return dtoFactory.createLabelDto(label); + public D getConfiguration() { + return dtoCreation.apply(component); } }; } }); } + + @Override public ConfigurationSnapshot createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // ensure id is set - if (StringUtils.isBlank(funnelDTO.getId())) { - funnelDTO.setId(UUID.randomUUID().toString()); - } - - // add the label - final Funnel funnel = funnelDAO.createFunnel(groupId, funnelDTO); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return true; - } - - @Override - public FunnelDTO getConfiguration() { - return dtoFactory.createFunnelDto(funnel); - } - }; - } - }); + return createComponent(revision, funnelDTO, () -> funnelDAO.createFunnel(groupId, funnelDTO), funnel -> dtoFactory.createFunnelDto(funnel)); } private void validateSnippetContents(final FlowSnippetDTO flowSnippet, final String groupId) { @@ -1389,122 +938,26 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ConfigurationSnapshot createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // ensure id is set - if (StringUtils.isBlank(inputPortDTO.getId())) { - inputPortDTO.setId(UUID.randomUUID().toString()); - } - - final Port inputPort = inputPortDAO.createPort(groupId, inputPortDTO); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return true; - } - - @Override - public PortDTO getConfiguration() { - return dtoFactory.createPortDto(inputPort); - } - }; - } - }); + return createComponent(revision, inputPortDTO, () -> inputPortDAO.createPort(groupId, inputPortDTO), port -> dtoFactory.createPortDto(port)); } @Override public ConfigurationSnapshot createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // ensure id is set - if (StringUtils.isBlank(outputPortDTO.getId())) { - outputPortDTO.setId(UUID.randomUUID().toString()); - } - - final Port outputPort = outputPortDAO.createPort(groupId, outputPortDTO); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return true; - } - - @Override - public PortDTO getConfiguration() { - return dtoFactory.createPortDto(outputPort); - } - }; - } - }); + return createComponent(revision, outputPortDTO, () -> outputPortDAO.createPort(groupId, outputPortDTO), port -> dtoFactory.createPortDto(port)); } @Override public ConfigurationSnapshot createProcessGroup(final String parentGroupId, final Revision revision, final ProcessGroupDTO processGroupDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // ensure id is set - if (StringUtils.isBlank(processGroupDTO.getId())) { - processGroupDTO.setId(UUID.randomUUID().toString()); - } - - final ProcessGroup processGroup = processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return true; - } - - @Override - public ProcessGroupDTO getConfiguration() { - return dtoFactory.createProcessGroupDto(processGroup); - } - }; - } - }); + return createComponent(revision, processGroupDTO, + () -> processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO), + processGroup -> dtoFactory.createProcessGroupDto(processGroup)); } @Override public ConfigurationSnapshot createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { - @Override - public ConfigurationResult execute() { - // ensure id is set - if (StringUtils.isBlank(remoteProcessGroupDTO.getId())) { - remoteProcessGroupDTO.setId(UUID.randomUUID().toString()); - } - - final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO); - - // save the flow - controllerFacade.save(); - - return new ConfigurationResult() { - @Override - public boolean isNew() { - return true; - } - - @Override - public RemoteProcessGroupDTO getConfiguration() { - return dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup); - } - }; - } - }); + return createComponent(revision, remoteProcessGroupDTO, + () -> remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO), + remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); } @Override @@ -1587,17 +1040,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // create the archive controllerFacade.createArchive(); - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; + return new StandardConfigurationResult(false, null); } }); } @@ -1630,17 +1073,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public ProcessorDTO getConfiguration() { - return dtoFactory.createProcessorDto(processor); - } - }; + return new StandardConfigurationResult<>(false, dtoFactory.createProcessorDto(processor)); } }); } @@ -1665,17 +1098,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { - @Override - public boolean isNew() { - return true; - } - - @Override - public ControllerServiceDTO getConfiguration() { - return dtoFactory.createControllerServiceDto(controllerService); - } - }; + return new StandardConfigurationResult(true, dtoFactory.createControllerServiceDto(controllerService)); } }); } @@ -1699,17 +1122,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public ControllerServiceDTO getConfiguration() { - return dtoFactory.createControllerServiceDto(controllerService); - } - }; + return new StandardConfigurationResult(false, dtoFactory.createControllerServiceDto(controllerService)); } }); } @@ -1726,17 +1139,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { public ConfigurationResult> execute() { final ControllerServiceReference reference = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); - return new ConfigurationResult>() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Set getConfiguration() { - return dtoFactory.createControllerServiceReferencingComponentsDto(reference); - } - }; + return new StandardConfigurationResult>(false, dtoFactory.createControllerServiceReferencingComponentsDto(reference)); } }); } @@ -1756,21 +1159,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; + return new StandardConfigurationResult(false, null); } }); } + @Override public ConfigurationSnapshot createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { @@ -1791,17 +1185,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { - @Override - public boolean isNew() { - return true; - } - - @Override - public ReportingTaskDTO getConfiguration() { - return dtoFactory.createReportingTaskDto(reportingTask); - } - }; + return new StandardConfigurationResult(true, dtoFactory.createReportingTaskDto(reportingTask)); } }); } @@ -1825,17 +1209,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public ReportingTaskDTO getConfiguration() { - return dtoFactory.createReportingTaskDto(reportingTask); - } - }; + return new StandardConfigurationResult(false, dtoFactory.createReportingTaskDto(reportingTask)); } }); } @@ -1855,17 +1229,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new ConfigurationResult() { - @Override - public boolean isNew() { - return false; - } - - @Override - public Void getConfiguration() { - return null; - } - }; + return new StandardConfigurationResult(false, null); } }); } @@ -2768,6 +2132,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return userDTOs; } + @Override public boolean isClustered() { return controllerFacade.isClustered(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/StandardConfigurationResult.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/StandardConfigurationResult.java new file mode 100644 index 0000000000..1d5e6851c3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/StandardConfigurationResult.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web; + +public class StandardConfigurationResult implements ConfigurationResult { + private final boolean isNew; + private final T configuration; + + public StandardConfigurationResult(final boolean isNew, final T configuration) { + this.isNew = isNew; + this.configuration = configuration; + } + + @Override + public boolean isNew() { + return isNew; + } + + @Override + public T getConfiguration() { + return configuration; + } +} diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java index 2280273509..95bdbe53e2 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java @@ -69,9 +69,9 @@ public class JMSConnectionFactoryProviderTest { assertNotNull(cf); assertEquals("org.apache.nifi.jms.testcflib.TestConnectionFactory", cf.getClass().getName()); assertEquals("myhost", this.get("getHost", cf)); - assertEquals(1234, this.get("getPort", cf)); + assertEquals(1234, ((Integer) this.get("getPort", cf)).intValue()); assertEquals("foo", this.get("getFoo", cf)); - assertEquals(3, this.get("getBar", cf)); + assertEquals(3, ((Integer) this.get("getBar", cf)).intValue()); } @Test(expected = AssertionError.class) diff --git a/pom.xml b/pom.xml index 442acdd550..e7027855eb 100644 --- a/pom.xml +++ b/pom.xml @@ -81,8 +81,8 @@ language governing permissions and limitations under the License. --> https://issues.apache.org/jira/browse/NIFI - 1.7 - 1.7 + 1.8 + 1.8 3.1.0 UTF-8 UTF-8 @@ -1313,7 +1313,7 @@ language governing permissions and limitations under the License. --> private UTF-8 true - 1.7 + 1.8 -J-Xmx512m @@ -1383,8 +1383,8 @@ language governing permissions and limitations under the License. --> - 1.7 - 1.7 + 1.8 + 1.8