diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-listed-entity/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-extension-bundles/nifi-extension-utils/nifi-listed-entity/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java index 68e8cb4be7..338f80dc56 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-listed-entity/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-listed-entity/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java @@ -217,7 +217,7 @@ public class TestAbstractListProcessor { // Require a cache service. runner.assertNotValid(); - final DistributedCache trackingCache = new DistributedCache(); + final EphemeralMapCacheClientService trackingCache = new EphemeralMapCacheClientService(); runner.addControllerService("tracking-cache", trackingCache); runner.enableControllerService(trackingCache); @@ -361,7 +361,7 @@ public class TestAbstractListProcessor { String.format("Expected verification result to match pattern [%s]. Actual explanation was: %s", expectedExplanationRegex, result.getExplanation())); } - static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient { + static class EphemeralMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient { private final Map stored = new HashMap<>(); private int fetchCount = 0; diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java index f6b2c412ba..d8e1d2224d 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java @@ -20,7 +20,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; +import org.apache.nifi.distributed.cache.client.MapCacheClientService; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.InitializationException; @@ -42,9 +42,9 @@ public class TestDetectDuplicate { @Test public void testDuplicate() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); - final DistributedMapCacheClientImpl client = createClient(); + final EphemeralMapCacheClientService client = createClient(); final Map clientProperties = new HashMap<>(); - clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); + clientProperties.put(MapCacheClientService.HOSTNAME.getName(), "localhost"); runner.addControllerService("client", client, clientProperties); runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); @@ -68,9 +68,9 @@ public class TestDetectDuplicate { public void testDuplicateWithAgeOff() throws InitializationException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); - final DistributedMapCacheClientImpl client = createClient(); + final EphemeralMapCacheClientService client = createClient(); final Map clientProperties = new HashMap<>(); - clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); + clientProperties.put(MapCacheClientService.HOSTNAME.getName(), "localhost"); runner.addControllerService("client", client, clientProperties); runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); @@ -92,9 +92,9 @@ public class TestDetectDuplicate { runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); } - private DistributedMapCacheClientImpl createClient() throws InitializationException { + private EphemeralMapCacheClientService createClient() throws InitializationException { - final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl(); + final EphemeralMapCacheClientService client = new EphemeralMapCacheClientService(); final ComponentLog logger = new MockComponentLog("client", client); final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger, new MockStateManager(client)); client.initialize(clientInitContext); @@ -105,9 +105,9 @@ public class TestDetectDuplicate { @Test public void testDuplicateNoCache() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); - final DistributedMapCacheClientImpl client = createClient(); + final EphemeralMapCacheClientService client = createClient(); final Map clientProperties = new HashMap<>(); - clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); + clientProperties.put(MapCacheClientService.HOSTNAME.getName(), "localhost"); runner.addControllerService("client", client, clientProperties); runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); @@ -141,9 +141,9 @@ public class TestDetectDuplicate { public void testDuplicateNoCacheWithAgeOff() throws InitializationException, InterruptedException { final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); - final DistributedMapCacheClientImpl client = createClient(); + final EphemeralMapCacheClientService client = createClient(); final Map clientProperties = new HashMap<>(); - clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); + clientProperties.put(MapCacheClientService.HOSTNAME.getName(), "localhost"); runner.addControllerService("client", client, clientProperties); runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); @@ -168,7 +168,7 @@ public class TestDetectDuplicate { runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); } - static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient { + static final class EphemeralMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient { boolean exists = false; private Object cacheValue; @@ -180,10 +180,10 @@ public class TestDetectDuplicate { @Override protected java.util.List getSupportedPropertyDescriptors() { final List props = new ArrayList<>(); - props.add(DistributedMapCacheClientService.HOSTNAME); - props.add(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT); - props.add(DistributedMapCacheClientService.PORT); - props.add(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE); + props.add(MapCacheClientService.HOSTNAME); + props.add(MapCacheClientService.COMMUNICATIONS_TIMEOUT); + props.add(MapCacheClientService.PORT); + props.add(MapCacheClientService.SSL_CONTEXT_SERVICE); return props; } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClient.java similarity index 86% rename from nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java rename to nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClient.java index 6a65e26ed1..c0372b462d 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClient.java @@ -29,10 +29,9 @@ import org.apache.nifi.ssl.SSLContextService; import java.io.IOException; /** - * Encapsulate operations which may be performed using a {@link DistributedSetCacheClientService} or a - * {@link DistributedMapCacheClientService}. + * Encapsulate operations which may be performed using a Cache Client Service */ -public class DistributedCacheClient { +public class CacheClient { private static final boolean DAEMON_THREAD_ENABLED = true; @@ -54,12 +53,12 @@ public class DistributedCacheClient { * @param factory creator of object used to broker the version of the distributed cache protocol with the service * @param identifier uniquely identifies this client */ - protected DistributedCacheClient(final String hostname, - final int port, - final int timeoutMillis, - final SSLContextService sslContextService, - final VersionNegotiatorFactory factory, - final String identifier) { + protected CacheClient(final String hostname, + final int port, + final int timeoutMillis, + final SSLContextService sslContextService, + final VersionNegotiatorFactory factory, + final String identifier) { final String poolName = String.format("%s[%s]", getClass().getSimpleName(), identifier); this.eventLoopGroup = new NioEventLoopGroup(new DefaultThreadFactory(poolName, DAEMON_THREAD_ENABLED)); this.channelPool = new CacheClientChannelPoolFactory().createChannelPool( diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java index 905de8dc52..c7a337f15e 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java @@ -33,9 +33,7 @@ import javax.net.ssl.SSLContext; import java.time.Duration; /** - * Factory for construction of new {@link ChannelPool}, used by distributed cache clients to invoke service - * methods. Cache clients include the NiFi services {@link DistributedSetCacheClientService} - * and {@link DistributedMapCacheClientService}. + * Factory for construction of new {@link ChannelPool}, used by distributed cache clients to invoke service methods. */ class CacheClientChannelPoolFactory { diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/MapCacheClientService.java similarity index 95% rename from nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java rename to nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/MapCacheClientService.java index f92d4634f5..d44c999508 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/MapCacheClientService.java @@ -46,10 +46,10 @@ import java.util.Set; import java.util.concurrent.TimeUnit; @Tags({"distributed", "cache", "state", "map", "cluster"}) -@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) -@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map " +@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.MapCacheServer"}) +@CapabilityDescription("Provides the ability to communicate with a MapCacheServer. This can be used in order to share a Map " + "between nodes in a NiFi cluster") -public class DistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient { +public class MapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient { private static final long DEFAULT_CACHE_REVISION = 0L; @@ -82,10 +82,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService .defaultValue("30 secs") .build(); - /** - * The implementation of the business logic for {@link DistributedMapCacheClientService}. - */ - private volatile NettyDistributedMapCacheClient cacheClient = null; + private volatile NettyMapCacheClient cacheClient = null; /** * Creator of object used to broker the version of the distributed cache protocol with the service. @@ -107,7 +104,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService getLogger().debug("Enabling Map Cache Client Service [{}]", context.getName()); this.versionNegotiatorFactory = new StandardVersionNegotiatorFactory( ProtocolVersion.V3.value(), ProtocolVersion.V2.value(), ProtocolVersion.V1.value()); - this.cacheClient = new NettyDistributedMapCacheClient( + this.cacheClient = new NettyMapCacheClient( context.getProperty(HOSTNAME).getValue(), context.getProperty(PORT).asInteger(), context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyMapCacheClient.java similarity index 99% rename from nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java rename to nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyMapCacheClient.java index 694d241549..a0b21390ba 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyMapCacheClient.java @@ -39,7 +39,7 @@ import java.util.Set; * The implementation of the {@link DistributedMapCacheClient} using the netty library to provide the remote * communication services. */ -public class NettyDistributedMapCacheClient extends DistributedCacheClient { +public class NettyMapCacheClient extends CacheClient { private final ComponentLog log; /** @@ -54,7 +54,7 @@ public class NettyDistributedMapCacheClient extends DistributedCacheClient { * @param identifier uniquely identifies this client * @param log Component Log from instantiating Services */ - public NettyDistributedMapCacheClient( + public NettyMapCacheClient( final String hostname, final int port, final int timeoutMillis, diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedSetCacheClient.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettySetCacheClient.java similarity index 95% rename from nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedSetCacheClient.java rename to nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettySetCacheClient.java index 9bcc64a6d4..86a8947715 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedSetCacheClient.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettySetCacheClient.java @@ -26,10 +26,10 @@ import org.apache.nifi.ssl.SSLContextService; import java.io.IOException; /** - * The implementation of the {@link DistributedCacheClient} using the netty library to provide the remote + * The implementation of the {@link CacheClient} using the netty library to provide the remote * communication services. */ -public class NettyDistributedSetCacheClient extends DistributedCacheClient { +public class NettySetCacheClient extends CacheClient { /** * Constructor. @@ -42,7 +42,7 @@ public class NettyDistributedSetCacheClient extends DistributedCacheClient { * @param factory creator of object used to broker the version of the distributed cache protocol with the service * @param identifier uniquely identifies this client */ - public NettyDistributedSetCacheClient( + public NettySetCacheClient( final String hostname, final int port, final int timeoutMillis, diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SetCacheClientService.java similarity index 92% rename from nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java rename to nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SetCacheClientService.java index 19392ba51a..c706a7642c 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SetCacheClientService.java @@ -37,10 +37,10 @@ import java.util.List; import java.util.concurrent.TimeUnit; @Tags({"distributed", "cache", "state", "set", "cluster"}) -@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.DistributedSetCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) -@CapabilityDescription("Provides the ability to communicate with a DistributedSetCacheServer. This can be used in order to share a Set " +@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.SetCacheServer"}) +@CapabilityDescription("Provides the ability to communicate with a SetCacheServer. This can be used in order to share a Set " + "between nodes in a NiFi cluster") -public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient { +public class SetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient { public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() .name("Server Hostname") @@ -71,10 +71,7 @@ public class DistributedSetCacheClientService extends AbstractControllerService .defaultValue("30 secs") .build(); - /** - * The implementation of the business logic for {@link DistributedSetCacheClientService}. - */ - private volatile NettyDistributedSetCacheClient cacheClient = null; + private volatile NettySetCacheClient cacheClient = null; /** * Creator of object used to broker the version of the distributed cache protocol with the service. @@ -95,7 +92,7 @@ public class DistributedSetCacheClientService extends AbstractControllerService public void onEnabled(final ConfigurationContext context) { getLogger().debug("Enabling Set Cache Client Service [{}]", context.getName()); this.versionNegotiatorFactory = new StandardVersionNegotiatorFactory(ProtocolVersion.V1.value()); - this.cacheClient = new NettyDistributedSetCacheClient( + this.cacheClient = new NettySetCacheClient( context.getProperty(HOSTNAME).getValue(), context.getProperty(PORT).asInteger(), context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index a91f7ee4f4..53b1aeb26f 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -12,5 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService -org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService \ No newline at end of file +org.apache.nifi.distributed.cache.client.SetCacheClientService +org.apache.nifi.distributed.cache.client.MapCacheClientService \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java similarity index 98% rename from nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java rename to nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java index 573d95454c..e674af8a4d 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java @@ -29,7 +29,7 @@ import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.RestrictedSSLContextService; -public abstract class DistributedCacheServer extends AbstractControllerService { +public abstract class AbstractCacheServer extends AbstractControllerService { public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used"; public static final String EVICTION_STRATEGY_LRU = "Least Recently Used"; diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java similarity index 97% rename from nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java rename to nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java index 201ef93455..8aa113e3cd 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java @@ -28,7 +28,7 @@ import org.apache.nifi.ssl.SSLContextService; @Tags({"distributed", "set", "distinct", "cache", "server"}) @CapabilityDescription("Provides a set (collection of unique values) cache that can be accessed over a socket. " + "Interaction with this service is typically accomplished via a DistributedSetCacheClient service.") -public class DistributedSetCacheServer extends DistributedCacheServer { +public class SetCacheServer extends AbstractCacheServer { @Override protected CacheServer createCacheServer(final ConfigurationContext context) { diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java similarity index 92% rename from nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java rename to nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java index 8c3a83a485..a49f022409 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java @@ -24,16 +24,16 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.distributed.cache.server.CacheServer; -import org.apache.nifi.distributed.cache.server.DistributedCacheServer; +import org.apache.nifi.distributed.cache.server.AbstractCacheServer; import org.apache.nifi.distributed.cache.server.EvictionPolicy; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.ssl.SSLContextService; @Tags({"distributed", "cluster", "map", "cache", "server", "key/value"}) @CapabilityDescription("Provides a map (key/value) cache that can be accessed over a socket. Interaction with this service" - + " is typically accomplished via a DistributedMapCacheClient service.") -@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.ssl.StandardSSLContextService"}) -public class DistributedMapCacheServer extends DistributedCacheServer { + + " is typically accomplished via a Map Cache Client Service.") +@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.MapCacheClientService"}) +public class MapCacheServer extends AbstractCacheServer { @Override protected CacheServer createCacheServer(final ConfigurationContext context) { diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 0509c7cca1..035a36c0a6 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -12,5 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.distributed.cache.server.DistributedSetCacheServer -org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer \ No newline at end of file +org.apache.nifi.distributed.cache.server.SetCacheServer +org.apache.nifi.distributed.cache.server.map.MapCacheServer \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/MapCacheServiceTlsTest.java similarity index 86% rename from nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java rename to nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/MapCacheServiceTlsTest.java index bcad46f581..ae9ee5a2e8 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTlsTest.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/MapCacheServiceTlsTest.java @@ -18,7 +18,7 @@ package org.apache.nifi.distributed.cache.server.map; import org.apache.commons.lang3.SerializationException; import org.apache.nifi.distributed.cache.client.Deserializer; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; +import org.apache.nifi.distributed.cache.client.MapCacheClientService; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.exception.DeserializationException; import org.apache.nifi.processor.Processor; @@ -51,12 +51,12 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.when; -public class DistributedMapCacheTlsTest { +public class MapCacheServiceTlsTest { private static TestRunner runner = null; private static SSLContextService sslContextService = null; - private static DistributedMapCacheServer server = null; - private static DistributedMapCacheClientService client = null; + private static MapCacheServer server = null; + private static MapCacheClientService client = null; private static final Serializer serializer = new StringSerializer(); private static final Deserializer deserializer = new StringDeserializer(); @@ -67,18 +67,18 @@ public class DistributedMapCacheTlsTest { runner.addControllerService(sslContextService.getIdentifier(), sslContextService); runner.enableControllerService(sslContextService); - server = new DistributedMapCacheServer(); + server = new MapCacheServer(); runner.addControllerService(server.getClass().getName(), server); - runner.setProperty(server, DistributedMapCacheServer.PORT, "0"); - runner.setProperty(server, DistributedMapCacheServer.SSL_CONTEXT_SERVICE, sslContextService.getIdentifier()); + runner.setProperty(server, MapCacheServer.PORT, "0"); + runner.setProperty(server, MapCacheServer.SSL_CONTEXT_SERVICE, sslContextService.getIdentifier()); runner.enableControllerService(server); final int listeningPort = server.getPort(); - client = new DistributedMapCacheClientService(); + client = new MapCacheClientService(); runner.addControllerService(client.getClass().getName(), client); - runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost"); - runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(listeningPort)); - runner.setProperty(client, DistributedMapCacheClientService.SSL_CONTEXT_SERVICE, sslContextService.getIdentifier()); + runner.setProperty(client, MapCacheClientService.HOSTNAME, "localhost"); + runner.setProperty(client, MapCacheClientService.PORT, String.valueOf(listeningPort)); + runner.setProperty(client, MapCacheClientService.SSL_CONTEXT_SERVICE, sslContextService.getIdentifier()); runner.enableControllerService(client); } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/MapCacheTest.java similarity index 92% rename from nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java rename to nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/MapCacheTest.java index 8a0d5303d3..591e5707bc 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/MapCacheTest.java @@ -19,7 +19,7 @@ package org.apache.nifi.distributed.cache.server.map; import org.apache.commons.lang3.SerializationException; import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.Deserializer; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; +import org.apache.nifi.distributed.cache.client.MapCacheClientService; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.exception.DeserializationException; import org.apache.nifi.processor.Processor; @@ -45,11 +45,11 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(5) -public class DistributedMapCacheTest { +public class MapCacheTest { private static TestRunner runner = null; - private static DistributedMapCacheServer server = null; - private static DistributedMapCacheClientService client = null; + private static MapCacheServer server = null; + private static MapCacheClientService client = null; private static final Serializer serializer = new StringSerializer(); private static final Deserializer deserializer = new StringDeserializer(); @@ -57,16 +57,16 @@ public class DistributedMapCacheTest { public static void startServices() throws Exception { runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); - server = new DistributedMapCacheServer(); + server = new MapCacheServer(); runner.addControllerService(server.getClass().getName(), server); - runner.setProperty(server, DistributedMapCacheServer.PORT, "0"); + runner.setProperty(server, MapCacheServer.PORT, "0"); runner.enableControllerService(server); final int port = server.getPort(); - client = new DistributedMapCacheClientService(); + client = new MapCacheClientService(); runner.addControllerService(client.getClass().getName(), client); - runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost"); - runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(port)); + runner.setProperty(client, MapCacheClientService.HOSTNAME, "localhost"); + runner.setProperty(client, MapCacheClientService.PORT, String.valueOf(port)); runner.enableControllerService(client); } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java index 58c8506293..5e87d4e99d 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java @@ -22,12 +22,12 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.Deserializer; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; +import org.apache.nifi.distributed.cache.client.MapCacheClientService; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.exception.DeserializationException; import org.apache.nifi.distributed.cache.protocol.ProtocolVersion; import org.apache.nifi.distributed.cache.server.CacheServer; -import org.apache.nifi.distributed.cache.server.DistributedCacheServer; +import org.apache.nifi.distributed.cache.server.AbstractCacheServer; import org.apache.nifi.distributed.cache.server.EvictionPolicy; import org.apache.nifi.event.transport.EventServer; import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod; @@ -76,7 +76,7 @@ public class TestDistributedMapServerAndClient { private TestRunner runner; - private DistributedMapCacheServer server; + private MapCacheServer server; @BeforeEach public void setRunner() throws InitializationException, IOException { @@ -86,10 +86,10 @@ public class TestDistributedMapServerAndClient { runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); - server = new DistributedMapCacheServer(); + server = new MapCacheServer(); runner.addControllerService("server", server); - runner.setProperty(server, DistributedMapCacheServer.PORT, "0"); + runner.setProperty(server, MapCacheServer.PORT, "0"); } @AfterEach @@ -101,11 +101,11 @@ public class TestDistributedMapServerAndClient { public void testNonPersistentMapServerAndClient() throws InitializationException, IOException { runner.enableControllerService(server); - DistributedMapCacheClientService client = new DistributedMapCacheClientService(); + MapCacheClientService client = new MapCacheClientService(); try { runner.addControllerService("client", client); - runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost"); - runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(server.getPort())); + runner.setProperty(client, MapCacheClientService.HOSTNAME, "localhost"); + runner.setProperty(client, MapCacheClientService.PORT, String.valueOf(server.getPort())); runner.enableControllerService(client); final Serializer valueSerializer = new StringSerializer(); @@ -150,18 +150,18 @@ public class TestDistributedMapServerAndClient { public void testOptimisticLock() throws Exception { runner.enableControllerService(server); - DistributedMapCacheClientService client1 = new DistributedMapCacheClientService(); + MapCacheClientService client1 = new MapCacheClientService(); MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client1, "client1"); client1.initialize(clientInitContext1); - DistributedMapCacheClientService client2 = new DistributedMapCacheClientService(); + MapCacheClientService client2 = new MapCacheClientService(); MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2"); client2.initialize(clientInitContext2); final Map clientProperties = new HashMap<>(); - clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); - clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort())); - clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs"); + clientProperties.put(MapCacheClientService.HOSTNAME, "localhost"); + clientProperties.put(MapCacheClientService.PORT, String.valueOf(server.getPort())); + clientProperties.put(MapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs"); MockConfigurationContext clientContext1 = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup(), null); client1.onEnabled(clientContext1); @@ -221,7 +221,7 @@ public class TestDistributedMapServerAndClient { @Test public void testBackwardCompatibility() throws Exception { // Create a server that only supports protocol version 1. - server = new DistributedMapCacheServer() { + server = new MapCacheServer() { @Override protected CacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir, int maxReadSize) throws IOException { return new StandardMapCacheServer(getLogger(), getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir, maxReadSize) { @@ -233,17 +233,17 @@ public class TestDistributedMapServerAndClient { } }; runner.addControllerService("server", server); - runner.setProperty(server, DistributedMapCacheServer.PORT, "0"); + runner.setProperty(server, MapCacheServer.PORT, "0"); runner.enableControllerService(server); - DistributedMapCacheClientService client = new DistributedMapCacheClientService(); + MapCacheClientService client = new MapCacheClientService(); MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client, "client"); client.initialize(clientInitContext1); final Map clientProperties = new HashMap<>(); - clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); - clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort())); - clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs"); + clientProperties.put(MapCacheClientService.HOSTNAME, "localhost"); + clientProperties.put(MapCacheClientService.PORT, String.valueOf(server.getPort())); + clientProperties.put(MapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs"); MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup(), null); client.onEnabled(clientContext); @@ -276,12 +276,12 @@ public class TestDistributedMapServerAndClient { public void testLimitServiceReadSize() throws InitializationException, IOException { runner.enableControllerService(server); - final DistributedMapCacheClientService client = createClient(server.getPort()); + final MapCacheClientService client = createClient(server.getPort()); try { final Serializer serializer = new StringSerializer(); final String value = "value"; - final int maxReadSize = new MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue(); + final int maxReadSize = new MockPropertyValue(AbstractCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue(); final int belowThreshold = maxReadSize / value.length(); final int aboveThreshold = belowThreshold + 1; final String valueBelowThreshold = StringUtils.repeat(value, belowThreshold); @@ -300,12 +300,12 @@ public class TestDistributedMapServerAndClient { final NettyEventServerFactory serverFactory = getEventServerFactory(0, messages); final EventServer eventServer = serverFactory.getEventServer(); - DistributedMapCacheClientService client = new DistributedMapCacheClientService(); + MapCacheClientService client = new MapCacheClientService(); runner.addControllerService("client", client); - runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, "localhost"); - runner.setProperty(client, DistributedMapCacheClientService.PORT, String.valueOf(eventServer.getListeningPort())); - runner.setProperty(client, DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "250 ms"); + runner.setProperty(client, MapCacheClientService.HOSTNAME, "localhost"); + runner.setProperty(client, MapCacheClientService.PORT, String.valueOf(eventServer.getListeningPort())); + runner.setProperty(client, MapCacheClientService.COMMUNICATIONS_TIMEOUT, "250 ms"); runner.enableControllerService(client); final Serializer valueSerializer = new StringSerializer(); @@ -328,14 +328,14 @@ public class TestDistributedMapServerAndClient { return factory; } - private DistributedMapCacheClientService createClient(final int port) throws InitializationException { - final DistributedMapCacheClientService client = new DistributedMapCacheClientService(); + private MapCacheClientService createClient(final int port) throws InitializationException { + final MapCacheClientService client = new MapCacheClientService(); final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); client.initialize(clientInitContext); final Map clientProperties = new HashMap<>(); - clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); - clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(port)); + clientProperties.put(MapCacheClientService.HOSTNAME, "localhost"); + clientProperties.put(MapCacheClientService.PORT, String.valueOf(port)); final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup(), null); client.onEnabled(clientContext); diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestMapCacheClientService.java similarity index 87% rename from nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java rename to nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestMapCacheClientService.java index 243cd8fc6b..06f9e788cd 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestMapCacheClientService.java @@ -16,7 +16,7 @@ */ package org.apache.nifi.distributed.cache.server.map; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; +import org.apache.nifi.distributed.cache.client.MapCacheClientService; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.operations.MapOperation; import org.apache.nifi.distributed.cache.protocol.ProtocolVersion; @@ -48,7 +48,7 @@ import java.util.Arrays; import static org.junit.jupiter.api.Assertions.assertThrows; -public class TestDistributedMapCacheClientService { +public class TestMapCacheClientService { private static final String LOCALHOST = "127.0.0.1"; private static final int MAX_REQUEST_LENGTH = 64; @@ -91,13 +91,13 @@ public class TestDistributedMapCacheClientService { */ @Test public void testClientTimeoutOnServerNetworkFailure() throws InitializationException { - final String clientId = DistributedMapCacheClientService.class.getSimpleName(); - final DistributedMapCacheClientService clientService = new DistributedMapCacheClientService(); + final String clientId = MapCacheClientService.class.getSimpleName(); + final MapCacheClientService clientService = new MapCacheClientService(); runner.addControllerService(clientId, clientService); - runner.setProperty(clientService, DistributedMapCacheClientService.HOSTNAME, LOCALHOST); - runner.setProperty(clientService, DistributedMapCacheClientService.PORT, String.valueOf(port)); - runner.setProperty(clientService, DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "500 ms"); + runner.setProperty(clientService, MapCacheClientService.HOSTNAME, LOCALHOST); + runner.setProperty(clientService, MapCacheClientService.PORT, String.valueOf(port)); + runner.setProperty(clientService, MapCacheClientService.COMMUNICATIONS_TIMEOUT, "500 ms"); runner.enableControllerService(clientService); runner.assertValid(); diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/SetCacheServiceTest.java similarity index 81% rename from nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java rename to nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/SetCacheServiceTest.java index 401b298804..4634187a61 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/DistributedSetCacheTest.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/SetCacheServiceTest.java @@ -17,9 +17,9 @@ package org.apache.nifi.distributed.cache.server.set; import org.apache.commons.lang3.SerializationException; -import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService; +import org.apache.nifi.distributed.cache.client.SetCacheClientService; import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer; +import org.apache.nifi.distributed.cache.server.SetCacheServer; import org.apache.nifi.processor.Processor; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -35,27 +35,27 @@ import java.nio.charset.StandardCharsets; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DistributedSetCacheTest { +public class SetCacheServiceTest { private static TestRunner runner = null; - private static DistributedSetCacheServer server = null; - private static DistributedSetCacheClientService client = null; + private static SetCacheServer server = null; + private static SetCacheClientService client = null; private static final Serializer serializer = new StringSerializer(); @BeforeAll public static void setRunner() throws Exception { runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); - server = new DistributedSetCacheServer(); + server = new SetCacheServer(); runner.addControllerService(server.getClass().getName(), server); - runner.setProperty(server, DistributedSetCacheServer.PORT, "0"); + runner.setProperty(server, SetCacheServer.PORT, "0"); runner.enableControllerService(server); final int port = server.getPort(); - client = new DistributedSetCacheClientService(); + client = new SetCacheClientService(); runner.addControllerService(client.getClass().getName(), client); - runner.setProperty(client, DistributedSetCacheClientService.HOSTNAME, "localhost"); - runner.setProperty(client, DistributedSetCacheClientService.PORT, String.valueOf(port)); + runner.setProperty(client, SetCacheClientService.HOSTNAME, "localhost"); + runner.setProperty(client, SetCacheClientService.PORT, String.valueOf(port)); runner.enableControllerService(client); } diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java index d45abbb51e..9627890dd5 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/set/TestDistributedSetServerAndClient.java @@ -20,10 +20,10 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService; +import org.apache.nifi.distributed.cache.client.SetCacheClientService; import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.distributed.cache.server.DistributedCacheServer; -import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer; +import org.apache.nifi.distributed.cache.server.AbstractCacheServer; +import org.apache.nifi.distributed.cache.server.SetCacheServer; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.Processor; import org.apache.nifi.reporting.InitializationException; @@ -55,7 +55,7 @@ public class TestDistributedSetServerAndClient { private TestRunner runner; - private DistributedSetCacheServer server; + private SetCacheServer server; @BeforeEach public void setRunner() throws InitializationException, IOException { @@ -65,10 +65,10 @@ public class TestDistributedSetServerAndClient { runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); - server = new DistributedSetCacheServer(); + server = new SetCacheServer(); runner.addControllerService("server", server); - runner.setProperty(server, DistributedSetCacheServer.PORT, "0"); + runner.setProperty(server, SetCacheServer.PORT, "0"); } @AfterEach @@ -80,7 +80,7 @@ public class TestDistributedSetServerAndClient { public void testNonPersistentSetServerAndClient() throws InitializationException, IOException { runner.enableControllerService(server); - final DistributedSetCacheClientService client = createClient(server.getPort()); + final SetCacheClientService client = createClient(server.getPort()); try { final Serializer serializer = new StringSerializer(); final boolean added = client.addIfAbsent("test", serializer); @@ -104,10 +104,10 @@ public class TestDistributedSetServerAndClient { @Test public void testPersistentSetServerAndClient() throws InitializationException, IOException { - runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + runner.setProperty(server, SetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); runner.enableControllerService(server); - final DistributedSetCacheClientService client = createClient(server.getPort()); + final SetCacheClientService client = createClient(server.getPort()); try { final Serializer serializer = new StringSerializer(); final boolean added = client.addIfAbsent("test", serializer); @@ -135,12 +135,12 @@ public class TestDistributedSetServerAndClient { @Test public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException { - runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); - runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); - runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_LFU); + runner.setProperty(server, SetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + runner.setProperty(server, SetCacheServer.MAX_CACHE_ENTRIES, "3"); + runner.setProperty(server, SetCacheServer.EVICTION_POLICY, SetCacheServer.EVICTION_STRATEGY_LFU); runner.enableControllerService(server); - final DistributedSetCacheClientService client = createClient(server.getPort()); + final SetCacheClientService client = createClient(server.getPort()); try { final Serializer serializer = new StringSerializer(); final boolean added = client.addIfAbsent("test", serializer); @@ -173,12 +173,12 @@ public class TestDistributedSetServerAndClient { @Test public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException { - runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); - runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); - runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO); + runner.setProperty(server, SetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + runner.setProperty(server, SetCacheServer.MAX_CACHE_ENTRIES, "3"); + runner.setProperty(server, SetCacheServer.EVICTION_POLICY, SetCacheServer.EVICTION_STRATEGY_FIFO); runner.enableControllerService(server); - final DistributedSetCacheClientService client = createClient(server.getPort()); + final SetCacheClientService client = createClient(server.getPort()); try { final Serializer serializer = new StringSerializer(); @@ -218,12 +218,12 @@ public class TestDistributedSetServerAndClient { public void testLimitServiceReadSize() throws InitializationException, IOException { runner.enableControllerService(server); - final DistributedSetCacheClientService client = createClient(server.getPort()); + final SetCacheClientService client = createClient(server.getPort()); try { final Serializer serializer = new StringSerializer(); final String value = "value"; - final int maxReadSize = new MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue(); + final int maxReadSize = new MockPropertyValue(AbstractCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue(); final int belowThreshold = maxReadSize / value.length(); final int aboveThreshold = belowThreshold + 1; final String valueBelowThreshold = StringUtils.repeat(value, belowThreshold); @@ -243,14 +243,14 @@ public class TestDistributedSetServerAndClient { } } - private DistributedSetCacheClientService createClient(final int port) throws InitializationException { - final DistributedSetCacheClientService client = new DistributedSetCacheClientService(); + private SetCacheClientService createClient(final int port) throws InitializationException { + final SetCacheClientService client = new SetCacheClientService(); final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); client.initialize(clientInitContext); final Map clientProperties = new HashMap<>(); - clientProperties.put(DistributedSetCacheClientService.HOSTNAME, "localhost"); - clientProperties.put(DistributedSetCacheClientService.PORT, String.valueOf(port)); + clientProperties.put(SetCacheClientService.HOSTNAME, "localhost"); + clientProperties.put(SetCacheClientService.PORT, String.valueOf(port)); final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup(), null); client.onEnabled(clientContext); diff --git a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java index 7f878ce2bd..0c49296cfe 100644 --- a/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java +++ b/nifi-extension-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java @@ -44,7 +44,7 @@ public class TestDistributedMapCacheLookupService { public void testDistributedMapCacheLookupService() throws InitializationException { final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); final DistributedMapCacheLookupService service = new DistributedMapCacheLookupService(); - final DistributedMapCacheClient client = new DistributedMapCacheClientImpl(); + final DistributedMapCacheClient client = new EphemeralMapCacheClientService(); runner.addControllerService("client", client); runner.addControllerService("lookup-service", service); @@ -62,7 +62,7 @@ public class TestDistributedMapCacheLookupService { assertEquals(EMPTY_STRING, absent); } - static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient { + static final class EphemeralMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient { private Map map = new HashMap();