This commit is contained in:
joewitt 2015-04-27 13:43:35 -04:00
parent 6a706458d0
commit 9a3b6bed62
34 changed files with 460 additions and 434 deletions

View File

@ -27,20 +27,20 @@ import javax.net.ssl.SSLContext;
public interface CommsSession extends Closeable { public interface CommsSession extends Closeable {
void setTimeout(final long value, final TimeUnit timeUnit); void setTimeout(final long value, final TimeUnit timeUnit);
InputStream getInputStream() throws IOException; InputStream getInputStream() throws IOException;
OutputStream getOutputStream() throws IOException; OutputStream getOutputStream() throws IOException;
boolean isClosed(); boolean isClosed();
void interrupt(); void interrupt();
String getHostname(); String getHostname();
int getPort(); int getPort();
long getTimeout(TimeUnit timeUnit); long getTimeout(TimeUnit timeUnit);
SSLContext getSSLContext(); SSLContext getSSLContext();
} }

View File

@ -42,7 +42,7 @@ import org.apache.nifi.stream.io.DataOutputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@SeeAlso(classNames={"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"})
@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map " @CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map "
+ "between nodes in a NiFi cluster") + "between nodes in a NiFi cluster")
public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient { public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
@ -65,14 +65,14 @@ public class DistributedMapCacheClientService extends AbstractControllerService
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service") .name("SSL Context Service")
.description("If specified, indicates the SSL Context Service that is used to communicate with the " .description("If specified, indicates the SSL Context Service that is used to communicate with the "
+ "remote server. If not specified, communications will not be encrypted") + "remote server. If not specified, communications will not be encrypted")
.required(false) .required(false)
.identifiesControllerService(SSLContextService.class) .identifiesControllerService(SSLContextService.class)
.build(); .build();
public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder() public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout") .name("Communications Timeout")
.description("Specifies how long to wait when communicating with the remote server before determining that " .description("Specifies how long to wait when communicating with the remote server before determining that "
+ "there is a communications failure if data cannot be sent or received") + "there is a communications failure if data cannot be sent or received")
.required(true) .required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 secs") .defaultValue("30 secs")
@ -299,6 +299,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
} }
private static interface CommsAction<T> { private static interface CommsAction<T> {
T execute(CommsSession commsSession) throws IOException; T execute(CommsSession commsSession) throws IOException;
} }

View File

@ -42,7 +42,7 @@ import org.apache.nifi.stream.io.DataOutputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@SeeAlso(classNames={"org.apache.nifi.distributed.cache.server.DistributedSetCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.DistributedSetCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"})
@CapabilityDescription("Provides the ability to communicate with a DistributedSetCacheServer. This can be used in order to share a Set " @CapabilityDescription("Provides the ability to communicate with a DistributedSetCacheServer. This can be used in order to share a Set "
+ "between nodes in a NiFi cluster") + "between nodes in a NiFi cluster")
public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient { public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient {
@ -65,14 +65,14 @@ public class DistributedSetCacheClientService extends AbstractControllerService
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service") .name("SSL Context Service")
.description("If specified, indicates the SSL Context Service that is used to communicate with the " .description("If specified, indicates the SSL Context Service that is used to communicate with the "
+ "remote server. If not specified, communications will not be encrypted") + "remote server. If not specified, communications will not be encrypted")
.required(false) .required(false)
.identifiesControllerService(SSLContextService.class) .identifiesControllerService(SSLContextService.class)
.build(); .build();
public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder() public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout") .name("Communications Timeout")
.description("Specifices how long to wait when communicating with the remote server before determining " .description("Specifices how long to wait when communicating with the remote server before determining "
+ "that there is a communications failure if data cannot be sent or received") + "that there is a communications failure if data cannot be sent or received")
.required(true) .required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 secs") .defaultValue("30 secs")

View File

@ -30,36 +30,37 @@ import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
public class SSLCommsSession implements CommsSession { public class SSLCommsSession implements CommsSession {
private final SSLSocketChannel sslSocketChannel; private final SSLSocketChannel sslSocketChannel;
private final SSLContext sslContext; private final SSLContext sslContext;
private final String hostname; private final String hostname;
private final int port; private final int port;
private final SSLSocketChannelInputStream in; private final SSLSocketChannelInputStream in;
private final BufferedInputStream bufferedIn; private final BufferedInputStream bufferedIn;
private final SSLSocketChannelOutputStream out; private final SSLSocketChannelOutputStream out;
private final BufferedOutputStream bufferedOut; private final BufferedOutputStream bufferedOut;
public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException {
sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true); sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
in = new SSLSocketChannelInputStream(sslSocketChannel); in = new SSLSocketChannelInputStream(sslSocketChannel);
bufferedIn = new BufferedInputStream(in); bufferedIn = new BufferedInputStream(in);
out = new SSLSocketChannelOutputStream(sslSocketChannel); out = new SSLSocketChannelOutputStream(sslSocketChannel);
bufferedOut = new BufferedOutputStream(out); bufferedOut = new BufferedOutputStream(out);
this.sslContext = sslContext; this.sslContext = sslContext;
this.hostname = hostname; this.hostname = hostname;
this.port = port; this.port = port;
} }
@Override @Override
public void interrupt() { public void interrupt() {
sslSocketChannel.interrupt(); sslSocketChannel.interrupt();
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
sslSocketChannel.close(); sslSocketChannel.close();
@ -84,23 +85,25 @@ public class SSLCommsSession implements CommsSession {
public boolean isClosed() { public boolean isClosed() {
return sslSocketChannel.isClosed(); return sslSocketChannel.isClosed();
} }
@Override @Override
public String getHostname() { public String getHostname() {
return hostname; return hostname;
} }
@Override @Override
public int getPort() { public int getPort() {
return port; return port;
} }
@Override @Override
public SSLContext getSSLContext() { public SSLContext getSSLContext() {
return sslContext; return sslContext;
} }
@Override @Override
public long getTimeout(final TimeUnit timeUnit) { public long getTimeout(final TimeUnit timeUnit) {
return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS); return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS);
} }
} }

View File

@ -33,6 +33,7 @@ import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
import org.apache.nifi.remote.io.socket.SocketChannelOutputStream; import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
public class StandardCommsSession implements CommsSession { public class StandardCommsSession implements CommsSession {
private final SocketChannel socketChannel; private final SocketChannel socketChannel;
private final String hostname; private final String hostname;
private final int port; private final int port;

View File

@ -1,35 +1,35 @@
<!DOCTYPE html> <!DOCTYPE html>
<html lang="en"> <html lang="en">
<!-- <!--
Licensed to the Apache Software Foundation (ASF) under one or more Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership. this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0 The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
--> -->
<head> <head>
<meta charset="utf-8" /> <meta charset="utf-8" />
<title>Distributed Map Cache Client Service</title> <title>Distributed Map Cache Client Service</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head> </head>
<body> <body>
<p> <p>
Below is an example of how to create a client connection to your distributed map cache server. Below is an example of how to create a client connection to your distributed map cache server.
Note that the identifier in this example is <code>cache-client</code>. If you are using this template Note that the identifier in this example is <code>cache-client</code>. If you are using this template
to create your own MapCacheClient service, replace the values in this template with values that are to create your own MapCacheClient service, replace the values in this template with values that are
suitable for your system. Possible options for <code>Server Hostname</code>, <code>Server Port</code>, suitable for your system. Possible options for <code>Server Hostname</code>, <code>Server Port</code>,
<code>Communications Timeout</code>, and <span style="font-style: italic;">SSL Context Service</span>. <code>Communications Timeout</code>, and <span style="font-style: italic;">SSL Context Service</span>.
</p> </p>
<pre> <pre>
&lt;?xml version="1.0" encoding="UTF-8" ?&gt; &lt;?xml version="1.0" encoding="UTF-8" ?&gt;
&lt;services&gt; &lt;services&gt;
&lt;service&gt; &lt;service&gt;
@ -40,6 +40,6 @@
&lt;property name="Communications Timeout"&gt;30 secs&lt;/property&gt; &lt;property name="Communications Timeout"&gt;30 secs&lt;/property&gt;
&lt;/service&gt; &lt;/service&gt;
&lt;/services&gt; &lt;/services&gt;
</pre> </pre>
</body> </body>
</html> </html>

View File

@ -51,7 +51,8 @@ public abstract class AbstractCacheServer implements CacheServer {
private final int port; private final int port;
private final SSLContext sslContext; private final SSLContext sslContext;
protected volatile boolean stopped = false; protected volatile boolean stopped = false;
private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();; private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();
;
private volatile ServerSocketChannel serverSocketChannel; private volatile ServerSocketChannel serverSocketChannel;
@ -75,7 +76,7 @@ public abstract class AbstractCacheServer implements CacheServer {
final SocketChannel socketChannel; final SocketChannel socketChannel;
try { try {
socketChannel = serverSocketChannel.accept(); socketChannel = serverSocketChannel.accept();
logger.debug("Connected to {}", new Object[] { socketChannel }); logger.debug("Connected to {}", new Object[]{socketChannel});
} catch (final IOException e) { } catch (final IOException e) {
if (!stopped) { if (!stopped) {
logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString()); logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString());
@ -104,7 +105,7 @@ public abstract class AbstractCacheServer implements CacheServer {
rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel); rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel);
} }
} catch (IOException e) { } catch (IOException e) {
logger.error("Cannot create input and/or output streams for {}", new Object[] { identifier }, e); logger.error("Cannot create input and/or output streams for {}", new Object[]{identifier}, e);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.error("", e); logger.error("", e);
} }
@ -112,7 +113,7 @@ public abstract class AbstractCacheServer implements CacheServer {
socketChannel.close(); socketChannel.close();
} catch (IOException swallow) { } catch (IOException swallow) {
} }
return; return;
} }
try (final InputStream in = new BufferedInputStream(rawInputStream); try (final InputStream in = new BufferedInputStream(rawInputStream);
@ -127,12 +128,12 @@ public abstract class AbstractCacheServer implements CacheServer {
continueComms = listen(in, out, versionNegotiator.getVersion()); continueComms = listen(in, out, versionNegotiator.getVersion());
} }
// client has issued 'close' // client has issued 'close'
logger.debug("Client issued close on {}", new Object[] { socketChannel }); logger.debug("Client issued close on {}", new Object[]{socketChannel});
} catch (final SocketTimeoutException e) { } catch (final SocketTimeoutException e) {
logger.debug("30 sec timeout reached", e); logger.debug("30 sec timeout reached", e);
} catch (final IOException | HandshakeException e) { } catch (final IOException | HandshakeException e) {
if (!stopped) { if (!stopped) {
logger.error("{} unable to communicate with remote peer {} due to {}", new Object[] { this, peer, e.toString() }); logger.error("{} unable to communicate with remote peer {} due to {}", new Object[]{this, peer, e.toString()});
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.error("", e); logger.error("", e);
} }
@ -161,7 +162,7 @@ public abstract class AbstractCacheServer implements CacheServer {
@Override @Override
public void stop() throws IOException { public void stop() throws IOException {
stopped = true; stopped = true;
logger.info("Stopping CacheServer {}", new Object[] { this.identifier }); logger.info("Stopping CacheServer {}", new Object[]{this.identifier});
if (serverSocketChannel != null && serverSocketChannel.isOpen()) { if (serverSocketChannel != null && serverSocketChannel.isOpen()) {
serverSocketChannel.close(); serverSocketChannel.close();
@ -188,12 +189,12 @@ public abstract class AbstractCacheServer implements CacheServer {
/** /**
* Listens for incoming data and communicates with remote peer * Listens for incoming data and communicates with remote peer
* *
* @param in * @param in in
* @param out * @param out out
* @param version * @param version version
* @return <code>true</code> if communications should continue, <code>false</code> otherwise * @return <code>true</code> if communications should continue, <code>false</code> otherwise
* @throws IOException * @throws IOException ex
*/ */
protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException; protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException;
} }

View File

@ -22,26 +22,26 @@ import java.util.concurrent.atomic.AtomicLong;
public class CacheRecord { public class CacheRecord {
private static final AtomicLong idGenerator = new AtomicLong(0L); private static final AtomicLong idGenerator = new AtomicLong(0L);
private final long id; private final long id;
private final long entryDate; private final long entryDate;
private volatile long lastHitDate; private volatile long lastHitDate;
private final AtomicInteger hitCount = new AtomicInteger(0); private final AtomicInteger hitCount = new AtomicInteger(0);
public CacheRecord() { public CacheRecord() {
entryDate = System.currentTimeMillis(); entryDate = System.currentTimeMillis();
lastHitDate = entryDate; lastHitDate = entryDate;
id = idGenerator.getAndIncrement(); id = idGenerator.getAndIncrement();
} }
public long getEntryDate() { public long getEntryDate() {
return entryDate; return entryDate;
} }
public long getLastHitDate() { public long getLastHitDate() {
return lastHitDate; return lastHitDate;
} }
public int getHitCount() { public int getHitCount() {
return hitCount.get(); return hitCount.get();
} }
@ -50,7 +50,7 @@ public class CacheRecord {
hitCount.getAndIncrement(); hitCount.getAndIncrement();
lastHitDate = System.currentTimeMillis(); lastHitDate = System.currentTimeMillis();
} }
public long getId() { public long getId() {
return id; return id;
} }

View File

@ -21,6 +21,7 @@ import java.io.IOException;
public interface CacheServer { public interface CacheServer {
void start() throws IOException; void start() throws IOException;
void stop() throws IOException; void stop() throws IOException;
} }

View File

@ -29,6 +29,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
public abstract class DistributedCacheServer extends AbstractControllerService { public abstract class DistributedCacheServer extends AbstractControllerService {
public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used"; public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used";
public static final String EVICTION_STRATEGY_LRU = "Least Recently Used"; public static final String EVICTION_STRATEGY_LRU = "Least Recently Used";
public static final String EVICTION_STRATEGY_FIFO = "First In, First Out"; public static final String EVICTION_STRATEGY_FIFO = "First In, First Out";
@ -43,7 +44,7 @@ public abstract class DistributedCacheServer extends AbstractControllerService {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service") .name("SSL Context Service")
.description("If specified, this service will be used to create an SSL Context that will be used " .description("If specified, this service will be used to create an SSL Context that will be used "
+ "to secure communications; if not specified, communications will not be secure") + "to secure communications; if not specified, communications will not be secure")
.required(false) .required(false)
.identifiesControllerService(SSLContextService.class) .identifiesControllerService(SSLContextService.class)
.build(); .build();

View File

@ -25,6 +25,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth; import org.apache.nifi.ssl.SSLContextService.ClientAuth;
@Tags({"distributed", "set", "distinct", "cache", "server"}) @Tags({"distributed", "set", "distinct", "cache", "server"})
@CapabilityDescription("Provides a set (collection of unique values) cache that can be accessed over a socket. " @CapabilityDescription("Provides a set (collection of unique values) cache that can be accessed over a socket. "
+ "Interaction with this service is typically accomplished via a DistributedSetCacheClient service.") + "Interaction with this service is typically accomplished via a DistributedSetCacheClient service.")
@ -37,14 +38,14 @@ public class DistributedSetCacheServer extends DistributedCacheServer {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger(); final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue(); final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
final SSLContext sslContext; final SSLContext sslContext;
if ( sslContextService == null ) { if (sslContextService == null) {
sslContext = null; sslContext = null;
} else { } else {
sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
} }
final EvictionPolicy evictionPolicy; final EvictionPolicy evictionPolicy;
switch (evictionPolicyName) { switch (evictionPolicyName) {
case EVICTION_STRATEGY_FIFO: case EVICTION_STRATEGY_FIFO:
@ -59,14 +60,14 @@ public class DistributedSetCacheServer extends DistributedCacheServer {
default: default:
throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName); throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName);
} }
try { try {
final File persistenceDir = persistencePath == null ? null : new File(persistencePath); final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir); return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
} catch (final Exception e) { } catch (final Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
} }

View File

@ -19,37 +19,40 @@ package org.apache.nifi.distributed.cache.server;
import java.util.Comparator; import java.util.Comparator;
public enum EvictionPolicy { public enum EvictionPolicy {
LFU(new LFUComparator()), LFU(new LFUComparator()),
LRU(new LRUComparator()), LRU(new LRUComparator()),
FIFO(new FIFOComparator()); FIFO(new FIFOComparator());
private final Comparator<CacheRecord> comparator; private final Comparator<CacheRecord> comparator;
private EvictionPolicy(final Comparator<CacheRecord> comparator) { private EvictionPolicy(final Comparator<CacheRecord> comparator) {
this.comparator = comparator; this.comparator = comparator;
} }
public Comparator<CacheRecord> getComparator() { public Comparator<CacheRecord> getComparator() {
return comparator; return comparator;
} }
public static class LFUComparator implements Comparator<CacheRecord> { public static class LFUComparator implements Comparator<CacheRecord> {
@Override @Override
public int compare(final CacheRecord o1, final CacheRecord o2) { public int compare(final CacheRecord o1, final CacheRecord o2) {
if ( o1.equals(o2) ) { if (o1.equals(o2)) {
return 0; return 0;
} }
final int hitCountComparison = Integer.compare(o1.getHitCount(), o2.getHitCount()); final int hitCountComparison = Integer.compare(o1.getHitCount(), o2.getHitCount());
final int entryDateComparison = (hitCountComparison == 0) ? Long.compare(o1.getEntryDate(), o2.getEntryDate()) : hitCountComparison; final int entryDateComparison = (hitCountComparison == 0) ? Long.compare(o1.getEntryDate(), o2.getEntryDate()) : hitCountComparison;
return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison); return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison);
} }
} }
public static class LRUComparator implements Comparator<CacheRecord> { public static class LRUComparator implements Comparator<CacheRecord> {
@Override @Override
public int compare(final CacheRecord o1, final CacheRecord o2) { public int compare(final CacheRecord o1, final CacheRecord o2) {
if ( o1.equals(o2) ) { if (o1.equals(o2)) {
return 0; return 0;
} }
@ -57,11 +60,12 @@ public enum EvictionPolicy {
return (lastHitDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : lastHitDateComparison); return (lastHitDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : lastHitDateComparison);
} }
} }
public static class FIFOComparator implements Comparator<CacheRecord> { public static class FIFOComparator implements Comparator<CacheRecord> {
@Override @Override
public int compare(final CacheRecord o1, final CacheRecord o2) { public int compare(final CacheRecord o1, final CacheRecord o2) {
if ( o1.equals(o2) ) { if (o1.equals(o2)) {
return 0; return 0;
} }

View File

@ -67,17 +67,17 @@ public class SetCacheServer extends AbstractCacheServer {
final SetCacheResult response; final SetCacheResult response;
switch (action) { switch (action) {
case "addIfAbsent": case "addIfAbsent":
response = cache.addIfAbsent(valueBuffer); response = cache.addIfAbsent(valueBuffer);
break; break;
case "contains": case "contains":
response = cache.contains(valueBuffer); response = cache.contains(valueBuffer);
break; break;
case "remove": case "remove":
response = cache.remove(valueBuffer); response = cache.remove(valueBuffer);
break; break;
default: default:
throw new IOException("IllegalRequest"); throw new IOException("IllegalRequest");
} }
dos.writeBoolean(response.getResult()); dos.writeBoolean(response.getResult());
@ -97,8 +97,9 @@ public class SetCacheServer extends AbstractCacheServer {
@Override @Override
protected void finalize() throws Throwable { protected void finalize() throws Throwable {
if (!stopped) if (!stopped) {
stop(); stop();
}
} }
} }

View File

@ -33,7 +33,7 @@ import org.apache.nifi.ssl.SSLContextService.ClientAuth;
@Tags({"distributed", "cluster", "map", "cache", "server", "key/value"}) @Tags({"distributed", "cluster", "map", "cache", "server", "key/value"})
@CapabilityDescription("Provides a map (key/value) cache that can be accessed over a socket. Interaction with this service" @CapabilityDescription("Provides a map (key/value) cache that can be accessed over a socket. Interaction with this service"
+ " is typically accomplished via a DistributedMapCacheClient service.") + " is typically accomplished via a DistributedMapCacheClient service.")
@SeeAlso(classNames={"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.ssl.StandardSSLContextService"}) @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.ssl.StandardSSLContextService"})
public class DistributedMapCacheServer extends DistributedCacheServer { public class DistributedMapCacheServer extends DistributedCacheServer {
@Override @Override
@ -43,14 +43,14 @@ public class DistributedMapCacheServer extends DistributedCacheServer {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger(); final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue(); final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue();
final SSLContext sslContext; final SSLContext sslContext;
if ( sslContextService == null ) { if (sslContextService == null) {
sslContext = null; sslContext = null;
} else { } else {
sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
} }
final EvictionPolicy evictionPolicy; final EvictionPolicy evictionPolicy;
switch (evictionPolicyName) { switch (evictionPolicyName) {
case EVICTION_STRATEGY_FIFO: case EVICTION_STRATEGY_FIFO:
@ -65,10 +65,10 @@ public class DistributedMapCacheServer extends DistributedCacheServer {
default: default:
throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName); throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName);
} }
try { try {
final File persistenceDir = persistencePath == null ? null : new File(persistencePath); final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir); return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
} catch (final Exception e) { } catch (final Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -22,8 +22,12 @@ import java.nio.ByteBuffer;
public interface MapCache { public interface MapCache {
MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException; MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException;
boolean containsKey(ByteBuffer key) throws IOException; boolean containsKey(ByteBuffer key) throws IOException;
ByteBuffer get(ByteBuffer key) throws IOException; ByteBuffer get(ByteBuffer key) throws IOException;
ByteBuffer remove(ByteBuffer key) throws IOException; ByteBuffer remove(ByteBuffer key) throws IOException;
void shutdown() throws IOException; void shutdown() throws IOException;
} }

View File

@ -21,38 +21,39 @@ import java.nio.ByteBuffer;
import org.apache.nifi.distributed.cache.server.CacheRecord; import org.apache.nifi.distributed.cache.server.CacheRecord;
public class MapCacheRecord extends CacheRecord { public class MapCacheRecord extends CacheRecord {
private final ByteBuffer key; private final ByteBuffer key;
private final ByteBuffer value; private final ByteBuffer value;
public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) { public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) {
this.key = key; this.key = key;
this.value = value; this.value = value;
} }
public ByteBuffer getKey() { public ByteBuffer getKey() {
return key; return key;
} }
public ByteBuffer getValue() { public ByteBuffer getValue() {
return value; return value;
} }
@Override @Override
public int hashCode() { public int hashCode() {
return 2938476 + key.hashCode() * value.hashCode(); return 2938476 + key.hashCode() * value.hashCode();
} }
@Override @Override
public boolean equals(final Object obj) { public boolean equals(final Object obj) {
if ( obj == this ) { if (obj == this) {
return true; return true;
} }
if ( obj instanceof MapCacheRecord ) { if (obj instanceof MapCacheRecord) {
final MapCacheRecord that = ((MapCacheRecord) obj); final MapCacheRecord that = ((MapCacheRecord) obj);
return key.equals(that.key) && value.equals(that.value); return key.equals(that.key) && value.equals(that.value);
} }
return false; return false;
} }
} }

View File

@ -55,63 +55,63 @@ public class MapCacheServer extends AbstractCacheServer {
final String action = dis.readUTF(); final String action = dis.readUTF();
try { try {
switch (action) { switch (action) {
case "close": { case "close": {
return false; return false;
}
case "putIfAbsent": {
final byte[] key = readValue(dis);
final byte[] value = readValue(dis);
final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
dos.writeBoolean(putResult.isSuccessful());
break;
}
case "containsKey": {
final byte[] key = readValue(dis);
final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
dos.writeBoolean(contains);
break;
}
case "getAndPutIfAbsent": {
final byte[] key = readValue(dis);
final byte[] value = readValue(dis);
final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
if (putResult.isSuccessful()) {
// Put was successful. There was no old value to get.
dos.writeInt(0);
} else {
// we didn't put. Write back the previous value
final byte[] byteArray = putResult.getExistingValue().array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
} }
case "putIfAbsent": {
break; final byte[] key = readValue(dis);
} final byte[] value = readValue(dis);
case "get": { final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
final byte[] key = readValue(dis); dos.writeBoolean(putResult.isSuccessful());
final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key)); break;
if (existingValue == null) {
// there was no existing value; we did a "put".
dos.writeInt(0);
} else {
// a value already existed. we did not update the map
final byte[] byteArray = existingValue.array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
} }
case "containsKey": {
final byte[] key = readValue(dis);
final boolean contains = cache.containsKey(ByteBuffer.wrap(key));
dos.writeBoolean(contains);
break;
}
case "getAndPutIfAbsent": {
final byte[] key = readValue(dis);
final byte[] value = readValue(dis);
break; final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value));
} if (putResult.isSuccessful()) {
case "remove": { // Put was successful. There was no old value to get.
final byte[] key = readValue(dis); dos.writeInt(0);
final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null; } else {
dos.writeBoolean(removed); // we didn't put. Write back the previous value
break; final byte[] byteArray = putResult.getExistingValue().array();
} dos.writeInt(byteArray.length);
default: { dos.write(byteArray);
throw new IOException("Illegal Request"); }
}
break;
}
case "get": {
final byte[] key = readValue(dis);
final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
if (existingValue == null) {
// there was no existing value; we did a "put".
dos.writeInt(0);
} else {
// a value already existed. we did not update the map
final byte[] byteArray = existingValue.array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
}
break;
}
case "remove": {
final byte[] key = readValue(dis);
final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
dos.writeBoolean(removed);
break;
}
default: {
throw new IOException("Illegal Request");
}
} }
} finally { } finally {
dos.flush(); dos.flush();
@ -131,8 +131,9 @@ public class MapCacheServer extends AbstractCacheServer {
@Override @Override
protected void finalize() throws Throwable { protected void finalize() throws Throwable {
if (!stopped) if (!stopped) {
stop(); stop();
}
} }
private byte[] readValue(final DataInputStream dis) throws IOException { private byte[] readValue(final DataInputStream dis) throws IOException {

View File

@ -19,11 +19,12 @@ package org.apache.nifi.distributed.cache.server.map;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
public class MapPutResult { public class MapPutResult {
private final boolean successful; private final boolean successful;
private final ByteBuffer key, value; private final ByteBuffer key, value;
private final ByteBuffer existingValue; private final ByteBuffer existingValue;
private final ByteBuffer evictedKey, evictedValue; private final ByteBuffer evictedKey, evictedValue;
public MapPutResult(final boolean successful, final ByteBuffer key, final ByteBuffer value, final ByteBuffer existingValue, final ByteBuffer evictedKey, final ByteBuffer evictedValue) { public MapPutResult(final boolean successful, final ByteBuffer key, final ByteBuffer value, final ByteBuffer existingValue, final ByteBuffer evictedKey, final ByteBuffer evictedValue) {
this.successful = successful; this.successful = successful;
this.key = key; this.key = key;
@ -44,7 +45,7 @@ public class MapPutResult {
public ByteBuffer getValue() { public ByteBuffer getValue() {
return value; return value;
} }
public ByteBuffer getExistingValue() { public ByteBuffer getExistingValue() {
return existingValue; return existingValue;
} }

View File

@ -38,9 +38,9 @@ public class PersistentMapCache implements MapCache {
private final MapCache wrapped; private final MapCache wrapped;
private final WriteAheadRepository<MapWaliRecord> wali; private final WriteAheadRepository<MapWaliRecord> wali;
private final AtomicLong modifications = new AtomicLong(0L); private final AtomicLong modifications = new AtomicLong(0L);
public PersistentMapCache(final String serviceIdentifier, final File persistencePath, final MapCache cacheToWrap) throws IOException { public PersistentMapCache(final String serviceIdentifier, final File persistencePath, final MapCache cacheToWrap) throws IOException {
wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null); wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null);
wrapped = cacheToWrap; wrapped = cacheToWrap;
@ -48,8 +48,8 @@ public class PersistentMapCache implements MapCache {
synchronized void restore() throws IOException { synchronized void restore() throws IOException {
final Collection<MapWaliRecord> recovered = wali.recoverRecords(); final Collection<MapWaliRecord> recovered = wali.recoverRecords();
for ( final MapWaliRecord record : recovered ) { for (final MapWaliRecord record : recovered) {
if ( record.getUpdateType() == UpdateType.CREATE ) { if (record.getUpdateType() == UpdateType.CREATE) {
wrapped.putIfAbsent(record.getKey(), record.getValue()); wrapped.putIfAbsent(record.getKey(), record.getValue());
} }
} }
@ -58,24 +58,24 @@ public class PersistentMapCache implements MapCache {
@Override @Override
public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException { public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException {
final MapPutResult putResult = wrapped.putIfAbsent(key, value); final MapPutResult putResult = wrapped.putIfAbsent(key, value);
if ( putResult.isSuccessful() ) { if (putResult.isSuccessful()) {
// The put was successful. // The put was successful.
final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value); final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
final List<MapWaliRecord> records = new ArrayList<>(); final List<MapWaliRecord> records = new ArrayList<>();
records.add(record); records.add(record);
if ( putResult.getEvictedKey() != null ) { if (putResult.getEvictedKey() != null) {
records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue())); records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
} }
wali.update(Collections.singletonList(record), false); wali.update(Collections.singletonList(record), false);
final long modCount = modifications.getAndIncrement(); final long modCount = modifications.getAndIncrement();
if ( modCount > 0 && modCount % 100000 == 0 ) { if (modCount > 0 && modCount % 100000 == 0) {
wali.checkpoint(); wali.checkpoint();
} }
} }
return putResult; return putResult;
} }
@ -92,65 +92,64 @@ public class PersistentMapCache implements MapCache {
@Override @Override
public ByteBuffer remove(ByteBuffer key) throws IOException { public ByteBuffer remove(ByteBuffer key) throws IOException {
final ByteBuffer removeResult = wrapped.remove(key); final ByteBuffer removeResult = wrapped.remove(key);
if ( removeResult != null ) { if (removeResult != null) {
final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, key, removeResult); final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, key, removeResult);
final List<MapWaliRecord> records = new ArrayList<>(1); final List<MapWaliRecord> records = new ArrayList<>(1);
records.add(record); records.add(record);
wali.update(records, false); wali.update(records, false);
final long modCount = modifications.getAndIncrement(); final long modCount = modifications.getAndIncrement();
if ( modCount > 0 && modCount % 1000 == 0 ) { if (modCount > 0 && modCount % 1000 == 0) {
wali.checkpoint(); wali.checkpoint();
} }
} }
return removeResult; return removeResult;
} }
@Override @Override
public void shutdown() throws IOException { public void shutdown() throws IOException {
wali.shutdown(); wali.shutdown();
} }
private static class MapWaliRecord { private static class MapWaliRecord {
private final UpdateType updateType; private final UpdateType updateType;
private final ByteBuffer key; private final ByteBuffer key;
private final ByteBuffer value; private final ByteBuffer value;
public MapWaliRecord(final UpdateType updateType, final ByteBuffer key, final ByteBuffer value) { public MapWaliRecord(final UpdateType updateType, final ByteBuffer key, final ByteBuffer value) {
this.updateType = updateType; this.updateType = updateType;
this.key = key; this.key = key;
this.value = value; this.value = value;
} }
public UpdateType getUpdateType() { public UpdateType getUpdateType() {
return updateType; return updateType;
} }
public ByteBuffer getKey() { public ByteBuffer getKey() {
return key; return key;
} }
public ByteBuffer getValue() { public ByteBuffer getValue() {
return value; return value;
} }
} }
private static class Serde implements SerDe<MapWaliRecord> { private static class Serde implements SerDe<MapWaliRecord> {
@Override @Override
public void serializeEdit(MapWaliRecord previousRecordState, MapWaliRecord newRecordState, java.io.DataOutputStream out) throws IOException { public void serializeEdit(MapWaliRecord previousRecordState, MapWaliRecord newRecordState, java.io.DataOutputStream out) throws IOException {
final UpdateType updateType = newRecordState.getUpdateType(); final UpdateType updateType = newRecordState.getUpdateType();
if ( updateType == UpdateType.DELETE ) { if (updateType == UpdateType.DELETE) {
out.write(0); out.write(0);
} else { } else {
out.write(1); out.write(1);
} }
final byte[] key = newRecordState.getKey().array(); final byte[] key = newRecordState.getKey().array();
final byte[] value = newRecordState.getValue().array(); final byte[] value = newRecordState.getValue().array();
out.writeInt(key.length); out.writeInt(key.length);
out.write(key); out.write(key);
out.writeInt(value.length); out.writeInt(value.length);
@ -165,12 +164,12 @@ public class PersistentMapCache implements MapCache {
@Override @Override
public MapWaliRecord deserializeEdit(final DataInputStream in, final Map<Object, MapWaliRecord> currentRecordStates, final int version) throws IOException { public MapWaliRecord deserializeEdit(final DataInputStream in, final Map<Object, MapWaliRecord> currentRecordStates, final int version) throws IOException {
final int updateTypeValue = in.read(); final int updateTypeValue = in.read();
if ( updateTypeValue < 0 ) { if (updateTypeValue < 0) {
throw new EOFException(); throw new EOFException();
} }
final UpdateType updateType = (updateTypeValue == 0 ? UpdateType.DELETE : UpdateType.CREATE); final UpdateType updateType = (updateTypeValue == 0 ? UpdateType.DELETE : UpdateType.CREATE);
final int keySize = in.readInt(); final int keySize = in.readInt();
final byte[] key = new byte[keySize]; final byte[] key = new byte[keySize];
in.readFully(key); in.readFully(key);
@ -207,4 +206,4 @@ public class PersistentMapCache implements MapCache {
return 1; return 1;
} }
} }
} }

View File

@ -33,46 +33,47 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class SimpleMapCache implements MapCache { public class SimpleMapCache implements MapCache {
private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class); private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class);
private final Map<ByteBuffer, MapCacheRecord> cache = new HashMap<>(); private final Map<ByteBuffer, MapCacheRecord> cache = new HashMap<>();
private final SortedMap<MapCacheRecord, ByteBuffer> inverseCacheMap; private final SortedMap<MapCacheRecord, ByteBuffer> inverseCacheMap;
private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock(); private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock(); private final Lock writeLock = rwLock.writeLock();
private final String serviceIdentifier; private final String serviceIdentifier;
private final int maxSize; private final int maxSize;
public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) { public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) {
// need to change to ConcurrentMap as this is modified when only the readLock is held // need to change to ConcurrentMap as this is modified when only the readLock is held
inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator()); inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator());
this.serviceIdentifier = serviceIdentifier; this.serviceIdentifier = serviceIdentifier;
this.maxSize = maxSize; this.maxSize = maxSize;
} }
@Override @Override
public String toString() { public String toString() {
return "SimpleSetCache[service id=" + serviceIdentifier + "]"; return "SimpleSetCache[service id=" + serviceIdentifier + "]";
} }
// don't need synchronized because this method is only called when the writeLock is held, and all // don't need synchronized because this method is only called when the writeLock is held, and all
// public methods obtain either the read or write lock // public methods obtain either the read or write lock
private MapCacheRecord evict() { private MapCacheRecord evict() {
if ( cache.size() < maxSize ) { if (cache.size() < maxSize) {
return null; return null;
} }
final MapCacheRecord recordToEvict = inverseCacheMap.firstKey(); final MapCacheRecord recordToEvict = inverseCacheMap.firstKey();
final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict); final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict);
cache.remove(valueToEvict); cache.remove(valueToEvict);
if ( logger.isDebugEnabled() ) { if (logger.isDebugEnabled()) {
logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8)); logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8));
} }
return recordToEvict; return recordToEvict;
} }
@ -81,44 +82,44 @@ public class SimpleMapCache implements MapCache {
writeLock.lock(); writeLock.lock();
try { try {
final MapCacheRecord record = cache.get(key); final MapCacheRecord record = cache.get(key);
if ( record == null ) { if (record == null) {
// Record is null. We will add. // Record is null. We will add.
final MapCacheRecord evicted = evict(); final MapCacheRecord evicted = evict();
final MapCacheRecord newRecord = new MapCacheRecord(key, value); final MapCacheRecord newRecord = new MapCacheRecord(key, value);
cache.put(key, newRecord); cache.put(key, newRecord);
inverseCacheMap.put(newRecord, key); inverseCacheMap.put(newRecord, key);
if ( evicted == null ) { if (evicted == null) {
return new MapPutResult(true, key, value, null, null, null); return new MapPutResult(true, key, value, null, null, null);
} else { } else {
return new MapPutResult(true, key, value, null, evicted.getKey(), evicted.getValue()); return new MapPutResult(true, key, value, null, evicted.getKey(), evicted.getValue());
} }
} }
// Record is not null. Increment hit count and return result indicating that record was not added. // Record is not null. Increment hit count and return result indicating that record was not added.
inverseCacheMap.remove(record); inverseCacheMap.remove(record);
record.hit(); record.hit();
inverseCacheMap.put(record, key); inverseCacheMap.put(record, key);
return new MapPutResult(false, key, value, record.getValue(), null, null); return new MapPutResult(false, key, value, record.getValue(), null, null);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
} }
@Override @Override
public boolean containsKey(final ByteBuffer key) { public boolean containsKey(final ByteBuffer key) {
readLock.lock(); readLock.lock();
try { try {
final MapCacheRecord record = cache.get(key); final MapCacheRecord record = cache.get(key);
if ( record == null ) { if (record == null) {
return false; return false;
} }
inverseCacheMap.remove(record); inverseCacheMap.remove(record);
record.hit(); record.hit();
inverseCacheMap.put(record, key); inverseCacheMap.put(record, key);
return true; return true;
} finally { } finally {
readLock.unlock(); readLock.unlock();
@ -130,14 +131,14 @@ public class SimpleMapCache implements MapCache {
readLock.lock(); readLock.lock();
try { try {
final MapCacheRecord record = cache.get(key); final MapCacheRecord record = cache.get(key);
if ( record == null ) { if (record == null) {
return null; return null;
} }
inverseCacheMap.remove(record); inverseCacheMap.remove(record);
record.hit(); record.hit();
inverseCacheMap.put(record, key); inverseCacheMap.put(record, key);
return record.getValue(); return record.getValue();
} finally { } finally {
readLock.unlock(); readLock.unlock();

View File

@ -38,34 +38,34 @@ public class PersistentSetCache implements SetCache {
private final SetCache wrapped; private final SetCache wrapped;
private final WriteAheadRepository<SetRecord> wali; private final WriteAheadRepository<SetRecord> wali;
private final AtomicLong modifications = new AtomicLong(0L); private final AtomicLong modifications = new AtomicLong(0L);
public PersistentSetCache(final String serviceIdentifier, final File persistencePath, final SetCache cacheToWrap) throws IOException { public PersistentSetCache(final String serviceIdentifier, final File persistencePath, final SetCache cacheToWrap) throws IOException {
wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null); wali = new MinimalLockingWriteAheadLog<>(persistencePath.toPath(), 1, new Serde(), null);
wrapped = cacheToWrap; wrapped = cacheToWrap;
} }
public synchronized void restore() throws IOException { public synchronized void restore() throws IOException {
final Collection<SetRecord> recovered = wali.recoverRecords(); final Collection<SetRecord> recovered = wali.recoverRecords();
for ( final SetRecord record : recovered ) { for (final SetRecord record : recovered) {
if ( record.getUpdateType() == UpdateType.CREATE ) { if (record.getUpdateType() == UpdateType.CREATE) {
addIfAbsent(record.getBuffer()); addIfAbsent(record.getBuffer());
} }
} }
} }
@Override @Override
public synchronized SetCacheResult remove(final ByteBuffer value) throws IOException { public synchronized SetCacheResult remove(final ByteBuffer value) throws IOException {
final SetCacheResult removeResult = wrapped.remove(value); final SetCacheResult removeResult = wrapped.remove(value);
if ( removeResult.getResult() ) { if (removeResult.getResult()) {
final SetRecord record = new SetRecord(UpdateType.DELETE, value); final SetRecord record = new SetRecord(UpdateType.DELETE, value);
final List<SetRecord> records = new ArrayList<>(); final List<SetRecord> records = new ArrayList<>();
records.add(record); records.add(record);
wali.update(records, false); wali.update(records, false);
final long modCount = modifications.getAndIncrement(); final long modCount = modifications.getAndIncrement();
if ( modCount > 0 && modCount % 1000 == 0 ) { if (modCount > 0 && modCount % 1000 == 0) {
wali.checkpoint(); wali.checkpoint();
} }
} }
@ -76,24 +76,24 @@ public class PersistentSetCache implements SetCache {
@Override @Override
public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) throws IOException { public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) throws IOException {
final SetCacheResult addResult = wrapped.addIfAbsent(value); final SetCacheResult addResult = wrapped.addIfAbsent(value);
if ( addResult.getResult() ) { if (addResult.getResult()) {
final SetRecord record = new SetRecord(UpdateType.CREATE, value); final SetRecord record = new SetRecord(UpdateType.CREATE, value);
final List<SetRecord> records = new ArrayList<>(); final List<SetRecord> records = new ArrayList<>();
records.add(record); records.add(record);
final SetCacheRecord evictedRecord = addResult.getEvictedRecord(); final SetCacheRecord evictedRecord = addResult.getEvictedRecord();
if ( evictedRecord != null ) { if (evictedRecord != null) {
records.add(new SetRecord(UpdateType.DELETE, evictedRecord.getValue())); records.add(new SetRecord(UpdateType.DELETE, evictedRecord.getValue()));
} }
wali.update(records, false); wali.update(records, false);
final long modCount = modifications.getAndIncrement(); final long modCount = modifications.getAndIncrement();
if ( modCount > 0 && modCount % 1000 == 0 ) { if (modCount > 0 && modCount % 1000 == 0) {
wali.checkpoint(); wali.checkpoint();
} }
} }
return addResult; return addResult;
} }
@ -101,45 +101,46 @@ public class PersistentSetCache implements SetCache {
public synchronized SetCacheResult contains(final ByteBuffer value) throws IOException { public synchronized SetCacheResult contains(final ByteBuffer value) throws IOException {
return wrapped.contains(value); return wrapped.contains(value);
} }
@Override @Override
public void shutdown() throws IOException { public void shutdown() throws IOException {
wali.shutdown(); wali.shutdown();
} }
private static class SetRecord { private static class SetRecord {
private final UpdateType updateType; private final UpdateType updateType;
private final ByteBuffer value; private final ByteBuffer value;
public SetRecord(final UpdateType updateType, final ByteBuffer value) { public SetRecord(final UpdateType updateType, final ByteBuffer value) {
this.updateType = updateType; this.updateType = updateType;
this.value = value; this.value = value;
} }
public UpdateType getUpdateType() { public UpdateType getUpdateType() {
return updateType; return updateType;
} }
public ByteBuffer getBuffer() { public ByteBuffer getBuffer() {
return value; return value;
} }
public byte[] getData() { public byte[] getData() {
return value.array(); return value.array();
} }
} }
private static class Serde implements SerDe<SetRecord> { private static class Serde implements SerDe<SetRecord> {
@Override @Override
public void serializeEdit(final SetRecord previousRecordState, final SetRecord newRecordState, final DataOutputStream out) throws IOException { public void serializeEdit(final SetRecord previousRecordState, final SetRecord newRecordState, final DataOutputStream out) throws IOException {
final UpdateType updateType = newRecordState.getUpdateType(); final UpdateType updateType = newRecordState.getUpdateType();
if ( updateType == UpdateType.DELETE ) { if (updateType == UpdateType.DELETE) {
out.write(0); out.write(0);
} else { } else {
out.write(1); out.write(1);
} }
final byte[] data = newRecordState.getData(); final byte[] data = newRecordState.getData();
out.writeInt(data.length); out.writeInt(data.length);
out.write(newRecordState.getData()); out.write(newRecordState.getData());
@ -153,16 +154,16 @@ public class PersistentSetCache implements SetCache {
@Override @Override
public SetRecord deserializeEdit(final DataInputStream in, final Map<Object, SetRecord> currentRecordStates, final int version) throws IOException { public SetRecord deserializeEdit(final DataInputStream in, final Map<Object, SetRecord> currentRecordStates, final int version) throws IOException {
final int value = in.read(); final int value = in.read();
if ( value < 0 ) { if (value < 0) {
throw new EOFException(); throw new EOFException();
} }
final UpdateType updateType = (value == 0 ? UpdateType.DELETE : UpdateType.CREATE); final UpdateType updateType = (value == 0 ? UpdateType.DELETE : UpdateType.CREATE);
final int size = in.readInt(); final int size = in.readInt();
final byte[] data = new byte[size]; final byte[] data = new byte[size];
in.readFully(data); in.readFully(data);
return new SetRecord(updateType, ByteBuffer.wrap(data)); return new SetRecord(updateType, ByteBuffer.wrap(data));
} }

View File

@ -22,8 +22,11 @@ import java.nio.ByteBuffer;
public interface SetCache { public interface SetCache {
SetCacheResult remove(ByteBuffer value) throws IOException; SetCacheResult remove(ByteBuffer value) throws IOException;
SetCacheResult addIfAbsent(ByteBuffer value) throws IOException; SetCacheResult addIfAbsent(ByteBuffer value) throws IOException;
SetCacheResult contains(ByteBuffer value) throws IOException; SetCacheResult contains(ByteBuffer value) throws IOException;
void shutdown() throws IOException; void shutdown() throws IOException;
} }

View File

@ -21,33 +21,34 @@ import java.nio.ByteBuffer;
import org.apache.nifi.distributed.cache.server.CacheRecord; import org.apache.nifi.distributed.cache.server.CacheRecord;
public class SetCacheRecord extends CacheRecord { public class SetCacheRecord extends CacheRecord {
private final ByteBuffer value; private final ByteBuffer value;
public SetCacheRecord(final ByteBuffer value) { public SetCacheRecord(final ByteBuffer value) {
this.value = value; this.value = value;
} }
public ByteBuffer getValue() { public ByteBuffer getValue() {
return value; return value;
} }
@Override @Override
public int hashCode() { public int hashCode() {
return value.hashCode(); return value.hashCode();
} }
@Override @Override
public boolean equals(final Object obj) { public boolean equals(final Object obj) {
if ( this == obj ) { if (this == obj) {
return true; return true;
} }
if (obj instanceof SetCacheRecord) { if (obj instanceof SetCacheRecord) {
return value.equals(((SetCacheRecord) obj).value); return value.equals(((SetCacheRecord) obj).value);
} }
return false; return false;
} }
@Override @Override
public String toString() { public String toString() {
return "SetCacheRecord[value=" + new String(value.array()) + ", hitCount=" + getHitCount() + "]"; return "SetCacheRecord[value=" + new String(value.array()) + ", hitCount=" + getHitCount() + "]";

View File

@ -16,27 +16,26 @@
*/ */
package org.apache.nifi.distributed.cache.server.set; package org.apache.nifi.distributed.cache.server.set;
public class SetCacheResult { public class SetCacheResult {
private final boolean result; private final boolean result;
private final SetCacheRecord stats; private final SetCacheRecord stats;
private final SetCacheRecord evictedRecord; private final SetCacheRecord evictedRecord;
public SetCacheResult(final boolean result, final SetCacheRecord stats, final SetCacheRecord evictedRecord) { public SetCacheResult(final boolean result, final SetCacheRecord stats, final SetCacheRecord evictedRecord) {
this.result = result; this.result = result;
this.stats = stats; this.stats = stats;
this.evictedRecord = evictedRecord; this.evictedRecord = evictedRecord;
} }
public boolean getResult() { public boolean getResult() {
return result; return result;
} }
public SetCacheRecord getRecord() { public SetCacheRecord getRecord() {
return stats; return stats;
} }
public SetCacheRecord getEvictedRecord() { public SetCacheRecord getEvictedRecord() {
return evictedRecord; return evictedRecord;
} }

View File

@ -30,41 +30,42 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class SimpleSetCache implements SetCache { public class SimpleSetCache implements SetCache {
private static final Logger logger = LoggerFactory.getLogger(SimpleSetCache.class); private static final Logger logger = LoggerFactory.getLogger(SimpleSetCache.class);
private final Map<ByteBuffer, SetCacheRecord> cache = new HashMap<>(); private final Map<ByteBuffer, SetCacheRecord> cache = new HashMap<>();
private final SortedMap<SetCacheRecord, ByteBuffer> inverseCacheMap; private final SortedMap<SetCacheRecord, ByteBuffer> inverseCacheMap;
private final String serviceIdentifier; private final String serviceIdentifier;
private final int maxSize; private final int maxSize;
public SimpleSetCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) { public SimpleSetCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) {
inverseCacheMap = new TreeMap<>(evictionPolicy.getComparator()); inverseCacheMap = new TreeMap<>(evictionPolicy.getComparator());
this.serviceIdentifier = serviceIdentifier; this.serviceIdentifier = serviceIdentifier;
this.maxSize = maxSize; this.maxSize = maxSize;
} }
private synchronized SetCacheRecord evict() { private synchronized SetCacheRecord evict() {
if ( cache.size() < maxSize ) { if (cache.size() < maxSize) {
return null; return null;
} }
final SetCacheRecord recordToEvict = inverseCacheMap.firstKey(); final SetCacheRecord recordToEvict = inverseCacheMap.firstKey();
final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict); final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict);
cache.remove(valueToEvict); cache.remove(valueToEvict);
if ( logger.isDebugEnabled() ) { if (logger.isDebugEnabled()) {
logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8)); logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8));
} }
return recordToEvict; return recordToEvict;
} }
@Override @Override
public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) { public synchronized SetCacheResult addIfAbsent(final ByteBuffer value) {
final SetCacheRecord record = cache.get(value); final SetCacheRecord record = cache.get(value);
if ( record == null ) { if (record == null) {
final SetCacheRecord evicted = evict(); final SetCacheRecord evicted = evict();
final SetCacheRecord newRecord = new SetCacheRecord(value); final SetCacheRecord newRecord = new SetCacheRecord(value);
cache.put(value, newRecord); cache.put(value, newRecord);
@ -75,42 +76,42 @@ public class SimpleSetCache implements SetCache {
inverseCacheMap.remove(record); inverseCacheMap.remove(record);
record.hit(); record.hit();
inverseCacheMap.put(record, value); inverseCacheMap.put(record, value);
return new SetCacheResult(false, record, null); return new SetCacheResult(false, record, null);
} }
} }
@Override @Override
public synchronized SetCacheResult contains(final ByteBuffer value) { public synchronized SetCacheResult contains(final ByteBuffer value) {
final SetCacheRecord record = cache.get(value); final SetCacheRecord record = cache.get(value);
if ( record == null ) { if (record == null) {
return new SetCacheResult(false, null, null); return new SetCacheResult(false, null, null);
} else { } else {
// We have to remove the record and add it again in order to cause the Map to stay sorted // We have to remove the record and add it again in order to cause the Map to stay sorted
inverseCacheMap.remove(record); inverseCacheMap.remove(record);
record.hit(); record.hit();
inverseCacheMap.put(record, value); inverseCacheMap.put(record, value);
return new SetCacheResult(true, record, null); return new SetCacheResult(true, record, null);
} }
} }
@Override @Override
public synchronized SetCacheResult remove(final ByteBuffer value) { public synchronized SetCacheResult remove(final ByteBuffer value) {
final SetCacheRecord record = cache.remove(value); final SetCacheRecord record = cache.remove(value);
if ( record == null ) { if (record == null) {
return new SetCacheResult(false, null, null); return new SetCacheResult(false, null, null);
} else { } else {
inverseCacheMap.remove(record); inverseCacheMap.remove(record);
return new SetCacheResult(true, record, null); return new SetCacheResult(true, record, null);
} }
} }
@Override @Override
public String toString() { public String toString() {
return "SimpleSetCache[service id=" + serviceIdentifier + "]"; return "SimpleSetCache[service id=" + serviceIdentifier + "]";
} }
@Override @Override
public void shutdown() throws IOException { public void shutdown() throws IOException {
} }

View File

@ -1,36 +1,36 @@
<!DOCTYPE html> <!DOCTYPE html>
<html lang="en"> <html lang="en">
<!-- <!--
Licensed to the Apache Software Foundation (ASF) under one or more Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership. this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0 The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
--> -->
<head> <head>
<meta charset="utf-8" /> <meta charset="utf-8" />
<title>Distributed Map Cache Client Service</title> <title>Distributed Map Cache Client Service</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head> </head>
<body> <body>
<p> <p>
Below is an example of how to create a distributed map cache server for clients to connect to. Below is an example of how to create a distributed map cache server for clients to connect to.
Note that the identifier in this example is <code>cache-server</code>. If you are using this template Note that the identifier in this example is <code>cache-server</code>. If you are using this template
to create your own DistributedMapCache server, replace the values in this template with values that are to create your own DistributedMapCache server, replace the values in this template with values that are
suitable for your system. Possible options for <code>Port</code>, <code>Maximum Cache Entries</code>, suitable for your system. Possible options for <code>Port</code>, <code>Maximum Cache Entries</code>,
<code>Eviction Strategy</code>, <span style="font-style: italic;">SSL Context Service</span>, and <code>Eviction Strategy</code>, <span style="font-style: italic;">SSL Context Service</span>, and
<span style="font-style: italic;">Persistence Directory</span> <span style="font-style: italic;">Persistence Directory</span>
</p> </p>
<pre> <pre>
&lt;?xml version="1.0" encoding="UTF-8" ?&gt; &lt;?xml version="1.0" encoding="UTF-8" ?&gt;
&lt;services&gt; &lt;services&gt;
&lt;service&gt; &lt;service&gt;
@ -41,6 +41,6 @@
&lt;property name="Eviction Strategy"&gt;Least Recently Used&lt;/property&gt; &lt;property name="Eviction Strategy"&gt;Least Recently Used&lt;/property&gt;
&lt;/service&gt; &lt;/service&gt;
&lt;/services&gt; &lt;/services&gt;
</pre> </pre>
</body> </body>
</html> </html>

View File

@ -152,7 +152,7 @@ public class TestServerAndClient {
newServer.shutdownServer(); newServer.shutdownServer();
} }
@Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel") @Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel")
@Test @Test
public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException { public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException {
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
@ -215,7 +215,7 @@ public class TestServerAndClient {
newServer.shutdownServer(); newServer.shutdownServer();
} }
@Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel") @Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel")
@Test @Test
public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException { public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException {
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
@ -374,8 +374,7 @@ public class TestServerAndClient {
public void testClientTermination() throws InitializationException, IOException, InterruptedException { public void testClientTermination() throws InitializationException, IOException, InterruptedException {
/** /**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug See: https://issues.apache.org/jira/browse/NIFI-437
* See: https://issues.apache.org/jira/browse/NIFI-437
*/ */
Assume.assumeFalse("testClientTermination is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", Assume.assumeFalse("testClientTermination is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
@ -509,6 +508,7 @@ public class TestServerAndClient {
} }
private static class StringSerializer implements Serializer<String> { private static class StringSerializer implements Serializer<String> {
@Override @Override
public void serialize(final String value, final OutputStream output) throws SerializationException, IOException { public void serialize(final String value, final OutputStream output) throws SerializationException, IOException {
output.write(value.getBytes(StandardCharsets.UTF_8)); output.write(value.getBytes(StandardCharsets.UTF_8));
@ -516,6 +516,7 @@ public class TestServerAndClient {
} }
private static class StringDeserializer implements Deserializer<String> { private static class StringDeserializer implements Deserializer<String> {
@Override @Override
public String deserialize(final byte[] input) throws DeserializationException, IOException { public String deserialize(final byte[] input) throws DeserializationException, IOException {
return (input.length == 0) ? null : new String(input, StandardCharsets.UTF_8); return (input.length == 0) ? null : new String(input, StandardCharsets.UTF_8);

View File

@ -14,24 +14,24 @@
limitations under the License. limitations under the License.
--> -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services</artifactId> <artifactId>nifi-standard-services</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version> <version>0.1.0-incubating-SNAPSHOT</version>
</parent> </parent>
<artifactId>nifi-http-context-map-api</artifactId> <artifactId>nifi-http-context-map-api</artifactId>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId> <artifactId>nifi-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>javax.servlet</groupId> <groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId> <artifactId>javax.servlet-api</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -22,51 +22,48 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerService;
/** /**
* <p> * <p>
* An interface that provides the capability of receiving an HTTP servlet request in one component * An interface that provides the capability of receiving an HTTP servlet request in one component and responding to that request in another component.
* and responding to that request in another component.
* </p> * </p>
* *
* <p> * <p>
* The intended flow is for the component receiving the HTTP request to register the request, response, * The intended flow is for the component receiving the HTTP request to register the request, response, and AsyncContext with a particular identifier via the
* and AsyncContext with a particular identifier via the * {@link #register(String, HttpServletRequest, HttpServletResponse, AsyncContext)} method. Another component is then able to obtain the response by providing that identifier to the
* {@link #register(String, HttpServletRequest, HttpServletResponse, AsyncContext)} * {@link #getResponse(String)} method. After writing to the HttpServletResponse, the transaction is to then be completed via the {@link #complete(String)} method.
* method. Another component is then able to obtain the response
* by providing that identifier to the {@link #getResponse(String)} method. After writing to the
* HttpServletResponse, the transaction is to then be completed via the {@link #complete(String)} method.
* </p> * </p>
*/ */
public interface HttpContextMap extends ControllerService { public interface HttpContextMap extends ControllerService {
/** /**
* Registers an HttpServletRequest, HttpServletResponse, and the AsyncContext for a given identifier * Registers an HttpServletRequest, HttpServletResponse, and the AsyncContext for a given identifier
* *
* @param identifier * @param identifier identifier
* @param request * @param request request
* @param response * @param response response
* @param context * @param context context
* *
* @return true if register is successful, false if the context map is too full because too many requests have already been received and not processed * @return true if register is successful, false if the context map is too full because too many requests have already been received and not processed
* *
* @throws IllegalStateException if the identifier is already registered * @throws IllegalStateException if the identifier is already registered
*/ */
boolean register(String identifier, HttpServletRequest request, HttpServletResponse response, AsyncContext context); boolean register(String identifier, HttpServletRequest request, HttpServletResponse response, AsyncContext context);
/** /**
* Retrieves the HttpServletResponse for the given identifier, if it exists * Retrieves the HttpServletResponse for the given identifier, if it exists
* @param identifier *
* @param identifier identifier
* @return the HttpServletResponse for the given identifier, or {@code null} if it does not exist * @return the HttpServletResponse for the given identifier, or {@code null} if it does not exist
*/ */
HttpServletResponse getResponse(String identifier); HttpServletResponse getResponse(String identifier);
/** /**
* Marks the HTTP request/response for the given identifier as complete * Marks the HTTP request/response for the given identifier as complete
* @param identifier *
* * @param identifier identifier
*
* @throws IllegalStateException if the identifier is not registered to a valid AsyncContext * @throws IllegalStateException if the identifier is not registered to a valid AsyncContext
*/ */
void complete(String identifier); void complete(String identifier);
} }

View File

@ -24,21 +24,21 @@
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId> <artifactId>nifi-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-processor-utils</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map-api</artifactId> <artifactId>nifi-http-context-map-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>javax.servlet</groupId> <groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId> <artifactId>javax.servlet-api</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -42,34 +42,35 @@ import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
@Tags({"http", "request", "response"}) @Tags({"http", "request", "response"})
@SeeAlso(classNames={ @SeeAlso(classNames = {
"org.apache.nifi.processors.standard.HandleHttpRequest", "org.apache.nifi.processors.standard.HandleHttpRequest",
"org.apache.nifi.processors.standard.HandleHttpResponse"}) "org.apache.nifi.processors.standard.HandleHttpResponse"})
@CapabilityDescription("Provides the ability to store and retrieve HTTP requests and responses external to a Processor, so that " @CapabilityDescription("Provides the ability to store and retrieve HTTP requests and responses external to a Processor, so that "
+ "multiple Processors can interact with the same HTTP request.") + "multiple Processors can interact with the same HTTP request.")
public class StandardHttpContextMap extends AbstractControllerService implements HttpContextMap { public class StandardHttpContextMap extends AbstractControllerService implements HttpContextMap {
public static final PropertyDescriptor MAX_OUTSTANDING_REQUESTS = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_OUTSTANDING_REQUESTS = new PropertyDescriptor.Builder()
.name("Maximum Outstanding Requests") .name("Maximum Outstanding Requests")
.description("The maximum number of HTTP requests that can be outstanding at any one time. Any attempt to register an additional HTTP Request will cause an error") .description("The maximum number of HTTP requests that can be outstanding at any one time. Any attempt to register an additional HTTP Request will cause an error")
.required(true) .required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("5000") .defaultValue("5000")
.build(); .build();
public static final PropertyDescriptor REQUEST_EXPIRATION = new PropertyDescriptor.Builder() public static final PropertyDescriptor REQUEST_EXPIRATION = new PropertyDescriptor.Builder()
.name("Request Expiration") .name("Request Expiration")
.description("Specifies how long an HTTP Request should be left unanswered before being evicted from the cache and being responded to with a Service Unavailable status code") .description("Specifies how long an HTTP Request should be left unanswered before being evicted from the cache and being responded to with a Service Unavailable status code")
.required(true) .required(true)
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.defaultValue("1 min") .defaultValue("1 min")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build(); .build();
private final ConcurrentMap<String, Wrapper> wrapperMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Wrapper> wrapperMap = new ConcurrentHashMap<>();
private volatile int maxSize = 5000; private volatile int maxSize = 5000;
private volatile long maxRequestNanos; private volatile long maxRequestNanos;
private volatile ScheduledExecutorService executor; private volatile ScheduledExecutorService executor;
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(2); final List<PropertyDescriptor> properties = new ArrayList<>(2);
@ -77,67 +78,68 @@ public class StandardHttpContextMap extends AbstractControllerService implements
properties.add(REQUEST_EXPIRATION); properties.add(REQUEST_EXPIRATION);
return properties; return properties;
} }
@OnEnabled @OnEnabled
public void onConfigured(final ConfigurationContext context) { public void onConfigured(final ConfigurationContext context) {
maxSize = context.getProperty(MAX_OUTSTANDING_REQUESTS).asInteger(); maxSize = context.getProperty(MAX_OUTSTANDING_REQUESTS).asInteger();
executor = Executors.newSingleThreadScheduledExecutor(); executor = Executors.newSingleThreadScheduledExecutor();
maxRequestNanos = context.getProperty(REQUEST_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS); maxRequestNanos = context.getProperty(REQUEST_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS);
final long scheduleNanos = maxRequestNanos / 2; final long scheduleNanos = maxRequestNanos / 2;
executor.scheduleWithFixedDelay(new CleanupExpiredRequests(), scheduleNanos, scheduleNanos, TimeUnit.NANOSECONDS); executor.scheduleWithFixedDelay(new CleanupExpiredRequests(), scheduleNanos, scheduleNanos, TimeUnit.NANOSECONDS);
} }
@OnDisabled @OnDisabled
public void cleanup() { public void cleanup() {
if ( executor != null ) { if (executor != null) {
executor.shutdown(); executor.shutdown();
} }
} }
@Override @Override
public boolean register(final String identifier, final HttpServletRequest request, final HttpServletResponse response, final AsyncContext context) { public boolean register(final String identifier, final HttpServletRequest request, final HttpServletResponse response, final AsyncContext context) {
// fail if there are too many already. Maybe add a configuration property for how many // fail if there are too many already. Maybe add a configuration property for how many
// outstanding, with a default of say 5000 // outstanding, with a default of say 5000
if ( wrapperMap.size() >= maxSize ) { if (wrapperMap.size() >= maxSize) {
return false; return false;
} }
final Wrapper wrapper = new Wrapper(request, response, context); final Wrapper wrapper = new Wrapper(request, response, context);
final Wrapper existing = wrapperMap.putIfAbsent(identifier, wrapper); final Wrapper existing = wrapperMap.putIfAbsent(identifier, wrapper);
if ( existing != null ) { if (existing != null) {
throw new IllegalStateException("HTTP Request already registered with identifier " + identifier); throw new IllegalStateException("HTTP Request already registered with identifier " + identifier);
} }
return true; return true;
} }
@Override @Override
public HttpServletResponse getResponse(final String identifier) { public HttpServletResponse getResponse(final String identifier) {
final Wrapper wrapper = wrapperMap.get(identifier); final Wrapper wrapper = wrapperMap.get(identifier);
if ( wrapper == null ) { if (wrapper == null) {
return null; return null;
} }
return wrapper.getResponse(); return wrapper.getResponse();
} }
@Override @Override
public void complete(final String identifier) { public void complete(final String identifier) {
final Wrapper wrapper = wrapperMap.remove(identifier); final Wrapper wrapper = wrapperMap.remove(identifier);
if ( wrapper == null ) { if (wrapper == null) {
throw new IllegalStateException("No HTTP Request registered with identifier " + identifier); throw new IllegalStateException("No HTTP Request registered with identifier " + identifier);
} }
wrapper.getAsync().complete(); wrapper.getAsync().complete();
} }
private static class Wrapper { private static class Wrapper {
@SuppressWarnings("unused") @SuppressWarnings("unused")
private final HttpServletRequest request; private final HttpServletRequest request;
private final HttpServletResponse response; private final HttpServletResponse response;
private final AsyncContext async; private final AsyncContext async;
private final long nanoTimeAdded = System.nanoTime(); private final long nanoTimeAdded = System.nanoTime();
public Wrapper(final HttpServletRequest request, final HttpServletResponse response, final AsyncContext async) { public Wrapper(final HttpServletRequest request, final HttpServletResponse response, final AsyncContext async) {
this.request = request; this.request = request;
this.response = response; this.response = response;
@ -151,24 +153,25 @@ public class StandardHttpContextMap extends AbstractControllerService implements
public AsyncContext getAsync() { public AsyncContext getAsync() {
return async; return async;
} }
public long getNanoTimeAdded() { public long getNanoTimeAdded() {
return nanoTimeAdded; return nanoTimeAdded;
} }
} }
private class CleanupExpiredRequests implements Runnable { private class CleanupExpiredRequests implements Runnable {
@Override @Override
public void run() { public void run() {
final long now = System.nanoTime(); final long now = System.nanoTime();
final long threshold = now - maxRequestNanos; final long threshold = now - maxRequestNanos;
final Iterator<Map.Entry<String, Wrapper>> itr = wrapperMap.entrySet().iterator(); final Iterator<Map.Entry<String, Wrapper>> itr = wrapperMap.entrySet().iterator();
while ( itr.hasNext() ) { while (itr.hasNext()) {
final Map.Entry<String, Wrapper> entry = itr.next(); final Map.Entry<String, Wrapper> entry = itr.next();
if ( entry.getValue().getNanoTimeAdded() < threshold ) { if (entry.getValue().getNanoTimeAdded() < threshold) {
itr.remove(); itr.remove();
// send SERVICE_UNAVAILABLE // send SERVICE_UNAVAILABLE
try { try {
final AsyncContext async = entry.getValue().getAsync(); final AsyncContext async = entry.getValue().getAsync();

View File

@ -22,15 +22,15 @@
</head> </head>
<body> <body>
<h2>Description:</h2> <h2>Description:</h2>
<p> <p>
This is the standard implementation of the SSL Context Map. This service is used to provide This is the standard implementation of the SSL Context Map. This service is used to provide
coordination between coordination between
<a href="../org.apache.nifi.processors.standard.HandleHttpRequest/index.html">HandleHttpRequest</a> <a href="../org.apache.nifi.processors.standard.HandleHttpRequest/index.html">HandleHttpRequest</a>
and and
<a href="../org.apache.nifi.processors.standard.HandleHttpResponse/index.html">HandleHttpResponse</a> <a href="../org.apache.nifi.processors.standard.HandleHttpResponse/index.html">HandleHttpResponse</a>
Processors. Processors.
</p> </p>
<!-- Service Documentation ================================================== --> <!-- Service Documentation ================================================== -->
<h2>Configuring the HTTP Context Map:</h2> <h2>Configuring the HTTP Context Map:</h2>
@ -40,9 +40,9 @@
</p> </p>
<p> <p>
This controller service exposes a single property named <code>Maximum Outstanding Requests</code>. This controller service exposes a single property named <code>Maximum Outstanding Requests</code>.
This property determines the maximum number of HTTP requests that can be outstanding at any one time. This property determines the maximum number of HTTP requests that can be outstanding at any one time.
Any attempt to register an additional HTTP Request will cause an error. The default value is 5000. Any attempt to register an additional HTTP Request will cause an error. The default value is 5000.
Below is an example of the template for a StandardHttpContextMap controller service. Below is an example of the template for a StandardHttpContextMap controller service.
</p> </p>
@ -56,12 +56,12 @@
&lt;/service&gt; &lt;/service&gt;
&lt;/services&gt; &lt;/services&gt;
</pre> </pre>
<p> <p>
<strong>See Also:</strong><br /> <strong>See Also:</strong><br />
<a href="../org.apache.nifi.processors.standard.HandleHttpRequest/index.html">HandleHttpRequest</a><br /> <a href="../org.apache.nifi.processors.standard.HandleHttpRequest/index.html">HandleHttpRequest</a><br />
<a href="../org.apache.nifi.processors.standard.HandleHttpResponse/index.html">HandleHttpResponse</a><br /> <a href="../org.apache.nifi.processors.standard.HandleHttpResponse/index.html">HandleHttpResponse</a><br />
</p> </p>
</body> </body>
</html> </html>

View File

@ -205,7 +205,7 @@ public class StandardSSLContextService extends AbstractControllerService impleme
} }
return results; return results;
} }
private void verifySslConfig(final ValidationContext validationContext) throws ProcessException { private void verifySslConfig(final ValidationContext validationContext) throws ProcessException {
try { try {
final String keystoreFile = validationContext.getProperty(KEYSTORE).getValue(); final String keystoreFile = validationContext.getProperty(KEYSTORE).getValue();
@ -237,7 +237,6 @@ public class StandardSSLContextService extends AbstractControllerService impleme
throw new ProcessException(e); throw new ProcessException(e);
} }
} }
@Override @Override
public SSLContext createSSLContext(final ClientAuth clientAuth) throws ProcessException { public SSLContext createSSLContext(final ClientAuth clientAuth) throws ProcessException {

View File

@ -73,7 +73,7 @@ public class SSLContextServiceTest {
properties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "wrongpassword"); properties.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "wrongpassword");
properties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); properties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
runner.addControllerService("test-bad4", service, properties); runner.addControllerService("test-bad4", service, properties);
runner.assertNotValid(service); runner.assertNotValid(service);
} }
@ -126,7 +126,7 @@ public class SSLContextServiceTest {
properties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); properties.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
runner.addControllerService("test-good2", service, properties); runner.addControllerService("test-good2", service, properties);
runner.enableControllerService(service); runner.enableControllerService(service);
runner.setProperty("SSL Context Svc ID", "test-good2"); runner.setProperty("SSL Context Svc ID", "test-good2");
runner.assertValid(); runner.assertValid();
Assert.assertNotNull(service); Assert.assertNotNull(service);