mirror of https://github.com/apache/nifi.git
NIFI-13596 Renamed Distributed Cache Server and Client Services
- Renamed DistributedMapCacheServer to MapCacheServer - Renamed DistributedSetCacheServer to SetCacheServer - Renamed DistributedMapCacheClientService to MapCacheClientService - Renamed DistributedSetCacheClientService to SetCacheClientService Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #9398.
This commit is contained in:
parent
31d867ffbd
commit
d23bf2ce4f
|
@ -217,7 +217,7 @@ public class TestAbstractListProcessor {
|
||||||
// Require a cache service.
|
// Require a cache service.
|
||||||
runner.assertNotValid();
|
runner.assertNotValid();
|
||||||
|
|
||||||
final DistributedCache trackingCache = new DistributedCache();
|
final EphemeralMapCacheClientService trackingCache = new EphemeralMapCacheClientService();
|
||||||
runner.addControllerService("tracking-cache", trackingCache);
|
runner.addControllerService("tracking-cache", trackingCache);
|
||||||
runner.enableControllerService(trackingCache);
|
runner.enableControllerService(trackingCache);
|
||||||
|
|
||||||
|
@ -361,7 +361,7 @@ public class TestAbstractListProcessor {
|
||||||
String.format("Expected verification result to match pattern [%s]. Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()));
|
String.format("Expected verification result to match pattern [%s]. Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()));
|
||||||
}
|
}
|
||||||
|
|
||||||
static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
|
static class EphemeralMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
|
||||||
private final Map<Object, Object> stored = new HashMap<>();
|
private final Map<Object, Object> stored = new HashMap<>();
|
||||||
private int fetchCount = 0;
|
private int fetchCount = 0;
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.distributed.cache.client.Deserializer;
|
import org.apache.nifi.distributed.cache.client.Deserializer;
|
||||||
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
|
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
|
||||||
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
|
import org.apache.nifi.distributed.cache.client.MapCacheClientService;
|
||||||
import org.apache.nifi.distributed.cache.client.Serializer;
|
import org.apache.nifi.distributed.cache.client.Serializer;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
@ -42,9 +42,9 @@ public class TestDetectDuplicate {
|
||||||
@Test
|
@Test
|
||||||
public void testDuplicate() throws InitializationException {
|
public void testDuplicate() throws InitializationException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
|
final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
|
||||||
final DistributedMapCacheClientImpl client = createClient();
|
final EphemeralMapCacheClientService client = createClient();
|
||||||
final Map<String, String> clientProperties = new HashMap<>();
|
final Map<String, String> clientProperties = new HashMap<>();
|
||||||
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
|
clientProperties.put(MapCacheClientService.HOSTNAME.getName(), "localhost");
|
||||||
runner.addControllerService("client", client, clientProperties);
|
runner.addControllerService("client", client, clientProperties);
|
||||||
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
|
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
|
||||||
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
|
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
|
||||||
|
@ -68,9 +68,9 @@ public class TestDetectDuplicate {
|
||||||
public void testDuplicateWithAgeOff() throws InitializationException, InterruptedException {
|
public void testDuplicateWithAgeOff() throws InitializationException, InterruptedException {
|
||||||
|
|
||||||
final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
|
final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
|
||||||
final DistributedMapCacheClientImpl client = createClient();
|
final EphemeralMapCacheClientService client = createClient();
|
||||||
final Map<String, String> clientProperties = new HashMap<>();
|
final Map<String, String> clientProperties = new HashMap<>();
|
||||||
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
|
clientProperties.put(MapCacheClientService.HOSTNAME.getName(), "localhost");
|
||||||
runner.addControllerService("client", client, clientProperties);
|
runner.addControllerService("client", client, clientProperties);
|
||||||
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
|
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
|
||||||
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
|
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
|
||||||
|
@ -92,9 +92,9 @@ public class TestDetectDuplicate {
|
||||||
runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0);
|
runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private DistributedMapCacheClientImpl createClient() throws InitializationException {
|
private EphemeralMapCacheClientService createClient() throws InitializationException {
|
||||||
|
|
||||||
final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl();
|
final EphemeralMapCacheClientService client = new EphemeralMapCacheClientService();
|
||||||
final ComponentLog logger = new MockComponentLog("client", client);
|
final ComponentLog logger = new MockComponentLog("client", client);
|
||||||
final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger, new MockStateManager(client));
|
final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger, new MockStateManager(client));
|
||||||
client.initialize(clientInitContext);
|
client.initialize(clientInitContext);
|
||||||
|
@ -105,9 +105,9 @@ public class TestDetectDuplicate {
|
||||||
@Test
|
@Test
|
||||||
public void testDuplicateNoCache() throws InitializationException {
|
public void testDuplicateNoCache() throws InitializationException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
|
final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
|
||||||
final DistributedMapCacheClientImpl client = createClient();
|
final EphemeralMapCacheClientService client = createClient();
|
||||||
final Map<String, String> clientProperties = new HashMap<>();
|
final Map<String, String> clientProperties = new HashMap<>();
|
||||||
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
|
clientProperties.put(MapCacheClientService.HOSTNAME.getName(), "localhost");
|
||||||
runner.addControllerService("client", client, clientProperties);
|
runner.addControllerService("client", client, clientProperties);
|
||||||
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
|
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
|
||||||
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
|
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
|
||||||
|
@ -141,9 +141,9 @@ public class TestDetectDuplicate {
|
||||||
public void testDuplicateNoCacheWithAgeOff() throws InitializationException, InterruptedException {
|
public void testDuplicateNoCacheWithAgeOff() throws InitializationException, InterruptedException {
|
||||||
|
|
||||||
final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
|
final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
|
||||||
final DistributedMapCacheClientImpl client = createClient();
|
final EphemeralMapCacheClientService client = createClient();
|
||||||
final Map<String, String> clientProperties = new HashMap<>();
|
final Map<String, String> clientProperties = new HashMap<>();
|
||||||
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
|
clientProperties.put(MapCacheClientService.HOSTNAME.getName(), "localhost");
|
||||||
runner.addControllerService("client", client, clientProperties);
|
runner.addControllerService("client", client, clientProperties);
|
||||||
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
|
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
|
||||||
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
|
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
|
||||||
|
@ -168,7 +168,7 @@ public class TestDetectDuplicate {
|
||||||
runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0);
|
runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient {
|
static final class EphemeralMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
|
||||||
|
|
||||||
boolean exists = false;
|
boolean exists = false;
|
||||||
private Object cacheValue;
|
private Object cacheValue;
|
||||||
|
@ -180,10 +180,10 @@ public class TestDetectDuplicate {
|
||||||
@Override
|
@Override
|
||||||
protected java.util.List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected java.util.List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
final List<PropertyDescriptor> props = new ArrayList<>();
|
final List<PropertyDescriptor> props = new ArrayList<>();
|
||||||
props.add(DistributedMapCacheClientService.HOSTNAME);
|
props.add(MapCacheClientService.HOSTNAME);
|
||||||
props.add(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT);
|
props.add(MapCacheClientService.COMMUNICATIONS_TIMEOUT);
|
||||||
props.add(DistributedMapCacheClientService.PORT);
|
props.add(MapCacheClientService.PORT);
|
||||||
props.add(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE);
|
props.add(MapCacheClientService.SSL_CONTEXT_SERVICE);
|
||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,10 +29,9 @@ import org.apache.nifi.ssl.SSLContextService;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encapsulate operations which may be performed using a {@link DistributedSetCacheClientService} or a
|
* Encapsulate operations which may be performed using a Cache Client Service
|
||||||
* {@link DistributedMapCacheClientService}.
|
|
||||||
*/
|
*/
|
||||||
public class DistributedCacheClient {
|
public class CacheClient {
|
||||||
|
|
||||||
private static final boolean DAEMON_THREAD_ENABLED = true;
|
private static final boolean DAEMON_THREAD_ENABLED = true;
|
||||||
|
|
||||||
|
@ -54,7 +53,7 @@ public class DistributedCacheClient {
|
||||||
* @param factory creator of object used to broker the version of the distributed cache protocol with the service
|
* @param factory creator of object used to broker the version of the distributed cache protocol with the service
|
||||||
* @param identifier uniquely identifies this client
|
* @param identifier uniquely identifies this client
|
||||||
*/
|
*/
|
||||||
protected DistributedCacheClient(final String hostname,
|
protected CacheClient(final String hostname,
|
||||||
final int port,
|
final int port,
|
||||||
final int timeoutMillis,
|
final int timeoutMillis,
|
||||||
final SSLContextService sslContextService,
|
final SSLContextService sslContextService,
|
|
@ -33,9 +33,7 @@ import javax.net.ssl.SSLContext;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory for construction of new {@link ChannelPool}, used by distributed cache clients to invoke service
|
* Factory for construction of new {@link ChannelPool}, used by distributed cache clients to invoke service methods.
|
||||||
* methods. Cache clients include the NiFi services {@link DistributedSetCacheClientService}
|
|
||||||
* and {@link DistributedMapCacheClientService}.
|
|
||||||
*/
|
*/
|
||||||
class CacheClientChannelPoolFactory {
|
class CacheClientChannelPoolFactory {
|
||||||
|
|
||||||
|
|
|
@ -46,10 +46,10 @@ import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@Tags({"distributed", "cache", "state", "map", "cluster"})
|
@Tags({"distributed", "cache", "state", "map", "cluster"})
|
||||||
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"})
|
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.MapCacheServer"})
|
||||||
@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map "
|
@CapabilityDescription("Provides the ability to communicate with a MapCacheServer. This can be used in order to share a Map "
|
||||||
+ "between nodes in a NiFi cluster")
|
+ "between nodes in a NiFi cluster")
|
||||||
public class DistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> {
|
public class MapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> {
|
||||||
|
|
||||||
private static final long DEFAULT_CACHE_REVISION = 0L;
|
private static final long DEFAULT_CACHE_REVISION = 0L;
|
||||||
|
|
||||||
|
@ -82,10 +82,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
|
||||||
.defaultValue("30 secs")
|
.defaultValue("30 secs")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
/**
|
private volatile NettyMapCacheClient cacheClient = null;
|
||||||
* The implementation of the business logic for {@link DistributedMapCacheClientService}.
|
|
||||||
*/
|
|
||||||
private volatile NettyDistributedMapCacheClient cacheClient = null;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creator of object used to broker the version of the distributed cache protocol with the service.
|
* Creator of object used to broker the version of the distributed cache protocol with the service.
|
||||||
|
@ -107,7 +104,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
|
||||||
getLogger().debug("Enabling Map Cache Client Service [{}]", context.getName());
|
getLogger().debug("Enabling Map Cache Client Service [{}]", context.getName());
|
||||||
this.versionNegotiatorFactory = new StandardVersionNegotiatorFactory(
|
this.versionNegotiatorFactory = new StandardVersionNegotiatorFactory(
|
||||||
ProtocolVersion.V3.value(), ProtocolVersion.V2.value(), ProtocolVersion.V1.value());
|
ProtocolVersion.V3.value(), ProtocolVersion.V2.value(), ProtocolVersion.V1.value());
|
||||||
this.cacheClient = new NettyDistributedMapCacheClient(
|
this.cacheClient = new NettyMapCacheClient(
|
||||||
context.getProperty(HOSTNAME).getValue(),
|
context.getProperty(HOSTNAME).getValue(),
|
||||||
context.getProperty(PORT).asInteger(),
|
context.getProperty(PORT).asInteger(),
|
||||||
context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(),
|
context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(),
|
|
@ -39,7 +39,7 @@ import java.util.Set;
|
||||||
* The implementation of the {@link DistributedMapCacheClient} using the netty library to provide the remote
|
* The implementation of the {@link DistributedMapCacheClient} using the netty library to provide the remote
|
||||||
* communication services.
|
* communication services.
|
||||||
*/
|
*/
|
||||||
public class NettyDistributedMapCacheClient extends DistributedCacheClient {
|
public class NettyMapCacheClient extends CacheClient {
|
||||||
private final ComponentLog log;
|
private final ComponentLog log;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -54,7 +54,7 @@ public class NettyDistributedMapCacheClient extends DistributedCacheClient {
|
||||||
* @param identifier uniquely identifies this client
|
* @param identifier uniquely identifies this client
|
||||||
* @param log Component Log from instantiating Services
|
* @param log Component Log from instantiating Services
|
||||||
*/
|
*/
|
||||||
public NettyDistributedMapCacheClient(
|
public NettyMapCacheClient(
|
||||||
final String hostname,
|
final String hostname,
|
||||||
final int port,
|
final int port,
|
||||||
final int timeoutMillis,
|
final int timeoutMillis,
|
|
@ -26,10 +26,10 @@ import org.apache.nifi.ssl.SSLContextService;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The implementation of the {@link DistributedCacheClient} using the netty library to provide the remote
|
* The implementation of the {@link CacheClient} using the netty library to provide the remote
|
||||||
* communication services.
|
* communication services.
|
||||||
*/
|
*/
|
||||||
public class NettyDistributedSetCacheClient extends DistributedCacheClient {
|
public class NettySetCacheClient extends CacheClient {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
|
@ -42,7 +42,7 @@ public class NettyDistributedSetCacheClient extends DistributedCacheClient {
|
||||||
* @param factory creator of object used to broker the version of the distributed cache protocol with the service
|
* @param factory creator of object used to broker the version of the distributed cache protocol with the service
|
||||||
* @param identifier uniquely identifies this client
|
* @param identifier uniquely identifies this client
|
||||||
*/
|
*/
|
||||||
public NettyDistributedSetCacheClient(
|
public NettySetCacheClient(
|
||||||
final String hostname,
|
final String hostname,
|
||||||
final int port,
|
final int port,
|
||||||
final int timeoutMillis,
|
final int timeoutMillis,
|
|
@ -37,10 +37,10 @@ import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@Tags({"distributed", "cache", "state", "set", "cluster"})
|
@Tags({"distributed", "cache", "state", "set", "cluster"})
|
||||||
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.DistributedSetCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"})
|
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.SetCacheServer"})
|
||||||
@CapabilityDescription("Provides the ability to communicate with a DistributedSetCacheServer. This can be used in order to share a Set "
|
@CapabilityDescription("Provides the ability to communicate with a SetCacheServer. This can be used in order to share a Set "
|
||||||
+ "between nodes in a NiFi cluster")
|
+ "between nodes in a NiFi cluster")
|
||||||
public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient {
|
public class SetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient {
|
||||||
|
|
||||||
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
|
||||||
.name("Server Hostname")
|
.name("Server Hostname")
|
||||||
|
@ -71,10 +71,7 @@ public class DistributedSetCacheClientService extends AbstractControllerService
|
||||||
.defaultValue("30 secs")
|
.defaultValue("30 secs")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
/**
|
private volatile NettySetCacheClient cacheClient = null;
|
||||||
* The implementation of the business logic for {@link DistributedSetCacheClientService}.
|
|
||||||
*/
|
|
||||||
private volatile NettyDistributedSetCacheClient cacheClient = null;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creator of object used to broker the version of the distributed cache protocol with the service.
|
* Creator of object used to broker the version of the distributed cache protocol with the service.
|
||||||
|
@ -95,7 +92,7 @@ public class DistributedSetCacheClientService extends AbstractControllerService
|
||||||
public void onEnabled(final ConfigurationContext context) {
|
public void onEnabled(final ConfigurationContext context) {
|
||||||
getLogger().debug("Enabling Set Cache Client Service [{}]", context.getName());
|
getLogger().debug("Enabling Set Cache Client Service [{}]", context.getName());
|
||||||
this.versionNegotiatorFactory = new StandardVersionNegotiatorFactory(ProtocolVersion.V1.value());
|
this.versionNegotiatorFactory = new StandardVersionNegotiatorFactory(ProtocolVersion.V1.value());
|
||||||
this.cacheClient = new NettyDistributedSetCacheClient(
|
this.cacheClient = new NettySetCacheClient(
|
||||||
context.getProperty(HOSTNAME).getValue(),
|
context.getProperty(HOSTNAME).getValue(),
|
||||||
context.getProperty(PORT).asInteger(),
|
context.getProperty(PORT).asInteger(),
|
||||||
context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(),
|
context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(),
|
|
@ -12,5 +12,5 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService
|
org.apache.nifi.distributed.cache.client.SetCacheClientService
|
||||||
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
|
org.apache.nifi.distributed.cache.client.MapCacheClientService
|
|
@ -29,7 +29,7 @@ import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||||
|
|
||||||
public abstract class DistributedCacheServer extends AbstractControllerService {
|
public abstract class AbstractCacheServer extends AbstractControllerService {
|
||||||
|
|
||||||
public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used";
|
public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used";
|
||||||
public static final String EVICTION_STRATEGY_LRU = "Least Recently Used";
|
public static final String EVICTION_STRATEGY_LRU = "Least Recently Used";
|
|
@ -28,7 +28,7 @@ import org.apache.nifi.ssl.SSLContextService;
|
||||||
@Tags({"distributed", "set", "distinct", "cache", "server"})
|
@Tags({"distributed", "set", "distinct", "cache", "server"})
|
||||||
@CapabilityDescription("Provides a set (collection of unique values) cache that can be accessed over a socket. "
|
@CapabilityDescription("Provides a set (collection of unique values) cache that can be accessed over a socket. "
|
||||||
+ "Interaction with this service is typically accomplished via a DistributedSetCacheClient service.")
|
+ "Interaction with this service is typically accomplished via a DistributedSetCacheClient service.")
|
||||||
public class DistributedSetCacheServer extends DistributedCacheServer {
|
public class SetCacheServer extends AbstractCacheServer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected CacheServer createCacheServer(final ConfigurationContext context) {
|
protected CacheServer createCacheServer(final ConfigurationContext context) {
|
|
@ -24,16 +24,16 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.distributed.cache.server.CacheServer;
|
import org.apache.nifi.distributed.cache.server.CacheServer;
|
||||||
import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
|
import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
|
||||||
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
|
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
|
||||||
import org.apache.nifi.processor.DataUnit;
|
import org.apache.nifi.processor.DataUnit;
|
||||||
import org.apache.nifi.ssl.SSLContextService;
|
import org.apache.nifi.ssl.SSLContextService;
|
||||||
|
|
||||||
@Tags({"distributed", "cluster", "map", "cache", "server", "key/value"})
|
@Tags({"distributed", "cluster", "map", "cache", "server", "key/value"})
|
||||||
@CapabilityDescription("Provides a map (key/value) cache that can be accessed over a socket. Interaction with this service"
|
@CapabilityDescription("Provides a map (key/value) cache that can be accessed over a socket. Interaction with this service"
|
||||||
+ " is typically accomplished via a DistributedMapCacheClient service.")
|
+ " is typically accomplished via a Map Cache Client Service.")
|
||||||
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.ssl.StandardSSLContextService"})
|
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.MapCacheClientService"})
|
||||||
public class DistributedMapCacheServer extends DistributedCacheServer {
|
public class MapCacheServer extends AbstractCacheServer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected CacheServer createCacheServer(final ConfigurationContext context) {
|
protected CacheServer createCacheServer(final ConfigurationContext context) {
|
|
@ -12,5 +12,5 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
org.apache.nifi.distributed.cache.server.DistributedSetCacheServer
|
org.apache.nifi.distributed.cache.server.SetCacheServer
|
||||||
org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer
|
org.apache.nifi.distributed.cache.server.map.MapCacheServer
|
|
@ -18,7 +18,7 @@ package org.apache.nifi.distributed.cache.server.map;
|
||||||
|
|
||||||
import org.apache.commons.lang3.SerializationException;
|
import org.apache.commons.lang3.SerializationException;
|
||||||
import org.apache.nifi.distributed.cache.client.Deserializer;
|
import org.apache.nifi.distributed.cache.client.Deserializer;
|
||||||
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
|
import org.apache.nifi.distributed.cache.client.MapCacheClientService;
|
||||||
import org.apache.nifi.distributed.cache.client.Serializer;
|
import org.apache.nifi.distributed.cache.client.Serializer;
|
||||||
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
|
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
|
||||||
import org.apache.nifi.processor.Processor;
|
import org.apache.nifi.processor.Processor;
|
||||||
|
@ -51,12 +51,12 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class DistributedMapCacheTlsTest {
|
public class MapCacheServiceTlsTest {
|
||||||
|
|
||||||
private static TestRunner runner = null;
|
private static TestRunner runner = null;
|
||||||
private static SSLContextService sslContextService = null;
|
private static SSLContextService sslContextService = null;
|
||||||
private static DistributedMapCacheServer server = null;
|
private static MapCacheServer server = null;
|
||||||
private static DistributedMapCacheClientService client = null;
|
private static MapCacheClientService client = null;
|
||||||
private static final Serializer<String> serializer = new StringSerializer();
|
private static final Serializer<String> serializer = new StringSerializer();
|
||||||
private static final Deserializer<String> deserializer = new StringDeserializer();
|
private static final Deserializer<String> deserializer = new StringDeserializer();
|
||||||
|
|
||||||
|
@ -67,18 +67,18 @@ public class DistributedMapCacheTlsTest {
|
||||||
runner.addControllerService(sslContextService.getIdentifier(), sslContextService);
|
runner.addControllerService(sslContextService.getIdentifier(), sslContextService);
|
||||||
runner.enableControllerService(sslContextService);
|
runner.enableControllerService(sslContextService);
|
||||||
|
|
||||||
server = new DistributedMapCacheServer();
|
server = new MapCacheServer();
|
||||||
runner.addControllerService(server.getClass().getName(), server);
|
runner.addControllerService(server.getClass().getName(), server);
|
||||||
runner.setProperty(server, DistributedMapCacheServer.PORT, "0");
|
runner.setProperty(server, MapCacheServer.PORT, "0");
|
||||||
runner.setProperty(server, DistributedMapCacheServer.SSL_CONTEXT_SERVICE, sslContextService.getIdentifier());
|
runner.setProperty(server, MapCacheServer.SSL_CONTEXT_SERVICE, sslContextService.getIdentifier());
|
||||||
runner.enableControllerService(server);
|
runner.enableControllerService(server);
|
||||||
final int listeningPort = server.getPort();
|
final int listeningPort = server.getPort();
|
||||||
|
|
||||||
client = new DistributedMapCacheClientService();
|
client = new MapCacheClientService();
|
||||||
runner.addControllerService(client.getClass().getName(), client);
|
runner.addControllerService(client.getClass().getName(), client);
|
||||||
runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost");
|
runner.setProperty(client, MapCacheClientService.HOSTNAME, "localhost");
|
||||||
runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(listeningPort));
|
runner.setProperty(client, MapCacheClientService.PORT, String.valueOf(listeningPort));
|
||||||
runner.setProperty(client, DistributedMapCacheClientService.SSL_CONTEXT_SERVICE, sslContextService.getIdentifier());
|
runner.setProperty(client, MapCacheClientService.SSL_CONTEXT_SERVICE, sslContextService.getIdentifier());
|
||||||
runner.enableControllerService(client);
|
runner.enableControllerService(client);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.nifi.distributed.cache.server.map;
|
||||||
import org.apache.commons.lang3.SerializationException;
|
import org.apache.commons.lang3.SerializationException;
|
||||||
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
|
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
|
||||||
import org.apache.nifi.distributed.cache.client.Deserializer;
|
import org.apache.nifi.distributed.cache.client.Deserializer;
|
||||||
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
|
import org.apache.nifi.distributed.cache.client.MapCacheClientService;
|
||||||
import org.apache.nifi.distributed.cache.client.Serializer;
|
import org.apache.nifi.distributed.cache.client.Serializer;
|
||||||
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
|
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
|
||||||
import org.apache.nifi.processor.Processor;
|
import org.apache.nifi.processor.Processor;
|
||||||
|
@ -45,11 +45,11 @@ import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
@Timeout(5)
|
@Timeout(5)
|
||||||
public class DistributedMapCacheTest {
|
public class MapCacheTest {
|
||||||
|
|
||||||
private static TestRunner runner = null;
|
private static TestRunner runner = null;
|
||||||
private static DistributedMapCacheServer server = null;
|
private static MapCacheServer server = null;
|
||||||
private static DistributedMapCacheClientService client = null;
|
private static MapCacheClientService client = null;
|
||||||
private static final Serializer<String> serializer = new StringSerializer();
|
private static final Serializer<String> serializer = new StringSerializer();
|
||||||
private static final Deserializer<String> deserializer = new StringDeserializer();
|
private static final Deserializer<String> deserializer = new StringDeserializer();
|
||||||
|
|
||||||
|
@ -57,16 +57,16 @@ public class DistributedMapCacheTest {
|
||||||
public static void startServices() throws Exception {
|
public static void startServices() throws Exception {
|
||||||
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
|
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
|
||||||
|
|
||||||
server = new DistributedMapCacheServer();
|
server = new MapCacheServer();
|
||||||
runner.addControllerService(server.getClass().getName(), server);
|
runner.addControllerService(server.getClass().getName(), server);
|
||||||
runner.setProperty(server, DistributedMapCacheServer.PORT, "0");
|
runner.setProperty(server, MapCacheServer.PORT, "0");
|
||||||
runner.enableControllerService(server);
|
runner.enableControllerService(server);
|
||||||
final int port = server.getPort();
|
final int port = server.getPort();
|
||||||
|
|
||||||
client = new DistributedMapCacheClientService();
|
client = new MapCacheClientService();
|
||||||
runner.addControllerService(client.getClass().getName(), client);
|
runner.addControllerService(client.getClass().getName(), client);
|
||||||
runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost");
|
runner.setProperty(client, MapCacheClientService.HOSTNAME, "localhost");
|
||||||
runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(port));
|
runner.setProperty(client, MapCacheClientService.PORT, String.valueOf(port));
|
||||||
runner.enableControllerService(client);
|
runner.enableControllerService(client);
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,12 +22,12 @@ import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
|
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
|
||||||
import org.apache.nifi.distributed.cache.client.Deserializer;
|
import org.apache.nifi.distributed.cache.client.Deserializer;
|
||||||
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
|
import org.apache.nifi.distributed.cache.client.MapCacheClientService;
|
||||||
import org.apache.nifi.distributed.cache.client.Serializer;
|
import org.apache.nifi.distributed.cache.client.Serializer;
|
||||||
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
|
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
|
||||||
import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
|
import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
|
||||||
import org.apache.nifi.distributed.cache.server.CacheServer;
|
import org.apache.nifi.distributed.cache.server.CacheServer;
|
||||||
import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
|
import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
|
||||||
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
|
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
|
||||||
import org.apache.nifi.event.transport.EventServer;
|
import org.apache.nifi.event.transport.EventServer;
|
||||||
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
|
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
|
||||||
|
@ -76,7 +76,7 @@ public class TestDistributedMapServerAndClient {
|
||||||
|
|
||||||
private TestRunner runner;
|
private TestRunner runner;
|
||||||
|
|
||||||
private DistributedMapCacheServer server;
|
private MapCacheServer server;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setRunner() throws InitializationException, IOException {
|
public void setRunner() throws InitializationException, IOException {
|
||||||
|
@ -86,10 +86,10 @@ public class TestDistributedMapServerAndClient {
|
||||||
|
|
||||||
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
|
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
|
||||||
|
|
||||||
server = new DistributedMapCacheServer();
|
server = new MapCacheServer();
|
||||||
runner.addControllerService("server", server);
|
runner.addControllerService("server", server);
|
||||||
|
|
||||||
runner.setProperty(server, DistributedMapCacheServer.PORT, "0");
|
runner.setProperty(server, MapCacheServer.PORT, "0");
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
|
@ -101,11 +101,11 @@ public class TestDistributedMapServerAndClient {
|
||||||
public void testNonPersistentMapServerAndClient() throws InitializationException, IOException {
|
public void testNonPersistentMapServerAndClient() throws InitializationException, IOException {
|
||||||
runner.enableControllerService(server);
|
runner.enableControllerService(server);
|
||||||
|
|
||||||
DistributedMapCacheClientService client = new DistributedMapCacheClientService();
|
MapCacheClientService client = new MapCacheClientService();
|
||||||
try {
|
try {
|
||||||
runner.addControllerService("client", client);
|
runner.addControllerService("client", client);
|
||||||
runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost");
|
runner.setProperty(client, MapCacheClientService.HOSTNAME, "localhost");
|
||||||
runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
|
runner.setProperty(client, MapCacheClientService.PORT, String.valueOf(server.getPort()));
|
||||||
runner.enableControllerService(client);
|
runner.enableControllerService(client);
|
||||||
|
|
||||||
final Serializer<String> valueSerializer = new StringSerializer();
|
final Serializer<String> valueSerializer = new StringSerializer();
|
||||||
|
@ -150,18 +150,18 @@ public class TestDistributedMapServerAndClient {
|
||||||
public void testOptimisticLock() throws Exception {
|
public void testOptimisticLock() throws Exception {
|
||||||
runner.enableControllerService(server);
|
runner.enableControllerService(server);
|
||||||
|
|
||||||
DistributedMapCacheClientService client1 = new DistributedMapCacheClientService();
|
MapCacheClientService client1 = new MapCacheClientService();
|
||||||
MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client1, "client1");
|
MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client1, "client1");
|
||||||
client1.initialize(clientInitContext1);
|
client1.initialize(clientInitContext1);
|
||||||
|
|
||||||
DistributedMapCacheClientService client2 = new DistributedMapCacheClientService();
|
MapCacheClientService client2 = new MapCacheClientService();
|
||||||
MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2");
|
MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2");
|
||||||
client2.initialize(clientInitContext2);
|
client2.initialize(clientInitContext2);
|
||||||
|
|
||||||
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
|
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
|
||||||
clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
|
clientProperties.put(MapCacheClientService.HOSTNAME, "localhost");
|
||||||
clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
|
clientProperties.put(MapCacheClientService.PORT, String.valueOf(server.getPort()));
|
||||||
clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
|
clientProperties.put(MapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
|
||||||
|
|
||||||
MockConfigurationContext clientContext1 = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup(), null);
|
MockConfigurationContext clientContext1 = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup(), null);
|
||||||
client1.onEnabled(clientContext1);
|
client1.onEnabled(clientContext1);
|
||||||
|
@ -221,7 +221,7 @@ public class TestDistributedMapServerAndClient {
|
||||||
@Test
|
@Test
|
||||||
public void testBackwardCompatibility() throws Exception {
|
public void testBackwardCompatibility() throws Exception {
|
||||||
// Create a server that only supports protocol version 1.
|
// Create a server that only supports protocol version 1.
|
||||||
server = new DistributedMapCacheServer() {
|
server = new MapCacheServer() {
|
||||||
@Override
|
@Override
|
||||||
protected CacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int maxReadSize) throws IOException {
|
protected CacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int maxReadSize) throws IOException {
|
||||||
return new StandardMapCacheServer(getLogger(), getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize) {
|
return new StandardMapCacheServer(getLogger(), getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize) {
|
||||||
|
@ -233,17 +233,17 @@ public class TestDistributedMapServerAndClient {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
runner.addControllerService("server", server);
|
runner.addControllerService("server", server);
|
||||||
runner.setProperty(server, DistributedMapCacheServer.PORT, "0");
|
runner.setProperty(server, MapCacheServer.PORT, "0");
|
||||||
runner.enableControllerService(server);
|
runner.enableControllerService(server);
|
||||||
|
|
||||||
DistributedMapCacheClientService client = new DistributedMapCacheClientService();
|
MapCacheClientService client = new MapCacheClientService();
|
||||||
MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client, "client");
|
MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client, "client");
|
||||||
client.initialize(clientInitContext1);
|
client.initialize(clientInitContext1);
|
||||||
|
|
||||||
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
|
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
|
||||||
clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
|
clientProperties.put(MapCacheClientService.HOSTNAME, "localhost");
|
||||||
clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
|
clientProperties.put(MapCacheClientService.PORT, String.valueOf(server.getPort()));
|
||||||
clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
|
clientProperties.put(MapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
|
||||||
|
|
||||||
MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup(), null);
|
MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup(), null);
|
||||||
client.onEnabled(clientContext);
|
client.onEnabled(clientContext);
|
||||||
|
@ -276,12 +276,12 @@ public class TestDistributedMapServerAndClient {
|
||||||
public void testLimitServiceReadSize() throws InitializationException, IOException {
|
public void testLimitServiceReadSize() throws InitializationException, IOException {
|
||||||
runner.enableControllerService(server);
|
runner.enableControllerService(server);
|
||||||
|
|
||||||
final DistributedMapCacheClientService client = createClient(server.getPort());
|
final MapCacheClientService client = createClient(server.getPort());
|
||||||
try {
|
try {
|
||||||
final Serializer<String> serializer = new StringSerializer();
|
final Serializer<String> serializer = new StringSerializer();
|
||||||
|
|
||||||
final String value = "value";
|
final String value = "value";
|
||||||
final int maxReadSize = new MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
|
final int maxReadSize = new MockPropertyValue(AbstractCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
|
||||||
final int belowThreshold = maxReadSize / value.length();
|
final int belowThreshold = maxReadSize / value.length();
|
||||||
final int aboveThreshold = belowThreshold + 1;
|
final int aboveThreshold = belowThreshold + 1;
|
||||||
final String valueBelowThreshold = StringUtils.repeat(value, belowThreshold);
|
final String valueBelowThreshold = StringUtils.repeat(value, belowThreshold);
|
||||||
|
@ -300,12 +300,12 @@ public class TestDistributedMapServerAndClient {
|
||||||
final NettyEventServerFactory serverFactory = getEventServerFactory(0, messages);
|
final NettyEventServerFactory serverFactory = getEventServerFactory(0, messages);
|
||||||
final EventServer eventServer = serverFactory.getEventServer();
|
final EventServer eventServer = serverFactory.getEventServer();
|
||||||
|
|
||||||
DistributedMapCacheClientService client = new DistributedMapCacheClientService();
|
MapCacheClientService client = new MapCacheClientService();
|
||||||
|
|
||||||
runner.addControllerService("client", client);
|
runner.addControllerService("client", client);
|
||||||
runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost");
|
runner.setProperty(client, MapCacheClientService.HOSTNAME, "localhost");
|
||||||
runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(eventServer.getListeningPort()));
|
runner.setProperty(client, MapCacheClientService.PORT, String.valueOf(eventServer.getListeningPort()));
|
||||||
runner.setProperty(client, DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "250 ms");
|
runner.setProperty(client, MapCacheClientService.COMMUNICATIONS_TIMEOUT, "250 ms");
|
||||||
runner.enableControllerService(client);
|
runner.enableControllerService(client);
|
||||||
|
|
||||||
final Serializer<String> valueSerializer = new StringSerializer();
|
final Serializer<String> valueSerializer = new StringSerializer();
|
||||||
|
@ -328,14 +328,14 @@ public class TestDistributedMapServerAndClient {
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
private DistributedMapCacheClientService createClient(final int port) throws InitializationException {
|
private MapCacheClientService createClient(final int port) throws InitializationException {
|
||||||
final DistributedMapCacheClientService client = new DistributedMapCacheClientService();
|
final MapCacheClientService client = new MapCacheClientService();
|
||||||
final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
|
final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
|
||||||
client.initialize(clientInitContext);
|
client.initialize(clientInitContext);
|
||||||
|
|
||||||
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
|
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
|
||||||
clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
|
clientProperties.put(MapCacheClientService.HOSTNAME, "localhost");
|
||||||
clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(port));
|
clientProperties.put(MapCacheClientService.PORT, String.valueOf(port));
|
||||||
final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup(), null);
|
final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup(), null);
|
||||||
client.onEnabled(clientContext);
|
client.onEnabled(clientContext);
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.distributed.cache.server.map;
|
package org.apache.nifi.distributed.cache.server.map;
|
||||||
|
|
||||||
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
|
import org.apache.nifi.distributed.cache.client.MapCacheClientService;
|
||||||
import org.apache.nifi.distributed.cache.client.Serializer;
|
import org.apache.nifi.distributed.cache.client.Serializer;
|
||||||
import org.apache.nifi.distributed.cache.operations.MapOperation;
|
import org.apache.nifi.distributed.cache.operations.MapOperation;
|
||||||
import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
|
import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
|
||||||
|
@ -48,7 +48,7 @@ import java.util.Arrays;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
|
||||||
public class TestDistributedMapCacheClientService {
|
public class TestMapCacheClientService {
|
||||||
private static final String LOCALHOST = "127.0.0.1";
|
private static final String LOCALHOST = "127.0.0.1";
|
||||||
|
|
||||||
private static final int MAX_REQUEST_LENGTH = 64;
|
private static final int MAX_REQUEST_LENGTH = 64;
|
||||||
|
@ -91,13 +91,13 @@ public class TestDistributedMapCacheClientService {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testClientTimeoutOnServerNetworkFailure() throws InitializationException {
|
public void testClientTimeoutOnServerNetworkFailure() throws InitializationException {
|
||||||
final String clientId = DistributedMapCacheClientService.class.getSimpleName();
|
final String clientId = MapCacheClientService.class.getSimpleName();
|
||||||
final DistributedMapCacheClientService clientService = new DistributedMapCacheClientService();
|
final MapCacheClientService clientService = new MapCacheClientService();
|
||||||
|
|
||||||
runner.addControllerService(clientId, clientService);
|
runner.addControllerService(clientId, clientService);
|
||||||
runner.setProperty(clientService, DistributedMapCacheClientService.HOSTNAME, LOCALHOST);
|
runner.setProperty(clientService, MapCacheClientService.HOSTNAME, LOCALHOST);
|
||||||
runner.setProperty(clientService, DistributedMapCacheClientService.PORT, String.valueOf(port));
|
runner.setProperty(clientService, MapCacheClientService.PORT, String.valueOf(port));
|
||||||
runner.setProperty(clientService, DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "500 ms");
|
runner.setProperty(clientService, MapCacheClientService.COMMUNICATIONS_TIMEOUT, "500 ms");
|
||||||
runner.enableControllerService(clientService);
|
runner.enableControllerService(clientService);
|
||||||
runner.assertValid();
|
runner.assertValid();
|
||||||
|
|
|
@ -17,9 +17,9 @@
|
||||||
package org.apache.nifi.distributed.cache.server.set;
|
package org.apache.nifi.distributed.cache.server.set;
|
||||||
|
|
||||||
import org.apache.commons.lang3.SerializationException;
|
import org.apache.commons.lang3.SerializationException;
|
||||||
import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
|
import org.apache.nifi.distributed.cache.client.SetCacheClientService;
|
||||||
import org.apache.nifi.distributed.cache.client.Serializer;
|
import org.apache.nifi.distributed.cache.client.Serializer;
|
||||||
import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer;
|
import org.apache.nifi.distributed.cache.server.SetCacheServer;
|
||||||
import org.apache.nifi.processor.Processor;
|
import org.apache.nifi.processor.Processor;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
@ -35,27 +35,27 @@ import java.nio.charset.StandardCharsets;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class DistributedSetCacheTest {
|
public class SetCacheServiceTest {
|
||||||
|
|
||||||
private static TestRunner runner = null;
|
private static TestRunner runner = null;
|
||||||
private static DistributedSetCacheServer server = null;
|
private static SetCacheServer server = null;
|
||||||
private static DistributedSetCacheClientService client = null;
|
private static SetCacheClientService client = null;
|
||||||
private static final Serializer<String> serializer = new StringSerializer();
|
private static final Serializer<String> serializer = new StringSerializer();
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void setRunner() throws Exception {
|
public static void setRunner() throws Exception {
|
||||||
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
|
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
|
||||||
|
|
||||||
server = new DistributedSetCacheServer();
|
server = new SetCacheServer();
|
||||||
runner.addControllerService(server.getClass().getName(), server);
|
runner.addControllerService(server.getClass().getName(), server);
|
||||||
runner.setProperty(server, DistributedSetCacheServer.PORT, "0");
|
runner.setProperty(server, SetCacheServer.PORT, "0");
|
||||||
runner.enableControllerService(server);
|
runner.enableControllerService(server);
|
||||||
final int port = server.getPort();
|
final int port = server.getPort();
|
||||||
|
|
||||||
client = new DistributedSetCacheClientService();
|
client = new SetCacheClientService();
|
||||||
runner.addControllerService(client.getClass().getName(), client);
|
runner.addControllerService(client.getClass().getName(), client);
|
||||||
runner.setProperty(client, DistributedSetCacheClientService.HOSTNAME, "localhost");
|
runner.setProperty(client, SetCacheClientService.HOSTNAME, "localhost");
|
||||||
runner.setProperty(client, DistributedSetCacheClientService.PORT, String.valueOf(port));
|
runner.setProperty(client, SetCacheClientService.PORT, String.valueOf(port));
|
||||||
runner.enableControllerService(client);
|
runner.enableControllerService(client);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,10 +20,10 @@ import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.lang3.SerializationException;
|
import org.apache.commons.lang3.SerializationException;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
|
import org.apache.nifi.distributed.cache.client.SetCacheClientService;
|
||||||
import org.apache.nifi.distributed.cache.client.Serializer;
|
import org.apache.nifi.distributed.cache.client.Serializer;
|
||||||
import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
|
import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
|
||||||
import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer;
|
import org.apache.nifi.distributed.cache.server.SetCacheServer;
|
||||||
import org.apache.nifi.processor.DataUnit;
|
import org.apache.nifi.processor.DataUnit;
|
||||||
import org.apache.nifi.processor.Processor;
|
import org.apache.nifi.processor.Processor;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
@ -55,7 +55,7 @@ public class TestDistributedSetServerAndClient {
|
||||||
|
|
||||||
private TestRunner runner;
|
private TestRunner runner;
|
||||||
|
|
||||||
private DistributedSetCacheServer server;
|
private SetCacheServer server;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setRunner() throws InitializationException, IOException {
|
public void setRunner() throws InitializationException, IOException {
|
||||||
|
@ -65,10 +65,10 @@ public class TestDistributedSetServerAndClient {
|
||||||
|
|
||||||
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
|
runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
|
||||||
|
|
||||||
server = new DistributedSetCacheServer();
|
server = new SetCacheServer();
|
||||||
runner.addControllerService("server", server);
|
runner.addControllerService("server", server);
|
||||||
|
|
||||||
runner.setProperty(server, DistributedSetCacheServer.PORT, "0");
|
runner.setProperty(server, SetCacheServer.PORT, "0");
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
|
@ -80,7 +80,7 @@ public class TestDistributedSetServerAndClient {
|
||||||
public void testNonPersistentSetServerAndClient() throws InitializationException, IOException {
|
public void testNonPersistentSetServerAndClient() throws InitializationException, IOException {
|
||||||
runner.enableControllerService(server);
|
runner.enableControllerService(server);
|
||||||
|
|
||||||
final DistributedSetCacheClientService client = createClient(server.getPort());
|
final SetCacheClientService client = createClient(server.getPort());
|
||||||
try {
|
try {
|
||||||
final Serializer<String> serializer = new StringSerializer();
|
final Serializer<String> serializer = new StringSerializer();
|
||||||
final boolean added = client.addIfAbsent("test", serializer);
|
final boolean added = client.addIfAbsent("test", serializer);
|
||||||
|
@ -104,10 +104,10 @@ public class TestDistributedSetServerAndClient {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPersistentSetServerAndClient() throws InitializationException, IOException {
|
public void testPersistentSetServerAndClient() throws InitializationException, IOException {
|
||||||
runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
|
runner.setProperty(server, SetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
|
||||||
runner.enableControllerService(server);
|
runner.enableControllerService(server);
|
||||||
|
|
||||||
final DistributedSetCacheClientService client = createClient(server.getPort());
|
final SetCacheClientService client = createClient(server.getPort());
|
||||||
try {
|
try {
|
||||||
final Serializer<String> serializer = new StringSerializer();
|
final Serializer<String> serializer = new StringSerializer();
|
||||||
final boolean added = client.addIfAbsent("test", serializer);
|
final boolean added = client.addIfAbsent("test", serializer);
|
||||||
|
@ -135,12 +135,12 @@ public class TestDistributedSetServerAndClient {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException {
|
public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException {
|
||||||
runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
|
runner.setProperty(server, SetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
|
||||||
runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
|
runner.setProperty(server, SetCacheServer.MAX_CACHE_ENTRIES, "3");
|
||||||
runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_LFU);
|
runner.setProperty(server, SetCacheServer.EVICTION_POLICY, SetCacheServer.EVICTION_STRATEGY_LFU);
|
||||||
runner.enableControllerService(server);
|
runner.enableControllerService(server);
|
||||||
|
|
||||||
final DistributedSetCacheClientService client = createClient(server.getPort());
|
final SetCacheClientService client = createClient(server.getPort());
|
||||||
try {
|
try {
|
||||||
final Serializer<String> serializer = new StringSerializer();
|
final Serializer<String> serializer = new StringSerializer();
|
||||||
final boolean added = client.addIfAbsent("test", serializer);
|
final boolean added = client.addIfAbsent("test", serializer);
|
||||||
|
@ -173,12 +173,12 @@ public class TestDistributedSetServerAndClient {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException {
|
public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException {
|
||||||
runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
|
runner.setProperty(server, SetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath());
|
||||||
runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3");
|
runner.setProperty(server, SetCacheServer.MAX_CACHE_ENTRIES, "3");
|
||||||
runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO);
|
runner.setProperty(server, SetCacheServer.EVICTION_POLICY, SetCacheServer.EVICTION_STRATEGY_FIFO);
|
||||||
runner.enableControllerService(server);
|
runner.enableControllerService(server);
|
||||||
|
|
||||||
final DistributedSetCacheClientService client = createClient(server.getPort());
|
final SetCacheClientService client = createClient(server.getPort());
|
||||||
try {
|
try {
|
||||||
final Serializer<String> serializer = new StringSerializer();
|
final Serializer<String> serializer = new StringSerializer();
|
||||||
|
|
||||||
|
@ -218,12 +218,12 @@ public class TestDistributedSetServerAndClient {
|
||||||
public void testLimitServiceReadSize() throws InitializationException, IOException {
|
public void testLimitServiceReadSize() throws InitializationException, IOException {
|
||||||
runner.enableControllerService(server);
|
runner.enableControllerService(server);
|
||||||
|
|
||||||
final DistributedSetCacheClientService client = createClient(server.getPort());
|
final SetCacheClientService client = createClient(server.getPort());
|
||||||
try {
|
try {
|
||||||
final Serializer<String> serializer = new StringSerializer();
|
final Serializer<String> serializer = new StringSerializer();
|
||||||
|
|
||||||
final String value = "value";
|
final String value = "value";
|
||||||
final int maxReadSize = new MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
|
final int maxReadSize = new MockPropertyValue(AbstractCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
|
||||||
final int belowThreshold = maxReadSize / value.length();
|
final int belowThreshold = maxReadSize / value.length();
|
||||||
final int aboveThreshold = belowThreshold + 1;
|
final int aboveThreshold = belowThreshold + 1;
|
||||||
final String valueBelowThreshold = StringUtils.repeat(value, belowThreshold);
|
final String valueBelowThreshold = StringUtils.repeat(value, belowThreshold);
|
||||||
|
@ -243,14 +243,14 @@ public class TestDistributedSetServerAndClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private DistributedSetCacheClientService createClient(final int port) throws InitializationException {
|
private SetCacheClientService createClient(final int port) throws InitializationException {
|
||||||
final DistributedSetCacheClientService client = new DistributedSetCacheClientService();
|
final SetCacheClientService client = new SetCacheClientService();
|
||||||
final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
|
final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client");
|
||||||
client.initialize(clientInitContext);
|
client.initialize(clientInitContext);
|
||||||
|
|
||||||
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
|
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
|
||||||
clientProperties.put(DistributedSetCacheClientService.HOSTNAME, "localhost");
|
clientProperties.put(SetCacheClientService.HOSTNAME, "localhost");
|
||||||
clientProperties.put(DistributedSetCacheClientService.PORT, String.valueOf(port));
|
clientProperties.put(SetCacheClientService.PORT, String.valueOf(port));
|
||||||
final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup(), null);
|
final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup(), null);
|
||||||
client.onEnabled(clientContext);
|
client.onEnabled(clientContext);
|
||||||
|
|
||||||
|
|
|
@ -44,7 +44,7 @@ public class TestDistributedMapCacheLookupService {
|
||||||
public void testDistributedMapCacheLookupService() throws InitializationException {
|
public void testDistributedMapCacheLookupService() throws InitializationException {
|
||||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||||
final DistributedMapCacheLookupService service = new DistributedMapCacheLookupService();
|
final DistributedMapCacheLookupService service = new DistributedMapCacheLookupService();
|
||||||
final DistributedMapCacheClient client = new DistributedMapCacheClientImpl();
|
final DistributedMapCacheClient client = new EphemeralMapCacheClientService();
|
||||||
|
|
||||||
runner.addControllerService("client", client);
|
runner.addControllerService("client", client);
|
||||||
runner.addControllerService("lookup-service", service);
|
runner.addControllerService("lookup-service", service);
|
||||||
|
@ -62,7 +62,7 @@ public class TestDistributedMapCacheLookupService {
|
||||||
assertEquals(EMPTY_STRING, absent);
|
assertEquals(EMPTY_STRING, absent);
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient {
|
static final class EphemeralMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
|
||||||
|
|
||||||
private Map<String, String> map = new HashMap<String, String>();
|
private Map<String, String> map = new HashMap<String, String>();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue