mirror of https://github.com/apache/nifi.git
NIFI-826 This closes #617. Added deterministic template support
This commit is contained in:
parent
4d4c525d9c
commit
52a961873b
|
@ -0,0 +1,76 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.util;
|
||||||
|
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class TypeOneUUIDGenerator {
|
||||||
|
|
||||||
|
public static final Object lock = new Object();
|
||||||
|
|
||||||
|
private static long lastTime;
|
||||||
|
private static long clockSequence = 0;
|
||||||
|
private static final Random randomGenerator = new Random();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Will generate unique time based UUID where the next UUID is always
|
||||||
|
* greater then the previous.
|
||||||
|
*/
|
||||||
|
public final static UUID generateId() {
|
||||||
|
return generateId(System.currentTimeMillis());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public final static UUID generateId(long currentTime) {
|
||||||
|
return generateId(currentTime, Math.abs(randomGenerator.nextInt()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public final static UUID generateId(long currentTime, int lsbInt) {
|
||||||
|
long time;
|
||||||
|
|
||||||
|
synchronized (lock) {
|
||||||
|
if (currentTime > lastTime) {
|
||||||
|
lastTime = currentTime;
|
||||||
|
clockSequence = 0;
|
||||||
|
} else {
|
||||||
|
++clockSequence;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
time = currentTime;
|
||||||
|
|
||||||
|
// low Time
|
||||||
|
time = currentTime << 32;
|
||||||
|
|
||||||
|
// mid Time
|
||||||
|
time |= ((currentTime & 0xFFFF00000000L) >> 16);
|
||||||
|
|
||||||
|
// hi Time
|
||||||
|
time |= 0x1000 | ((currentTime >> 48) & 0x0FFF);
|
||||||
|
|
||||||
|
long clockSequenceHi = clockSequence;
|
||||||
|
clockSequenceHi <<= 48;
|
||||||
|
long lsb = clockSequenceHi | lsbInt;
|
||||||
|
return new UUID(time, lsb);
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,11 +16,16 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.web.api.dto;
|
package org.apache.nifi.web.api.dto;
|
||||||
|
|
||||||
import com.wordnik.swagger.annotations.ApiModelProperty;
|
import java.util.Comparator;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
import javax.xml.bind.annotation.XmlType;
|
import javax.xml.bind.annotation.XmlType;
|
||||||
|
|
||||||
|
import com.wordnik.swagger.annotations.ApiModelProperty;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The contents of a flow snippet.
|
* The contents of a flow snippet.
|
||||||
*/
|
*/
|
||||||
|
@ -37,6 +42,15 @@ public class FlowSnippetDTO {
|
||||||
private Set<FunnelDTO> funnels = new LinkedHashSet<>();
|
private Set<FunnelDTO> funnels = new LinkedHashSet<>();
|
||||||
private Set<ControllerServiceDTO> controllerServices = new LinkedHashSet<>();
|
private Set<ControllerServiceDTO> controllerServices = new LinkedHashSet<>();
|
||||||
|
|
||||||
|
private final boolean newTemplate;
|
||||||
|
|
||||||
|
public FlowSnippetDTO() {
|
||||||
|
this(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FlowSnippetDTO(boolean newTemplate) {
|
||||||
|
this.newTemplate = newTemplate;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* @return connections in this flow snippet
|
* @return connections in this flow snippet
|
||||||
*/
|
*/
|
||||||
|
@ -48,7 +62,8 @@ public class FlowSnippetDTO {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setConnections(Set<ConnectionDTO> connections) {
|
public void setConnections(Set<ConnectionDTO> connections) {
|
||||||
this.connections = connections;
|
this.removeInstanceIdentifierIfNecessary(connections);
|
||||||
|
this.connections = this.orderedById(connections);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -62,7 +77,8 @@ public class FlowSnippetDTO {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setInputPorts(Set<PortDTO> inputPorts) {
|
public void setInputPorts(Set<PortDTO> inputPorts) {
|
||||||
this.inputPorts = inputPorts;
|
this.removeInstanceIdentifierIfNecessary(inputPorts);
|
||||||
|
this.inputPorts = this.orderedById(inputPorts);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -76,7 +92,8 @@ public class FlowSnippetDTO {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setLabels(Set<LabelDTO> labels) {
|
public void setLabels(Set<LabelDTO> labels) {
|
||||||
this.labels = labels;
|
this.removeInstanceIdentifierIfNecessary(labels);
|
||||||
|
this.labels = this.orderedById(labels);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -90,7 +107,8 @@ public class FlowSnippetDTO {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setFunnels(Set<FunnelDTO> funnels) {
|
public void setFunnels(Set<FunnelDTO> funnels) {
|
||||||
this.funnels = funnels;
|
this.removeInstanceIdentifierIfNecessary(funnels);
|
||||||
|
this.funnels = this.orderedById(funnels);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -104,7 +122,8 @@ public class FlowSnippetDTO {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setOutputPorts(Set<PortDTO> outputPorts) {
|
public void setOutputPorts(Set<PortDTO> outputPorts) {
|
||||||
this.outputPorts = outputPorts;
|
this.removeInstanceIdentifierIfNecessary(outputPorts);
|
||||||
|
this.outputPorts = this.orderedById(outputPorts);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -118,7 +137,8 @@ public class FlowSnippetDTO {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setProcessGroups(Set<ProcessGroupDTO> processGroups) {
|
public void setProcessGroups(Set<ProcessGroupDTO> processGroups) {
|
||||||
this.processGroups = processGroups;
|
this.removeInstanceIdentifierIfNecessary(processGroups);
|
||||||
|
this.processGroups = this.orderedById(processGroups);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -132,7 +152,8 @@ public class FlowSnippetDTO {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setProcessors(Set<ProcessorDTO> processors) {
|
public void setProcessors(Set<ProcessorDTO> processors) {
|
||||||
this.processors = processors;
|
this.removeInstanceIdentifierIfNecessary(processors);
|
||||||
|
this.processors = this.orderedById(processors);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -146,7 +167,8 @@ public class FlowSnippetDTO {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setRemoteProcessGroups(Set<RemoteProcessGroupDTO> remoteProcessGroups) {
|
public void setRemoteProcessGroups(Set<RemoteProcessGroupDTO> remoteProcessGroups) {
|
||||||
this.remoteProcessGroups = remoteProcessGroups;
|
this.removeInstanceIdentifierIfNecessary(remoteProcessGroups);
|
||||||
|
this.remoteProcessGroups = this.orderedById(remoteProcessGroups);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -160,6 +182,40 @@ public class FlowSnippetDTO {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setControllerServices(Set<ControllerServiceDTO> controllerServices) {
|
public void setControllerServices(Set<ControllerServiceDTO> controllerServices) {
|
||||||
this.controllerServices = controllerServices;
|
this.removeInstanceIdentifierIfNecessary(controllerServices);
|
||||||
|
this.controllerServices = this.orderedById(controllerServices);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T extends ComponentDTO> Set<T> orderedById(Set<T> dtos) {
|
||||||
|
TreeSet<T> components = new TreeSet<>(new Comparator<ComponentDTO>() {
|
||||||
|
@Override
|
||||||
|
public int compare(ComponentDTO c1, ComponentDTO c2) {
|
||||||
|
return UUID.fromString(c1.getId()).compareTo(UUID.fromString(c2.getId()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
components.addAll(dtos);
|
||||||
|
return components;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeInstanceIdentifierIfNecessary(Set<? extends ComponentDTO> componentDtos) {
|
||||||
|
if (this.newTemplate) {
|
||||||
|
for (ComponentDTO componentDto : componentDtos) {
|
||||||
|
UUID id = UUID.fromString(componentDto.getId());
|
||||||
|
id = new UUID(id.getMostSignificantBits(), 0);
|
||||||
|
componentDto.setId(id.toString());
|
||||||
|
if (componentDto instanceof ConnectionDTO) {
|
||||||
|
ConnectionDTO connectionDTO = (ConnectionDTO) componentDto;
|
||||||
|
ConnectableDTO cdto = connectionDTO.getSource();
|
||||||
|
id = UUID.fromString(cdto.getId());
|
||||||
|
id = new UUID(id.getMostSignificantBits(), 0);
|
||||||
|
cdto.setId(id.toString());
|
||||||
|
|
||||||
|
cdto = connectionDTO.getDestination();
|
||||||
|
id = UUID.fromString(cdto.getId());
|
||||||
|
id = new UUID(id.getMostSignificantBits(), 0);
|
||||||
|
cdto.setId(id.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||||
import org.apache.nifi.events.EventReporter;
|
import org.apache.nifi.events.EventReporter;
|
||||||
import org.apache.nifi.reporting.Severity;
|
import org.apache.nifi.reporting.Severity;
|
||||||
import org.apache.nifi.util.FormatUtils;
|
import org.apache.nifi.util.FormatUtils;
|
||||||
|
import org.apache.nifi.util.TypeOneUUIDGenerator;
|
||||||
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
|
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -216,12 +217,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
@Override
|
@Override
|
||||||
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, final boolean indicateReplicated) {
|
public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, final boolean indicateReplicated) {
|
||||||
final Map<String, String> updatedHeaders = new HashMap<>(headers);
|
final Map<String, String> updatedHeaders = new HashMap<>(headers);
|
||||||
updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, UUID.randomUUID().toString());
|
|
||||||
|
|
||||||
|
updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, TypeOneUUIDGenerator.generateId().toString());
|
||||||
if (indicateReplicated) {
|
if (indicateReplicated) {
|
||||||
updatedHeaders.put(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true");
|
updatedHeaders.put(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// If the user is authenticated, add them as a proxied entity so that when the receiving NiFi receives the request,
|
// If the user is authenticated, add them as a proxied entity so that when the receiving NiFi receives the request,
|
||||||
// it knows that we are acting as a proxy on behalf of the current user.
|
// it knows that we are acting as a proxy on behalf of the current user.
|
||||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||||
|
|
|
@ -174,6 +174,12 @@
|
||||||
<artifactId>cglib-nodep</artifactId>
|
<artifactId>cglib-nodep</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.eclipse.jgit</groupId>
|
||||||
|
<artifactId>org.eclipse.jgit</artifactId>
|
||||||
|
<version>4.3.1.201605051710-r</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
|
|
|
@ -26,6 +26,11 @@ import javax.xml.bind.Marshaller;
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import javax.xml.stream.XMLOutputFactory;
|
||||||
|
import javax.xml.stream.XMLStreamException;
|
||||||
|
import javax.xml.stream.XMLStreamWriter;
|
||||||
|
|
||||||
|
import com.sun.xml.txw2.output.IndentingXMLStreamWriter;
|
||||||
|
|
||||||
public final class TemplateSerializer {
|
public final class TemplateSerializer {
|
||||||
|
|
||||||
|
@ -39,12 +44,13 @@ public final class TemplateSerializer {
|
||||||
|
|
||||||
JAXBContext context = JAXBContext.newInstance(TemplateDTO.class);
|
JAXBContext context = JAXBContext.newInstance(TemplateDTO.class);
|
||||||
Marshaller marshaller = context.createMarshaller();
|
Marshaller marshaller = context.createMarshaller();
|
||||||
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
|
XMLOutputFactory xmlof = XMLOutputFactory.newInstance();
|
||||||
marshaller.marshal(dto, bos);
|
XMLStreamWriter writer = new IndentingXMLStreamWriter(xmlof.createXMLStreamWriter(bos));
|
||||||
|
marshaller.marshal(dto, writer);
|
||||||
|
|
||||||
bos.flush();
|
bos.flush();
|
||||||
return baos.toByteArray();
|
return baos.toByteArray();
|
||||||
} catch (final IOException | JAXBException e) {
|
} catch (final IOException | JAXBException | XMLStreamException e) {
|
||||||
throw new FlowSerializationException(e);
|
throw new FlowSerializationException(e);
|
||||||
} finally {
|
} finally {
|
||||||
if (currentCl != null) {
|
if (currentCl != null) {
|
||||||
|
|
|
@ -43,7 +43,7 @@ class SnippetUtilsSpec extends Specification {
|
||||||
where:
|
where:
|
||||||
nX | nY | oX | oY | fX | fY | snippet
|
nX | nY | oX | oY | fX | fY | snippet
|
||||||
500 | 500 | 0 | 0 | 1.5 | 1.34 | new FlowSnippetDTO()
|
500 | 500 | 0 | 0 | 1.5 | 1.34 | new FlowSnippetDTO()
|
||||||
500 | 500 | 10 | 10 | 1.5 | 1.34 | new FlowSnippetDTO(processors: [new ProcessorDTO(position: new PositionDTO(x: 10, y: 10))])
|
500 | 500 | 10 | 10 | 1.5 | 1.34 | new FlowSnippetDTO(processors: [new ProcessorDTO(id:"c81f6810-0155-1000-0000-c4af042cb1559", position: new PositionDTO(x: 10, y: 10))])
|
||||||
}
|
}
|
||||||
|
|
||||||
@Unroll
|
@Unroll
|
||||||
|
@ -69,10 +69,10 @@ class SnippetUtilsSpec extends Specification {
|
||||||
fX | fY | snippet
|
fX | fY | snippet
|
||||||
1.5 | 1.34 | new FlowSnippetDTO()
|
1.5 | 1.34 | new FlowSnippetDTO()
|
||||||
1.5 | 1.34 | new FlowSnippetDTO(
|
1.5 | 1.34 | new FlowSnippetDTO(
|
||||||
processors: [new ProcessorDTO(position: new PositionDTO(x: 10, y: 10))],
|
processors: [new ProcessorDTO(id:"c81f6810-0155-1000-0001-c4af042cb1559", position: new PositionDTO(x: 10, y: 10))],
|
||||||
processGroups: [
|
processGroups: [
|
||||||
new ProcessGroupDTO(position: new PositionDTO(x: 105, y: -10), name: 'pg2',
|
new ProcessGroupDTO(id:"c81f6a10-0155-1000-0002-c4af042cb1559", position: new PositionDTO(x: 105, y: -10), name: 'pg2',
|
||||||
contents: new FlowSnippetDTO(processors: [new ProcessorDTO(name: 'proc1', position: new PositionDTO(x: 50, y: 60))]))])
|
contents: new FlowSnippetDTO(processors: [new ProcessorDTO(id:"c81f6810-0155-1000-0002-c4af042cb1559", name: 'proc1', position: new PositionDTO(x: 50, y: 60))]))])
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,140 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.persistence;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.lang.reflect.Modifier;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import javax.xml.bind.JAXBContext;
|
||||||
|
import javax.xml.bind.JAXBElement;
|
||||||
|
import javax.xml.bind.Unmarshaller;
|
||||||
|
import javax.xml.transform.stream.StreamSource;
|
||||||
|
|
||||||
|
import org.apache.nifi.nar.NarClassLoader;
|
||||||
|
import org.apache.nifi.nar.NarClassLoaders;
|
||||||
|
import org.apache.nifi.util.TypeOneUUIDGenerator;
|
||||||
|
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
|
||||||
|
import org.apache.nifi.web.api.dto.ProcessorDTO;
|
||||||
|
import org.apache.nifi.web.api.dto.TemplateDTO;
|
||||||
|
import org.eclipse.jgit.diff.DiffFormatter;
|
||||||
|
import org.eclipse.jgit.diff.EditList;
|
||||||
|
import org.eclipse.jgit.diff.HistogramDiff;
|
||||||
|
import org.eclipse.jgit.diff.RawText;
|
||||||
|
import org.eclipse.jgit.diff.RawTextComparator;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TemplateSerializerTest {
|
||||||
|
@Before
|
||||||
|
public void before() throws Exception {
|
||||||
|
Field initField = NarClassLoaders.class.getDeclaredField("initialized");
|
||||||
|
setFinalField(initField, new AtomicBoolean(true));
|
||||||
|
Field clField = NarClassLoaders.class.getDeclaredField("frameworkClassLoader");
|
||||||
|
NarClassLoader cl = new NarClassLoader(new File(""), Thread.currentThread().getContextClassLoader());
|
||||||
|
setFinalField(clField, new AtomicReference<NarClassLoader>(cl));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void validateDiffWithChangingComponentIdAndAdditionalElements() throws Exception {
|
||||||
|
|
||||||
|
// Create initial template (TemplateDTO)
|
||||||
|
FlowSnippetDTO snippet = new FlowSnippetDTO();
|
||||||
|
Set<ProcessorDTO> procs = new HashSet<>();
|
||||||
|
for (int i = 4; i > 0; i--) {
|
||||||
|
ProcessorDTO procDTO = new ProcessorDTO();
|
||||||
|
procDTO.setType("Processor" + i + ".class");
|
||||||
|
procDTO.setId(TypeOneUUIDGenerator.generateId().toString());
|
||||||
|
procs.add(procDTO);
|
||||||
|
}
|
||||||
|
snippet.setProcessors(procs);
|
||||||
|
TemplateDTO origTemplate = new TemplateDTO();
|
||||||
|
origTemplate.setDescription("MyTemplate");
|
||||||
|
origTemplate.setId("MyTemplate");
|
||||||
|
origTemplate.setSnippet(snippet);
|
||||||
|
byte[] serTemplate = TemplateSerializer.serialize(origTemplate);
|
||||||
|
|
||||||
|
// Deserialize Template into TemplateDTP
|
||||||
|
ByteArrayInputStream in = new ByteArrayInputStream(serTemplate);
|
||||||
|
JAXBContext context = JAXBContext.newInstance(TemplateDTO.class);
|
||||||
|
Unmarshaller unmarshaller = context.createUnmarshaller();
|
||||||
|
JAXBElement<TemplateDTO> templateElement = unmarshaller.unmarshal(new StreamSource(in), TemplateDTO.class);
|
||||||
|
TemplateDTO deserTemplate = templateElement.getValue();
|
||||||
|
|
||||||
|
// Modify deserialized template
|
||||||
|
FlowSnippetDTO deserSnippet = deserTemplate.getSnippet();
|
||||||
|
Set<ProcessorDTO> deserProcs = deserSnippet.getProcessors();
|
||||||
|
int c = 0;
|
||||||
|
for (ProcessorDTO processorDTO : deserProcs) {
|
||||||
|
if (c % 2 == 0) {
|
||||||
|
processorDTO.setName("Hello-" + c);
|
||||||
|
}
|
||||||
|
c++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// add new Processor
|
||||||
|
ProcessorDTO procDTO = new ProcessorDTO();
|
||||||
|
procDTO.setType("ProcessorNew" + ".class");
|
||||||
|
procDTO.setId(TypeOneUUIDGenerator.generateId().toString());
|
||||||
|
deserProcs.add(procDTO);
|
||||||
|
|
||||||
|
// Serialize modified template
|
||||||
|
byte[] serTemplate2 = TemplateSerializer.serialize(deserTemplate);
|
||||||
|
|
||||||
|
RawText rt1 = new RawText(serTemplate);
|
||||||
|
RawText rt2 = new RawText(serTemplate2);
|
||||||
|
EditList diffList = new EditList();
|
||||||
|
diffList.addAll(new HistogramDiff().diff(RawTextComparator.DEFAULT, rt1, rt2));
|
||||||
|
|
||||||
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||||
|
try (DiffFormatter diff = new DiffFormatter(out);) {
|
||||||
|
diff.format(diffList, rt1, rt2);
|
||||||
|
|
||||||
|
BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(out.toByteArray()), StandardCharsets.UTF_8));
|
||||||
|
|
||||||
|
List<String> changes = reader.lines().peek(System.out::println)
|
||||||
|
.filter(line -> line.startsWith("+") || line.startsWith("-")).collect(Collectors.toList());
|
||||||
|
|
||||||
|
assertEquals("+ <name>Hello-0</name>", changes.get(0));
|
||||||
|
assertEquals("+ <name>Hello-2</name>", changes.get(1));
|
||||||
|
assertEquals("+ <processors>", changes.get(2));
|
||||||
|
assertEquals("+ <type>ProcessorNew.class</type>", changes.get(4));
|
||||||
|
assertEquals("+ </processors>", changes.get(5));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setFinalField(Field field, Object newValue) throws Exception {
|
||||||
|
field.setAccessible(true);
|
||||||
|
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||||
|
modifiersField.setAccessible(true);
|
||||||
|
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
|
||||||
|
field.set(null, newValue);
|
||||||
|
}
|
||||||
|
}
|
|
@ -41,6 +41,7 @@ import org.apache.nifi.remote.exception.NotAuthorizedException;
|
||||||
import org.apache.nifi.remote.protocol.ResponseCode;
|
import org.apache.nifi.remote.protocol.ResponseCode;
|
||||||
import org.apache.nifi.remote.protocol.http.HttpHeaders;
|
import org.apache.nifi.remote.protocol.http.HttpHeaders;
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
|
import org.apache.nifi.util.TypeOneUUIDGenerator;
|
||||||
import org.apache.nifi.web.AuthorizableLookup;
|
import org.apache.nifi.web.AuthorizableLookup;
|
||||||
import org.apache.nifi.web.AuthorizeAccess;
|
import org.apache.nifi.web.AuthorizeAccess;
|
||||||
import org.apache.nifi.web.NiFiServiceFacade;
|
import org.apache.nifi.web.NiFiServiceFacade;
|
||||||
|
@ -200,7 +201,19 @@ public abstract class ApplicationResource {
|
||||||
|
|
||||||
protected String generateUuid() {
|
protected String generateUuid() {
|
||||||
final Optional<String> seed = getIdGenerationSeed();
|
final Optional<String> seed = getIdGenerationSeed();
|
||||||
return seed.isPresent() ? UUID.nameUUIDFromBytes(seed.get().getBytes(StandardCharsets.UTF_8)).toString() : UUID.randomUUID().toString();
|
UUID uuid;
|
||||||
|
if (seed.isPresent()) {
|
||||||
|
try {
|
||||||
|
UUID seedId = UUID.fromString(seed.get());
|
||||||
|
uuid = TypeOneUUIDGenerator.generateId(seedId.getMostSignificantBits(), Math.abs(seed.get().hashCode()));
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("Provided 'seed' does not represent UUID. Will not be able to extract most significant bits for ID generation.");
|
||||||
|
uuid = UUID.nameUUIDFromBytes(seed.get().getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
uuid = TypeOneUUIDGenerator.generateId();
|
||||||
|
}
|
||||||
|
return uuid.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Optional<String> getIdGenerationSeed() {
|
protected Optional<String> getIdGenerationSeed() {
|
||||||
|
|
|
@ -16,20 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.web.api;
|
package org.apache.nifi.web.api;
|
||||||
|
|
||||||
import com.wordnik.swagger.annotations.Api;
|
import java.nio.charset.StandardCharsets;
|
||||||
import com.wordnik.swagger.annotations.ApiOperation;
|
import java.util.Set;
|
||||||
import com.wordnik.swagger.annotations.ApiParam;
|
|
||||||
import com.wordnik.swagger.annotations.ApiResponse;
|
|
||||||
import com.wordnik.swagger.annotations.ApiResponses;
|
|
||||||
import com.wordnik.swagger.annotations.Authorization;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.nifi.authorization.Authorizer;
|
|
||||||
import org.apache.nifi.authorization.RequestAction;
|
|
||||||
import org.apache.nifi.authorization.resource.Authorizable;
|
|
||||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
|
||||||
import org.apache.nifi.web.NiFiServiceFacade;
|
|
||||||
import org.apache.nifi.web.api.dto.TemplateDTO;
|
|
||||||
import org.apache.nifi.web.api.entity.TemplateEntity;
|
|
||||||
|
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.ws.rs.Consumes;
|
import javax.ws.rs.Consumes;
|
||||||
|
@ -42,7 +30,23 @@ import javax.ws.rs.Produces;
|
||||||
import javax.ws.rs.core.Context;
|
import javax.ws.rs.core.Context;
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import java.util.Set;
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.authorization.Authorizer;
|
||||||
|
import org.apache.nifi.authorization.RequestAction;
|
||||||
|
import org.apache.nifi.authorization.resource.Authorizable;
|
||||||
|
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||||
|
import org.apache.nifi.persistence.TemplateSerializer;
|
||||||
|
import org.apache.nifi.web.NiFiServiceFacade;
|
||||||
|
import org.apache.nifi.web.api.dto.TemplateDTO;
|
||||||
|
import org.apache.nifi.web.api.entity.TemplateEntity;
|
||||||
|
|
||||||
|
import com.wordnik.swagger.annotations.Api;
|
||||||
|
import com.wordnik.swagger.annotations.ApiOperation;
|
||||||
|
import com.wordnik.swagger.annotations.ApiParam;
|
||||||
|
import com.wordnik.swagger.annotations.ApiResponse;
|
||||||
|
import com.wordnik.swagger.annotations.ApiResponses;
|
||||||
|
import com.wordnik.swagger.annotations.Authorization;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RESTful endpoint for managing a Template.
|
* RESTful endpoint for managing a Template.
|
||||||
|
@ -168,7 +172,13 @@ public class TemplateResource extends ApplicationResource {
|
||||||
}
|
}
|
||||||
|
|
||||||
// generate the response
|
// generate the response
|
||||||
return generateOkResponse(template).header("Content-Disposition", String.format("attachment; filename=\"%s.xml\"", attachmentName)).build();
|
/*
|
||||||
|
* Here instead of relying on default JAXB marshalling we are simply
|
||||||
|
* serializing template to String (formatted, indented etc) and sending
|
||||||
|
* it as part of the response.
|
||||||
|
*/
|
||||||
|
String serializedTemplate = new String(TemplateSerializer.serialize(template), StandardCharsets.UTF_8);
|
||||||
|
return generateOkResponse(serializedTemplate).header("Content-Disposition", String.format("attachment; filename=\"%s.xml\"", attachmentName)).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -16,17 +16,21 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.web.util;
|
package org.apache.nifi.web.util;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.connectable.ConnectableType;
|
import org.apache.nifi.connectable.ConnectableType;
|
||||||
import org.apache.nifi.connectable.Connection;
|
import org.apache.nifi.connectable.Connection;
|
||||||
|
@ -74,7 +78,7 @@ public final class SnippetUtils {
|
||||||
* @return snippet
|
* @return snippet
|
||||||
*/
|
*/
|
||||||
public FlowSnippetDTO populateFlowSnippet(final Snippet snippet, final boolean recurse, final boolean includeControllerServices) {
|
public FlowSnippetDTO populateFlowSnippet(final Snippet snippet, final boolean recurse, final boolean includeControllerServices) {
|
||||||
final FlowSnippetDTO snippetDto = new FlowSnippetDTO();
|
final FlowSnippetDTO snippetDto = new FlowSnippetDTO(true);
|
||||||
final String groupId = snippet.getParentGroupId();
|
final String groupId = snippet.getParentGroupId();
|
||||||
final ProcessGroup processGroup = flowController.getGroup(groupId);
|
final ProcessGroup processGroup = flowController.getGroup(groupId);
|
||||||
|
|
||||||
|
@ -99,9 +103,12 @@ public final class SnippetUtils {
|
||||||
controllerServices.addAll(getControllerServices(processor.getProperties()));
|
controllerServices.addAll(getControllerServices(processor.getProperties()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
this.normalizeCoordinates(processors);
|
||||||
snippetDto.setProcessors(processors);
|
snippetDto.setProcessors(processors);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// add any connections
|
// add any connections
|
||||||
if (!snippet.getConnections().isEmpty()) {
|
if (!snippet.getConnections().isEmpty()) {
|
||||||
final Set<ConnectionDTO> connections = new LinkedHashSet<>();
|
final Set<ConnectionDTO> connections = new LinkedHashSet<>();
|
||||||
|
@ -559,11 +566,12 @@ public final class SnippetUtils {
|
||||||
* Generates a new id for the current id that is specified. If no seed is found, a new random id will be created.
|
* Generates a new id for the current id that is specified. If no seed is found, a new random id will be created.
|
||||||
*/
|
*/
|
||||||
private String generateId(final String currentId, final String seed) {
|
private String generateId(final String currentId, final String seed) {
|
||||||
if (seed == null) {
|
long msb = UUID.fromString(currentId).getMostSignificantBits();
|
||||||
return UUID.randomUUID().toString();
|
long lsb = StringUtils.isBlank(seed)
|
||||||
} else {
|
? Math.abs(new Random().nextInt())
|
||||||
return UUID.nameUUIDFromBytes((currentId + seed).getBytes(StandardCharsets.UTF_8)).toString();
|
: Math.abs(ByteBuffer.wrap(seed.getBytes(StandardCharsets.UTF_8)).getInt());
|
||||||
}
|
|
||||||
|
return new UUID(msb, lsb).toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* setters */
|
/* setters */
|
||||||
|
@ -575,4 +583,30 @@ public final class SnippetUtils {
|
||||||
this.flowController = flowController;
|
this.flowController = flowController;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Will normalize the coordinates of the processors to ensure their
|
||||||
|
* consistency across exports. It will do so by fist calculating the
|
||||||
|
* smallest X and smallest Y and then subtracting it from all X's and Y's of
|
||||||
|
* each processor ensuring that coordinates are consistent across export
|
||||||
|
* while preserving relative locations set by the user.
|
||||||
|
*/
|
||||||
|
private void normalizeCoordinates(Collection<ProcessorDTO> processors) {
|
||||||
|
double smallestX = Double.MAX_VALUE;
|
||||||
|
double smallestY = Double.MAX_VALUE;
|
||||||
|
for (ProcessorDTO processor : processors) {
|
||||||
|
double d = processor.getPosition().getX();
|
||||||
|
if (d < smallestX) {
|
||||||
|
smallestX = d;
|
||||||
|
}
|
||||||
|
d = processor.getPosition().getY();
|
||||||
|
if (d < smallestY) {
|
||||||
|
smallestY = d;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (ProcessorDTO processor : processors) {
|
||||||
|
processor.getPosition().setX(processor.getPosition().getX() - smallestX);
|
||||||
|
processor.getPosition().setY(processor.getPosition().getY() - smallestY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,8 @@ import org.apache.nifi.controller.service.ControllerServiceProvider
|
||||||
import org.apache.nifi.groups.ProcessGroup
|
import org.apache.nifi.groups.ProcessGroup
|
||||||
import org.apache.nifi.web.api.dto.*
|
import org.apache.nifi.web.api.dto.*
|
||||||
import org.apache.nifi.web.util.SnippetUtils
|
import org.apache.nifi.web.util.SnippetUtils
|
||||||
|
|
||||||
|
import spock.lang.Ignore;
|
||||||
import spock.lang.Specification
|
import spock.lang.Specification
|
||||||
import spock.lang.Unroll
|
import spock.lang.Unroll
|
||||||
|
|
||||||
|
@ -117,28 +119,28 @@ class StandardTemplateDAOSpec extends Specification {
|
||||||
rootGroupId | oldOriginX | oldOriginY | newOriginX | newOriginY | templateId | idGenerationSeed | encodingVersion | snippet
|
rootGroupId | oldOriginX | oldOriginY | newOriginX | newOriginY | templateId | idGenerationSeed | encodingVersion | snippet
|
||||||
'g1' | 0.0 | 0.0 | 5.0 | 5.0 | 't1' | 'AAAA' | null | new FlowSnippetDTO()
|
'g1' | 0.0 | 0.0 | 5.0 | 5.0 | 't1' | 'AAAA' | null | new FlowSnippetDTO()
|
||||||
'g1' | 10.0 | 10.0 | 5.0 | 5.0 | 't1' | 'AAAA' | '0.7' | new FlowSnippetDTO(
|
'g1' | 10.0 | 10.0 | 5.0 | 5.0 | 't1' | 'AAAA' | '0.7' | new FlowSnippetDTO(
|
||||||
processors: [new ProcessorDTO(name: 'proc1', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 10, y: 10))])
|
processors: [new ProcessorDTO(id:"c81f6810-0155-1000-0000-c4af042cb1559", name: 'proc1', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 10, y: 10))])
|
||||||
'g1' | 10.0 | -10.0 | 5.0 | 5.0 | 't1' | 'AAAA' | null | new FlowSnippetDTO(
|
'g1' | 10.0 | -10.0 | 5.0 | 5.0 | 't1' | 'AAAA' | null | new FlowSnippetDTO(
|
||||||
processors: [new ProcessorDTO(name: 'proc2', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 10, y: 10))],
|
processors: [new ProcessorDTO(id:"c81f6810-0155-1000-0001-c4af042cb1559", name: 'proc2', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 10, y: 10))],
|
||||||
processGroups: [
|
processGroups: [
|
||||||
new ProcessGroupDTO(
|
new ProcessGroupDTO(id:"c81f6810-0a55-1000-0000-c4af042cb1559",
|
||||||
name: 'g2',
|
name: 'g2',
|
||||||
position: new PositionDTO(x: 105, y: -10),
|
position: new PositionDTO(x: 105, y: -10),
|
||||||
contents: new FlowSnippetDTO(processors: [new ProcessorDTO(name: 'proc3', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 50, y: 60))]))])
|
contents: new FlowSnippetDTO(processors: [new ProcessorDTO(id:"c81f6810-0155-1000-0002-c4af042cb1559", name: 'proc3', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 50, y: 60))]))])
|
||||||
'g1' | 10.0 | -10.0 | 5.0 | 5.0 | 't1' | 'AAAA' | '0.7' | new FlowSnippetDTO(
|
'g1' | 10.0 | -10.0 | 5.0 | 5.0 | 't1' | 'AAAA' | '0.7' | new FlowSnippetDTO(
|
||||||
processors: [new ProcessorDTO(name: 'proc2', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 10, y: 10))],
|
processors: [new ProcessorDTO(id:"c81f6810-0155-1000-0003-c4af042cb1559", name: 'proc2', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 10, y: 10))],
|
||||||
processGroups: [
|
processGroups: [
|
||||||
new ProcessGroupDTO(
|
new ProcessGroupDTO(id:"c81f6810-0a55-1000-0001-c4af042cb1559",
|
||||||
name: 'g2',
|
name: 'g2',
|
||||||
position: new PositionDTO(x: 105, y: -10),
|
position: new PositionDTO(x: 105, y: -10),
|
||||||
contents: new FlowSnippetDTO(processors: [new ProcessorDTO(name: 'proc3', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 50, y: 60))]))])
|
contents: new FlowSnippetDTO(processors: [new ProcessorDTO(id:"c81f6810-0155-1000-0004-c4af042cb1559", name: 'proc3', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 50, y: 60))]))])
|
||||||
'g1' | 10.0 | -10.0 | 5.0 | 5.0 | 't1' | 'AAAA' | '1.0' | new FlowSnippetDTO(
|
'g1' | 10.0 | -10.0 | 5.0 | 5.0 | 't1' | 'AAAA' | '1.0' | new FlowSnippetDTO(
|
||||||
processors: [new ProcessorDTO(name: 'proc2', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 10, y: 10))],
|
processors: [new ProcessorDTO(id:"c81f6810-0155-1000-0005-c4af042cb1559", name: 'proc2', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 10, y: 10))],
|
||||||
processGroups: [
|
processGroups: [
|
||||||
new ProcessGroupDTO(
|
new ProcessGroupDTO(id:"c81f6810-0a55-1000-0003-c4af042cb1559",
|
||||||
name: 'g2',
|
name: 'g2',
|
||||||
position: new PositionDTO(x: 105, y: -10),
|
position: new PositionDTO(x: 105, y: -10),
|
||||||
contents: new FlowSnippetDTO(processors: [new ProcessorDTO(name: 'proc3', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 50, y: 60))]))])
|
contents: new FlowSnippetDTO(processors: [new ProcessorDTO(id:"c81f6810-0155-1000-0006-c4af042cb1559", name: 'proc3', config: new ProcessorConfigDTO(), position: new PositionDTO(x: 50, y: 60))]))])
|
||||||
}
|
}
|
||||||
|
|
||||||
def PositionDTO calculateMoveAndScalePosition(position, oldOriginX, oldOriginY, newOriginX, newOriginY, factorX, factorY) {
|
def PositionDTO calculateMoveAndScalePosition(position, oldOriginX, oldOriginY, newOriginX, newOriginY, factorX, factorY) {
|
||||||
|
|
Loading…
Reference in New Issue