mirror of https://github.com/apache/nifi.git
NIFI-11307 Removed S2S Toolkit (#7060)
This commit is contained in:
parent
23d6d6ede4
commit
ce484a0418
|
@ -28,7 +28,6 @@ The NiFi Toolkit contains several command line utilities to setup and support Ni
|
|||
* Flow Analyzer -- The `flow-analyzer` tool produces a report that helps administrators understand the max amount of data which can be stored in backpressure for a given flow.
|
||||
* Node Manager -- The `node-manager` tool enables administrators to perform status checks on nodes as well as the ability to connect, disconnect, or remove nodes from the cluster.
|
||||
* Notify -- The `notify` tool enables administrators to send bulletins to the NiFi UI.
|
||||
* S2S -- The `s2s` tool enables administrators to send data into or out of NiFi flows over site-to-site.
|
||||
* TLS Toolkit -- The `tls-toolkit` utility generates the required keystores, truststore, and relevant configuration files to facilitate the setup of a secure NiFi instance.
|
||||
* ZooKeeper Migrator -- The `zk-migrator` tool enables administrators to:
|
||||
** move ZooKeeper information from one ZooKeeper cluster to another
|
||||
|
@ -965,54 +964,6 @@ Executing the above command line should result in a bulletin appearing in NiFi:
|
|||
|
||||
image::nifi-notifications.png["NiFi Notifications"]
|
||||
|
||||
== S2S
|
||||
S2S is a command line tool (invoked as `./bin/s2s.sh` or `bin\s2s.bat`) that can either read a list of DataPackets from stdin to send over site-to-site or write the received DataPackets to stdout.
|
||||
|
||||
=== Usage
|
||||
To show help:
|
||||
|
||||
./bin/s2s.sh -h
|
||||
|
||||
The following are available options:
|
||||
|
||||
* `--batchCount <arg>` Number of flow files in a batch
|
||||
* `--batchDuration <arg>` Duration of a batch
|
||||
* `--batchSize <arg>` Size of flow files in a batch
|
||||
* `-c`,`--compression` Use compression
|
||||
* `-d`,`--direction` Direction (valid directions: `SEND`, `RECEIVE`) (default: `SEND`)
|
||||
* `-h`,`--help` Help Text (optional)
|
||||
* `-i`,`--portIdentifier <arg>` Port id
|
||||
* `--keystore <arg>` Keystore
|
||||
* `--keyStorePassword <arg>` Keystore password
|
||||
* `--keyStoreType <arg>` Keystore type (default: `JKS`)
|
||||
* `-n`,`--portName` Port name
|
||||
* `-p`,`--transportProtocol` Site to site transport protocol (default: `RAW`)
|
||||
* `--peerPersistenceFile <arg>` File to write peer information to so it can be recovered on restart
|
||||
* `--penalization <arg>` Penalization period
|
||||
* `--proxyHost <arg>` Proxy hostname
|
||||
* `--proxyPassword <arg>` Proxy password
|
||||
* `--proxyPort <arg>` Proxy port
|
||||
* `--proxyUsername <arg>` Proxy username
|
||||
* `--timeout <arg>` Timeout
|
||||
* `--trustStore <arg>` Truststore
|
||||
* `--trustStorePassword <arg>` Truststore password
|
||||
* `--trustStoreType <arg>` Truststore type (default: `JKS`)
|
||||
* `-u,--url <arg>` NiFI URL to connect to (default: `http://localhost:8080/nifi`)
|
||||
|
||||
The s2s cli input/output format is a JSON list of DataPackets. They can have the following formats:
|
||||
|
||||
[{"attributes":{"key":"value"},"data":"aGVsbG8gbmlmaQ=="}]
|
||||
|
||||
where data is the base64 encoded value of the FlowFile content (always used for received data) or:
|
||||
|
||||
[{"attributes":{"key":"value"},"dataFile":"/Users/pvillard/Documents/GitHub/nifi/nifi-toolkit/nifi-toolkit-assembly/target/nifi-toolkit-1.9.0-SNAPSHOT-bin/nifi-toolkit-1.9.0-SNAPSHOT/bin/EXAMPLE"}]
|
||||
|
||||
where dataFile is a file to read the FlowFile content from.
|
||||
|
||||
Example usage to send a FlowFile with the contents of "hey nifi" to a local unsecured NiFi over http with an input port named "input":
|
||||
|
||||
echo '[{"data":"aGV5IG5pZmk="}]' | bin/s2s.sh -n input -p http
|
||||
|
||||
[[tls_toolkit]]
|
||||
== TLS Toolkit
|
||||
In order to facilitate the secure setup of NiFi, you can use the `tls-toolkit` command line utility to automatically generate the required keystores, truststore, and relevant configuration files. This is especially useful for securing multiple NiFi nodes, which can be a tedious and error-prone process.
|
||||
|
|
|
@ -73,11 +73,6 @@ language governing permissions and limitations under the License. -->
|
|||
<artifactId>nifi-toolkit-encrypt-config</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-toolkit-s2s</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-toolkit-admin</artifactId>
|
||||
|
|
|
@ -1,41 +0,0 @@
|
|||
@echo off
|
||||
rem
|
||||
rem Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
rem contributor license agreements. See the NOTICE file distributed with
|
||||
rem this work for additional information regarding copyright ownership.
|
||||
rem The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
rem (the "License"); you may not use this file except in compliance with
|
||||
rem the License. You may obtain a copy of the License at
|
||||
rem
|
||||
rem http://www.apache.org/licenses/LICENSE-2.0
|
||||
rem
|
||||
rem Unless required by applicable law or agreed to in writing, software
|
||||
rem distributed under the License is distributed on an "AS IS" BASIS,
|
||||
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
rem See the License for the specific language governing permissions and
|
||||
rem limitations under the License.
|
||||
rem
|
||||
|
||||
rem Use JAVA_HOME if it's set; otherwise, just use java
|
||||
|
||||
if "%JAVA_HOME%" == "" goto noJavaHome
|
||||
if not exist "%JAVA_HOME%\bin\java.exe" goto noJavaHome
|
||||
set JAVA_EXE=%JAVA_HOME%\bin\java.exe
|
||||
goto startConfig
|
||||
|
||||
:noJavaHome
|
||||
echo The JAVA_HOME environment variable is not defined correctly.
|
||||
echo Instead the PATH will be used to find the java executable.
|
||||
echo.
|
||||
set JAVA_EXE=java
|
||||
goto startConfig
|
||||
|
||||
:startConfig
|
||||
set LIB_DIR=%~dp0..\classpath;%~dp0..\lib
|
||||
|
||||
if "%JAVA_OPTS%" == "" set JAVA_OPTS=-Xms128m -Xmx256m
|
||||
|
||||
SET JAVA_PARAMS=-cp %LIB_DIR%\* %JAVA_OPTS% org.apache.nifi.toolkit.s2s.SiteToSiteCliMain
|
||||
|
||||
cmd.exe /C ""%JAVA_EXE%" %JAVA_PARAMS% %* ""
|
||||
|
|
@ -1,119 +0,0 @@
|
|||
#!/bin/sh
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
#
|
||||
|
||||
# Script structure inspired from Apache Karaf and other Apache projects with similar startup approaches
|
||||
|
||||
SCRIPT_DIR=$(dirname "$0")
|
||||
SCRIPT_NAME=$(basename "$0")
|
||||
NIFI_TOOLKIT_HOME=$(cd "${SCRIPT_DIR}" && cd .. && pwd)
|
||||
PROGNAME=$(basename "$0")
|
||||
|
||||
|
||||
warn() {
|
||||
(>&2 echo "${PROGNAME}: $*")
|
||||
}
|
||||
|
||||
die() {
|
||||
warn "$*"
|
||||
exit 1
|
||||
}
|
||||
|
||||
detectOS() {
|
||||
# OS specific support (must be 'true' or 'false').
|
||||
cygwin=false;
|
||||
aix=false;
|
||||
os400=false;
|
||||
darwin=false;
|
||||
case "$(uname)" in
|
||||
CYGWIN*)
|
||||
cygwin=true
|
||||
;;
|
||||
AIX*)
|
||||
aix=true
|
||||
;;
|
||||
OS400*)
|
||||
os400=true
|
||||
;;
|
||||
Darwin)
|
||||
darwin=true
|
||||
;;
|
||||
esac
|
||||
# For AIX, set an environment variable
|
||||
if ${aix}; then
|
||||
export LDR_CNTRL=MAXDATA=0xB0000000@DSA
|
||||
echo ${LDR_CNTRL}
|
||||
fi
|
||||
}
|
||||
|
||||
locateJava() {
|
||||
# Setup the Java Virtual Machine
|
||||
if $cygwin ; then
|
||||
[ -n "${JAVA}" ] && JAVA=$(cygpath --unix "${JAVA}")
|
||||
[ -n "${JAVA_HOME}" ] && JAVA_HOME=$(cygpath --unix "${JAVA_HOME}")
|
||||
fi
|
||||
|
||||
if [ "x${JAVA}" = "x" ] && [ -r /etc/gentoo-release ] ; then
|
||||
JAVA_HOME=$(java-config --jre-home)
|
||||
fi
|
||||
if [ "x${JAVA}" = "x" ]; then
|
||||
if [ "x${JAVA_HOME}" != "x" ]; then
|
||||
if [ ! -d "${JAVA_HOME}" ]; then
|
||||
die "JAVA_HOME is not valid: ${JAVA_HOME}"
|
||||
fi
|
||||
JAVA="${JAVA_HOME}/bin/java"
|
||||
else
|
||||
warn "JAVA_HOME not set; results may vary"
|
||||
JAVA=$(type java)
|
||||
JAVA=$(expr "${JAVA}" : '.* \(/.*\)$')
|
||||
if [ "x${JAVA}" = "x" ]; then
|
||||
die "java command not found"
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
init() {
|
||||
# Determine if there is special OS handling we must perform
|
||||
detectOS
|
||||
|
||||
# Locate the Java VM to execute
|
||||
locateJava "$1"
|
||||
}
|
||||
|
||||
run() {
|
||||
LIBS="${NIFI_TOOLKIT_HOME}/lib/*"
|
||||
|
||||
sudo_cmd_prefix=""
|
||||
if $cygwin; then
|
||||
NIFI_TOOLKIT_HOME=$(cygpath --path --windows "${NIFI_TOOLKIT_HOME}")
|
||||
CLASSPATH="$NIFI_TOOLKIT_HOME/classpath;$(cygpath --path --windows "${LIBS}")"
|
||||
else
|
||||
CLASSPATH="$NIFI_TOOLKIT_HOME/classpath:${LIBS}"
|
||||
fi
|
||||
|
||||
export JAVA_HOME="$JAVA_HOME"
|
||||
export NIFI_TOOLKIT_HOME="$NIFI_TOOLKIT_HOME"
|
||||
|
||||
umask 0077
|
||||
exec "${JAVA}" -cp "${CLASSPATH}" ${JAVA_OPTS:--Xms128m -Xmx256m} org.apache.nifi.toolkit.s2s.SiteToSiteCliMain "$@"
|
||||
}
|
||||
|
||||
|
||||
init "$1"
|
||||
run "$@"
|
|
@ -1,48 +0,0 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-toolkit</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-toolkit-s2s</artifactId>
|
||||
<description>Site-to-site cli</description>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-site-to-site-client</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-cli</groupId>
|
||||
<artifactId>commons-cli</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -1,136 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.toolkit.s2s;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.nifi.remote.protocol.DataPacket;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Spliterator;
|
||||
import java.util.Spliterators;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
/**
|
||||
* DTO object for serializing and deserializing DataPackets via JSON
|
||||
*/
|
||||
public class DataPacketDto {
|
||||
public static final TypeReference<DataPacketDto> DATA_PACKET_DTO_TYPE_REFERENCE = new TypeReference<DataPacketDto>() {
|
||||
};
|
||||
private Map<String, String> attributes;
|
||||
private byte[] data;
|
||||
private String dataFile;
|
||||
|
||||
public DataPacketDto() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
public DataPacketDto(byte[] data) {
|
||||
this(new HashMap<>(), data);
|
||||
}
|
||||
|
||||
public DataPacketDto(Map<String, String> attributes, byte[] data) {
|
||||
this(attributes, data, null);
|
||||
}
|
||||
|
||||
public DataPacketDto(Map<String, String> attributes, String dataFile) {
|
||||
this(attributes, null, dataFile);
|
||||
}
|
||||
|
||||
public DataPacketDto(Map<String, String> attributes, byte[] data, String dataFile) {
|
||||
this.attributes = attributes;
|
||||
this.data = data;
|
||||
this.dataFile = dataFile;
|
||||
}
|
||||
|
||||
public static Stream<DataPacket> getDataPacketStream(InputStream inputStream) throws IOException {
|
||||
JsonParser jsonParser = new JsonFactory().createParser(inputStream);
|
||||
if (jsonParser.nextToken() != JsonToken.START_ARRAY) {
|
||||
throw new IOException("Expecting start array token to begin object array.");
|
||||
}
|
||||
jsonParser.setCodec(new ObjectMapper());
|
||||
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<DataPacket>() {
|
||||
DataPacket next = getNext();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return next != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataPacket next() {
|
||||
DataPacket next = this.next;
|
||||
this.next = getNext();
|
||||
return next;
|
||||
}
|
||||
|
||||
DataPacket getNext() throws RuntimeException {
|
||||
try {
|
||||
if (jsonParser.nextToken() == JsonToken.END_ARRAY) {
|
||||
return null;
|
||||
}
|
||||
DataPacketDto dataPacketDto = jsonParser.readValueAs(DATA_PACKET_DTO_TYPE_REFERENCE);
|
||||
return dataPacketDto.toDataPacket();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}, Spliterator.ORDERED), false);
|
||||
}
|
||||
|
||||
public Map<String, String> getAttributes() {
|
||||
return attributes;
|
||||
}
|
||||
|
||||
public void setAttributes(Map<String, String> attributes) {
|
||||
this.attributes = attributes;
|
||||
}
|
||||
|
||||
public byte[] getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public void setData(byte[] data) {
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public String getDataFile() {
|
||||
return dataFile;
|
||||
}
|
||||
|
||||
public void setDataFile(String dataFile) {
|
||||
this.dataFile = dataFile;
|
||||
}
|
||||
|
||||
public DataPacket toDataPacket() {
|
||||
return new DataPacketImpl(attributes, data, dataFile);
|
||||
}
|
||||
|
||||
public DataPacketDto putAttribute(String key, String value) {
|
||||
attributes.put(key, value);
|
||||
return this;
|
||||
}
|
||||
}
|
|
@ -1,97 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.toolkit.s2s;
|
||||
|
||||
import org.apache.nifi.remote.protocol.DataPacket;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Implements DataPacket either taking the data passed in or reading from a file on disk
|
||||
*/
|
||||
public class DataPacketImpl implements DataPacket {
|
||||
private final Map<String, String> attributes;
|
||||
private final byte[] data;
|
||||
private final String dataFile;
|
||||
|
||||
public DataPacketImpl(Map<String, String> attributes, byte[] data, String dataFile) {
|
||||
this.attributes = attributes == null ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap<>(attributes));
|
||||
this.data = data;
|
||||
this.dataFile = dataFile;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getAttributes() {
|
||||
return attributes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream getData() {
|
||||
if (data == null) {
|
||||
if (dataFile != null && dataFile.length() > 0) {
|
||||
try {
|
||||
return new FileInputStream(dataFile);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
return new ByteArrayInputStream(new byte[0]);
|
||||
}
|
||||
}
|
||||
return new ByteArrayInputStream(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSize() {
|
||||
if (data == null) {
|
||||
if (dataFile != null && dataFile.length() > 0) {
|
||||
return new File(dataFile).length();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
return data.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
DataPacketImpl that = (DataPacketImpl) o;
|
||||
|
||||
if (attributes != null ? !attributes.equals(that.attributes) : that.attributes != null) return false;
|
||||
return Arrays.equals(data, that.data);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = attributes != null ? attributes.hashCode() : 0;
|
||||
result = 31 * result + Arrays.hashCode(data);
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -1,259 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.toolkit.s2s;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude.Value;
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.CommandLineParser;
|
||||
import org.apache.commons.cli.DefaultParser;
|
||||
import org.apache.commons.cli.HelpFormatter;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.nifi.remote.TransferDirection;
|
||||
import org.apache.nifi.remote.client.KeystoreType;
|
||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
|
||||
import org.apache.nifi.remote.protocol.http.HttpProxy;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class SiteToSiteCliMain {
|
||||
public static final String URL_OPTION = "url";
|
||||
public static final String URL_OPTION_DEFAULT = "http://localhost:8080/nifi";
|
||||
public static final String DIRECTION_OPTION = "direction";
|
||||
public static final String DIRECTION_OPTION_DEFAULT = TransferDirection.SEND.toString();
|
||||
public static final String PORT_NAME_OPTION = "portName";
|
||||
public static final String PORT_IDENTIFIER_OPTION = "portIdentifier";
|
||||
public static final String TIMEOUT_OPTION = "timeout";
|
||||
public static final String PENALIZATION_OPTION = "penalization";
|
||||
public static final String KEYSTORE_OPTION = "keyStore";
|
||||
public static final String KEY_STORE_TYPE_OPTION = "keyStoreType";
|
||||
public static final String KEY_STORE_PASSWORD_OPTION = "keyStorePassword";
|
||||
public static final String TRUST_STORE_OPTION = "trustStore";
|
||||
public static final String TRUST_STORE_TYPE_OPTION = "trustStoreType";
|
||||
public static final String TRUST_STORE_PASSWORD_OPTION = "trustStorePassword";
|
||||
public static final String PEER_PERSISTENCE_FILE_OPTION = "peerPersistenceFile";
|
||||
public static final String COMPRESSION_OPTION = "compression";
|
||||
public static final String TRANSPORT_PROTOCOL_OPTION = "transportProtocol";
|
||||
public static final String TRANSPORT_PROTOCOL_OPTION_DEFAULT = SiteToSiteTransportProtocol.RAW.toString();
|
||||
public static final String BATCH_COUNT_OPTION = "batchCount";
|
||||
public static final String BATCH_SIZE_OPTION = "batchSize";
|
||||
public static final String BATCH_DURATION_OPTION = "batchDuration";
|
||||
public static final String HELP_OPTION = "help";
|
||||
public static final String PROXY_HOST_OPTION = "proxyHost";
|
||||
public static final String PROXY_PORT_OPTION = "proxyPort";
|
||||
public static final String PROXY_USERNAME_OPTION = "proxyUsername";
|
||||
public static final String PROXY_PASSWORD_OPTION = "proxyPassword";
|
||||
public static final String PROXY_PORT_OPTION_DEFAULT = "80";
|
||||
public static final String KEYSTORE_TYPE_OPTION_DEFAULT = KeystoreType.JKS.toString();
|
||||
|
||||
/**
|
||||
* Prints the usage to System.out
|
||||
*
|
||||
* @param errorMessage optional error message
|
||||
* @param options the options object to print usage for
|
||||
*/
|
||||
public static void printUsage(String errorMessage, Options options) {
|
||||
if (errorMessage != null) {
|
||||
System.out.println(errorMessage);
|
||||
System.out.println();
|
||||
System.out.println();
|
||||
}
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
|
||||
objectMapper.setDefaultPropertyInclusion(Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.ALWAYS));
|
||||
System.out.println("s2s is a command line tool that can either read a list of DataPackets from stdin to send over site-to-site or write the received DataPackets to stdout");
|
||||
System.out.println();
|
||||
System.out.println("The s2s cli input/output format is a JSON list of DataPackets. They can have the following formats:");
|
||||
try {
|
||||
System.out.println();
|
||||
objectMapper.writeValue(System.out, Arrays.asList(new DataPacketDto("hello nifi".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value")));
|
||||
System.out.println();
|
||||
System.out.println("Where data is the base64 encoded value of the FlowFile content (always used for received data) or");
|
||||
System.out.println();
|
||||
objectMapper.writeValue(System.out, Arrays.asList(new DataPacketDto(new HashMap<>(), new File("EXAMPLE").getAbsolutePath()).putAttribute("key", "value")));
|
||||
System.out.println();
|
||||
System.out.println("Where dataFile is a file to read the FlowFile content from");
|
||||
System.out.println();
|
||||
System.out.println();
|
||||
System.out.println("Example usage to send a FlowFile with the contents of \"hey nifi\" to a local unsecured NiFi over http with an input port named input:");
|
||||
System.out.print("echo '");
|
||||
DataPacketDto dataPacketDto = new DataPacketDto("hey nifi".getBytes(StandardCharsets.UTF_8));
|
||||
dataPacketDto.setAttributes(null);
|
||||
objectMapper.writeValue(System.out, Arrays.asList(dataPacketDto));
|
||||
System.out.println("' | bin/s2s.sh -n input -p http");
|
||||
System.out.println();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
HelpFormatter helpFormatter = new HelpFormatter();
|
||||
helpFormatter.setWidth(160);
|
||||
helpFormatter.printHelp("s2s", options);
|
||||
System.out.flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses command line options into a CliParse object
|
||||
*
|
||||
* @param options an empty options object (so callers can print usage if the parse fails
|
||||
* @param args the string array of arguments
|
||||
* @return a CliParse object containing the constructed SiteToSiteClient.Builder and a TransferDirection
|
||||
* @throws ParseException if there is an error parsing the command line
|
||||
*/
|
||||
public static CliParse parseCli(Options options, String[] args) throws ParseException {
|
||||
options.addOption("u", URL_OPTION, true, "NiFI URL to connect to (default: " + URL_OPTION_DEFAULT + ")");
|
||||
options.addOption("d", DIRECTION_OPTION, true, "Direction (valid directions: "
|
||||
+ Arrays.stream(TransferDirection.values()).map(Object::toString).collect(Collectors.joining(", ")) + ") (default: " + DIRECTION_OPTION_DEFAULT + ")");
|
||||
options.addOption("n", PORT_NAME_OPTION, true, "Port name");
|
||||
options.addOption("i", PORT_IDENTIFIER_OPTION, true, "Port id");
|
||||
options.addOption(null, TIMEOUT_OPTION, true, "Timeout");
|
||||
options.addOption(null, PENALIZATION_OPTION, true, "Penalization period");
|
||||
options.addOption(null, KEYSTORE_OPTION, true, "Keystore");
|
||||
options.addOption(null, KEY_STORE_TYPE_OPTION, true, "Keystore type (default: " + KEYSTORE_TYPE_OPTION_DEFAULT + ")");
|
||||
options.addOption(null, KEY_STORE_PASSWORD_OPTION, true, "Keystore password");
|
||||
options.addOption(null, TRUST_STORE_OPTION, true, "Truststore");
|
||||
options.addOption(null, TRUST_STORE_TYPE_OPTION, true, "Truststore type (default: " + KEYSTORE_TYPE_OPTION_DEFAULT + ")");
|
||||
options.addOption(null, TRUST_STORE_PASSWORD_OPTION, true, "Truststore password");
|
||||
options.addOption("c", COMPRESSION_OPTION, false, "Use compression");
|
||||
options.addOption(null, PEER_PERSISTENCE_FILE_OPTION, true, "File to write peer information to so it can be recovered on restart");
|
||||
options.addOption("p", TRANSPORT_PROTOCOL_OPTION, true, "Site to site transport protocol (default: " + TRANSPORT_PROTOCOL_OPTION_DEFAULT + ")");
|
||||
options.addOption(null, BATCH_COUNT_OPTION, true, "Number of flow files in a batch");
|
||||
options.addOption(null, BATCH_SIZE_OPTION, true, "Size of flow files in a batch");
|
||||
options.addOption(null, BATCH_DURATION_OPTION, true, "Duration of a batch");
|
||||
options.addOption(null, PROXY_HOST_OPTION, true, "Proxy hostname");
|
||||
options.addOption(null, PROXY_PORT_OPTION, true, "Proxy port");
|
||||
options.addOption(null, PROXY_USERNAME_OPTION, true, "Proxy username");
|
||||
options.addOption(null, PROXY_PASSWORD_OPTION, true, "Proxy password");
|
||||
options.addOption("h", HELP_OPTION, false, "Show help message and exit");
|
||||
CommandLineParser parser = new DefaultParser();
|
||||
CommandLine commandLine;
|
||||
commandLine = parser.parse(options, args);
|
||||
if (commandLine.hasOption(HELP_OPTION)) {
|
||||
printUsage(null, options);
|
||||
System.exit(1);
|
||||
}
|
||||
SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder();
|
||||
builder.url(commandLine.getOptionValue(URL_OPTION, URL_OPTION_DEFAULT));
|
||||
if (commandLine.hasOption(PORT_NAME_OPTION)) {
|
||||
builder.portName(commandLine.getOptionValue(PORT_NAME_OPTION));
|
||||
}
|
||||
if (commandLine.hasOption(PORT_IDENTIFIER_OPTION)) {
|
||||
builder.portIdentifier(commandLine.getOptionValue(PORT_IDENTIFIER_OPTION));
|
||||
}
|
||||
if (commandLine.hasOption(TIMEOUT_OPTION)) {
|
||||
builder.timeout(FormatUtils.getTimeDuration(commandLine.getOptionValue(TIMEOUT_OPTION), TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
if (commandLine.hasOption(PENALIZATION_OPTION)) {
|
||||
builder.nodePenalizationPeriod(FormatUtils.getTimeDuration(commandLine.getOptionValue(PENALIZATION_OPTION), TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
if (commandLine.hasOption(KEYSTORE_OPTION)) {
|
||||
builder.keystoreFilename(commandLine.getOptionValue(KEYSTORE_OPTION));
|
||||
builder.keystoreType(KeystoreType.valueOf(commandLine.getOptionValue(KEY_STORE_TYPE_OPTION, KEYSTORE_TYPE_OPTION_DEFAULT).toUpperCase()));
|
||||
|
||||
if (commandLine.hasOption(KEY_STORE_PASSWORD_OPTION)) {
|
||||
builder.keystorePass(commandLine.getOptionValue(KEY_STORE_PASSWORD_OPTION));
|
||||
} else {
|
||||
throw new ParseException("Must specify keystore password");
|
||||
}
|
||||
}
|
||||
if (commandLine.hasOption(TRUST_STORE_OPTION)) {
|
||||
builder.truststoreFilename(commandLine.getOptionValue(TRUST_STORE_OPTION));
|
||||
builder.truststoreType(KeystoreType.valueOf(commandLine.getOptionValue(TRUST_STORE_TYPE_OPTION, KEYSTORE_TYPE_OPTION_DEFAULT).toUpperCase()));
|
||||
|
||||
if (commandLine.hasOption(TRUST_STORE_PASSWORD_OPTION)) {
|
||||
builder.truststorePass(commandLine.getOptionValue(TRUST_STORE_PASSWORD_OPTION));
|
||||
} else {
|
||||
throw new ParseException("Must specify truststore password");
|
||||
}
|
||||
}
|
||||
if (commandLine.hasOption(COMPRESSION_OPTION)) {
|
||||
builder.useCompression(true);
|
||||
} else {
|
||||
builder.useCompression(false);
|
||||
}
|
||||
if (commandLine.hasOption(PEER_PERSISTENCE_FILE_OPTION)) {
|
||||
builder.peerPersistenceFile(new File(commandLine.getOptionValue(PEER_PERSISTENCE_FILE_OPTION)));
|
||||
}
|
||||
if (commandLine.hasOption(BATCH_COUNT_OPTION)) {
|
||||
builder.requestBatchCount(Integer.parseInt(commandLine.getOptionValue(BATCH_COUNT_OPTION)));
|
||||
}
|
||||
if (commandLine.hasOption(BATCH_SIZE_OPTION)) {
|
||||
builder.requestBatchSize(Long.parseLong(commandLine.getOptionValue(BATCH_SIZE_OPTION)));
|
||||
}
|
||||
if (commandLine.hasOption(BATCH_DURATION_OPTION)) {
|
||||
builder.requestBatchDuration(FormatUtils.getTimeDuration(commandLine.getOptionValue(BATCH_DURATION_OPTION), TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
if (commandLine.hasOption(PROXY_HOST_OPTION)) {
|
||||
builder.httpProxy(new HttpProxy(commandLine.getOptionValue(PROXY_HOST_OPTION), Integer.parseInt(commandLine.getOptionValue(PROXY_PORT_OPTION, PROXY_PORT_OPTION_DEFAULT)),
|
||||
commandLine.getOptionValue(PROXY_USERNAME_OPTION), commandLine.getOptionValue(PROXY_PASSWORD_OPTION)));
|
||||
}
|
||||
builder.transportProtocol(SiteToSiteTransportProtocol.valueOf(commandLine.getOptionValue(TRANSPORT_PROTOCOL_OPTION, TRANSPORT_PROTOCOL_OPTION_DEFAULT).toUpperCase()));
|
||||
TransferDirection transferDirection = TransferDirection.valueOf(commandLine.getOptionValue(DIRECTION_OPTION, DIRECTION_OPTION_DEFAULT));
|
||||
return new CliParse() {
|
||||
@Override
|
||||
public SiteToSiteClient.Builder getBuilder() {
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransferDirection getTransferDirection() {
|
||||
return transferDirection;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
// Make IO redirection useful
|
||||
PrintStream output = System.out;
|
||||
System.setOut(System.err);
|
||||
Options options = new Options();
|
||||
try {
|
||||
CliParse cliParse = parseCli(options, args);
|
||||
try (SiteToSiteClient siteToSiteClient = cliParse.getBuilder().build()) {
|
||||
if (cliParse.getTransferDirection() == TransferDirection.SEND) {
|
||||
new SiteToSiteSender(siteToSiteClient, System.in).sendFiles();
|
||||
} else {
|
||||
new SiteToSiteReceiver(siteToSiteClient, output).receiveFiles();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
printUsage(e.getMessage(), options);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Combines a SiteToSiteClient.Builder and TransferDirection into a return value for parseCli
|
||||
*/
|
||||
public interface CliParse {
|
||||
SiteToSiteClient.Builder getBuilder();
|
||||
|
||||
TransferDirection getTransferDirection();
|
||||
}
|
||||
}
|
|
@ -1,73 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.toolkit.s2s;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.remote.Transaction;
|
||||
import org.apache.nifi.remote.TransactionCompletion;
|
||||
import org.apache.nifi.remote.TransferDirection;
|
||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||
import org.apache.nifi.remote.protocol.DataPacket;
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Class that will print received DataPackets to output
|
||||
*/
|
||||
public class SiteToSiteReceiver {
|
||||
private final SiteToSiteClient siteToSiteClient;
|
||||
private final OutputStream output;
|
||||
|
||||
public SiteToSiteReceiver(SiteToSiteClient siteToSiteClient, OutputStream output) {
|
||||
this.siteToSiteClient = siteToSiteClient;
|
||||
this.output = output;
|
||||
}
|
||||
|
||||
public TransactionCompletion receiveFiles() throws IOException {
|
||||
Transaction transaction = siteToSiteClient.createTransaction(TransferDirection.RECEIVE);
|
||||
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(output);
|
||||
jsonGenerator.writeStartArray();
|
||||
DataPacket dataPacket;
|
||||
while ((dataPacket = transaction.receive()) != null) {
|
||||
jsonGenerator.writeStartObject();
|
||||
jsonGenerator.writeFieldName("attributes");
|
||||
jsonGenerator.writeStartObject();
|
||||
Map<String, String> attributes = dataPacket.getAttributes();
|
||||
if (attributes != null) {
|
||||
for (Map.Entry<String, String> stringStringEntry : attributes.entrySet()) {
|
||||
jsonGenerator.writeStringField(stringStringEntry.getKey(), stringStringEntry.getValue());
|
||||
}
|
||||
}
|
||||
jsonGenerator.writeEndObject();
|
||||
InputStream data = dataPacket.getData();
|
||||
if (data != null) {
|
||||
jsonGenerator.writeBinaryField("data", IOUtils.toByteArray(data));
|
||||
}
|
||||
jsonGenerator.writeEndObject();
|
||||
}
|
||||
jsonGenerator.writeEndArray();
|
||||
jsonGenerator.close();
|
||||
transaction.confirm();
|
||||
return transaction.complete();
|
||||
}
|
||||
}
|
|
@ -1,60 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.toolkit.s2s;
|
||||
|
||||
import org.apache.nifi.remote.Transaction;
|
||||
import org.apache.nifi.remote.TransactionCompletion;
|
||||
import org.apache.nifi.remote.TransferDirection;
|
||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
* Class that will send DataPackets read from input
|
||||
*/
|
||||
public class SiteToSiteSender {
|
||||
private final SiteToSiteClient siteToSiteClient;
|
||||
private final InputStream input;
|
||||
|
||||
public SiteToSiteSender(SiteToSiteClient siteToSiteClient, InputStream input) {
|
||||
this.siteToSiteClient = siteToSiteClient;
|
||||
this.input = input;
|
||||
}
|
||||
|
||||
public TransactionCompletion sendFiles() throws IOException {
|
||||
Transaction transaction = siteToSiteClient.createTransaction(TransferDirection.SEND);
|
||||
try {
|
||||
DataPacketDto.getDataPacketStream(input).forEachOrdered(d -> {
|
||||
try {
|
||||
transaction.send(d);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
} catch (RuntimeException e) {
|
||||
Throwable cause = e.getCause();
|
||||
if (cause instanceof IOException) {
|
||||
throw (IOException) cause;
|
||||
}
|
||||
throw new IOException(e.getMessage(), e);
|
||||
}
|
||||
transaction.confirm();
|
||||
return transaction.complete();
|
||||
}
|
||||
}
|
|
@ -1,96 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.toolkit.s2s;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.nifi.remote.protocol.DataPacket;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
|
||||
public class DataPacketDtoTest {
|
||||
public static DataPacketDto create(byte[] data) {
|
||||
return new DataPacketDto(new HashMap<>(), data);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoArgConstructor() {
|
||||
DataPacketDto dataPacketDto = new DataPacketDto();
|
||||
assertEquals(0, dataPacketDto.getAttributes().size());
|
||||
assertNull(dataPacketDto.getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSetAttributes() {
|
||||
DataPacketDto dataPacketDto = create(null);
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("key", "value");
|
||||
dataPacketDto.setAttributes(attributes);
|
||||
assertEquals(attributes, Collections.unmodifiableMap(dataPacketDto.getAttributes()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDataFileConstructor() {
|
||||
String dataFile = "dataFile";
|
||||
assertEquals(dataFile, new DataPacketDto(null, dataFile).getDataFile());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParserNone() throws IOException {
|
||||
List<DataPacket> dataPackets = DataPacketDto.getDataPacketStream(new ByteArrayInputStream(("[]").getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList());
|
||||
assertEquals(0, dataPackets.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParserSingle() throws IOException {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
StringBuilder stringBuilder = new StringBuilder("[");
|
||||
DataPacketDto dataPacketDto = new DataPacketDto("test data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value");
|
||||
stringBuilder.append(objectMapper.writeValueAsString(dataPacketDto));
|
||||
stringBuilder.append("]");
|
||||
List<DataPacket> dataPackets = DataPacketDto.getDataPacketStream(new ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList());
|
||||
assertEquals(1, dataPackets.size());
|
||||
assertEquals(dataPacketDto.toDataPacket(), dataPackets.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParserMultiple() throws IOException {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
StringBuilder stringBuilder = new StringBuilder("[");
|
||||
DataPacketDto dataPacketDto = new DataPacketDto("test data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value");
|
||||
stringBuilder.append(objectMapper.writeValueAsString(dataPacketDto));
|
||||
DataPacketDto dataPacketDto2 = new DataPacketDto("test data 2".getBytes(StandardCharsets.UTF_8)).putAttribute("key2", "value2");
|
||||
stringBuilder.append(",");
|
||||
stringBuilder.append(objectMapper.writeValueAsString(dataPacketDto2));
|
||||
stringBuilder.append("]");
|
||||
List<DataPacket> dataPackets = DataPacketDto.getDataPacketStream(new ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))).collect(Collectors.toList());
|
||||
assertEquals(2, dataPackets.size());
|
||||
assertEquals(dataPacketDto.toDataPacket(), dataPackets.get(0));
|
||||
assertEquals(dataPacketDto2.toDataPacket(), dataPackets.get(1));
|
||||
}
|
||||
}
|
|
@ -1,85 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.toolkit.s2s;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class DataPacketImplTest {
|
||||
private Map<String, String> testAttributes;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
testAttributes = new HashMap<>();
|
||||
testAttributes.put("testKey", "testVal");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPacketNulls() throws IOException {
|
||||
DataPacketImpl dataPacket = new DataPacketImpl(null, null, null);
|
||||
assertEquals(0, dataPacket.getAttributes().size());
|
||||
assertEquals(-1, dataPacket.getData().read(new byte[1]));
|
||||
assertEquals(0, dataPacket.getSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPacketAttributes() {
|
||||
assertEquals(Collections.unmodifiableMap(testAttributes), new DataPacketImpl(testAttributes, null, null).getAttributes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPacketData() throws IOException {
|
||||
byte[] testData = "test data".getBytes(StandardCharsets.UTF_8);
|
||||
DataPacketImpl dataPacket = new DataPacketImpl(null, testData, null);
|
||||
assertEquals(testData.length, dataPacket.getSize());
|
||||
assertArrayEquals(testData, IOUtils.toByteArray(dataPacket.getData()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDataFile() throws IOException {
|
||||
byte[] testData = "test data".getBytes(StandardCharsets.UTF_8);
|
||||
File tempFile = File.createTempFile("abc", "def");
|
||||
try {
|
||||
try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
|
||||
fileOutputStream.write(testData);
|
||||
}
|
||||
DataPacketImpl dataPacket = new DataPacketImpl(null, null, tempFile.getAbsolutePath());
|
||||
assertEquals(testData.length, dataPacket.getSize());
|
||||
try (InputStream input = dataPacket.getData()) {
|
||||
assertArrayEquals(testData, IOUtils.toByteArray(input));
|
||||
}
|
||||
} finally {
|
||||
if (!tempFile.delete()) {
|
||||
tempFile.deleteOnExit();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,245 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.toolkit.s2s;
|
||||
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.nifi.remote.TransferDirection;
|
||||
import org.apache.nifi.remote.client.KeystoreType;
|
||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
|
||||
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
|
||||
import org.apache.nifi.remote.protocol.http.HttpProxy;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
|
||||
public class SiteToSiteCliMainTest {
|
||||
private String expectedUrl;
|
||||
private TransferDirection expectedTransferDirection;
|
||||
private SiteToSiteTransportProtocol expectedSiteToSiteTransportProtocol;
|
||||
private String expectedPortName;
|
||||
private String expectedPortIdentifier;
|
||||
private long expectedTimeoutNs;
|
||||
private long expectedPenalizationNs;
|
||||
private String expectedKeystoreFilename;
|
||||
private String expectedKeystorePass;
|
||||
private KeystoreType expectedKeystoreType;
|
||||
private String expectedTruststoreFilename;
|
||||
private String expectedTruststorePass;
|
||||
private KeystoreType expectedTruststoreType;
|
||||
private boolean expectedCompression;
|
||||
private File expectedPeerPersistenceFile;
|
||||
private int expectedBatchCount;
|
||||
private long expectedBatchDuration;
|
||||
private long expectedBatchSize;
|
||||
private HttpProxy expectedHttpProxy;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
SiteToSiteClient.Builder builder = new SiteToSiteClient.Builder();
|
||||
expectedUrl = SiteToSiteCliMain.URL_OPTION_DEFAULT;
|
||||
expectedTransferDirection = TransferDirection.valueOf(SiteToSiteCliMain.DIRECTION_OPTION_DEFAULT);
|
||||
expectedSiteToSiteTransportProtocol = SiteToSiteTransportProtocol.valueOf(SiteToSiteCliMain.TRANSPORT_PROTOCOL_OPTION_DEFAULT);
|
||||
expectedPortName = builder.getPortName();
|
||||
expectedPortIdentifier = builder.getPortIdentifier();
|
||||
expectedTimeoutNs = builder.getTimeout(TimeUnit.NANOSECONDS);
|
||||
expectedPenalizationNs = builder.getPenalizationPeriod(TimeUnit.NANOSECONDS);
|
||||
expectedKeystoreFilename = builder.getKeystoreFilename();
|
||||
expectedKeystorePass = builder.getKeystorePass();
|
||||
expectedKeystoreType = builder.getKeystoreType();
|
||||
expectedTruststoreFilename = builder.getTruststoreFilename();
|
||||
expectedTruststorePass = builder.getTruststorePass();
|
||||
expectedTruststoreType = builder.getTruststoreType();
|
||||
expectedCompression = false;
|
||||
expectedPeerPersistenceFile = builder.getPeerPersistenceFile();
|
||||
SiteToSiteClientConfig siteToSiteClientConfig = builder.buildConfig();
|
||||
expectedBatchCount = siteToSiteClientConfig.getPreferredBatchCount();
|
||||
expectedBatchDuration = siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS);
|
||||
expectedBatchSize = siteToSiteClientConfig.getPreferredBatchSize();
|
||||
expectedHttpProxy = siteToSiteClientConfig.getHttpProxy();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseNoArgs() throws ParseException {
|
||||
parseAndCheckExpected(new String[0]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseUrl() throws ParseException {
|
||||
expectedUrl = "http://fake.url:8080/nifi";
|
||||
parseAndCheckExpected("u", SiteToSiteCliMain.URL_OPTION, expectedUrl);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParsePortName() throws ParseException {
|
||||
expectedPortName = "testPortName";
|
||||
parseAndCheckExpected("n", SiteToSiteCliMain.PORT_NAME_OPTION, expectedPortName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParsePortIdentifier() throws ParseException {
|
||||
expectedPortIdentifier = "testPortId";
|
||||
parseAndCheckExpected("i", SiteToSiteCliMain.PORT_IDENTIFIER_OPTION, expectedPortIdentifier);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseTimeout() throws ParseException {
|
||||
expectedTimeoutNs = TimeUnit.DAYS.toNanos(3);
|
||||
parseAndCheckExpected(null, SiteToSiteCliMain.TIMEOUT_OPTION, "3 days");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParsePenalization() throws ParseException {
|
||||
expectedPenalizationNs = TimeUnit.HOURS.toNanos(4);
|
||||
parseAndCheckExpected(null, SiteToSiteCliMain.PENALIZATION_OPTION, "4 hours");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseKeystore() throws ParseException {
|
||||
expectedKeystoreFilename = "keystore.pkcs12";
|
||||
expectedKeystorePass = "badPassword";
|
||||
expectedKeystoreType = KeystoreType.PKCS12;
|
||||
parseAndCheckExpected(new String[]{
|
||||
"--" + SiteToSiteCliMain.KEYSTORE_OPTION, expectedKeystoreFilename,
|
||||
"--" + SiteToSiteCliMain.KEY_STORE_PASSWORD_OPTION, expectedKeystorePass,
|
||||
"--" + SiteToSiteCliMain.KEY_STORE_TYPE_OPTION, expectedKeystoreType.toString()
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseTruststore() throws ParseException {
|
||||
expectedTruststoreFilename = "truststore.pkcs12";
|
||||
expectedTruststorePass = "badPassword";
|
||||
expectedTruststoreType = KeystoreType.PKCS12;
|
||||
parseAndCheckExpected(new String[]{
|
||||
"--" + SiteToSiteCliMain.TRUST_STORE_OPTION, expectedTruststoreFilename,
|
||||
"--" + SiteToSiteCliMain.TRUST_STORE_PASSWORD_OPTION, expectedTruststorePass,
|
||||
"--" + SiteToSiteCliMain.TRUST_STORE_TYPE_OPTION, expectedTruststoreType.toString()
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseCompression() throws ParseException {
|
||||
expectedCompression = true;
|
||||
parseAndCheckExpected("c", SiteToSiteCliMain.COMPRESSION_OPTION, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParsePeerPersistenceFile() throws ParseException {
|
||||
String pathname = "test";
|
||||
expectedPeerPersistenceFile = new File(pathname);
|
||||
parseAndCheckExpected(null, SiteToSiteCliMain.PEER_PERSISTENCE_FILE_OPTION, pathname);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseBatchCount() throws ParseException {
|
||||
expectedBatchCount = 55;
|
||||
parseAndCheckExpected(null, SiteToSiteCliMain.BATCH_COUNT_OPTION, Integer.toString(expectedBatchCount));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseBatchDuration() throws ParseException {
|
||||
expectedBatchDuration = TimeUnit.MINUTES.toNanos(5);
|
||||
parseAndCheckExpected(null, SiteToSiteCliMain.BATCH_DURATION_OPTION, "5 min");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseBatchSize() throws ParseException {
|
||||
expectedBatchSize = 1026;
|
||||
parseAndCheckExpected(null, SiteToSiteCliMain.BATCH_SIZE_OPTION, Long.toString(expectedBatchSize));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseProxy() throws ParseException {
|
||||
String expectedHost = "testHost";
|
||||
int expectedPort = 292;
|
||||
String expectedUser = "testUser";
|
||||
String expectedPassword = "badPassword";
|
||||
expectedHttpProxy = new HttpProxy(expectedHost, expectedPort, expectedUser, expectedPassword);
|
||||
parseAndCheckExpected(new String[]{
|
||||
"--" + SiteToSiteCliMain.PROXY_HOST_OPTION, expectedHost,
|
||||
"--" + SiteToSiteCliMain.PROXY_PORT_OPTION, Integer.toString(expectedPort),
|
||||
"--" + SiteToSiteCliMain.PROXY_USERNAME_OPTION, expectedUser,
|
||||
"--" + SiteToSiteCliMain.PROXY_PASSWORD_OPTION, expectedPassword});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseTransferDirection() throws ParseException {
|
||||
expectedTransferDirection = TransferDirection.RECEIVE;
|
||||
parseAndCheckExpected("d", SiteToSiteCliMain.DIRECTION_OPTION, expectedTransferDirection.toString());
|
||||
}
|
||||
|
||||
private void parseAndCheckExpected(String shortOption, String longOption, String value) throws ParseException {
|
||||
if (shortOption != null) {
|
||||
String[] args;
|
||||
if (value == null) {
|
||||
args = new String[]{"-" + shortOption};
|
||||
} else {
|
||||
args = new String[]{"-" + shortOption, value};
|
||||
}
|
||||
parseAndCheckExpected(args);
|
||||
}
|
||||
String[] args;
|
||||
if (value == null) {
|
||||
args = new String[]{"--" + longOption};
|
||||
} else {
|
||||
args = new String[]{"--" + longOption, value};
|
||||
}
|
||||
parseAndCheckExpected(args);
|
||||
}
|
||||
|
||||
private void parseAndCheckExpected(String[] args) throws ParseException {
|
||||
SiteToSiteCliMain.CliParse cliParse = SiteToSiteCliMain.parseCli(new Options(), args);
|
||||
SiteToSiteClient.Builder builder = cliParse.getBuilder();
|
||||
assertEquals(expectedUrl, builder.getUrl());
|
||||
assertEquals(expectedSiteToSiteTransportProtocol, builder.getTransportProtocol());
|
||||
assertEquals(expectedPortName, builder.getPortName());
|
||||
assertEquals(expectedPortIdentifier, builder.getPortIdentifier());
|
||||
assertEquals(expectedTimeoutNs, builder.getTimeout(TimeUnit.NANOSECONDS));
|
||||
assertEquals(expectedPenalizationNs, builder.getPenalizationPeriod(TimeUnit.NANOSECONDS));
|
||||
assertEquals(expectedKeystoreFilename, builder.getKeystoreFilename());
|
||||
assertEquals(expectedKeystorePass, builder.getKeystorePass());
|
||||
assertEquals(expectedKeystoreType, builder.getKeystoreType());
|
||||
assertEquals(expectedTruststoreFilename, builder.getTruststoreFilename());
|
||||
assertEquals(expectedTruststorePass, builder.getTruststorePass());
|
||||
assertEquals(expectedTruststoreType, builder.getTruststoreType());
|
||||
assertEquals(expectedCompression, builder.isUseCompression());
|
||||
assertEquals(expectedPeerPersistenceFile, builder.getPeerPersistenceFile());
|
||||
if (expectedHttpProxy == null) {
|
||||
assertNull(builder.getHttpProxy());
|
||||
} else {
|
||||
HttpProxy httpProxy = builder.getHttpProxy();
|
||||
assertNotNull(httpProxy);
|
||||
assertEquals(expectedHttpProxy.getHttpHost(), httpProxy.getHttpHost());
|
||||
assertEquals(expectedHttpProxy.getUsername(), httpProxy.getUsername());
|
||||
assertEquals(expectedHttpProxy.getPassword(), httpProxy.getPassword());
|
||||
}
|
||||
SiteToSiteClientConfig siteToSiteClientConfig = builder.buildConfig();
|
||||
assertEquals(expectedBatchCount, siteToSiteClientConfig.getPreferredBatchCount());
|
||||
assertEquals(expectedBatchDuration, siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.NANOSECONDS));
|
||||
assertEquals(expectedBatchSize, siteToSiteClientConfig.getPreferredBatchSize());
|
||||
assertEquals(expectedTransferDirection, cliParse.getTransferDirection());
|
||||
}
|
||||
}
|
|
@ -1,99 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.toolkit.s2s;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude.Value;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.nifi.remote.Transaction;
|
||||
import org.apache.nifi.remote.TransactionCompletion;
|
||||
import org.apache.nifi.remote.TransferDirection;
|
||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class SiteToSiteReceiverTest {
|
||||
private final ObjectMapper objectMapper = new ObjectMapper().setDefaultPropertyInclusion(Value.construct(Include.NON_NULL, Include.ALWAYS));
|
||||
@Mock
|
||||
SiteToSiteClient siteToSiteClient;
|
||||
@Mock
|
||||
Transaction transaction;
|
||||
@Mock
|
||||
TransactionCompletion transactionCompletion;
|
||||
ByteArrayOutputStream data;
|
||||
private final Supplier<SiteToSiteReceiver> receiverSupplier = () -> new SiteToSiteReceiver(siteToSiteClient, data);
|
||||
ByteArrayOutputStream expectedData;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() throws IOException {
|
||||
data = new ByteArrayOutputStream();
|
||||
expectedData = new ByteArrayOutputStream();
|
||||
when(siteToSiteClient.createTransaction(TransferDirection.RECEIVE)).thenReturn(transaction);
|
||||
when(transaction.complete()).thenAnswer(invocation -> {
|
||||
verify(siteToSiteClient).createTransaction(TransferDirection.RECEIVE);
|
||||
verify(transaction).confirm();
|
||||
return transactionCompletion;
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmpty() throws IOException {
|
||||
assertEquals(transactionCompletion, receiverSupplier.get().receiveFiles());
|
||||
|
||||
objectMapper.writeValue(expectedData, Collections.emptyList());
|
||||
assertEquals(expectedData.toString(), data.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingle() throws IOException {
|
||||
DataPacketDto dataPacketDto = new DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value");
|
||||
when(transaction.receive()).thenReturn(dataPacketDto.toDataPacket()).thenReturn(null);
|
||||
|
||||
assertEquals(transactionCompletion, receiverSupplier.get().receiveFiles());
|
||||
|
||||
objectMapper.writeValue(expectedData, Collections.singletonList(dataPacketDto));
|
||||
assertEquals(expectedData.toString(), data.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMulti() throws IOException {
|
||||
DataPacketDto dataPacketDto = new DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value");
|
||||
DataPacketDto dataPacketDto2 = new DataPacketDto("test-data2".getBytes(StandardCharsets.UTF_8)).putAttribute("key2", "value2");
|
||||
when(transaction.receive()).thenReturn(dataPacketDto.toDataPacket()).thenReturn(dataPacketDto2.toDataPacket()).thenReturn(null);
|
||||
|
||||
assertEquals(transactionCompletion, receiverSupplier.get().receiveFiles());
|
||||
|
||||
objectMapper.writeValue(expectedData, Arrays.asList(dataPacketDto, dataPacketDto2));
|
||||
assertEquals(expectedData.toString(), data.toString());
|
||||
}
|
||||
}
|
|
@ -1,132 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.toolkit.s2s;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.nifi.remote.Transaction;
|
||||
import org.apache.nifi.remote.TransactionCompletion;
|
||||
import org.apache.nifi.remote.TransferDirection;
|
||||
import org.apache.nifi.remote.client.SiteToSiteClient;
|
||||
import org.apache.nifi.remote.protocol.DataPacket;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class SiteToSiteSenderTest {
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
@Mock
|
||||
SiteToSiteClient siteToSiteClient;
|
||||
@Mock
|
||||
Transaction transaction;
|
||||
@Mock
|
||||
TransactionCompletion transactionCompletion;
|
||||
ByteArrayOutputStream data;
|
||||
private final Supplier<SiteToSiteSender> senderSupplier = () -> new SiteToSiteSender(siteToSiteClient, new ByteArrayInputStream(data.toByteArray()));
|
||||
|
||||
@BeforeEach
|
||||
public void setup() throws IOException {
|
||||
data = new ByteArrayOutputStream();
|
||||
when(siteToSiteClient.createTransaction(TransferDirection.SEND)).thenReturn(transaction);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyList() throws IOException {
|
||||
setTransactionCompletion();
|
||||
|
||||
objectMapper.writeValue(data, Collections.emptyList());
|
||||
assertEquals(transactionCompletion, senderSupplier.get().sendFiles());
|
||||
verify(transaction, never()).send(any(DataPacket.class));
|
||||
verify(transaction).complete();
|
||||
verifyNoMoreInteractions(siteToSiteClient, transaction, transactionCompletion);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleElement() throws IOException {
|
||||
setTransactionCompletion();
|
||||
|
||||
DataPacketDto dataPacketDto = new DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value");
|
||||
objectMapper.writeValue(data, Arrays.stream(new DataPacketDto[]{dataPacketDto}).collect(Collectors.toList()));
|
||||
assertEquals(transactionCompletion, senderSupplier.get().sendFiles());
|
||||
verify(transaction).send(dataPacketDto.toDataPacket());
|
||||
verify(transaction).complete();
|
||||
verifyNoMoreInteractions(siteToSiteClient, transaction, transactionCompletion);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleElements() throws IOException {
|
||||
setTransactionCompletion();
|
||||
|
||||
DataPacketDto dataPacketDto = new DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value");
|
||||
DataPacketDto dataPacketDto2 = new DataPacketDto("test-data2".getBytes(StandardCharsets.UTF_8)).putAttribute("key2", "value2");
|
||||
objectMapper.writeValue(data, Arrays.stream(new DataPacketDto[]{dataPacketDto, dataPacketDto2}).collect(Collectors.toList()));
|
||||
assertEquals(transactionCompletion, senderSupplier.get().sendFiles());
|
||||
verify(transaction).send(dataPacketDto.toDataPacket());
|
||||
verify(transaction).send(dataPacketDto2.toDataPacket());
|
||||
verify(transaction).complete();
|
||||
verifyNoMoreInteractions(siteToSiteClient, transaction, transactionCompletion);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIOException() throws IOException {
|
||||
IOException test = new IOException("test");
|
||||
DataPacketDto dataPacketDto = new DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value");
|
||||
objectMapper.writeValue(data, Arrays.stream(new DataPacketDto[]{dataPacketDto}).collect(Collectors.toList()));
|
||||
doThrow(test).when(transaction).send(any(DataPacket.class));
|
||||
|
||||
assertThrows(IOException.class, () -> senderSupplier.get().sendFiles());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRuntimeException() throws IOException {
|
||||
RuntimeException test = new RuntimeException("test");
|
||||
DataPacketDto dataPacketDto = new DataPacketDto("test-data".getBytes(StandardCharsets.UTF_8)).putAttribute("key", "value");
|
||||
objectMapper.writeValue(data, Arrays.stream(new DataPacketDto[]{dataPacketDto}).collect(Collectors.toList()));
|
||||
doThrow(test).when(transaction).send(any(DataPacket.class));
|
||||
|
||||
assertThrows(IOException.class, () -> senderSupplier.get().sendFiles());
|
||||
}
|
||||
|
||||
private void setTransactionCompletion() throws IOException {
|
||||
when(transaction.complete()).thenAnswer(invocation -> {
|
||||
verify(siteToSiteClient).createTransaction(TransferDirection.SEND);
|
||||
verify(transaction).confirm();
|
||||
return transactionCompletion;
|
||||
});
|
||||
}
|
||||
}
|
|
@ -25,7 +25,6 @@
|
|||
<modules>
|
||||
<module>nifi-toolkit-tls</module>
|
||||
<module>nifi-toolkit-encrypt-config</module>
|
||||
<module>nifi-toolkit-s2s</module>
|
||||
<module>nifi-toolkit-admin</module>
|
||||
<module>nifi-toolkit-zookeeper-migrator</module>
|
||||
<module>nifi-toolkit-flowfile-repo</module>
|
||||
|
|
Loading…
Reference in New Issue