NIFI-9805 Refactored Distributed Cache Servers using Netty

- Added Map and Set Cache Servers based on nifi-event-transport components
- Removed custom servers and unused socket stream components
- Reduced duplication on protocol classes
- Added checks for readable bytes
- Added mark and reset handling for buffer reads

This closes #6040
Signed-off-by: Paul Grey <greyp@apache.org>
This commit is contained in:
exceptionfactory 2022-05-12 00:05:11 -05:00 committed by Paul Grey
parent 85d5e2071b
commit fe424a2d42
No known key found for this signature in database
GPG Key ID: 8DDF32B9C7EE39D0
39 changed files with 1931 additions and 1236 deletions

View File

@ -1,188 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.io.socket;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Set;
public class SocketChannelInputStream extends InputStream {
private final SocketChannel channel;
private volatile int timeoutMillis = 30000;
private volatile boolean interrupted = false;
private final Selector readSelector;
private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
private Byte bufferedByte = null;
public SocketChannelInputStream(final SocketChannel socketChannel) throws IOException {
// this class expects a non-blocking channel
socketChannel.configureBlocking(false);
this.channel = socketChannel;
readSelector = Selector.open();
this.channel.register(readSelector, SelectionKey.OP_READ);
}
public void setTimeout(final int timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}
public void consume() throws IOException {
channel.shutdownInput();
final byte[] b = new byte[4096];
final ByteBuffer buffer = ByteBuffer.wrap(b);
int bytesRead;
do {
bytesRead = channel.read(buffer);
buffer.flip();
} while (bytesRead > 0);
}
@Override
public int read() throws IOException {
if (bufferedByte != null) {
final int retVal = bufferedByte & 0xFF;
bufferedByte = null;
return retVal;
}
oneByteBuffer.flip();
oneByteBuffer.clear();
final long maxTime = System.currentTimeMillis() + timeoutMillis;
waitForReady();
int bytesRead;
do {
bytesRead = channel.read(oneByteBuffer);
if (bytesRead == 0) {
if (System.currentTimeMillis() > maxTime) {
throw new SocketTimeoutException("Timed out reading from socket");
}
}
} while (bytesRead == 0);
if (bytesRead == -1) {
return -1;
}
oneByteBuffer.flip();
return oneByteBuffer.get() & 0xFF;
}
@Override
public int read(final byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
if (bufferedByte != null) {
final byte retVal = bufferedByte;
bufferedByte = null;
b[off] = retVal;
return 1;
}
waitForReady();
final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
final long maxTime = System.currentTimeMillis() + timeoutMillis;
int bytesRead;
do {
bytesRead = channel.read(buffer);
if (bytesRead == 0) {
if (System.currentTimeMillis() > maxTime) {
throw new SocketTimeoutException("Timed out reading from socket");
}
}
} while (bytesRead == 0);
return bytesRead;
}
private void waitForReady() throws IOException {
int readyCount = readSelector.select(timeoutMillis);
if (readyCount < 1) {
if (interrupted) {
throw new TransmissionDisabledException();
}
throw new SocketTimeoutException("Timed out reading from socket");
}
final Set<SelectionKey> selectedKeys = readSelector.selectedKeys();
selectedKeys.clear(); // clear the selected keys so that the Selector will be able to add them back to the ready set next time they are ready.
}
@Override
public int available() throws IOException {
if (bufferedByte != null) {
return 1;
}
isDataAvailable(); // attempt to read from socket
return (bufferedByte == null) ? 0 : 1;
}
public boolean isDataAvailable() throws IOException {
if (bufferedByte != null) {
return true;
}
oneByteBuffer.flip();
oneByteBuffer.clear();
final int bytesRead = channel.read(oneByteBuffer);
if (bytesRead == -1) {
throw new EOFException("Peer has closed the stream");
}
if (bytesRead > 0) {
oneByteBuffer.flip();
bufferedByte = oneByteBuffer.get();
return true;
}
return false;
}
public void interrupt() {
interrupted = true;
readSelector.wakeup();
}
/**
* Closes the underlying socket channel.
*
* @throws java.io.IOException for issues closing underlying stream
*/
@Override
public void close() throws IOException {
channel.close();
readSelector.close();
}
}

View File

@ -1,124 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.io.socket;
import java.io.IOException;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit;
public class SocketChannelOutputStream extends OutputStream {
private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
private final SocketChannel channel;
private volatile int timeout = 30000;
private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
public SocketChannelOutputStream(final SocketChannel socketChannel) throws IOException {
// this class expects a non-blocking channel
socketChannel.configureBlocking(false);
this.channel = socketChannel;
}
public void setTimeout(final int timeoutMillis) {
this.timeout = timeoutMillis;
}
@Override
public void write(final int b) throws IOException {
oneByteBuffer.flip();
oneByteBuffer.clear();
oneByteBuffer.put((byte) b);
oneByteBuffer.flip();
final int timeoutMillis = this.timeout;
long maxTime = System.currentTimeMillis() + timeoutMillis;
int bytesWritten;
long sleepNanos = 1L;
while (oneByteBuffer.hasRemaining()) {
bytesWritten = channel.write(oneByteBuffer);
if (bytesWritten == 0) {
if (System.currentTimeMillis() > maxTime) {
throw new SocketTimeoutException("Timed out writing to socket");
}
try {
TimeUnit.NANOSECONDS.sleep(sleepNanos);
} catch (InterruptedException e) {
close();
Thread.currentThread().interrupt(); // set the interrupt status
throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
}
sleepNanos = Math.min(sleepNanos * 2, CHANNEL_FULL_WAIT_NANOS);
} else {
return;
}
}
}
@Override
public void write(final byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
final int timeoutMillis = this.timeout;
long maxTime = System.currentTimeMillis() + timeoutMillis;
int bytesWritten;
long sleepNanos = 1L;
while (buffer.hasRemaining()) {
bytesWritten = channel.write(buffer);
if (bytesWritten == 0) {
if (System.currentTimeMillis() > maxTime) {
throw new SocketTimeoutException("Timed out writing to socket");
}
try {
TimeUnit.NANOSECONDS.sleep(sleepNanos);
} catch (InterruptedException e) {
close();
Thread.currentThread().interrupt(); // set the interrupt status
throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
}
sleepNanos = Math.min(sleepNanos * 2, CHANNEL_FULL_WAIT_NANOS);
} else {
maxTime = System.currentTimeMillis() + timeoutMillis;
}
}
}
/**
* Closes the underlying SocketChannel
*
* @throws java.io.IOException if issues closing underlying stream
*/
@Override
public void close() throws IOException {
channel.close();
}
}

View File

@ -1,231 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.io.socket;
//package nifi.remote.io.socket;
//
//import static org.junit.Assert.assertEquals;
//import static org.junit.Assert.assertTrue;
//
//import java.io.ByteArrayOutputStream;
//import java.io.DataInputStream;
//import java.io.DataOutputStream;
//import java.io.IOException;
//import java.io.InputStream;
//import java.io.OutputStream;
//import java.net.InetSocketAddress;
//import java.net.ServerSocket;
//import java.net.Socket;
//import java.net.SocketTimeoutException;
//import java.net.URI;
//import java.net.URISyntaxException;
//import java.nio.channels.SocketChannel;
//import java.util.Arrays;
//import java.util.concurrent.TimeUnit;
//
//import javax.net.ServerSocketFactory;
//
//import nifi.events.EventReporter;
//import nifi.groups.RemoteProcessGroup;
//import nifi.remote.RemoteGroupPort;
//import nifi.remote.RemoteResourceFactory;
//import nifi.remote.StandardSiteToSiteProtocol;
//import nifi.remote.TransferDirection;
//import nifi.remote.exception.HandshakeException;
//import nifi.remote.exception.PortNotRunningException;
//import nifi.remote.exception.UnknownPortException;
//import nifi.remote.protocol.CommunicationsProtocol;
//import nifi.remote.protocol.CommunicationsSession;
//import nifi.util.NiFiProperties;
//
//import org.junit.Ignore;
//import org.junit.Test;
//import org.mockito.Mockito;
//
//@Ignore("For local testing only")
//public class TestSocketChannelStreams {
// public static final int DATA_SIZE = 8 * 1024 * 1024;
//
// @Test
// public void testSendingToLocalInstanceWithoutSSL() throws IOException, InterruptedException, HandshakeException, UnknownPortException, PortNotRunningException, URISyntaxException {
// System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
// final SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 5000));
// channel.configureBlocking(false);
//
// final CommunicationsSession commsSession;
// commsSession = new SocketChannelCommunicationsSession(channel, "", null);
// commsSession.setUri("nifi://localhost:5000");
// final DataInputStream dis = new DataInputStream(commsSession.getRequest().getInputStream());
// final DataOutputStream dos = new DataOutputStream(commsSession.getResponse().getOutputStream());
//
// dos.write(CommunicationsProtocol.MAGIC_BYTES);
// dos.flush();
//
// final EventReporter eventReporter = Mockito.mock(EventReporter.class);
// final StandardSiteToSiteProtocol proposedProtocol = new StandardSiteToSiteProtocol(commsSession, eventReporter, nifiProperties);
//
// final StandardSiteToSiteProtocol negotiatedProtocol = (StandardSiteToSiteProtocol) RemoteResourceFactory.initiateResourceNegotiation(proposedProtocol, dis, dos);
// System.out.println(negotiatedProtocol);
//
// final RemoteProcessGroup rpg = Mockito.mock(RemoteProcessGroup.class);
// Mockito.when(rpg.getCommunicationsTimeout(Mockito.any(TimeUnit.class))).thenReturn(2000);
// Mockito.when(rpg.getTargetUri()).thenReturn( new URI("https://localhost:5050/") );
//
// final RemoteGroupPort port = Mockito.mock(RemoteGroupPort.class);
// Mockito.when(port.getIdentifier()).thenReturn("90880680-d6da-40be-b2cc-a15423de2e1a");
// Mockito.when(port.getName()).thenReturn("Data In");
// Mockito.when(port.getRemoteProcessGroup()).thenReturn(rpg);
//
// negotiatedProtocol.initiateHandshake(port, TransferDirection.SEND);
// }
//
// @Test
// public void testInputOutputStreams() throws IOException, InterruptedException {
// final ServerThread server = new ServerThread();
// server.start();
//
// int port = server.getPort();
// while ( port <= 0 ) {
// Thread.sleep(10L);
// port = server.getPort();
// }
//
// final SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", port));
// channel.configureBlocking(false);
//
// final OutputStream out = new SocketChannelOutputStream(channel);
// final InputStream in = new SocketChannelInputStream(channel);
// final DataInputStream dataIn = new DataInputStream(in);
//
// final byte[] sent = new byte[DATA_SIZE];
// for (int i=0; i < sent.length; i++) {
// sent[i] = (byte) (i % 255);
// }
//
// for (int itr=0; itr < 5; itr++) {
// final long start = System.nanoTime();
// out.write(sent);
// final long nanos = System.nanoTime() - start;
// final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
// final float seconds = (float) millis / 1000F;
// final float megabytes = (float) DATA_SIZE / (1024F * 1024F);
// final float MBperS = megabytes / seconds;
// System.out.println("Millis: " + millis + "; MB/s: " + MBperS);
//
// Thread.sleep(2500L);
// final byte[] received = server.getReceivedData();
// System.out.println("Server received " + received.length + " bytes");
// server.clearReceivedData();
// assertTrue(Arrays.equals(sent, received));
//
// final long val = dataIn.readLong();
// assertEquals(DATA_SIZE, val);
// System.out.println(val);
// }
//
// server.shutdown();
// }
//
// public final long toLong(final byte[] buffer) throws IOException {
// return (((long)buffer[0] << 56) +
// ((long)(buffer[1] & 255) << 48) +
// ((long)(buffer[2] & 255) << 40) +
// ((long)(buffer[3] & 255) << 32) +
// ((long)(buffer[4] & 255) << 24) +
// ((buffer[5] & 255) << 16) +
// ((buffer[6] & 255) << 8) +
// ((buffer[7] & 255) << 0));
// }
//
// private static class ServerThread extends Thread {
// private int listeningPort;
// private final ByteArrayOutputStream received = new ByteArrayOutputStream();
//
// private volatile int readingDelay = 0;
// private volatile boolean shutdown = false;
//
// public ServerThread() {
// }
//
// public int getPort() {
// return listeningPort;
// }
//
// public byte[] getReceivedData() {
// return received.toByteArray();
// }
//
// @Override
// public void run() {
// try {
// final ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
// final ServerSocket serverSocket = serverSocketFactory.createServerSocket(0);
// this.listeningPort = serverSocket.getLocalPort();
//
// final Socket socket = serverSocket.accept();
// final InputStream stream = socket.getInputStream();
// final DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
//
// final byte[] buffer = new byte[4096];
// int len;
//
// while (!shutdown) {
// try {
// len = stream.read(buffer);
//
// System.out.println("Received " + len + " bytes");
//
// if ( readingDelay > 0 ) {
// try { Thread.sleep(readingDelay); } catch (final InterruptedException e) {}
// }
// } catch (final SocketTimeoutException e) {
// continue;
// }
//
// if ( len < 0 ) {
// return;
// }
//
// received.write(buffer, 0, len);
//
// final long length = received.size();
// if ( length % (DATA_SIZE) == 0 ) {
// dos.writeLong(length);
// dos.flush();
// }
// }
//
// System.out.println("Server successfully shutdown");
// } catch (final Exception e) {
// e.printStackTrace();
// }
// }
//
// public void clearReceivedData() {
// this.received.reset();
// }
//
// public void shutdown() {
// this.shutdown = true;
// }
//
// public void delayReading(final int millis) {
// this.readingDelay = millis;
// }
// }
//
//}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.operations;
/**
* Cache Operation definition
*/
public interface CacheOperation {
/**
* Get Cache Operation value used during protocol communication
*
* @return Cache Operation value
*/
String value();
}

View File

@ -19,7 +19,7 @@ package org.apache.nifi.distributed.cache.operations;
/**
* Represents a distributed set cache operation which may be invoked.
*/
public enum MapOperation {
public enum MapOperation implements CacheOperation {
CONTAINS_KEY("containsKey"),
FETCH("fetch"),
GET("get"),
@ -41,6 +41,7 @@ public enum MapOperation {
this.operation = operation;
}
@Override
public String value() {
return operation;
}

View File

@ -19,7 +19,7 @@ package org.apache.nifi.distributed.cache.operations;
/**
* Represents a distributed set cache operation which may be invoked.
*/
public enum SetOperation {
public enum SetOperation implements CacheOperation {
ADD_IF_ABSENT("addIfAbsent"),
CONTAINS("contains"),
REMOVE("remove"),
@ -31,6 +31,7 @@ public enum SetOperation {
this.operation = operation;
}
@Override
public String value() {
return operation;
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.operations;
/**
* Standard Cache Operation
*/
public enum StandardCacheOperation implements CacheOperation {
CLOSE("close");
private final String operation;
StandardCacheOperation(final String operation) {
this.operation = operation;
}
@Override
public String value() {
return operation;
}
}

View File

@ -30,6 +30,11 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-protocol</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-event-transport</artifactId>
<version>1.17.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>

View File

@ -1,247 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.net.ssl.SSLContext;
import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractCacheServer implements CacheServer {
private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class);
private final String identifier;
private final int port;
private final int maxReadSize;
private final SSLContext sslContext;
protected volatile boolean stopped = false;
private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();
private volatile ServerSocketChannel serverSocketChannel;
public AbstractCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxReadSize) {
this.identifier = identifier;
this.port = port;
this.sslContext = sslContext;
this.maxReadSize = maxReadSize;
}
@Override
public int getPort() {
return serverSocketChannel == null ? this.port : serverSocketChannel.socket().getLocalPort();
}
@Override
public void start() throws IOException {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(true);
serverSocketChannel.bind(new InetSocketAddress(port));
final Runnable runnable = new Runnable() {
@Override
public void run() {
while (true) {
final SocketChannel socketChannel;
try {
socketChannel = serverSocketChannel.accept();
logger.debug("Connected to {}", new Object[]{socketChannel});
} catch (final IOException e) {
if (!stopped) {
logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString());
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
return;
}
final Runnable processInputRunnable = new Runnable() {
@Override
public void run() {
final InputStream rawInputStream;
final OutputStream rawOutputStream;
final String peer = socketChannel.socket().getInetAddress().getHostName();
try {
if (sslContext == null) {
rawInputStream = new SocketChannelInputStream(socketChannel);
rawOutputStream = new SocketChannelOutputStream(socketChannel);
} else {
final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false);
sslSocketChannel.connect();
rawInputStream = new SSLSocketChannelInputStream(sslSocketChannel);
rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel);
}
} catch (final IOException e) {
logger.error("Cannot create input and/or output streams for {}", new Object[]{identifier}, e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
try {
socketChannel.close();
} catch (final IOException swallow) {
}
return;
}
try (final InputStream in = new BufferedInputStream(rawInputStream);
final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
final VersionNegotiator versionNegotiator = getVersionNegotiator();
ProtocolHandshake.receiveHandshake(in, out, versionNegotiator);
boolean continueComms = true;
while (continueComms) {
continueComms = listen(in, out, versionNegotiator.getVersion());
}
// client has issued 'close'
logger.debug("Client issued close on {}", new Object[]{socketChannel});
} catch (final SocketTimeoutException e) {
logger.debug("30 sec timeout reached", e);
} catch (final IOException | HandshakeException e) {
if (!stopped) {
logger.error("{} unable to communicate with remote peer {} due to {}", new Object[]{this, peer, e.toString()});
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
} finally {
processInputThreads.remove(Thread.currentThread());
}
}
};
final Thread processInputThread = new Thread(processInputRunnable);
processInputThread.setName("Distributed Cache Server Communications Thread: " + identifier);
processInputThread.setDaemon(true);
processInputThread.start();
processInputThreads.add(processInputThread);
}
}
};
final Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName("Distributed Cache Server: " + identifier);
thread.start();
}
/**
* Refer {@link org.apache.nifi.distributed.cache.protocol.ProtocolHandshake#initiateHandshake(InputStream, OutputStream, VersionNegotiator)}
* for details of each version enhancements.
*/
protected StandardVersionNegotiator getVersionNegotiator() {
return new StandardVersionNegotiator(1);
}
@Override
public void stop() throws IOException {
stopped = true;
logger.info("Stopping CacheServer {}", new Object[]{this.identifier});
if (serverSocketChannel != null && serverSocketChannel.isOpen()) {
try {
serverSocketChannel.close();
} catch (final IOException e) {
logger.warn("Server Socket Close Failed", e);
}
}
// need to close out the created SocketChannels...this is done by interrupting
// the created threads that loop on listen().
for (final Thread processInputThread : processInputThreads) {
processInputThread.interrupt();
int i = 0;
while (!processInputThread.isInterrupted() && i++ < 5) {
try {
Thread.sleep(50); // allow thread to gracefully terminate
} catch (final InterruptedException e) {
}
}
}
processInputThreads.clear();
}
@Override
public String toString() {
return "CacheServer[id=" + identifier + "]";
}
/**
* Listens for incoming data and communicates with remote peer
*
* @param in in
* @param out out
* @param version version
* @return <code>true</code> if communications should continue, <code>false</code> otherwise
* @throws IOException ex
*/
protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException;
/**
* Read a length-prefixed value from the {@link DataInputStream}.
*
* @param dis the {@link DataInputStream} from which to read the value
* @return the serialized representation of the value
* @throws IOException on failure to read from the input stream
*/
protected byte[] readValue(final DataInputStream dis) throws IOException {
final int numBytes = validateSize(dis.readInt());
final byte[] buffer = new byte[numBytes];
dis.readFully(buffer);
return buffer;
}
/**
* Validate a size value received from the {@link DataInputStream} against the configured maximum.
*
* @param size the size value received from the {@link DataInputStream}
* @return the size value, iff it passes validation; otherwise, an exception is thrown
*/
protected int validateSize(final int size) {
if (size <= maxReadSize) {
return size;
} else {
throw new IllegalStateException(String.format("Size [%d] exceeds maximum configured read [%d]", size, maxReadSize));
}
}
}

View File

@ -21,6 +21,7 @@ import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.server.set.StandardSetCacheServer;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.ssl.SSLContextService;
@ -38,12 +39,7 @@ public class DistributedSetCacheServer extends DistributedCacheServer {
final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
final int maxReadSize = context.getProperty(MAX_READ_SIZE).asDataSize(DataUnit.B).intValue();
final SSLContext sslContext;
if (sslContextService == null) {
sslContext = null;
} else {
sslContext = sslContextService.createContext();
}
final SSLContext sslContext = sslContextService == null ? null : sslContextService.createContext();
final EvictionPolicy evictionPolicy;
switch (evictionPolicyName) {
@ -63,7 +59,7 @@ public class DistributedSetCacheServer extends DistributedCacheServer {
try {
final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize);
return new StandardSetCacheServer(getLogger(), getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize);
} catch (final Exception e) {
throw new RuntimeException(e);
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.security.util.ClientAuth;
import javax.net.ssl.SSLContext;
import java.net.InetAddress;
import java.util.Objects;
/**
* Abstract Event Cache Server with standard lifecycle methods
*/
public abstract class EventCacheServer implements CacheServer {
private static final InetAddress ALL_ADDRESSES = null;
private final ComponentLog log;
private final int port;
private EventServer eventServer;
public EventCacheServer(
final ComponentLog log,
final int port
) {
this.log = Objects.requireNonNull(log, "Component Log required");
this.port = port;
}
/**
* Start Server
*
*/
@Override
public void start() {
eventServer = createEventServer();
log.info("Started Cache Server Port [{}]", port);
}
/**
* Stop Server
*
*/
@Override
public void stop() {
if (eventServer == null) {
log.info("Server not running");
} else {
eventServer.shutdown();
}
log.info("Stopped Cache Server Port [{}]", port);
}
/**
* Get Server Port Number
*
* @return Port Number
*/
@Override
public int getPort() {
return port;
}
/**
* Create Event Server Factory with standard properties
*
* @param identifier Component Identifier
* @param sslContext SSL Context is null when not configured
* @return Netty Event Server Factory
*/
protected NettyEventServerFactory createEventServerFactory(final String identifier, final SSLContext sslContext) {
final NettyEventServerFactory eventServerFactory = new NettyEventServerFactory(ALL_ADDRESSES, port, TransportProtocol.TCP);
eventServerFactory.setSslContext(sslContext);
eventServerFactory.setClientAuth(ClientAuth.REQUIRED);
final String threadNamePrefix = String.format("%s[%s]", getClass().getSimpleName(), identifier);
eventServerFactory.setThreadNamePrefix(threadNamePrefix);
eventServerFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
eventServerFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
return eventServerFactory;
}
/**
* Create Event Server
*
* @return Event Server
*/
protected abstract EventServer createEventServer();
}

View File

@ -1,102 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import javax.net.ssl.SSLContext;
import org.apache.nifi.distributed.cache.server.set.PersistentSetCache;
import org.apache.nifi.distributed.cache.server.set.SetCache;
import org.apache.nifi.distributed.cache.server.set.SetCacheResult;
import org.apache.nifi.distributed.cache.server.set.SimpleSetCache;
public class SetCacheServer extends AbstractCacheServer {
private final SetCache cache;
public SetCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
final EvictionPolicy evictionPolicy, final File persistencePath, final int maxReadSize) throws IOException {
super(identifier, sslContext, port, maxReadSize);
final SetCache simpleCache = new SimpleSetCache(identifier, maxSize, evictionPolicy);
if (persistencePath == null) {
this.cache = simpleCache;
} else {
final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache);
persistentCache.restore();
this.cache = persistentCache;
}
}
@Override
protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
final DataInputStream dis = new DataInputStream(in);
final DataOutputStream dos = new DataOutputStream(out);
final String action = dis.readUTF();
if (action.equals("close")) {
return false;
}
final byte[] value = readValue(dis);
final ByteBuffer valueBuffer = ByteBuffer.wrap(value);
final SetCacheResult response;
switch (action) {
case "addIfAbsent":
response = cache.addIfAbsent(valueBuffer);
break;
case "contains":
response = cache.contains(valueBuffer);
break;
case "remove":
response = cache.remove(valueBuffer);
break;
default:
throw new IOException("IllegalRequest");
}
dos.writeBoolean(response.getResult());
dos.flush();
return true;
}
@Override
public void stop() throws IOException {
try {
super.stop();
} finally {
cache.shutdown();
}
}
@Override
protected void finalize() throws Throwable {
if (!stopped) {
stop();
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.nifi.distributed.cache.server.protocol.CacheOperationResult;
/**
* Message Encoder for Cache Operation Results
*/
@ChannelHandler.Sharable
public class CacheOperationResultEncoder extends MessageToByteEncoder<CacheOperationResult> {
@Override
protected void encode(final ChannelHandlerContext channelHandlerContext, final CacheOperationResult cacheOperationResult, final ByteBuf byteBuf) {
final int code = cacheOperationResult.isSuccess() ? 1 : 0;
byteBuf.writeByte(code);
}
}

View File

@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.nifi.distributed.cache.operations.CacheOperation;
import org.apache.nifi.distributed.cache.operations.StandardCacheOperation;
import org.apache.nifi.distributed.cache.server.protocol.CacheRequest;
import org.apache.nifi.distributed.cache.server.protocol.CacheVersionRequest;
import org.apache.nifi.logging.ComponentLog;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Cache Request Decoder processes bytes and decodes cache version and operation requests
*/
public class CacheRequestDecoder extends ByteToMessageDecoder {
private static final int HEADER_LENGTH = 4;
private static final int LONG_LENGTH = 8;
private static final int INT_LENGTH = 4;
private static final int SHORT_LENGTH = 2;
private final AtomicBoolean headerReceived = new AtomicBoolean();
private final AtomicInteger protocolVersion = new AtomicInteger();
private final ComponentLog log;
private final int maxLength;
private final CacheOperation[] supportedOperations;
public CacheRequestDecoder(
final ComponentLog log,
final int maxLength,
final CacheOperation[] supportedOperations
) {
this.log = log;
this.maxLength = maxLength;
this.supportedOperations = supportedOperations;
}
/**
* Decode Byte Buffer reading header on initial connection followed by protocol version and cache operations
*
* @param channelHandlerContext Channel Handler Context
* @param byteBuf Byte Buffer
* @param objects Decoded Objects
*/
@Override
protected void decode(final ChannelHandlerContext channelHandlerContext, final ByteBuf byteBuf, final List<Object> objects) {
if (!headerReceived.get()) {
readHeader(byteBuf, channelHandlerContext.channel().remoteAddress());
}
if (protocolVersion.get() == 0) {
final OptionalInt clientVersion = readInt(byteBuf);
if (clientVersion.isPresent()) {
final int clientVersionFound = clientVersion.getAsInt();
log.debug("Protocol Version [{}] Received [{}]", clientVersionFound, channelHandlerContext.channel().remoteAddress());
final CacheVersionRequest cacheVersionRequest = new CacheVersionRequest(clientVersionFound);
objects.add(cacheVersionRequest);
}
} else {
// Mark ByteBuf reader index to reset when sufficient bytes are not found
byteBuf.markReaderIndex();
final Optional<CacheOperation> cacheOperation = readOperation(byteBuf);
if (cacheOperation.isPresent()) {
final CacheOperation cacheOperationFound = cacheOperation.get();
final Optional<Object> cacheRequest = readRequest(cacheOperationFound, byteBuf);
if (cacheRequest.isPresent()) {
final Object cacheRequestFound = cacheRequest.get();
objects.add(cacheRequestFound);
} else if (StandardCacheOperation.CLOSE.value().contentEquals(cacheOperationFound.value())) {
objects.add(new CacheRequest(cacheOperationFound, null));
} else {
byteBuf.resetReaderIndex();
log.debug("Cache Operation [{}] request not processed", cacheOperationFound);
}
} else {
byteBuf.resetReaderIndex();
}
}
}
@Override
public void exceptionCaught(final ChannelHandlerContext context, final Throwable cause) {
log.warn("Request Decoding Failed: Closing Connection [{}]", context.channel().remoteAddress(), cause);
context.close();
}
/**
* Set Protocol Version based on version negotiated in other handlers
*
* @param protocolVersion Protocol Version
*/
public void setProtocolVersion(final int protocolVersion) {
this.protocolVersion.getAndSet(protocolVersion);
}
/**
* Read Request Object based on Cache Operation
*
* @param cacheOperation Cache Operation
* @param byteBuf Byte Buffer
* @return Request Object or empty when buffer does not contain sufficient bytes
*/
protected Optional<Object> readRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
final Optional<byte[]> bytes = readBytes(byteBuf);
return bytes.map(value -> new CacheRequest(cacheOperation, value));
}
/**
* Read Bytes from buffer based on length indicated
*
* @param byteBuf Byte Buffer
* @return Bytes read or null when buffer does not contain sufficient bytes
*/
protected Optional<byte[]> readBytes(final ByteBuf byteBuf) {
final Optional<byte[]> bytesRead;
final OptionalInt length = readInt(byteBuf);
if (length.isPresent()) {
final int readableBytes = byteBuf.readableBytes();
final int lengthFound = length.getAsInt();
if (readableBytes >= lengthFound) {
bytesRead = Optional.of(readBytes(byteBuf, lengthFound));
} else {
bytesRead = Optional.empty();
}
} else {
bytesRead = Optional.empty();
}
return bytesRead;
}
/**
* Read Unicode String from buffer based on length of available bytes
*
* @param byteBuf Byte Buffer
* @return String or null when buffer does not contain sufficient bytes
*/
protected Optional<String> readUnicodeString(final ByteBuf byteBuf) {
final String unicodeString;
if (byteBuf.readableBytes() >= SHORT_LENGTH) {
final int length = byteBuf.readUnsignedShort();
if (length > maxLength) {
throw new IllegalArgumentException(String.format("Maximum Operation Length [%d] exceeded [%d]", maxLength, length));
}
if (byteBuf.readableBytes() >= length) {
unicodeString = byteBuf.readCharSequence(length, StandardCharsets.UTF_8).toString();
} else {
unicodeString = null;
}
} else {
unicodeString = null;
}
return Optional.ofNullable(unicodeString);
}
/**
* Read Integer from buffer
*
* @param byteBuf Byte Buffer
* @return Integer or empty when buffer does not contain sufficient bytes
*/
protected OptionalInt readInt(final ByteBuf byteBuf) {
final Integer integer;
final int readableBytes = byteBuf.readableBytes();
if (readableBytes >= INT_LENGTH) {
integer = byteBuf.readInt();
if (integer > maxLength) {
throw new IllegalArgumentException(String.format("Maximum Length [%d] exceeded [%d]", maxLength, integer));
}
} else {
integer = null;
}
return integer == null ? OptionalInt.empty() : OptionalInt.of(integer);
}
protected OptionalLong readLong(final ByteBuf byteBuf) {
final int readableBytes = byteBuf.readableBytes();
return readableBytes >= LONG_LENGTH ? OptionalLong.of(byteBuf.readLong()) : OptionalLong.empty();
}
private byte[] readBytes(final ByteBuf byteBuf, final int length) {
final byte[] bytes = new byte[length];
byteBuf.readBytes(bytes);
return bytes;
}
private Optional<CacheOperation> readOperation(final ByteBuf byteBuf) {
final Optional<String> clientOperation = readUnicodeString(byteBuf);
return clientOperation.map(operation -> Arrays.stream(supportedOperations)
.filter(supportedOperation -> supportedOperation.value().contentEquals(operation))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format("Cache Operation not supported [%d]", operation.length())))
);
}
private void readHeader(final ByteBuf byteBuf, final SocketAddress remoteAddress) {
if (byteBuf.readableBytes() >= HEADER_LENGTH) {
byteBuf.readBytes(HEADER_LENGTH);
headerReceived.getAndSet(true);
log.debug("Header Received [{}]", remoteAddress);
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.codec;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
import org.apache.nifi.distributed.cache.server.protocol.CacheVersionRequest;
import org.apache.nifi.distributed.cache.server.protocol.CacheVersionResponse;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.VersionNegotiator;
import java.util.Map;
import java.util.Objects;
/**
* Handler for Cache Version Requests responsible for negotiating supported version
*/
public class CacheVersionRequestHandler extends SimpleChannelInboundHandler<CacheVersionRequest> {
private final ComponentLog log;
private final VersionNegotiator versionNegotiator;
public CacheVersionRequestHandler(
final ComponentLog log,
final VersionNegotiator versionNegotiator
) {
this.log = Objects.requireNonNull(log, "Component Log required");
this.versionNegotiator = Objects.requireNonNull(versionNegotiator, "Version Negotiator required");
}
@Override
protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final CacheVersionRequest cacheVersionRequest) {
for (final Map.Entry<String, ChannelHandler> entry : channelHandlerContext.channel().pipeline()) {
final ChannelHandler channelHandler = entry.getValue();
if (channelHandler instanceof CacheRequestDecoder) {
final CacheRequestDecoder cacheRequestDecoder = (CacheRequestDecoder) channelHandler;
final int requestedVersion = cacheVersionRequest.getVersion();
final CacheVersionResponse cacheVersionResponse = handleVersion(cacheRequestDecoder, requestedVersion);
channelHandlerContext.writeAndFlush(cacheVersionResponse);
}
}
}
private CacheVersionResponse handleVersion(final CacheRequestDecoder cacheRequestDecoder, final int requestedVersion) {
final CacheVersionResponse cacheVersionResponse;
if (versionNegotiator.isVersionSupported(requestedVersion)) {
log.debug("Cache Version Supported [{}]", requestedVersion);
cacheRequestDecoder.setProtocolVersion(requestedVersion);
cacheVersionResponse = new CacheVersionResponse(ProtocolHandshake.RESOURCE_OK, requestedVersion);
} else {
final Integer preferredVersion = versionNegotiator.getPreferredVersion(requestedVersion);
if (preferredVersion == null) {
log.debug("Cache Version Rejected [{}]", requestedVersion);
cacheVersionResponse = new CacheVersionResponse(ProtocolHandshake.ABORT, requestedVersion);
} else {
log.debug("Cache Version Rejected [{}] Preferred [{}]", requestedVersion, preferredVersion);
cacheVersionResponse = new CacheVersionResponse(ProtocolHandshake.DIFFERENT_RESOURCE_VERSION, preferredVersion);
}
}
return cacheVersionResponse;
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
import org.apache.nifi.distributed.cache.server.protocol.CacheVersionResponse;
/**
* Message Encoder for Cache Version Responses
*/
@ChannelHandler.Sharable
public class CacheVersionResponseEncoder extends MessageToByteEncoder<CacheVersionResponse> {
@Override
protected void encode(final ChannelHandlerContext channelHandlerContext, final CacheVersionResponse cacheVersionResponse, final ByteBuf byteBuf) {
final int statusCode = cacheVersionResponse.getStatusCode();
byteBuf.writeByte(statusCode);
if (ProtocolHandshake.DIFFERENT_RESOURCE_VERSION == statusCode) {
final int version = cacheVersionResponse.getVersion();
byteBuf.writeInt(version);
}
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.codec;
import io.netty.buffer.ByteBuf;
import org.apache.nifi.distributed.cache.operations.CacheOperation;
import org.apache.nifi.distributed.cache.operations.MapOperation;
import org.apache.nifi.distributed.cache.server.protocol.MapCacheRequest;
import org.apache.nifi.logging.ComponentLog;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
/**
* Cache Request Decoder processes bytes and decodes cache version and operation requests
*/
public class MapCacheRequestDecoder extends CacheRequestDecoder {
public MapCacheRequestDecoder(
final ComponentLog log,
final int maxLength,
final CacheOperation[] supportedOperations
) {
super(log, maxLength, supportedOperations);
}
@Override
protected Optional<Object> readRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
final MapCacheRequest request;
if (MapOperation.CONTAINS_KEY == cacheOperation) {
request = readKeyRequest(cacheOperation, byteBuf);
} else if (MapOperation.FETCH == cacheOperation) {
request = readKeyRequest(cacheOperation, byteBuf);
} else if (MapOperation.GET == cacheOperation) {
request = readKeyRequest(cacheOperation, byteBuf);
} else if (MapOperation.GET_AND_PUT_IF_ABSENT == cacheOperation) {
request = readKeyValueRequest(cacheOperation, byteBuf);
} else if (MapOperation.KEYSET == cacheOperation) {
request = new MapCacheRequest(cacheOperation);
} else if (MapOperation.REMOVE == cacheOperation) {
request = readKeyRequest(cacheOperation, byteBuf);
} else if (MapOperation.REMOVE_BY_PATTERN == cacheOperation) {
request = readPatternRequest(cacheOperation, byteBuf);
} else if (MapOperation.REMOVE_BY_PATTERN_AND_GET == cacheOperation) {
request = readPatternRequest(cacheOperation, byteBuf);
} else if (MapOperation.REMOVE_AND_GET == cacheOperation) {
request = readKeyRequest(cacheOperation, byteBuf);
} else if (MapOperation.REPLACE == cacheOperation) {
request = readKeyRevisionValueRequest(cacheOperation, byteBuf);
} else if (MapOperation.SUBMAP == cacheOperation) {
request = readSubMapRequest(cacheOperation, byteBuf);
} else if (MapOperation.PUT == cacheOperation) {
request = readKeyValueRequest(cacheOperation, byteBuf);
} else if (MapOperation.PUT_IF_ABSENT == cacheOperation) {
request = readKeyValueRequest(cacheOperation, byteBuf);
} else {
request = new MapCacheRequest(cacheOperation);
}
return Optional.ofNullable(request);
}
private MapCacheRequest readKeyRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
final Optional<byte[]> key = readBytes(byteBuf);
return key.map(bytes -> new MapCacheRequest(cacheOperation, bytes)).orElse(null);
}
private MapCacheRequest readKeyValueRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
final MapCacheRequest mapCacheRequest;
final Optional<byte[]> key = readBytes(byteBuf);
if (key.isPresent()) {
final Optional<byte[]> value = readBytes(byteBuf);
mapCacheRequest = value.map(valueBytes -> new MapCacheRequest(cacheOperation, key.get(), valueBytes)).orElse(null);
} else {
mapCacheRequest = null;
}
return mapCacheRequest;
}
private MapCacheRequest readKeyRevisionValueRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
final MapCacheRequest mapCacheRequest;
final Optional<byte[]> key = readBytes(byteBuf);
if (key.isPresent()) {
final OptionalLong revision = readLong(byteBuf);
if (revision.isPresent()) {
final Optional<byte[]> value = readBytes(byteBuf);
mapCacheRequest = value.map(valueBytes -> new MapCacheRequest(cacheOperation, key.get(), revision.getAsLong(), valueBytes)).orElse(null);
} else {
mapCacheRequest = null;
}
} else {
mapCacheRequest = null;
}
return mapCacheRequest;
}
private MapCacheRequest readPatternRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
final Optional<String> pattern = readUnicodeString(byteBuf);
final Optional<MapCacheRequest> request = pattern.map(requestedPattern -> new MapCacheRequest(cacheOperation, requestedPattern));
return request.orElse(null);
}
private MapCacheRequest readSubMapRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
final MapCacheRequest mapCacheRequest;
final OptionalInt keys = readInt(byteBuf);
if (keys.isPresent()) {
final List<byte[]> subMapKeys = new ArrayList<>();
for (int i = 0; i < keys.getAsInt(); i++) {
final Optional<byte[]> key = readBytes(byteBuf);
if (key.isPresent()) {
subMapKeys.add(key.get());
} else {
// Clear Map to return null and retry on subsequent invocations
subMapKeys.clear();
break;
}
}
mapCacheRequest = subMapKeys.isEmpty() ? null : new MapCacheRequest(cacheOperation, subMapKeys);
} else {
mapCacheRequest = null;
}
return mapCacheRequest;
}
}

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.codec;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.nifi.distributed.cache.operations.CacheOperation;
import org.apache.nifi.distributed.cache.operations.MapOperation;
import org.apache.nifi.distributed.cache.server.map.MapCache;
import org.apache.nifi.distributed.cache.server.map.MapCacheRecord;
import org.apache.nifi.distributed.cache.server.map.MapPutResult;
import org.apache.nifi.distributed.cache.server.protocol.CacheOperationResult;
import org.apache.nifi.distributed.cache.server.protocol.MapCacheRequest;
import org.apache.nifi.distributed.cache.server.protocol.MapRemoveResponse;
import org.apache.nifi.distributed.cache.server.protocol.MapSizeResponse;
import org.apache.nifi.distributed.cache.server.protocol.MapValueResponse;
import org.apache.nifi.logging.ComponentLog;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Handler for Map Cache Request operations interacts with the Map Cache and writes Results
*/
@ChannelHandler.Sharable
public class MapCacheRequestHandler extends SimpleChannelInboundHandler<MapCacheRequest> {
private static final long REVISION_NOT_FOUND = -1;
private final ComponentLog log;
private final MapCache mapCache;
public MapCacheRequestHandler(
final ComponentLog log,
final MapCache mapCache
) {
this.log = Objects.requireNonNull(log, "Component Log required");
this.mapCache = Objects.requireNonNull(mapCache, "Map Cache required");
}
@Override
protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final MapCacheRequest mapCacheRequest) throws Exception {
final CacheOperation cacheOperation = mapCacheRequest.getCacheOperation();
if (MapOperation.CLOSE == cacheOperation) {
log.debug("Map Cache Operation [{}] received", cacheOperation);
channelHandlerContext.close();
} else if (MapOperation.CONTAINS_KEY == cacheOperation) {
final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
final boolean success = mapCache.containsKey(key);
writeResult(channelHandlerContext, cacheOperation, success);
} else if (MapOperation.GET == cacheOperation) {
final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
final ByteBuffer cached = mapCache.get(key);
writeBytes(channelHandlerContext, cacheOperation, cached);
} else if (MapOperation.GET_AND_PUT_IF_ABSENT == cacheOperation) {
final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue());
final MapPutResult result = mapCache.putIfAbsent(key, value);
final ByteBuffer cached = result.isSuccessful() ? null : result.getExisting().getValue();
writeBytes(channelHandlerContext, cacheOperation, cached);
} else if (MapOperation.FETCH == cacheOperation) {
final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
final MapCacheRecord mapCacheRecord = mapCache.fetch(key);
writeMapCacheRecord(channelHandlerContext, cacheOperation, mapCacheRecord);
} else if (MapOperation.KEYSET == cacheOperation) {
final Set<ByteBuffer> keySet = mapCache.keySet();
writeSize(channelHandlerContext, cacheOperation, keySet.size());
for (final ByteBuffer key : keySet) {
writeBytes(channelHandlerContext, cacheOperation, key);
}
} else if (MapOperation.PUT == cacheOperation) {
final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue());
final MapPutResult result = mapCache.put(key, value);
writeResult(channelHandlerContext, cacheOperation, result.isSuccessful());
} else if (MapOperation.PUT_IF_ABSENT == cacheOperation) {
final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue());
final MapPutResult result = mapCache.putIfAbsent(key, value);
writeResult(channelHandlerContext, cacheOperation, result.isSuccessful());
} else if (MapOperation.REMOVE == cacheOperation) {
final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
final ByteBuffer removed = mapCache.remove(key);
final boolean success = removed != null;
writeResult(channelHandlerContext, cacheOperation, success);
} else if (MapOperation.REMOVE_AND_GET == cacheOperation) {
final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
final ByteBuffer removed = mapCache.remove(key);
writeBytes(channelHandlerContext, cacheOperation, removed);
} else if (MapOperation.REMOVE_BY_PATTERN == cacheOperation) {
final String pattern = mapCacheRequest.getPattern();
final Map<ByteBuffer, ByteBuffer> removed = mapCache.removeByPattern(pattern);
final int size = removed == null ? 0 : removed.size();
writeRemoved(channelHandlerContext, cacheOperation, size);
} else if (MapOperation.REMOVE_BY_PATTERN_AND_GET == cacheOperation) {
final String pattern = mapCacheRequest.getPattern();
final Map<ByteBuffer, ByteBuffer> removed = mapCache.removeByPattern(pattern);
if (removed == null) {
writeRemoved(channelHandlerContext, cacheOperation, 0);
} else {
writeSize(channelHandlerContext, cacheOperation, removed.size());
for (final Map.Entry<ByteBuffer, ByteBuffer> entry : removed.entrySet()) {
writeBytes(channelHandlerContext, cacheOperation, entry.getKey());
writeBytes(channelHandlerContext, cacheOperation, entry.getValue());
}
}
} else if (MapOperation.REPLACE == cacheOperation) {
final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue());
final MapCacheRecord mapCacheRecord = new MapCacheRecord(key, value, mapCacheRequest.getRevision());
final MapPutResult result = mapCache.replace(mapCacheRecord);
writeResult(channelHandlerContext, cacheOperation, result.isSuccessful());
} else if (MapOperation.SUBMAP == cacheOperation) {
final List<byte[]> keys = mapCacheRequest.getKeys();
for (final byte[] key : keys) {
final ByteBuffer requestedKey = ByteBuffer.wrap(key);
final ByteBuffer value = mapCache.get(requestedKey);
writeBytes(channelHandlerContext, cacheOperation, value);
}
} else {
log.warn("Map Cache Operation [{}] not supported", cacheOperation);
}
}
private void writeResult(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final boolean success) {
log.debug("Map Cache Operation [{}] Success [{}]", cacheOperation, success);
final CacheOperationResult cacheOperationResult = new CacheOperationResult(success);
channelHandlerContext.writeAndFlush(cacheOperationResult);
}
private void writeRemoved(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final long size) {
final MapRemoveResponse mapRemoveResponse = new MapRemoveResponse(size);
log.debug("Map Cache Operation [{}] Size [{}]", cacheOperation, size);
channelHandlerContext.writeAndFlush(mapRemoveResponse);
}
private void writeSize(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final int size) {
final MapSizeResponse mapSizeResponse = new MapSizeResponse(size);
log.debug("Map Cache Operation [{}] Size [{}]", cacheOperation, size);
channelHandlerContext.writeAndFlush(mapSizeResponse);
}
private void writeBytes(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final ByteBuffer buffer) {
final byte[] bytes = buffer == null ? null : buffer.array();
final int length = bytes == null ? 0 : bytes.length;
final MapValueResponse mapValueResponse = new MapValueResponse(length, bytes);
log.debug("Map Cache Operation [{}] Length [{}]", cacheOperation, length);
channelHandlerContext.writeAndFlush(mapValueResponse);
}
private void writeMapCacheRecord(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final MapCacheRecord mapCacheRecord) {
final long revision = mapCacheRecord == null ? REVISION_NOT_FOUND : mapCacheRecord.getRevision();
final byte[] value = mapCacheRecord == null ? null : mapCacheRecord.getValue().array();
final int length = value == null ? 0 : value.length;
final MapValueResponse mapValueResponse = new MapValueResponse(length, value, revision);
log.debug("Map Cache Operation [{}] Length [{}]", cacheOperation, length);
channelHandlerContext.writeAndFlush(mapValueResponse);
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.nifi.distributed.cache.server.protocol.MapRemoveResponse;
/**
* Message Encoder for Map Remove Responses
*/
@ChannelHandler.Sharable
public class MapRemoveResponseEncoder extends MessageToByteEncoder<MapRemoveResponse> {
@Override
protected void encode(final ChannelHandlerContext channelHandlerContext, final MapRemoveResponse mapRemoveResponse, final ByteBuf byteBuf) {
byteBuf.writeLong(mapRemoveResponse.getSize());
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.nifi.distributed.cache.server.protocol.MapSizeResponse;
/**
* Message Encoder for Map Size Responses
*/
@ChannelHandler.Sharable
public class MapSizeResponseEncoder extends MessageToByteEncoder<MapSizeResponse> {
@Override
protected void encode(final ChannelHandlerContext channelHandlerContext, final MapSizeResponse mapSizeResponse, final ByteBuf byteBuf) {
byteBuf.writeInt(mapSizeResponse.getSize());
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.nifi.distributed.cache.server.protocol.MapValueResponse;
/**
* Message Encoder for Map Value Responses
*/
@ChannelHandler.Sharable
public class MapValueResponseEncoder extends MessageToByteEncoder<MapValueResponse> {
@Override
protected void encode(final ChannelHandlerContext channelHandlerContext, final MapValueResponse mapValueResponse, final ByteBuf byteBuf) {
final Long revision = mapValueResponse.getRevision();
if (revision != null) {
byteBuf.writeLong(revision);
}
byteBuf.writeInt(mapValueResponse.getLength());
final byte[] value = mapValueResponse.getValue();
if (value != null) {
byteBuf.writeBytes(value);
}
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.codec;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.nifi.distributed.cache.operations.CacheOperation;
import org.apache.nifi.distributed.cache.operations.SetOperation;
import org.apache.nifi.distributed.cache.server.protocol.CacheOperationResult;
import org.apache.nifi.distributed.cache.server.protocol.CacheRequest;
import org.apache.nifi.distributed.cache.server.set.SetCache;
import org.apache.nifi.distributed.cache.server.set.SetCacheResult;
import org.apache.nifi.logging.ComponentLog;
import java.nio.ByteBuffer;
import java.util.Objects;
/**
* Handler for Set Cache Request operations interacts with the Set Cache and writes Results
*/
@ChannelHandler.Sharable
public class SetCacheRequestHandler extends SimpleChannelInboundHandler<CacheRequest> {
private final ComponentLog log;
private final SetCache setCache;
public SetCacheRequestHandler(
final ComponentLog log,
final SetCache setCache
) {
this.log = Objects.requireNonNull(log, "Component Log required");
this.setCache = Objects.requireNonNull(setCache, "Set Cache required");
}
@Override
protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final CacheRequest cacheRequest) throws Exception {
final CacheOperation cacheOperation = cacheRequest.getCacheOperation();
final ByteBuffer body = ByteBuffer.wrap(cacheRequest.getBody());
final CacheOperationResult result;
if (SetOperation.ADD_IF_ABSENT == cacheOperation) {
result = getCacheOperationResult(setCache.addIfAbsent(body));
} else if (SetOperation.CONTAINS == cacheOperation) {
result = getCacheOperationResult(setCache.contains(body));
} else if (SetOperation.REMOVE == cacheOperation) {
result = getCacheOperationResult(setCache.remove(body));
} else {
result = null;
}
if (SetOperation.CLOSE == cacheOperation) {
log.debug("Set Cache Operation [{}] received", cacheOperation);
channelHandlerContext.close();
} else if (result == null) {
log.warn("Set Cache Operation [{}] not supported", cacheOperation);
} else {
log.debug("Set Cache Operation [{}] Success [{}]", cacheOperation, result.isSuccess());
channelHandlerContext.writeAndFlush(result);
}
}
private CacheOperationResult getCacheOperationResult(final SetCacheResult setCacheResult) {
return new CacheOperationResult(setCacheResult.getResult());
}
}

View File

@ -75,10 +75,23 @@ public class DistributedMapCacheServer extends DistributedCacheServer {
}
}
protected MapCacheServer createMapCacheServer(
final int port, final int maxSize, final SSLContext sslContext, final EvictionPolicy evictionPolicy,
final File persistenceDir, final int maxReadSize) throws IOException {
return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize);
protected CacheServer createMapCacheServer(
final int port,
final int maxSize,
final SSLContext sslContext,
final EvictionPolicy evictionPolicy,
final File persistenceDir,
final int maxReadSize
) throws IOException {
return new StandardMapCacheServer(
getLogger(),
getIdentifier(),
sslContext,
port,
maxSize,
evictionPolicy,
persistenceDir,
maxReadSize
);
}
}

View File

@ -1,252 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.map;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
import javax.net.ssl.SSLContext;
import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
public class MapCacheServer extends AbstractCacheServer {
private final MapCache cache;
public MapCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize,
final EvictionPolicy evictionPolicy, final File persistencePath, final int maxReadSize) throws IOException {
super(identifier, sslContext, port, maxReadSize);
final MapCache simpleCache = new SimpleMapCache(identifier, maxSize, evictionPolicy);
if (persistencePath == null) {
this.cache = simpleCache;
} else {
final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache);
persistentCache.restore();
this.cache = persistentCache;
}
}
/**
* Refer {@link org.apache.nifi.distributed.cache.protocol.ProtocolHandshake#initiateHandshake(InputStream, OutputStream, VersionNegotiator)}
* for details of each version enhancements.
*/
protected StandardVersionNegotiator getVersionNegotiator() {
return new StandardVersionNegotiator(3, 2, 1);
}
@Override
protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
final DataInputStream dis = new DataInputStream(in);
final DataOutputStream dos = new DataOutputStream(out);
final String action = dis.readUTF();
try {
switch (action) {
case "close": {
return false;
}
case "putIfAbsent": {
final byte[] key = readValue(dis);
final byte[] value = readValue(dis);
final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
dos.writeBoolean(putResult.isSuccessful());
break;
}
case "put": {
final byte[] key = readValue(dis);
final byte[] value = readValue(dis);
cache.put(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
dos.writeBoolean(true);
break;
}
case "containsKey": {
final byte[] key = readValue(dis);
final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
dos.writeBoolean(contains);
break;
}
case "getAndPutIfAbsent": {
final byte[] key = readValue(dis);
final byte[] value = readValue(dis);
final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
if (putResult.isSuccessful()) {
// Put was successful. There was no old value to get.
dos.writeInt(0);
} else {
// we didn't put. Write back the previous value
final byte[] byteArray = putResult.getExisting().getValue().array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
}
break;
}
case "get": {
final byte[] key = readValue(dis);
final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
if (existingValue == null) {
// there was no existing value.
dos.writeInt(0);
} else {
// a value already existed.
final byte[] byteArray = existingValue.array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
}
break;
}
case "subMap": {
final int numKeys = validateSize(dis.readInt());
for(int i=0;i<numKeys;i++) {
final byte[] key = readValue(dis);
final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
if (existingValue == null) {
// there was no existing value.
dos.writeInt(0);
} else {
// a value already existed.
final byte[] byteArray = existingValue.array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
}
}
break;
}
case "remove": {
final byte[] key = readValue(dis);
final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
dos.writeBoolean(removed);
break;
}
case "removeAndGet": {
final byte[] key = readValue(dis);
final ByteBuffer removed = cache.remove(ByteBuffer.wrap(key));
if (removed == null) {
// there was no value removed
dos.writeInt(0);
} else {
// reply with the value that was removed
final byte[] byteArray = removed.array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
}
break;
}
case "removeByPattern": {
final String pattern = dis.readUTF();
final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern);
dos.writeLong(removed == null ? 0 : removed.size());
break;
}
case "removeByPatternAndGet": {
final String pattern = dis.readUTF();
final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern);
if (removed == null || removed.size() == 0) {
dos.writeLong(0);
} else {
// write the map size
dos.writeInt(removed.size());
for (Map.Entry<ByteBuffer, ByteBuffer> entry : removed.entrySet()) {
// write map entry key
final byte[] key = entry.getKey().array();
dos.writeInt(key.length);
dos.write(key);
// write map entry value
final byte[] value = entry.getValue().array();
dos.writeInt(value.length);
dos.write(value);
}
}
break;
}
case "fetch": {
final byte[] key = readValue(dis);
final MapCacheRecord existing = cache.fetch(ByteBuffer.wrap(key));
if (existing == null) {
// there was no existing value.
dos.writeLong(-1);
dos.writeInt(0);
} else {
// a value already existed.
dos.writeLong(existing.getRevision());
final byte[] byteArray = existing.getValue().array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
}
break;
}
case "replace": {
final byte[] key = readValue(dis);
final long revision = dis.readLong();
final byte[] value = readValue(dis);
final MapPutResult result = cache.replace(new MapCacheRecord(ByteBuffer.wrap(key), ByteBuffer.wrap(value), revision));
dos.writeBoolean(result.isSuccessful());
break;
}
case "keySet": {
final Set<ByteBuffer> result = cache.keySet();
// write the set size
dos.writeInt(result.size());
// write each key in the set
for (ByteBuffer bb : result) {
final byte[] byteArray = bb.array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
}
break;
}
default: {
throw new IOException("Illegal Request");
}
}
} finally {
dos.flush();
}
return true;
}
@Override
public void stop() throws IOException {
try {
super.stop();
} finally {
cache.shutdown();
}
}
@Override
protected void finalize() throws Throwable {
if (!stopped) {
stop();
}
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.map;
import org.apache.nifi.distributed.cache.operations.MapOperation;
import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
import org.apache.nifi.distributed.cache.server.EventCacheServer;
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
import org.apache.nifi.distributed.cache.server.codec.CacheVersionRequestHandler;
import org.apache.nifi.distributed.cache.server.codec.CacheVersionResponseEncoder;
import org.apache.nifi.distributed.cache.server.codec.CacheOperationResultEncoder;
import org.apache.nifi.distributed.cache.server.codec.MapCacheRequestDecoder;
import org.apache.nifi.distributed.cache.server.codec.MapCacheRequestHandler;
import org.apache.nifi.distributed.cache.server.codec.MapRemoveResponseEncoder;
import org.apache.nifi.distributed.cache.server.codec.MapSizeResponseEncoder;
import org.apache.nifi.distributed.cache.server.codec.MapValueResponseEncoder;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.EventServerFactory;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
/**
* Standard Map Cache Server implemented using Netty
*/
public class StandardMapCacheServer extends EventCacheServer {
private final EventServerFactory eventServerFactory;
private final MapCache cache;
public StandardMapCacheServer(
final ComponentLog log,
final String identifier,
final SSLContext sslContext,
final int port,
final int maxCacheEntries,
final EvictionPolicy evictionPolicy,
final File persistencePath,
final int maxReadLength
) throws IOException {
super(log, port);
final MapCache simpleCache = new SimpleMapCache(identifier, maxCacheEntries, evictionPolicy);
if (persistencePath == null) {
this.cache = simpleCache;
} else {
final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache);
persistentCache.restore();
this.cache = persistentCache;
}
final NettyEventServerFactory nettyEventServerFactory = createEventServerFactory(identifier, sslContext);
// Create Sharable Handlers to avoid unnecessary instantiation for each connection
final MapCacheRequestHandler mapCacheRequestHandler = new MapCacheRequestHandler(log, cache);
final CacheVersionResponseEncoder cacheVersionResponseEncoder = new CacheVersionResponseEncoder();
final CacheOperationResultEncoder cacheOperationResultEncoder = new CacheOperationResultEncoder();
final MapRemoveResponseEncoder mapRemoveResponseEncoder = new MapRemoveResponseEncoder();
final MapSizeResponseEncoder mapSizeResponseEncoder = new MapSizeResponseEncoder();
final MapValueResponseEncoder mapValueResponseEncoder = new MapValueResponseEncoder();
final VersionNegotiator versionNegotiator = createVersionNegotiator();
nettyEventServerFactory.setHandlerSupplier(() ->
Arrays.asList(
cacheVersionResponseEncoder,
cacheOperationResultEncoder,
mapRemoveResponseEncoder,
mapSizeResponseEncoder,
mapValueResponseEncoder,
new MapCacheRequestDecoder(log, maxReadLength, MapOperation.values()),
mapCacheRequestHandler,
new CacheVersionRequestHandler(log, versionNegotiator)
)
);
this.eventServerFactory = nettyEventServerFactory;
}
@Override
public void stop() {
try {
cache.shutdown();
} catch (final IOException e) {
throw new UncheckedIOException("Cache Shutdown Failed", e);
} finally {
super.stop();
}
}
@Override
protected EventServer createEventServer() {
return eventServerFactory.getEventServer();
}
protected VersionNegotiator createVersionNegotiator() {
return new StandardVersionNegotiator(
ProtocolVersion.V3.value(),
ProtocolVersion.V2.value(),
ProtocolVersion.V1.value()
);
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.protocol;
public class CacheOperationResult {
private final boolean success;
public CacheOperationResult(
final boolean success
) {
this.success = success;
}
public boolean isSuccess() {
return success;
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.protocol;
import org.apache.nifi.distributed.cache.operations.CacheOperation;
import java.util.Objects;
/**
* Cache Request Packet
*/
public class CacheRequest {
private final CacheOperation cacheOperation;
private final byte[] body;
public CacheRequest(
final CacheOperation cacheOperation,
final byte[] body
) {
this.cacheOperation = Objects.requireNonNull(cacheOperation, "Cache Operation required");
this.body = Objects.requireNonNull(body, "Body required");
}
public CacheOperation getCacheOperation() {
return cacheOperation;
}
public byte[] getBody() {
return body;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.protocol;
/**
* Cache Version Request contains the protocol version which the peer requested
*/
public class CacheVersionRequest {
private final int version;
public CacheVersionRequest(
final int version
) {
this.version = version;
}
public int getVersion() {
return version;
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.protocol;
/**
* Cache Version Response contains the status code and optional requested version
*/
public class CacheVersionResponse {
private final int statusCode;
private final int version;
public CacheVersionResponse(
final int statusCode,
final int version
) {
this.statusCode = statusCode;
this.version = version;
}
public int getStatusCode() {
return statusCode;
}
public int getVersion() {
return version;
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.protocol;
import org.apache.nifi.distributed.cache.operations.CacheOperation;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
* Map Cache Request with operation and other optional properties
*/
public class MapCacheRequest {
private final CacheOperation cacheOperation;
private byte[] key;
private byte[] value;
private String pattern;
private long revision;
private List<byte[]> keys = Collections.emptyList();
public MapCacheRequest(
final CacheOperation cacheOperation
) {
this.cacheOperation = Objects.requireNonNull(cacheOperation, "Cache Operation required");
}
public MapCacheRequest(
final CacheOperation cacheOperation,
final byte[] key
) {
this(cacheOperation);
this.key = Objects.requireNonNull(key, "Key required");
}
public MapCacheRequest(
final CacheOperation cacheOperation,
final byte[] key,
final byte[] value
) {
this(cacheOperation, key);
this.value = Objects.requireNonNull(value, "Value required");
}
public MapCacheRequest(
final CacheOperation cacheOperation,
final byte[] key,
final long revision,
final byte[] value
) {
this(cacheOperation, key, value);
this.revision = revision;
}
public MapCacheRequest(
final CacheOperation cacheOperation,
final String pattern
) {
this(cacheOperation);
this.pattern = Objects.requireNonNull(pattern, "Pattern required");
}
public MapCacheRequest(
final CacheOperation cacheOperation,
final List<byte[]> keys
) {
this(cacheOperation);
this.keys = Objects.requireNonNull(keys, "Keys required");
}
public CacheOperation getCacheOperation() {
return cacheOperation;
}
public byte[] getKey() {
return key;
}
public byte[] getValue() {
return value;
}
public String getPattern() {
return pattern;
}
public long getRevision() {
return revision;
}
public List<byte[]> getKeys() {
return keys;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.protocol;
/**
* Map Remove Response
*/
public class MapRemoveResponse {
private final long size;
public MapRemoveResponse(
final long size
) {
this.size = size;
}
public long getSize() {
return size;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.protocol;
/**
* Map Size Response
*/
public class MapSizeResponse {
private final int size;
public MapSizeResponse(
final int size
) {
this.size = size;
}
public int getSize() {
return size;
}
}

View File

@ -14,39 +14,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.io.socket.ssl;
package org.apache.nifi.distributed.cache.server.protocol;
import java.io.IOException;
import java.io.OutputStream;
/**
* Map Value Response
*/
public class MapValueResponse {
private final int length;
public class SSLSocketChannelOutputStream extends OutputStream {
private final byte[] value;
private final SSLSocketChannel channel;
private Long revision;
public SSLSocketChannelOutputStream(final SSLSocketChannel channel) {
this.channel = channel;
public MapValueResponse(
final int length,
final byte[] value
) {
this.length = length;
this.value = value;
}
@Override
public void write(final int b) throws IOException {
channel.write(b);
public MapValueResponse(
final int length,
final byte[] value,
final Long revision
) {
this(length, value);
this.revision = revision;
}
@Override
public void write(byte[] b) throws IOException {
channel.write(b);
public int getLength() {
return length;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
channel.write(b, off, len);
public byte[] getValue() {
return value;
}
/**
* Closes the underlying SSLSocketChannel, which also will close the InputStream and the connection
*/
@Override
public void close() throws IOException {
channel.close();
public Long getRevision() {
return revision;
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.set;
import org.apache.nifi.distributed.cache.operations.SetOperation;
import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
import org.apache.nifi.distributed.cache.server.EventCacheServer;
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
import org.apache.nifi.distributed.cache.server.codec.CacheOperationResultEncoder;
import org.apache.nifi.distributed.cache.server.codec.CacheRequestDecoder;
import org.apache.nifi.distributed.cache.server.codec.CacheVersionRequestHandler;
import org.apache.nifi.distributed.cache.server.codec.CacheVersionResponseEncoder;
import org.apache.nifi.distributed.cache.server.codec.SetCacheRequestHandler;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.EventServerFactory;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
/**
* Standard Set Cache Server implementation based on Netty
*/
public class StandardSetCacheServer extends EventCacheServer {
private final EventServerFactory eventServerFactory;
private final SetCache cache;
public StandardSetCacheServer(
final ComponentLog log,
final String identifier,
final SSLContext sslContext,
final int port,
final int maxCacheEntries,
final EvictionPolicy evictionPolicy,
final File persistencePath,
final int maxReadLength
) throws IOException {
super(log, port);
final SetCache simpleCache = new SimpleSetCache(identifier, maxCacheEntries, evictionPolicy);
if (persistencePath == null) {
this.cache = simpleCache;
} else {
final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache);
persistentCache.restore();
this.cache = persistentCache;
}
final NettyEventServerFactory nettyEventServerFactory = createEventServerFactory(identifier, sslContext);
// Create Sharable Handlers to avoid unnecessary instantiation for each connection
final SetCacheRequestHandler setCacheRequestHandler = new SetCacheRequestHandler(log, cache);
final CacheVersionResponseEncoder cacheVersionResponseEncoder = new CacheVersionResponseEncoder();
final CacheOperationResultEncoder cacheOperationResultEncoder = new CacheOperationResultEncoder();
final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(ProtocolVersion.V1.value());
nettyEventServerFactory.setHandlerSupplier(() ->
Arrays.asList(
cacheVersionResponseEncoder,
cacheOperationResultEncoder,
new CacheRequestDecoder(log, maxReadLength, SetOperation.values()),
setCacheRequestHandler,
new CacheVersionRequestHandler(log, versionNegotiator)
)
);
this.eventServerFactory = nettyEventServerFactory;
}
@Override
public void stop() {
try {
cache.shutdown();
} catch (final IOException e) {
throw new UncheckedIOException("Cache Shutdown Failed", e);
} finally {
super.stop();
}
}
@Override
protected EventServer createEventServer() {
return eventServerFactory.getEventServer();
}
}

View File

@ -26,9 +26,10 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import java.io.IOException;
@ -39,18 +40,12 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Verify basic functionality of {@link DistributedMapCacheClientService}.
* <p>
* This test instantiates both the server and client {@link org.apache.nifi.controller.ControllerService} objects
* implementing the distributed cache protocol. It assumes that the default distributed cache port (4557)
* is available.
*/
@Timeout(5)
public class DistributedMapCacheTest {
private static TestRunner runner = null;
@ -59,8 +54,8 @@ public class DistributedMapCacheTest {
private static final Serializer<String> serializer = new StringSerializer();
private static final Deserializer<String> deserializer = new StringDeserializer();
@BeforeClass
public static void beforeClass() throws Exception {
@BeforeAll
public static void startServices() throws Exception {
final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
@ -76,8 +71,8 @@ public class DistributedMapCacheTest {
runner.enableControllerService(client);
}
@AfterClass
public static void afterClass() {
@AfterAll
public static void shutdownServices() {
runner.disableControllerService(client);
runner.removeControllerService(client);

View File

@ -29,9 +29,9 @@ import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import javax.net.ssl.SSLContext;
@ -40,18 +40,10 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Verify basic functionality of {@link DistributedMapCacheClientService}, in the context of a TLS authenticated
* socket session.
* <p>
* This test instantiates both the server and client {@link org.apache.nifi.controller.ControllerService} objects
* implementing the distributed cache protocol. It assumes that the default distributed cache port (4557)
* is available.
*/
public class DistributedMapCacheTlsTest {
private static TestRunner runner = null;
@ -61,8 +53,8 @@ public class DistributedMapCacheTlsTest {
private static final Serializer<String> serializer = new StringSerializer();
private static final Deserializer<String> deserializer = new StringDeserializer();
@BeforeClass
public static void beforeClass() throws Exception {
@BeforeAll
public static void setServices() throws Exception {
final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
sslContextService = createSslContextService();
@ -83,8 +75,8 @@ public class DistributedMapCacheTlsTest {
runner.enableControllerService(client);
}
@AfterClass
public static void afterClass() {
@AfterAll
public static void shutdown() {
runner.disableControllerService(client);
runner.removeControllerService(client);

View File

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.map;
import org.apache.nifi.distributed.cache.operations.MapOperation;
import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import javax.net.ssl.SSLContext;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(5)
@ExtendWith(MockitoExtension.class)
class StandardMapCacheServerTest {
private static final String IDENTIFIER = StandardMapCacheServer.class.getSimpleName();
private static final SSLContext SSL_CONTEXT_DISABLED = null;
private static final int MAX_CACHE_ENTRIES = 32;
private static final EvictionPolicy EVICTION_POLICY = EvictionPolicy.FIFO;
private static final File PERSISTENCE_PATH_DISABLED = null;
private static final int MAX_READ_LENGTH = 4096;
private static final String LOCALHOST = "127.0.0.1";
private static final byte[] HEADER = new byte[]{'N', 'i', 'F', 'i'};
private static final byte[] KEY = String.class.getSimpleName().getBytes(StandardCharsets.UTF_8);
private static final int KEY_NOT_FOUND = 0;
private static final int PUT_COMPLETED = 1;
@Mock
ComponentLog log;
StandardMapCacheServer server;
@BeforeEach
void setServer() throws IOException {
final int port = NetworkUtils.getAvailableTcpPort();
server = new StandardMapCacheServer(
log,
IDENTIFIER,
SSL_CONTEXT_DISABLED,
port,
MAX_CACHE_ENTRIES,
EVICTION_POLICY,
PERSISTENCE_PATH_DISABLED,
MAX_READ_LENGTH
);
server.start();
}
@AfterEach
void stopServer() {
server.stop();
}
@Test
void testSocketContainsKeyValueDelayed() throws IOException, InterruptedException {
try (
final Socket socket = new Socket(LOCALHOST, server.getPort());
final InputStream inputStream = socket.getInputStream();
final OutputStream outputStream = socket.getOutputStream();
final DataOutputStream dataOutputStream = new DataOutputStream(outputStream)
) {
sendHeaderVersion(dataOutputStream, inputStream);
dataOutputStream.writeUTF(MapOperation.CONTAINS_KEY.value());
// Delay writing key to simulate slow network connection
TimeUnit.MILLISECONDS.sleep(200);
dataOutputStream.writeInt(KEY.length);
dataOutputStream.write(KEY);
final int read = inputStream.read();
assertEquals(KEY_NOT_FOUND, read);
}
}
@Test
void testSocketPutGetMaxLength() throws IOException {
try (
final Socket socket = new Socket(LOCALHOST, server.getPort());
final InputStream inputStream = socket.getInputStream();
final DataInputStream dataInputStream = new DataInputStream(inputStream);
final OutputStream outputStream = socket.getOutputStream();
final DataOutputStream dataOutputStream = new DataOutputStream(outputStream)
) {
sendHeaderVersion(dataOutputStream, inputStream);
dataOutputStream.writeUTF(MapOperation.PUT.value());
dataOutputStream.writeInt(KEY.length);
dataOutputStream.write(KEY);
final byte[] value = getValue();
dataOutputStream.writeInt(value.length);
dataOutputStream.write(value);
final int putStatus = inputStream.read();
assertEquals(PUT_COMPLETED, putStatus);
dataOutputStream.writeUTF(MapOperation.GET.value());
dataOutputStream.writeInt(KEY.length);
dataOutputStream.write(KEY);
final int valueLength = dataInputStream.readInt();
assertEquals(MAX_READ_LENGTH, valueLength);
final byte[] cachedValue = new byte[valueLength];
final int cachedValueLength = dataInputStream.read(cachedValue);
assertEquals(MAX_READ_LENGTH, cachedValueLength);
assertArrayEquals(value, cachedValue);
}
}
private void sendHeaderVersion(final DataOutputStream dataOutputStream, final InputStream inputStream) throws IOException {
dataOutputStream.write(HEADER);
dataOutputStream.writeInt(ProtocolVersion.V3.value());
final int protocolResponse = inputStream.read();
assertEquals(ProtocolHandshake.RESOURCE_OK, protocolResponse);
}
private byte[] getValue() {
final SecureRandom secureRandom = new SecureRandom();
final byte[] value = new byte[MAX_READ_LENGTH];
secureRandom.nextBytes(value);
return value;
}
}

View File

@ -25,6 +25,8 @@ import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
import org.apache.nifi.distributed.cache.server.CacheServer;
import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
import org.apache.nifi.processor.DataUnit;
@ -224,11 +226,11 @@ public class TestDistributedMapServerAndClient {
// Create a server that only supports protocol version 1.
server = new DistributedMapCacheServer() {
@Override
protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int maxReadSize) throws IOException {
return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize) {
protected CacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int maxReadSize) throws IOException {
return new StandardMapCacheServer(getLogger(), getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize) {
@Override
protected StandardVersionNegotiator getVersionNegotiator() {
return new StandardVersionNegotiator(1);
protected StandardVersionNegotiator createVersionNegotiator() {
return new StandardVersionNegotiator(ProtocolVersion.V1.value());
}
};
}

View File

@ -24,25 +24,18 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Verify basic functionality of {@link DistributedSetCacheClientService}.
* <p>
* This test instantiates both the server and client {@link org.apache.nifi.controller.ControllerService} objects
* implementing the distributed cache protocol. It assumes that the default distributed cache port (4557)
* is available.
*/
public class DistributedSetCacheTest {
private static TestRunner runner = null;
@ -50,8 +43,8 @@ public class DistributedSetCacheTest {
private static DistributedSetCacheClientService client = null;
private static final Serializer<String> serializer = new StringSerializer();
@BeforeClass
public static void beforeClass() throws Exception {
@BeforeAll
public static void setRunner() throws Exception {
final String port = Integer.toString(NetworkUtils.getAvailableTcpPort());
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
@ -67,8 +60,8 @@ public class DistributedSetCacheTest {
runner.enableControllerService(client);
}
@AfterClass
public static void afterClass() {
@AfterAll
public static void shutdown() {
runner.disableControllerService(client);
runner.removeControllerService(client);