NIFI-826 (part deux)

- fixed clustering issues discovered after NIFI-826 was applied
This commit is contained in:
Oleg Zhurakousky 2016-07-18 15:22:54 -04:00
parent aa91032cde
commit f4d2919955
5 changed files with 28 additions and 26 deletions

View File

@ -1402,7 +1402,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
templateDTO.setName(name);
templateDTO.setDescription(description);
templateDTO.setTimestamp(new Date());
templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true));
templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, true, true));
templateDTO.setEncodingVersion(TemplateDTO.MAX_ENCODING_VERSION);
// set the id based on the specified seed

View File

@ -205,7 +205,7 @@ public abstract class ApplicationResource {
if (seed.isPresent()) {
try {
UUID seedId = UUID.fromString(seed.get());
uuid = TypeOneUUIDGenerator.generateId(seedId.getMostSignificantBits(), Math.abs(seed.get().hashCode()));
uuid = new UUID(seedId.getMostSignificantBits(), Math.abs(seed.get().hashCode()));
} catch (Exception e) {
logger.warn("Provided 'seed' does not represent UUID. Will not be able to extract most significant bits for ID generation.");
uuid = UUID.nameUUIDFromBytes(seed.get().getBytes(StandardCharsets.UTF_8));

View File

@ -79,13 +79,13 @@ public class StandardSnippetDAO implements SnippetDAO {
}
// generate the snippet contents
FlowSnippetDTO snippetContents = snippetUtils.populateFlowSnippet(existingSnippet, true, false);
FlowSnippetDTO snippetContents = snippetUtils.populateFlowSnippet(existingSnippet, true, false, false);
// resolve sensitive properties
lookupSensitiveProperties(snippetContents);
// copy snippet
snippetContents = snippetUtils.copy(snippetContents, processGroup, idGenerationSeed);
snippetContents = snippetUtils.copy(snippetContents, processGroup, idGenerationSeed, true);
// move the snippet if necessary
if (originX != null && originY != null) {

View File

@ -89,7 +89,7 @@ public class StandardTemplateDAO extends ComponentDAO implements TemplateDAO {
try {
// copy the template which pre-processes all ids
TemplateDTO templateDetails = template.getDetails();
FlowSnippetDTO snippet = snippetUtils.copy(templateDetails.getSnippet(), group, idGenerationSeed);
FlowSnippetDTO snippet = snippetUtils.copy(templateDetails.getSnippet(), group, idGenerationSeed, false);
// calculate scaling factors based on the template encoding version
// attempt to parse the encoding version

View File

@ -16,8 +16,6 @@
*/
package org.apache.nifi.web.util;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -45,6 +43,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.util.TypeOneUUIDGenerator;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
@ -69,6 +68,8 @@ public final class SnippetUtils {
private FlowController flowController;
private DtoFactory dtoFactory;
/**
* Populates the specified snippet and returns the details.
*
@ -77,8 +78,8 @@ public final class SnippetUtils {
* @param includeControllerServices whether or not to include controller services in the flow snippet dto
* @return snippet
*/
public FlowSnippetDTO populateFlowSnippet(final Snippet snippet, final boolean recurse, final boolean includeControllerServices) {
final FlowSnippetDTO snippetDto = new FlowSnippetDTO(true);
public FlowSnippetDTO populateFlowSnippet(final Snippet snippet, final boolean recurse, final boolean includeControllerServices, boolean removeInstanceId) {
final FlowSnippetDTO snippetDto = new FlowSnippetDTO(removeInstanceId);
final String groupId = snippet.getParentGroupId();
final ProcessGroup processGroup = flowController.getGroup(groupId);
@ -261,8 +262,9 @@ public final class SnippetUtils {
}
public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group, final String idGenerationSeed) {
final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null, idGenerationSeed);
public FlowSnippetDTO copy(final FlowSnippetDTO snippetContents, final ProcessGroup group,
final String idGenerationSeed, boolean isCopy) {
final FlowSnippetDTO snippetCopy = copyContentsForGroup(snippetContents, group.getIdentifier(), null, null, idGenerationSeed, isCopy);
resolveNameConflicts(snippetCopy, group);
return snippetCopy;
}
@ -318,7 +320,7 @@ public final class SnippetUtils {
}
private FlowSnippetDTO copyContentsForGroup(final FlowSnippetDTO snippetContents, final String groupId, final Map<String, ConnectableDTO> parentConnectableMap, Map<String, String> serviceIdMap,
final String idGenerationSeed) {
final String idGenerationSeed, boolean isCopy) {
final FlowSnippetDTO snippetContentsCopy = new FlowSnippetDTO();
//
@ -332,7 +334,7 @@ public final class SnippetUtils {
if (snippetContents.getControllerServices() != null) {
for (final ControllerServiceDTO serviceDTO : snippetContents.getControllerServices()) {
final ControllerServiceDTO service = dtoFactory.copy(serviceDTO);
service.setId(generateId(serviceDTO.getId(), idGenerationSeed));
service.setId(generateId(serviceDTO.getId(), idGenerationSeed, isCopy));
service.setState(ControllerServiceState.DISABLED.name());
services.add(service);
@ -368,7 +370,7 @@ public final class SnippetUtils {
if (snippetContents.getLabels() != null) {
for (final LabelDTO labelDTO : snippetContents.getLabels()) {
final LabelDTO label = dtoFactory.copy(labelDTO);
label.setId(generateId(labelDTO.getId(), idGenerationSeed));
label.setId(generateId(labelDTO.getId(), idGenerationSeed, isCopy));
label.setParentGroupId(groupId);
labels.add(label);
}
@ -388,7 +390,7 @@ public final class SnippetUtils {
if (snippetContents.getFunnels() != null) {
for (final FunnelDTO funnelDTO : snippetContents.getFunnels()) {
final FunnelDTO cp = dtoFactory.copy(funnelDTO);
cp.setId(generateId(funnelDTO.getId(), idGenerationSeed));
cp.setId(generateId(funnelDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
funnels.add(cp);
@ -401,7 +403,7 @@ public final class SnippetUtils {
if (snippetContents.getInputPorts() != null) {
for (final PortDTO portDTO : snippetContents.getInputPorts()) {
final PortDTO cp = dtoFactory.copy(portDTO);
cp.setId(generateId(portDTO.getId(), idGenerationSeed));
cp.setId(generateId(portDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
cp.setState(ScheduledState.STOPPED.toString());
inputPorts.add(cp);
@ -419,7 +421,7 @@ public final class SnippetUtils {
if (snippetContents.getOutputPorts() != null) {
for (final PortDTO portDTO : snippetContents.getOutputPorts()) {
final PortDTO cp = dtoFactory.copy(portDTO);
cp.setId(generateId(portDTO.getId(), idGenerationSeed));
cp.setId(generateId(portDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
cp.setState(ScheduledState.STOPPED.toString());
outputPorts.add(cp);
@ -440,7 +442,7 @@ public final class SnippetUtils {
if (snippetContents.getProcessors() != null) {
for (final ProcessorDTO processorDTO : snippetContents.getProcessors()) {
final ProcessorDTO cp = dtoFactory.copy(processorDTO);
cp.setId(generateId(processorDTO.getId(), idGenerationSeed));
cp.setId(generateId(processorDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
cp.setState(ScheduledState.STOPPED.toString());
processors.add(cp);
@ -461,11 +463,11 @@ public final class SnippetUtils {
if (snippetContents.getProcessGroups() != null) {
for (final ProcessGroupDTO groupDTO : snippetContents.getProcessGroups()) {
final ProcessGroupDTO cp = dtoFactory.copy(groupDTO, false);
cp.setId(generateId(groupDTO.getId(), idGenerationSeed));
cp.setId(generateId(groupDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
// copy the contents of this group - we do not copy via the dto factory since we want to specify new ids
final FlowSnippetDTO contentsCopy = copyContentsForGroup(groupDTO.getContents(), cp.getId(), connectableMap, serviceIdMap, idGenerationSeed);
final FlowSnippetDTO contentsCopy = copyContentsForGroup(groupDTO.getContents(), cp.getId(), connectableMap, serviceIdMap, idGenerationSeed, isCopy);
cp.setContents(contentsCopy);
groups.add(cp);
}
@ -476,7 +478,7 @@ public final class SnippetUtils {
if (snippetContents.getRemoteProcessGroups() != null) {
for (final RemoteProcessGroupDTO remoteGroupDTO : snippetContents.getRemoteProcessGroups()) {
final RemoteProcessGroupDTO cp = dtoFactory.copy(remoteGroupDTO);
cp.setId(generateId(remoteGroupDTO.getId(), idGenerationSeed));
cp.setId(generateId(remoteGroupDTO.getId(), idGenerationSeed, isCopy));
cp.setParentGroupId(groupId);
final RemoteProcessGroupContentsDTO contents = cp.getContents();
@ -511,7 +513,7 @@ public final class SnippetUtils {
throw new IllegalArgumentException("The flow snippet contains a Connection that references a component that is not included.");
}
cp.setId(generateId(connectionDTO.getId(), idGenerationSeed));
cp.setId(generateId(connectionDTO.getId(), idGenerationSeed, isCopy));
cp.setSource(source);
cp.setDestination(destination);
cp.setParentGroupId(groupId);
@ -565,13 +567,13 @@ public final class SnippetUtils {
/**
* Generates a new id for the current id that is specified. If no seed is found, a new random id will be created.
*/
private String generateId(final String currentId, final String seed) {
private String generateId(final String currentId, final String seed, boolean isCopy) {
long msb = UUID.fromString(currentId).getMostSignificantBits();
long lsb = StringUtils.isBlank(seed)
int lsb = StringUtils.isBlank(seed)
? Math.abs(new Random().nextInt())
: Math.abs(ByteBuffer.wrap(seed.getBytes(StandardCharsets.UTF_8)).getInt());
: Math.abs(seed.hashCode());
return new UUID(msb, lsb).toString();
return isCopy ? TypeOneUUIDGenerator.generateId(msb, lsb).toString() : new UUID(msb, lsb).toString();
}
/* setters */