NIFI-282: Added send(byte[], Map<String, String>) method to avoid having to create a DataPacket object

This commit is contained in:
Mark Payne 2015-02-16 15:18:57 -05:00
parent e16fc7972c
commit 2f60ddc03a
4 changed files with 42 additions and 1 deletions

View File

@ -17,6 +17,7 @@
package org.apache.nifi.remote; package org.apache.nifi.remote;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.DataPacket;
@ -80,6 +81,16 @@ public interface Transaction {
*/ */
void send(DataPacket dataPacket) throws IOException; void send(DataPacket dataPacket) throws IOException;
/**
* Sends the given byte array as the content of a {@link DataPacket} along with the
* provided attributes
*
* @param content
* @param attributes
* @throws IOException
*/
void send(byte[] content, Map<String, String> attributes) throws IOException;
/** /**
* Retrieves information from the remote NiFi instance, if any is available. If no data is available, will return * Retrieves information from the remote NiFi instance, if any is available. If no data is available, will return
* {@code null}. It is important to consume all data from the remote NiFi instance before attempting to * {@code null}. It is important to consume all data from the remote NiFi instance before attempting to

View File

@ -17,6 +17,7 @@
package org.apache.nifi.remote.client.socket; package org.apache.nifi.remote.client.socket;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.nifi.remote.Communicant; import org.apache.nifi.remote.Communicant;
@ -185,6 +186,11 @@ public class SocketClient implements SiteToSiteClient {
transaction.send(dataPacket); transaction.send(dataPacket);
} }
@Override
public void send(final byte[] content, final Map<String, String> attributes) throws IOException {
transaction.send(content, attributes);
}
@Override @Override
public DataPacket receive() throws IOException { public DataPacket receive() throws IOException {
return transaction.receive(); return transaction.receive();

View File

@ -19,11 +19,27 @@ package org.apache.nifi.remote.protocol;
import java.io.InputStream; import java.io.InputStream;
import java.util.Map; import java.util.Map;
/**
* Represents a piece of data that is to be sent to or that was received from a NiFi instance.
*/
public interface DataPacket { public interface DataPacket {
/**
* The key-value attributes that are to be associated with the data
* @return
*/
Map<String, String> getAttributes(); Map<String, String> getAttributes();
/**
* An InputStream from which the content can be read
* @return
*/
InputStream getData(); InputStream getData();
/**
* The length of the InputStream.
* @return
*/
long getSize(); long getSize();
} }

View File

@ -16,11 +16,13 @@
*/ */
package org.apache.nifi.remote.protocol.socket; package org.apache.nifi.remote.protocol.socket;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Map;
import java.util.zip.CRC32; import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream; import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream; import java.util.zip.CheckedOutputStream;
@ -36,6 +38,7 @@ import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -171,6 +174,11 @@ public class SocketClientTransaction implements Transaction {
} }
@Override
public void send(final byte[] content, final Map<String, String> attributes) throws IOException {
send(new StandardDataPacket(attributes, new ByteArrayInputStream(content), content.length));
}
@Override @Override
public void send(final DataPacket dataPacket) throws IOException { public void send(final DataPacket dataPacket) throws IOException {
try { try {