diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java index f838c2f0d7..c035485b31 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java @@ -27,20 +27,20 @@ import javax.net.ssl.SSLContext; public interface CommsSession extends Closeable { void setTimeout(final long value, final TimeUnit timeUnit); - + InputStream getInputStream() throws IOException; - + OutputStream getOutputStream() throws IOException; - + boolean isClosed(); - + void interrupt(); - + String getHostname(); - + int getPort(); - + long getTimeout(TimeUnit timeUnit); - + SSLContext getSSLContext(); } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index 92bda8f72c..51138b978f 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -42,7 +42,7 @@ import org.apache.nifi.stream.io.DataOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SeeAlso(classNames={"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) +@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) @CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map " + "between nodes in a NiFi cluster") public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient { @@ -65,14 +65,14 @@ public class DistributedMapCacheClientService extends AbstractControllerService public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .description("If specified, indicates the SSL Context Service that is used to communicate with the " - + "remote server. If not specified, communications will not be encrypted") + + "remote server. If not specified, communications will not be encrypted") .required(false) .identifiesControllerService(SSLContextService.class) .build(); public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder() .name("Communications Timeout") .description("Specifies how long to wait when communicating with the remote server before determining that " - + "there is a communications failure if data cannot be sent or received") + + "there is a communications failure if data cannot be sent or received") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .defaultValue("30 secs") @@ -299,6 +299,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService } private static interface CommsAction { + T execute(CommsSession commsSession) throws IOException; } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java index 2de4ccb254..63d59cabf8 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java @@ -42,7 +42,7 @@ import org.apache.nifi.stream.io.DataOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SeeAlso(classNames={"org.apache.nifi.distributed.cache.server.DistributedSetCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) +@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.DistributedSetCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) @CapabilityDescription("Provides the ability to communicate with a DistributedSetCacheServer. This can be used in order to share a Set " + "between nodes in a NiFi cluster") public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient { @@ -65,14 +65,14 @@ public class DistributedSetCacheClientService extends AbstractControllerService public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .description("If specified, indicates the SSL Context Service that is used to communicate with the " - + "remote server. If not specified, communications will not be encrypted") + + "remote server. If not specified, communications will not be encrypted") .required(false) .identifiesControllerService(SSLContextService.class) .build(); public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder() .name("Communications Timeout") .description("Specifices how long to wait when communicating with the remote server before determining " - + "that there is a communications failure if data cannot be sent or received") + + "that there is a communications failure if data cannot be sent or received") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .defaultValue("30 secs") diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java index 9b4b656063..3d400bb123 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java @@ -30,36 +30,37 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream; public class SSLCommsSession implements CommsSession { + private final SSLSocketChannel sslSocketChannel; private final SSLContext sslContext; private final String hostname; private final int port; - + private final SSLSocketChannelInputStream in; private final BufferedInputStream bufferedIn; - + private final SSLSocketChannelOutputStream out; private final BufferedOutputStream bufferedOut; - public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { + public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true); - + in = new SSLSocketChannelInputStream(sslSocketChannel); bufferedIn = new BufferedInputStream(in); - + out = new SSLSocketChannelOutputStream(sslSocketChannel); bufferedOut = new BufferedOutputStream(out); - + this.sslContext = sslContext; this.hostname = hostname; this.port = port; } - + @Override public void interrupt() { sslSocketChannel.interrupt(); } - + @Override public void close() throws IOException { sslSocketChannel.close(); @@ -84,23 +85,25 @@ public class SSLCommsSession implements CommsSession { public boolean isClosed() { return sslSocketChannel.isClosed(); } - + @Override public String getHostname() { return hostname; } - + @Override public int getPort() { return port; } + @Override public SSLContext getSSLContext() { return sslContext; } + @Override public long getTimeout(final TimeUnit timeUnit) { return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS); } - + } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java index 1f1ff7e48a..b2a5c1d338 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java @@ -33,6 +33,7 @@ import org.apache.nifi.remote.io.socket.SocketChannelInputStream; import org.apache.nifi.remote.io.socket.SocketChannelOutputStream; public class StandardCommsSession implements CommsSession { + private final SocketChannel socketChannel; private final String hostname; private final int port; diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/additionalDetails.html b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/additionalDetails.html index 4cde8c620a..15686356e0 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/additionalDetails.html +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/additionalDetails.html @@ -1,35 +1,35 @@ - - - -Distributed Map Cache Client Service - - + + + + Distributed Map Cache Client Service + + - -

- Below is an example of how to create a client connection to your distributed map cache server. - Note that the identifier in this example is cache-client. If you are using this template - to create your own MapCacheClient service, replace the values in this template with values that are - suitable for your system. Possible options for Server Hostname, Server Port, - Communications Timeout, and SSL Context Service. -

+ +

+ Below is an example of how to create a client connection to your distributed map cache server. + Note that the identifier in this example is cache-client. If you are using this template + to create your own MapCacheClient service, replace the values in this template with values that are + suitable for your system. Possible options for Server Hostname, Server Port, + Communications Timeout, and SSL Context Service. +

-
+        
 <?xml version="1.0" encoding="UTF-8" ?>
 <services>
     <service>
@@ -40,6 +40,6 @@
         <property name="Communications Timeout">30 secs</property>
     </service>
 </services>
-	
- +
+ diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java index a6a2458911..10f53b225f 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java @@ -51,7 +51,8 @@ public abstract class AbstractCacheServer implements CacheServer { private final int port; private final SSLContext sslContext; protected volatile boolean stopped = false; - private final Set processInputThreads = new CopyOnWriteArraySet<>();; + private final Set processInputThreads = new CopyOnWriteArraySet<>(); + ; private volatile ServerSocketChannel serverSocketChannel; @@ -75,7 +76,7 @@ public abstract class AbstractCacheServer implements CacheServer { final SocketChannel socketChannel; try { socketChannel = serverSocketChannel.accept(); - logger.debug("Connected to {}", new Object[] { socketChannel }); + logger.debug("Connected to {}", new Object[]{socketChannel}); } catch (final IOException e) { if (!stopped) { logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString()); @@ -104,7 +105,7 @@ public abstract class AbstractCacheServer implements CacheServer { rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel); } } catch (IOException e) { - logger.error("Cannot create input and/or output streams for {}", new Object[] { identifier }, e); + logger.error("Cannot create input and/or output streams for {}", new Object[]{identifier}, e); if (logger.isDebugEnabled()) { logger.error("", e); } @@ -112,7 +113,7 @@ public abstract class AbstractCacheServer implements CacheServer { socketChannel.close(); } catch (IOException swallow) { } - + return; } try (final InputStream in = new BufferedInputStream(rawInputStream); @@ -127,12 +128,12 @@ public abstract class AbstractCacheServer implements CacheServer { continueComms = listen(in, out, versionNegotiator.getVersion()); } // client has issued 'close' - logger.debug("Client issued close on {}", new Object[] { socketChannel }); + logger.debug("Client issued close on {}", new Object[]{socketChannel}); } catch (final SocketTimeoutException e) { logger.debug("30 sec timeout reached", e); } catch (final IOException | HandshakeException e) { if (!stopped) { - logger.error("{} unable to communicate with remote peer {} due to {}", new Object[] { this, peer, e.toString() }); + logger.error("{} unable to communicate with remote peer {} due to {}", new Object[]{this, peer, e.toString()}); if (logger.isDebugEnabled()) { logger.error("", e); } @@ -161,7 +162,7 @@ public abstract class AbstractCacheServer implements CacheServer { @Override public void stop() throws IOException { stopped = true; - logger.info("Stopping CacheServer {}", new Object[] { this.identifier }); + logger.info("Stopping CacheServer {}", new Object[]{this.identifier}); if (serverSocketChannel != null && serverSocketChannel.isOpen()) { serverSocketChannel.close(); @@ -188,12 +189,12 @@ public abstract class AbstractCacheServer implements CacheServer { /** * Listens for incoming data and communicates with remote peer - * - * @param in - * @param out - * @param version + * + * @param in in + * @param out out + * @param version version * @return true if communications should continue, false otherwise - * @throws IOException + * @throws IOException ex */ protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException; } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java index 71ac56d0d2..d7604cd320 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java @@ -22,26 +22,26 @@ import java.util.concurrent.atomic.AtomicLong; public class CacheRecord { private static final AtomicLong idGenerator = new AtomicLong(0L); - + private final long id; private final long entryDate; private volatile long lastHitDate; private final AtomicInteger hitCount = new AtomicInteger(0); - + public CacheRecord() { entryDate = System.currentTimeMillis(); lastHitDate = entryDate; id = idGenerator.getAndIncrement(); } - + public long getEntryDate() { return entryDate; } - + public long getLastHitDate() { return lastHitDate; } - + public int getHitCount() { return hitCount.get(); } @@ -50,7 +50,7 @@ public class CacheRecord { hitCount.getAndIncrement(); lastHitDate = System.currentTimeMillis(); } - + public long getId() { return id; } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java index 2c85cd85da..fab8f13388 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java @@ -21,6 +21,7 @@ import java.io.IOException; public interface CacheServer { void start() throws IOException; + void stop() throws IOException; - + } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java index f2e848f392..5907f5072e 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java @@ -29,6 +29,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; public abstract class DistributedCacheServer extends AbstractControllerService { + public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used"; public static final String EVICTION_STRATEGY_LRU = "Least Recently Used"; public static final String EVICTION_STRATEGY_FIFO = "First In, First Out"; @@ -43,7 +44,7 @@ public abstract class DistributedCacheServer extends AbstractControllerService { public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .description("If specified, this service will be used to create an SSL Context that will be used " - + "to secure communications; if not specified, communications will not be secure") + + "to secure communications; if not specified, communications will not be secure") .required(false) .identifiesControllerService(SSLContextService.class) .build(); diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java index 70e86c46e6..799baa31c1 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java @@ -25,6 +25,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService.ClientAuth; + @Tags({"distributed", "set", "distinct", "cache", "server"}) @CapabilityDescription("Provides a set (collection of unique values) cache that can be accessed over a socket. " + "Interaction with this service is typically accomplished via a DistributedSetCacheClient service.") @@ -37,14 +38,14 @@ public class DistributedSetCacheServer extends DistributedCacheServer { final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger(); final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue(); - + final SSLContext sslContext; - if ( sslContextService == null ) { + if (sslContextService == null) { sslContext = null; } else { sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); } - + final EvictionPolicy evictionPolicy; switch (evictionPolicyName) { case EVICTION_STRATEGY_FIFO: @@ -59,14 +60,14 @@ public class DistributedSetCacheServer extends DistributedCacheServer { default: throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName); } - + try { final File persistenceDir = persistencePath == null ? null : new File(persistencePath); - + return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir); } catch (final Exception e) { throw new RuntimeException(e); } } - + } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java index 60bd2c19fd..e6d577d039 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java @@ -19,37 +19,40 @@ package org.apache.nifi.distributed.cache.server; import java.util.Comparator; public enum EvictionPolicy { + LFU(new LFUComparator()), LRU(new LRUComparator()), FIFO(new FIFOComparator()); - + private final Comparator comparator; - + private EvictionPolicy(final Comparator comparator) { this.comparator = comparator; } - + public Comparator getComparator() { return comparator; } - + public static class LFUComparator implements Comparator { + @Override public int compare(final CacheRecord o1, final CacheRecord o2) { - if ( o1.equals(o2) ) { + if (o1.equals(o2)) { return 0; } - + final int hitCountComparison = Integer.compare(o1.getHitCount(), o2.getHitCount()); final int entryDateComparison = (hitCountComparison == 0) ? Long.compare(o1.getEntryDate(), o2.getEntryDate()) : hitCountComparison; return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison); } } - + public static class LRUComparator implements Comparator { + @Override public int compare(final CacheRecord o1, final CacheRecord o2) { - if ( o1.equals(o2) ) { + if (o1.equals(o2)) { return 0; } @@ -57,11 +60,12 @@ public enum EvictionPolicy { return (lastHitDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : lastHitDateComparison); } } - + public static class FIFOComparator implements Comparator { + @Override public int compare(final CacheRecord o1, final CacheRecord o2) { - if ( o1.equals(o2) ) { + if (o1.equals(o2)) { return 0; } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java index d0abe5cdcc..3dd224b3fb 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java @@ -67,17 +67,17 @@ public class SetCacheServer extends AbstractCacheServer { final SetCacheResult response; switch (action) { - case "addIfAbsent": - response = cache.addIfAbsent(valueBuffer); - break; - case "contains": - response = cache.contains(valueBuffer); - break; - case "remove": - response = cache.remove(valueBuffer); - break; - default: - throw new IOException("IllegalRequest"); + case "addIfAbsent": + response = cache.addIfAbsent(valueBuffer); + break; + case "contains": + response = cache.contains(valueBuffer); + break; + case "remove": + response = cache.remove(valueBuffer); + break; + default: + throw new IOException("IllegalRequest"); } dos.writeBoolean(response.getResult()); @@ -97,8 +97,9 @@ public class SetCacheServer extends AbstractCacheServer { @Override protected void finalize() throws Throwable { - if (!stopped) + if (!stopped) { stop(); + } } } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java index 0594dd4b27..dce7ccd488 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java @@ -33,7 +33,7 @@ import org.apache.nifi.ssl.SSLContextService.ClientAuth; @Tags({"distributed", "cluster", "map", "cache", "server", "key/value"}) @CapabilityDescription("Provides a map (key/value) cache that can be accessed over a socket. Interaction with this service" + " is typically accomplished via a DistributedMapCacheClient service.") -@SeeAlso(classNames={"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.ssl.StandardSSLContextService"}) +@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.ssl.StandardSSLContextService"}) public class DistributedMapCacheServer extends DistributedCacheServer { @Override @@ -43,14 +43,14 @@ public class DistributedMapCacheServer extends DistributedCacheServer { final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger(); final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue(); - + final SSLContext sslContext; - if ( sslContextService == null ) { + if (sslContextService == null) { sslContext = null; } else { sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); } - + final EvictionPolicy evictionPolicy; switch (evictionPolicyName) { case EVICTION_STRATEGY_FIFO: @@ -65,10 +65,10 @@ public class DistributedMapCacheServer extends DistributedCacheServer { default: throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName); } - + try { final File persistenceDir = persistencePath == null ? null : new File(persistencePath); - + return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir); } catch (final Exception e) { throw new RuntimeException(e); diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java index 534cb0b204..fad0adbbca 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java @@ -22,8 +22,12 @@ import java.nio.ByteBuffer; public interface MapCache { MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException; + boolean containsKey(ByteBuffer key) throws IOException; + ByteBuffer get(ByteBuffer key) throws IOException; + ByteBuffer remove(ByteBuffer key) throws IOException; + void shutdown() throws IOException; } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java index b0ab0c4003..ff032b19a7 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java @@ -21,38 +21,39 @@ import java.nio.ByteBuffer; import org.apache.nifi.distributed.cache.server.CacheRecord; public class MapCacheRecord extends CacheRecord { + private final ByteBuffer key; private final ByteBuffer value; - + public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) { this.key = key; this.value = value; } - + public ByteBuffer getKey() { return key; } - + public ByteBuffer getValue() { return value; } - + @Override public int hashCode() { return 2938476 + key.hashCode() * value.hashCode(); } - + @Override public boolean equals(final Object obj) { - if ( obj == this ) { + if (obj == this) { return true; } - - if ( obj instanceof MapCacheRecord ) { + + if (obj instanceof MapCacheRecord) { final MapCacheRecord that = ((MapCacheRecord) obj); return key.equals(that.key) && value.equals(that.value); } - + return false; } } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java index e4a600e3fe..943d6aab78 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java @@ -55,63 +55,63 @@ public class MapCacheServer extends AbstractCacheServer { final String action = dis.readUTF(); try { switch (action) { - case "close": { - return false; - } - case "putIfAbsent": { - final byte[] key = readValue(dis); - final byte[] value = readValue(dis); - final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); - dos.writeBoolean(putResult.isSuccessful()); - break; - } - case "containsKey": { - final byte[] key = readValue(dis); - final boolean contains = cache.containsKey(ByteBuffer.wrap(key)); - dos.writeBoolean(contains); - break; - } - case "getAndPutIfAbsent": { - final byte[] key = readValue(dis); - final byte[] value = readValue(dis); - - final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); - if (putResult.isSuccessful()) { - // Put was successful. There was no old value to get. - dos.writeInt(0); - } else { - // we didn't put. Write back the previous value - final byte[] byteArray = putResult.getExistingValue().array(); - dos.writeInt(byteArray.length); - dos.write(byteArray); + case "close": { + return false; } - - break; - } - case "get": { - final byte[] key = readValue(dis); - final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key)); - if (existingValue == null) { - // there was no existing value; we did a "put". - dos.writeInt(0); - } else { - // a value already existed. we did not update the map - final byte[] byteArray = existingValue.array(); - dos.writeInt(byteArray.length); - dos.write(byteArray); + case "putIfAbsent": { + final byte[] key = readValue(dis); + final byte[] value = readValue(dis); + final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + dos.writeBoolean(putResult.isSuccessful()); + break; } + case "containsKey": { + final byte[] key = readValue(dis); + final boolean contains = cache.containsKey(ByteBuffer.wrap(key)); + dos.writeBoolean(contains); + break; + } + case "getAndPutIfAbsent": { + final byte[] key = readValue(dis); + final byte[] value = readValue(dis); - break; - } - case "remove": { - final byte[] key = readValue(dis); - final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null; - dos.writeBoolean(removed); - break; - } - default: { - throw new IOException("Illegal Request"); - } + final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + if (putResult.isSuccessful()) { + // Put was successful. There was no old value to get. + dos.writeInt(0); + } else { + // we didn't put. Write back the previous value + final byte[] byteArray = putResult.getExistingValue().array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); + } + + break; + } + case "get": { + final byte[] key = readValue(dis); + final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key)); + if (existingValue == null) { + // there was no existing value; we did a "put". + dos.writeInt(0); + } else { + // a value already existed. we did not update the map + final byte[] byteArray = existingValue.array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); + } + + break; + } + case "remove": { + final byte[] key = readValue(dis); + final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null; + dos.writeBoolean(removed); + break; + } + default: { + throw new IOException("Illegal Request"); + } } } finally { dos.flush(); @@ -131,8 +131,9 @@ public class MapCacheServer extends AbstractCacheServer { @Override protected void finalize() throws Throwable { - if (!stopped) + if (!stopped) { stop(); + } } private byte[] readValue(final DataInputStream dis) throws IOException { diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java index 29695eb32a..d0055f3ab6 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java @@ -19,11 +19,12 @@ package org.apache.nifi.distributed.cache.server.map; import java.nio.ByteBuffer; public class MapPutResult { + private final boolean successful; private final ByteBuffer key, value; private final ByteBuffer existingValue; private final ByteBuffer evictedKey, evictedValue; - + public MapPutResult(final boolean successful, final ByteBuffer key, final ByteBuffer value, final ByteBuffer existingValue, final ByteBuffer evictedKey, final ByteBuffer evictedValue) { this.successful = successful; this.key = key; @@ -44,7 +45,7 @@ public class MapPutResult { public ByteBuffer getValue() { return value; } - + public ByteBuffer getExistingValue() { return existingValue; } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java index 77fb77db0c..e821fbfe4e 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java @@ -38,9 +38,9 @@ public class PersistentMapCache implements MapCache { private final MapCache wrapped; private final WriteAheadRepository wali; - + private final AtomicLong modifications = new AtomicLong(0L); - + public PersistentMapCache(final String serviceIdentifier, final File persistencePath, final MapCache cacheToWrap) throws IOException { wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null); wrapped = cacheToWrap; @@ -48,8 +48,8 @@ public class PersistentMapCache implements MapCache { synchronized void restore() throws IOException { final Collection recovered = wali.recoverRecords(); - for ( final MapWaliRecord record : recovered ) { - if ( record.getUpdateType() == UpdateType.CREATE ) { + for (final MapWaliRecord record : recovered) { + if (record.getUpdateType() == UpdateType.CREATE) { wrapped.putIfAbsent(record.getKey(), record.getValue()); } } @@ -58,24 +58,24 @@ public class PersistentMapCache implements MapCache { @Override public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException { final MapPutResult putResult = wrapped.putIfAbsent(key, value); - if ( putResult.isSuccessful() ) { + if (putResult.isSuccessful()) { // The put was successful. final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value); final List records = new ArrayList<>(); records.add(record); - if ( putResult.getEvictedKey() != null ) { + if (putResult.getEvictedKey() != null) { records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue())); } - + wali.update(Collections.singletonList(record), false); - + final long modCount = modifications.getAndIncrement(); - if ( modCount > 0 && modCount % 100000 == 0 ) { + if (modCount > 0 && modCount % 100000 == 0) { wali.checkpoint(); } } - + return putResult; } @@ -92,65 +92,64 @@ public class PersistentMapCache implements MapCache { @Override public ByteBuffer remove(ByteBuffer key) throws IOException { final ByteBuffer removeResult = wrapped.remove(key); - if ( removeResult != null ) { + if (removeResult != null) { final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, key, removeResult); final List records = new ArrayList<>(1); records.add(record); wali.update(records, false); - + final long modCount = modifications.getAndIncrement(); - if ( modCount > 0 && modCount % 1000 == 0 ) { + if (modCount > 0 && modCount % 1000 == 0) { wali.checkpoint(); } } return removeResult; } - @Override public void shutdown() throws IOException { wali.shutdown(); } - private static class MapWaliRecord { + private final UpdateType updateType; private final ByteBuffer key; private final ByteBuffer value; - + public MapWaliRecord(final UpdateType updateType, final ByteBuffer key, final ByteBuffer value) { this.updateType = updateType; this.key = key; this.value = value; } - + public UpdateType getUpdateType() { return updateType; } - + public ByteBuffer getKey() { return key; } - + public ByteBuffer getValue() { return value; } } - + private static class Serde implements SerDe { @Override public void serializeEdit(MapWaliRecord previousRecordState, MapWaliRecord newRecordState, java.io.DataOutputStream out) throws IOException { final UpdateType updateType = newRecordState.getUpdateType(); - if ( updateType == UpdateType.DELETE ) { + if (updateType == UpdateType.DELETE) { out.write(0); } else { out.write(1); } - + final byte[] key = newRecordState.getKey().array(); final byte[] value = newRecordState.getValue().array(); - + out.writeInt(key.length); out.write(key); out.writeInt(value.length); @@ -165,12 +164,12 @@ public class PersistentMapCache implements MapCache { @Override public MapWaliRecord deserializeEdit(final DataInputStream in, final Map currentRecordStates, final int version) throws IOException { final int updateTypeValue = in.read(); - if ( updateTypeValue < 0 ) { + if (updateTypeValue < 0) { throw new EOFException(); } final UpdateType updateType = (updateTypeValue == 0 ? UpdateType.DELETE : UpdateType.CREATE); - + final int keySize = in.readInt(); final byte[] key = new byte[keySize]; in.readFully(key); @@ -207,4 +206,4 @@ public class PersistentMapCache implements MapCache { return 1; } } -} \ No newline at end of file +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java index 10139f149b..9e8bbd1063 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java @@ -33,46 +33,47 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SimpleMapCache implements MapCache { + private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class); private final Map cache = new HashMap<>(); private final SortedMap inverseCacheMap; - + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); - + private final String serviceIdentifier; - + private final int maxSize; - + public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) { // need to change to ConcurrentMap as this is modified when only the readLock is held inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator()); this.serviceIdentifier = serviceIdentifier; this.maxSize = maxSize; } - + @Override public String toString() { return "SimpleSetCache[service id=" + serviceIdentifier + "]"; } - // don't need synchronized because this method is only called when the writeLock is held, and all + // don't need synchronized because this method is only called when the writeLock is held, and all // public methods obtain either the read or write lock private MapCacheRecord evict() { - if ( cache.size() < maxSize ) { + if (cache.size() < maxSize) { return null; } - + final MapCacheRecord recordToEvict = inverseCacheMap.firstKey(); final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict); cache.remove(valueToEvict); - - if ( logger.isDebugEnabled() ) { + + if (logger.isDebugEnabled()) { logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8)); } - + return recordToEvict; } @@ -81,44 +82,44 @@ public class SimpleMapCache implements MapCache { writeLock.lock(); try { final MapCacheRecord record = cache.get(key); - if ( record == null ) { + if (record == null) { // Record is null. We will add. final MapCacheRecord evicted = evict(); final MapCacheRecord newRecord = new MapCacheRecord(key, value); cache.put(key, newRecord); inverseCacheMap.put(newRecord, key); - - if ( evicted == null ) { + + if (evicted == null) { return new MapPutResult(true, key, value, null, null, null); } else { return new MapPutResult(true, key, value, null, evicted.getKey(), evicted.getValue()); } } - + // Record is not null. Increment hit count and return result indicating that record was not added. inverseCacheMap.remove(record); record.hit(); inverseCacheMap.put(record, key); - + return new MapPutResult(false, key, value, record.getValue(), null, null); } finally { writeLock.unlock(); } } - + @Override public boolean containsKey(final ByteBuffer key) { readLock.lock(); try { final MapCacheRecord record = cache.get(key); - if ( record == null ) { + if (record == null) { return false; } - + inverseCacheMap.remove(record); record.hit(); inverseCacheMap.put(record, key); - + return true; } finally { readLock.unlock(); @@ -130,14 +131,14 @@ public class SimpleMapCache implements MapCache { readLock.lock(); try { final MapCacheRecord record = cache.get(key); - if ( record == null ) { + if (record == null) { return null; } - + inverseCacheMap.remove(record); record.hit(); inverseCacheMap.put(record, key); - + return record.getValue(); } finally { readLock.unlock(); diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java index 4d75fc00d6..c2c3a4112a 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/PersistentSetCache.java @@ -38,34 +38,34 @@ public class PersistentSetCache implements SetCache { private final SetCache wrapped; private final WriteAheadRepository wali; - + private final AtomicLong modifications = new AtomicLong(0L); - + public PersistentSetCache(final String serviceIdentifier, final File persistencePath, final SetCache cacheToWrap) throws IOException { wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null); wrapped = cacheToWrap; } - + public synchronized void restore() throws IOException { final Collection recovered = wali.recoverRecords(); - for ( final SetRecord record : recovered ) { - if ( record.getUpdateType() == UpdateType.CREATE ) { + for (final SetRecord record : recovered) { + if (record.getUpdateType() == UpdateType.CREATE) { addIfAbsent(record.getBuffer()); } } } - + @Override public synchronized SetCacheResult remove(final ByteBuffer value) throws IOException { final SetCacheResult removeResult = wrapped.remove(value); - if ( removeResult.getResult() ) { + if (removeResult.getResult()) { final SetRecord record = new SetRecord(UpdateType.DELETE, value); final List records = new ArrayList<>(); records.add(record); wali.update(records, false); - + final long modCount = modifications.getAndIncrement(); - if ( modCount > 0 && modCount % 1000 == 0 ) { + if (modCount > 0 && modCount % 1000 == 0) { wali.checkpoint(); } } @@ -76,24 +76,24 @@ public class PersistentSetCache implements SetCache { @Override public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) throws IOException { final SetCacheResult addResult = wrapped.addIfAbsent(value); - if ( addResult.getResult() ) { + if (addResult.getResult()) { final SetRecord record = new SetRecord(UpdateType.CREATE, value); final List records = new ArrayList<>(); records.add(record); - + final SetCacheRecord evictedRecord = addResult.getEvictedRecord(); - if ( evictedRecord != null ) { + if (evictedRecord != null) { records.add(new SetRecord(UpdateType.DELETE, evictedRecord.getValue())); } - + wali.update(records, false); - + final long modCount = modifications.getAndIncrement(); - if ( modCount > 0 && modCount % 1000 == 0 ) { + if (modCount > 0 && modCount % 1000 == 0) { wali.checkpoint(); } } - + return addResult; } @@ -101,45 +101,46 @@ public class PersistentSetCache implements SetCache { public synchronized SetCacheResult contains(final ByteBuffer value) throws IOException { return wrapped.contains(value); } - + @Override public void shutdown() throws IOException { wali.shutdown(); } - + private static class SetRecord { + private final UpdateType updateType; private final ByteBuffer value; - + public SetRecord(final UpdateType updateType, final ByteBuffer value) { this.updateType = updateType; this.value = value; } - + public UpdateType getUpdateType() { return updateType; } - + public ByteBuffer getBuffer() { return value; } - + public byte[] getData() { return value.array(); } } - + private static class Serde implements SerDe { @Override public void serializeEdit(final SetRecord previousRecordState, final SetRecord newRecordState, final DataOutputStream out) throws IOException { final UpdateType updateType = newRecordState.getUpdateType(); - if ( updateType == UpdateType.DELETE ) { + if (updateType == UpdateType.DELETE) { out.write(0); } else { out.write(1); } - + final byte[] data = newRecordState.getData(); out.writeInt(data.length); out.write(newRecordState.getData()); @@ -153,16 +154,16 @@ public class PersistentSetCache implements SetCache { @Override public SetRecord deserializeEdit(final DataInputStream in, final Map currentRecordStates, final int version) throws IOException { final int value = in.read(); - if ( value < 0 ) { + if (value < 0) { throw new EOFException(); } final UpdateType updateType = (value == 0 ? UpdateType.DELETE : UpdateType.CREATE); - + final int size = in.readInt(); final byte[] data = new byte[size]; in.readFully(data); - + return new SetRecord(updateType, ByteBuffer.wrap(data)); } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java index bf6ae3e51c..dd37d0c07f 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCache.java @@ -22,8 +22,11 @@ import java.nio.ByteBuffer; public interface SetCache { SetCacheResult remove(ByteBuffer value) throws IOException; + SetCacheResult addIfAbsent(ByteBuffer value) throws IOException; + SetCacheResult contains(ByteBuffer value) throws IOException; + void shutdown() throws IOException; - + } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java index 20b6fae889..5a757752cd 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheRecord.java @@ -21,33 +21,34 @@ import java.nio.ByteBuffer; import org.apache.nifi.distributed.cache.server.CacheRecord; public class SetCacheRecord extends CacheRecord { + private final ByteBuffer value; - + public SetCacheRecord(final ByteBuffer value) { this.value = value; } - + public ByteBuffer getValue() { return value; } - + @Override public int hashCode() { return value.hashCode(); } - + @Override public boolean equals(final Object obj) { - if ( this == obj ) { + if (this == obj) { return true; } - + if (obj instanceof SetCacheRecord) { return value.equals(((SetCacheRecord) obj).value); } return false; } - + @Override public String toString() { return "SetCacheRecord[value=" + new String(value.array()) + ", hitCount=" + getHitCount() + "]"; diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java index 732c4f003b..7faceb64d5 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SetCacheResult.java @@ -16,27 +16,26 @@ */ package org.apache.nifi.distributed.cache.server.set; - - public class SetCacheResult { + private final boolean result; private final SetCacheRecord stats; private final SetCacheRecord evictedRecord; - + public SetCacheResult(final boolean result, final SetCacheRecord stats, final SetCacheRecord evictedRecord) { this.result = result; this.stats = stats; this.evictedRecord = evictedRecord; } - + public boolean getResult() { return result; } - + public SetCacheRecord getRecord() { return stats; } - + public SetCacheRecord getEvictedRecord() { return evictedRecord; } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java index 77d648154a..bf69ba7529 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/set/SimpleSetCache.java @@ -30,41 +30,42 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SimpleSetCache implements SetCache { + private static final Logger logger = LoggerFactory.getLogger(SimpleSetCache.class); - + private final Map cache = new HashMap<>(); private final SortedMap inverseCacheMap; - + private final String serviceIdentifier; - + private final int maxSize; - + public SimpleSetCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) { inverseCacheMap = new TreeMap<>(evictionPolicy.getComparator()); this.serviceIdentifier = serviceIdentifier; this.maxSize = maxSize; } - + private synchronized SetCacheRecord evict() { - if ( cache.size() < maxSize ) { + if (cache.size() < maxSize) { return null; } - + final SetCacheRecord recordToEvict = inverseCacheMap.firstKey(); final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict); cache.remove(valueToEvict); - - if ( logger.isDebugEnabled() ) { + + if (logger.isDebugEnabled()) { logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8)); } - + return recordToEvict; } - + @Override public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) { final SetCacheRecord record = cache.get(value); - if ( record == null ) { + if (record == null) { final SetCacheRecord evicted = evict(); final SetCacheRecord newRecord = new SetCacheRecord(value); cache.put(value, newRecord); @@ -75,42 +76,42 @@ public class SimpleSetCache implements SetCache { inverseCacheMap.remove(record); record.hit(); inverseCacheMap.put(record, value); - + return new SetCacheResult(false, record, null); } } - + @Override public synchronized SetCacheResult contains(final ByteBuffer value) { final SetCacheRecord record = cache.get(value); - if ( record == null ) { + if (record == null) { return new SetCacheResult(false, null, null); } else { // We have to remove the record and add it again in order to cause the Map to stay sorted inverseCacheMap.remove(record); record.hit(); inverseCacheMap.put(record, value); - + return new SetCacheResult(true, record, null); } } - + @Override public synchronized SetCacheResult remove(final ByteBuffer value) { final SetCacheRecord record = cache.remove(value); - if ( record == null ) { + if (record == null) { return new SetCacheResult(false, null, null); } else { inverseCacheMap.remove(record); return new SetCacheResult(true, record, null); } } - + @Override public String toString() { return "SimpleSetCache[service id=" + serviceIdentifier + "]"; } - + @Override public void shutdown() throws IOException { } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/additionalDetails.html b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/additionalDetails.html index ad9822173f..740abecf23 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/additionalDetails.html +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/resources/docs/org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/additionalDetails.html @@ -1,36 +1,36 @@ - - - -Distributed Map Cache Client Service - - + + + + Distributed Map Cache Client Service + + - -

- Below is an example of how to create a distributed map cache server for clients to connect to. - Note that the identifier in this example is cache-server. If you are using this template - to create your own DistributedMapCache server, replace the values in this template with values that are - suitable for your system. Possible options for Port, Maximum Cache Entries, - Eviction Strategy, SSL Context Service, and - Persistence Directory -

+ +

+ Below is an example of how to create a distributed map cache server for clients to connect to. + Note that the identifier in this example is cache-server. If you are using this template + to create your own DistributedMapCache server, replace the values in this template with values that are + suitable for your system. Possible options for Port, Maximum Cache Entries, + Eviction Strategy, SSL Context Service, and + Persistence Directory +

-
+        
 <?xml version="1.0" encoding="UTF-8" ?>
 <services>
     <service>
@@ -41,6 +41,6 @@
         <property name="Eviction Strategy">Least Recently Used</property>
     </service>
 </services>
-	
- +
+ diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java index dfad5a29b9..42698b8c8d 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java @@ -152,7 +152,7 @@ public class TestServerAndClient { newServer.shutdownServer(); } - @Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel") + @Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel") @Test public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException { LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); @@ -215,7 +215,7 @@ public class TestServerAndClient { newServer.shutdownServer(); } - @Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel") + @Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel") @Test public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException { LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); @@ -374,8 +374,7 @@ public class TestServerAndClient { public void testClientTermination() throws InitializationException, IOException, InterruptedException { /** - * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug - * See: https://issues.apache.org/jira/browse/NIFI-437 + * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug See: https://issues.apache.org/jira/browse/NIFI-437 */ Assume.assumeFalse("testClientTermination is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); @@ -509,6 +508,7 @@ public class TestServerAndClient { } private static class StringSerializer implements Serializer { + @Override public void serialize(final String value, final OutputStream output) throws SerializationException, IOException { output.write(value.getBytes(StandardCharsets.UTF_8)); @@ -516,6 +516,7 @@ public class TestServerAndClient { } private static class StringDeserializer implements Deserializer { + @Override public String deserialize(final byte[] input) throws DeserializationException, IOException { return (input.length == 0) ? null : new String(input, StandardCharsets.UTF_8); diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-api/pom.xml b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-api/pom.xml index bb3f3660cc..2f87d46ff3 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-api/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-api/pom.xml @@ -14,24 +14,24 @@ limitations under the License. --> - 4.0.0 - - org.apache.nifi - nifi-standard-services - 0.1.0-incubating-SNAPSHOT - + 4.0.0 + + org.apache.nifi + nifi-standard-services + 0.1.0-incubating-SNAPSHOT + - nifi-http-context-map-api + nifi-http-context-map-api - - - org.apache.nifi - nifi-api - - - javax.servlet - javax.servlet-api - - + + + org.apache.nifi + nifi-api + + + javax.servlet + javax.servlet-api + + diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-api/src/main/java/org/apache/nifi/http/HttpContextMap.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-api/src/main/java/org/apache/nifi/http/HttpContextMap.java index 04ff6ce35e..0dcff031cc 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-api/src/main/java/org/apache/nifi/http/HttpContextMap.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-api/src/main/java/org/apache/nifi/http/HttpContextMap.java @@ -22,51 +22,48 @@ import javax.servlet.http.HttpServletResponse; import org.apache.nifi.controller.ControllerService; - /** *

- * An interface that provides the capability of receiving an HTTP servlet request in one component - * and responding to that request in another component. + * An interface that provides the capability of receiving an HTTP servlet request in one component and responding to that request in another component. *

- * + * *

- * The intended flow is for the component receiving the HTTP request to register the request, response, - * and AsyncContext with a particular identifier via the - * {@link #register(String, HttpServletRequest, HttpServletResponse, AsyncContext)} - * method. Another component is then able to obtain the response - * by providing that identifier to the {@link #getResponse(String)} method. After writing to the - * HttpServletResponse, the transaction is to then be completed via the {@link #complete(String)} method. + * The intended flow is for the component receiving the HTTP request to register the request, response, and AsyncContext with a particular identifier via the + * {@link #register(String, HttpServletRequest, HttpServletResponse, AsyncContext)} method. Another component is then able to obtain the response by providing that identifier to the + * {@link #getResponse(String)} method. After writing to the HttpServletResponse, the transaction is to then be completed via the {@link #complete(String)} method. *

*/ public interface HttpContextMap extends ControllerService { /** * Registers an HttpServletRequest, HttpServletResponse, and the AsyncContext for a given identifier - * - * @param identifier - * @param request - * @param response - * @param context - * - * @return true if register is successful, false if the context map is too full because too many requests have already been received and not processed - * + * + * @param identifier identifier + * @param request request + * @param response response + * @param context context + * + * @return true if register is successful, false if the context map is too full because too many requests have already been received and not processed + * * @throws IllegalStateException if the identifier is already registered */ boolean register(String identifier, HttpServletRequest request, HttpServletResponse response, AsyncContext context); - + /** * Retrieves the HttpServletResponse for the given identifier, if it exists - * @param identifier + * + * @param identifier identifier * @return the HttpServletResponse for the given identifier, or {@code null} if it does not exist */ HttpServletResponse getResponse(String identifier); - + /** * Marks the HTTP request/response for the given identifier as complete - * @param identifier - * + * + * @param identifier identifier + * * @throws IllegalStateException if the identifier is not registered to a valid AsyncContext */ void complete(String identifier); - + } diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/pom.xml b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/pom.xml index cf4b3cbc17..eb2abdd317 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/pom.xml @@ -24,21 +24,21 @@ jar - - org.apache.nifi - nifi-api - - - org.apache.nifi - nifi-processor-utils - + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + org.apache.nifi nifi-http-context-map-api - javax.servlet - javax.servlet-api + javax.servlet + javax.servlet-api diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java index 5e312706bb..bd3e86696a 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/java/org/apache/nifi/http/StandardHttpContextMap.java @@ -42,34 +42,35 @@ import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.processor.util.StandardValidators; @Tags({"http", "request", "response"}) -@SeeAlso(classNames={ - "org.apache.nifi.processors.standard.HandleHttpRequest", - "org.apache.nifi.processors.standard.HandleHttpResponse"}) +@SeeAlso(classNames = { + "org.apache.nifi.processors.standard.HandleHttpRequest", + "org.apache.nifi.processors.standard.HandleHttpResponse"}) @CapabilityDescription("Provides the ability to store and retrieve HTTP requests and responses external to a Processor, so that " + "multiple Processors can interact with the same HTTP request.") public class StandardHttpContextMap extends AbstractControllerService implements HttpContextMap { + public static final PropertyDescriptor MAX_OUTSTANDING_REQUESTS = new PropertyDescriptor.Builder() - .name("Maximum Outstanding Requests") - .description("The maximum number of HTTP requests that can be outstanding at any one time. Any attempt to register an additional HTTP Request will cause an error") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("5000") - .build(); + .name("Maximum Outstanding Requests") + .description("The maximum number of HTTP requests that can be outstanding at any one time. Any attempt to register an additional HTTP Request will cause an error") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("5000") + .build(); public static final PropertyDescriptor REQUEST_EXPIRATION = new PropertyDescriptor.Builder() - .name("Request Expiration") - .description("Specifies how long an HTTP Request should be left unanswered before being evicted from the cache and being responded to with a Service Unavailable status code") - .required(true) - .expressionLanguageSupported(false) - .defaultValue("1 min") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - + .name("Request Expiration") + .description("Specifies how long an HTTP Request should be left unanswered before being evicted from the cache and being responded to with a Service Unavailable status code") + .required(true) + .expressionLanguageSupported(false) + .defaultValue("1 min") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + private final ConcurrentMap wrapperMap = new ConcurrentHashMap<>(); - + private volatile int maxSize = 5000; private volatile long maxRequestNanos; private volatile ScheduledExecutorService executor; - + @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(2); @@ -77,67 +78,68 @@ public class StandardHttpContextMap extends AbstractControllerService implements properties.add(REQUEST_EXPIRATION); return properties; } - + @OnEnabled public void onConfigured(final ConfigurationContext context) { maxSize = context.getProperty(MAX_OUTSTANDING_REQUESTS).asInteger(); executor = Executors.newSingleThreadScheduledExecutor(); - + maxRequestNanos = context.getProperty(REQUEST_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS); final long scheduleNanos = maxRequestNanos / 2; executor.scheduleWithFixedDelay(new CleanupExpiredRequests(), scheduleNanos, scheduleNanos, TimeUnit.NANOSECONDS); } - + @OnDisabled public void cleanup() { - if ( executor != null ) { + if (executor != null) { executor.shutdown(); } } - + @Override public boolean register(final String identifier, final HttpServletRequest request, final HttpServletResponse response, final AsyncContext context) { // fail if there are too many already. Maybe add a configuration property for how many // outstanding, with a default of say 5000 - if ( wrapperMap.size() >= maxSize ) { - return false; + if (wrapperMap.size() >= maxSize) { + return false; } final Wrapper wrapper = new Wrapper(request, response, context); final Wrapper existing = wrapperMap.putIfAbsent(identifier, wrapper); - if ( existing != null ) { + if (existing != null) { throw new IllegalStateException("HTTP Request already registered with identifier " + identifier); } - - return true; + + return true; } @Override public HttpServletResponse getResponse(final String identifier) { final Wrapper wrapper = wrapperMap.get(identifier); - if ( wrapper == null ) { + if (wrapper == null) { return null; } - + return wrapper.getResponse(); } @Override public void complete(final String identifier) { final Wrapper wrapper = wrapperMap.remove(identifier); - if ( wrapper == null ) { + if (wrapper == null) { throw new IllegalStateException("No HTTP Request registered with identifier " + identifier); } - + wrapper.getAsync().complete(); } private static class Wrapper { + @SuppressWarnings("unused") private final HttpServletRequest request; private final HttpServletResponse response; private final AsyncContext async; private final long nanoTimeAdded = System.nanoTime(); - + public Wrapper(final HttpServletRequest request, final HttpServletResponse response, final AsyncContext async) { this.request = request; this.response = response; @@ -151,24 +153,25 @@ public class StandardHttpContextMap extends AbstractControllerService implements public AsyncContext getAsync() { return async; } - + public long getNanoTimeAdded() { return nanoTimeAdded; } } - + private class CleanupExpiredRequests implements Runnable { + @Override public void run() { final long now = System.nanoTime(); final long threshold = now - maxRequestNanos; - + final Iterator> itr = wrapperMap.entrySet().iterator(); - while ( itr.hasNext() ) { + while (itr.hasNext()) { final Map.Entry entry = itr.next(); - if ( entry.getValue().getNanoTimeAdded() < threshold ) { + if (entry.getValue().getNanoTimeAdded() < threshold) { itr.remove(); - + // send SERVICE_UNAVAILABLE try { final AsyncContext async = entry.getValue().getAsync(); diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/resources/docs/org.apache.nifi.http.StandardHttpContextMap/index.html b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/resources/docs/org.apache.nifi.http.StandardHttpContextMap/index.html index 5c8b83a5de..774c3d9edf 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/resources/docs/org.apache.nifi.http.StandardHttpContextMap/index.html +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-http-context-map-bundle/nifi-http-context-map/src/main/resources/docs/org.apache.nifi.http.StandardHttpContextMap/index.html @@ -22,15 +22,15 @@ -

Description:

-

- This is the standard implementation of the SSL Context Map. This service is used to provide - coordination between - HandleHttpRequest - and - HandleHttpResponse - Processors. -

+

Description:

+

+ This is the standard implementation of the SSL Context Map. This service is used to provide + coordination between + HandleHttpRequest + and + HandleHttpResponse + Processors. +

Configuring the HTTP Context Map:

@@ -40,9 +40,9 @@

- This controller service exposes a single property named Maximum Outstanding Requests. - This property determines the maximum number of HTTP requests that can be outstanding at any one time. - Any attempt to register an additional HTTP Request will cause an error. The default value is 5000. + This controller service exposes a single property named Maximum Outstanding Requests. + This property determines the maximum number of HTTP requests that can be outstanding at any one time. + Any attempt to register an additional HTTP Request will cause an error. The default value is 5000. Below is an example of the template for a StandardHttpContextMap controller service.

@@ -56,12 +56,12 @@ </service> </services> - +

- See Also:
- HandleHttpRequest
- HandleHttpResponse
-

- + See Also:
+ HandleHttpRequest
+ HandleHttpResponse
+

+ diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java index 34f18441be..cde71da5ce 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/main/java/org/apache/nifi/ssl/StandardSSLContextService.java @@ -205,7 +205,7 @@ public class StandardSSLContextService extends AbstractControllerService impleme } return results; } - + private void verifySslConfig(final ValidationContext validationContext) throws ProcessException { try { final String keystoreFile = validationContext.getProperty(KEYSTORE).getValue(); @@ -237,7 +237,6 @@ public class StandardSSLContextService extends AbstractControllerService impleme throw new ProcessException(e); } } - @Override public SSLContext createSSLContext(final ClientAuth clientAuth) throws ProcessException { diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/java/org/apache/nifi/ssl/SSLContextServiceTest.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/java/org/apache/nifi/ssl/SSLContextServiceTest.java index 7d191fb9ad..1e22deed81 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/java/org/apache/nifi/ssl/SSLContextServiceTest.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-ssl-context-bundle/nifi-ssl-context-service/src/test/java/org/apache/nifi/ssl/SSLContextServiceTest.java @@ -73,7 +73,7 @@ public class SSLContextServiceTest { properties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "wrongpassword"); properties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); runner.addControllerService("test-bad4", service, properties); - + runner.assertNotValid(service); } @@ -126,7 +126,7 @@ public class SSLContextServiceTest { properties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); runner.addControllerService("test-good2", service, properties); runner.enableControllerService(service); - + runner.setProperty("SSL Context Svc ID", "test-good2"); runner.assertValid(); Assert.assertNotNull(service);