From ebee094ff3ef02c0daea5247989b0d9d6a7337d4 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 29 Apr 2015 07:44:19 -0400 Subject: [PATCH] NIFI-533: Fixed checkstyle issues --- .../OnPrimaryNodeStateChange.java | 4 +- .../notification/PrimaryNodeState.java | 20 ++-- .../nifi/controller/FlowController.java | 28 +++--- .../nifi/processors/standard/GetHTTP.java | 31 +++---- .../standard/TestDetectDuplicate.java | 32 +++---- .../DistributedMapCacheClientService.java | 91 ++++++++++--------- .../cache/server/map/PersistentMapCache.java | 19 ++-- 7 files changed, 112 insertions(+), 113 deletions(-) diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java index e0736602d3..4ea2170a00 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/OnPrimaryNodeStateChange.java @@ -25,10 +25,10 @@ import java.lang.annotation.Target; /** *

- * Marker annotation that a component can use to indicate that a method should be + * Marker annotation that a component can use to indicate that a method should be * called whenever the state of the Primary Node in a cluster has changed. *

- * + * *

* Methods with this annotation should take either no arguments or one argument of type * {@link PrimaryNodeState}. The {@link PrimaryNodeState} provides context about what changed diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java index 3a7245c39a..0d65b65d33 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/notification/PrimaryNodeState.java @@ -20,14 +20,14 @@ package org.apache.nifi.annotation.notification; * Represents a state change that occurred for the Primary Node of a NiFi cluster. */ public enum PrimaryNodeState { - /** - * The node receiving this state has been elected the Primary Node of the NiFi cluster. - */ - ELECTED_PRIMARY_NODE, - - /** - * The node receiving this state was the Primary Node but has now had its Primary Node - * role revoked. - */ - PRIMARY_NODE_REVOKED; + /** + * The node receiving this state has been elected the Primary Node of the NiFi cluster. + */ + ELECTED_PRIMARY_NODE, + + /** + * The node receiving this state was the Primary Node but has now had its Primary Node + * role revoked. + */ + PRIMARY_NODE_REVOKED; } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index e9eeaf2d1e..6c655cbbd0 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -296,7 +296,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R */ private final AtomicReference heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null); - private AtomicReference nodeBulletinSubscriber; + private final AtomicReference nodeBulletinSubscriber; // guarded by rwLock /** @@ -449,7 +449,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.protocolSender = protocolSender; try { this.templateManager = new TemplateManager(properties.getTemplateDirectory()); - } catch (IOException e) { + } catch (final IOException e) { throw new RuntimeException(e); } @@ -794,7 +794,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws NullPointerException if either arg is null * @throws ProcessorInstantiationException if the processor cannot be instantiated for any reason */ - public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException { + public ProcessorNode createProcessor(final String type, final String id) throws ProcessorInstantiationException { return createProcessor(type, id, true); } @@ -1508,7 +1508,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } if (config.getProperties() != null) { - for (Map.Entry entry : config.getProperties().entrySet()) { + for (final Map.Entry entry : config.getProperties().entrySet()) { if (entry.getValue() != null) { procNode.setProperty(entry.getKey(), entry.getValue()); } @@ -1661,7 +1661,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R Set remotePorts = null; if (ports != null) { remotePorts = new LinkedHashSet<>(ports.size()); - for (RemoteProcessGroupPortDTO port : ports) { + for (final RemoteProcessGroupPortDTO port : ports) { final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor(); descriptor.setId(port.getId()); descriptor.setName(port.getName()); @@ -3024,15 +3024,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED; final ProcessGroup rootGroup = getGroup(getRootGroupId()); for (final ProcessorNode procNode : rootGroup.findAllProcessors()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState); } for (final ControllerServiceNode serviceNode : getAllControllerServices()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState); } for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState); } - + // update primary this.primary = primary; eventDrivenWorkerQueue.setPrimary(primary); @@ -3092,7 +3092,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public boolean isInputAvailable() { try { return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier())); - } catch (IOException e) { + } catch (final IOException e) { return false; } } @@ -3101,7 +3101,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public boolean isOutputAvailable() { try { return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(), event.getContentClaimIdentifier())); - } catch (IOException e) { + } catch (final IOException e) { return false; } } @@ -3401,7 +3401,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final NodeProtocolSender protocolSender; private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US); - public BulletinsTask(NodeProtocolSender protocolSender) { + public BulletinsTask(final NodeProtocolSender protocolSender) { if (protocolSender == null) { throw new IllegalArgumentException("NodeProtocolSender may not be null."); } @@ -3557,7 +3557,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private class HeartbeatMessageGeneratorTask implements Runnable { - private AtomicReference heartbeatMessageRef = new AtomicReference<>(); + private final AtomicReference heartbeatMessageRef = new AtomicReference<>(); @Override public void run() { @@ -3624,7 +3624,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } @Override - public List getProvenanceEvents(long firstEventId, int maxRecords) throws IOException { + public List getProvenanceEvents(final long firstEventId, final int maxRecords) throws IOException { return new ArrayList(provenanceEventRepository.getEvents(firstEventId, maxRecords)); } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java index a1f57dab24..1654a4f602 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java @@ -185,7 +185,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { static final String LAST_MODIFIED = "LastModified"; static { - SimpleDateFormat sdf = new SimpleDateFormat(LAST_MODIFIED_DATE_PATTERN_RFC1123, Locale.US); + final SimpleDateFormat sdf = new SimpleDateFormat(LAST_MODIFIED_DATE_PATTERN_RFC1123, Locale.US); sdf.setTimeZone(TimeZone.getTimeZone("GMT")); UNINITIALIZED_LAST_MODIFIED_VALUE = sdf.format(new Date(1L)); } @@ -221,13 +221,13 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { this.properties = Collections.unmodifiableList(properties); // load etag and lastModified from file - File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); + final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); try (FileInputStream fis = new FileInputStream(httpCache)) { - Properties props = new Properties(); + final Properties props = new Properties(); props.load(fis); entityTagRef.set(props.getProperty(ETAG)); lastModifiedRef.set(props.getProperty(LAST_MODIFIED)); - } catch (IOException swallow) { + } catch (final IOException swallow) { } } @@ -242,20 +242,20 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { } @Override - public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { entityTagRef.set(""); lastModifiedRef.set(UNINITIALIZED_LAST_MODIFIED_VALUE); } @OnShutdown public void onShutdown() { - File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); + final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); try (FileOutputStream fos = new FileOutputStream(httpCache)) { - Properties props = new Properties(); + final Properties props = new Properties(); props.setProperty(ETAG, entityTagRef.get()); props.setProperty(LAST_MODIFIED, lastModifiedRef.get()); props.store(fos, "GetHTTP file modification values"); - } catch (IOException swallow) { + } catch (final IOException swallow) { } } @@ -287,7 +287,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { keystore.load(in, service.getKeyStorePassword().toCharArray()); } - SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(truststore, new TrustSelfSignedStrategy()).loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()).build(); + final SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(truststore, new TrustSelfSignedStrategy()).loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()).build(); return sslContext; } @@ -310,7 +310,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { try { uri = new URI(url); source = uri.getHost(); - } catch (URISyntaxException swallow) { + } catch (final URISyntaxException swallow) { // this won't happen as the url has already been validated } @@ -435,20 +435,19 @@ public class GetHTTP extends AbstractSessionFactoryProcessor { readLock.unlock(); writeLock.lock(); try { - if (timeToPersist < System.currentTimeMillis()) { + if (timeToPersist < System.currentTimeMillis()) { timeToPersist = System.currentTimeMillis() + PERSISTENCE_INTERVAL_MSEC; - File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); + final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); try (FileOutputStream fos = new FileOutputStream(httpCache)) { - Properties props = new Properties(); + final Properties props = new Properties(); props.setProperty(ETAG, entityTagRef.get()); props.setProperty(LAST_MODIFIED, lastModifiedRef.get()); props.store(fos, "GetHTTP file modification values"); - } catch (IOException e) { + } catch (final IOException e) { getLogger().error("Failed to persist ETag and LastMod due to " + e, e); } } - } - finally { + } finally { readLock.lock(); writeLock.unlock(); } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java index 6e9fb1f8f9..4166d94657 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java @@ -57,7 +57,7 @@ public class TestDetectDuplicate { @Test public void testDuplicate() throws InitializationException { - TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); + final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); final DistributedMapCacheClientImpl client = createClient(); final Map clientProperties = new HashMap<>(); clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); @@ -65,7 +65,7 @@ public class TestDetectDuplicate { runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "48 hours"); - Map props = new HashMap<>(); + final Map props = new HashMap<>(); props.put("hash.value", "1000"); runner.enqueue(new byte[]{}, props); runner.enableControllerService(client); @@ -84,7 +84,7 @@ public class TestDetectDuplicate { @Test public void testDuplicateWithAgeOff() throws InitializationException, InterruptedException { - TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); + final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); final DistributedMapCacheClientImpl client = createClient(); final Map clientProperties = new HashMap<>(); clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); @@ -94,7 +94,7 @@ public class TestDetectDuplicate { runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "2 secs"); runner.enableControllerService(client); - Map props = new HashMap<>(); + final Map props = new HashMap<>(); props.put("hash.value", "1000"); runner.enqueue(new byte[]{}, props); @@ -114,7 +114,7 @@ public class TestDetectDuplicate { final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl(); final ComponentLog logger = new MockProcessorLog("client", client); - MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger); + final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger); client.initialize(clientInitContext); return client; @@ -130,12 +130,12 @@ public class TestDetectDuplicate { } @Override - public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { } @Override protected java.util.List getSupportedPropertyDescriptors() { - List props = new ArrayList<>(); + final List props = new ArrayList<>(); props.add(DistributedMapCacheClientService.HOSTNAME); props.add(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT); props.add(DistributedMapCacheClientService.PORT); @@ -144,7 +144,7 @@ public class TestDetectDuplicate { } @Override - public boolean putIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException { + public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { if (exists) { return false; } @@ -154,7 +154,8 @@ public class TestDetectDuplicate { } @Override - public V getAndPutIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer, Deserializer valueDeserializer) throws IOException { + public V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, + final Deserializer valueDeserializer) throws IOException { if (exists) { return (V) cacheValue; } @@ -163,25 +164,24 @@ public class TestDetectDuplicate { } @Override - public boolean containsKey(K key, Serializer keySerializer) throws IOException { + public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { return exists; } @Override - public V get(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { return null; } @Override - public boolean remove(K key, Serializer serializer) throws IOException { + public boolean remove(final K key, final Serializer serializer) throws IOException { exists = false; return true; } - @Override - public void put(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException { - - } + @Override + public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + } } private static class StringSerializer implements Serializer { diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index ea8607196c..c03dd5afc6 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -46,39 +46,39 @@ import org.slf4j.LoggerFactory; @Tags({"distributed", "cache", "state", "map", "cluster"}) @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) @CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map " - + "between nodes in a NiFi cluster") + + "between nodes in a NiFi cluster") public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient { private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class); public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() - .name("Server Hostname") - .description("The name of the server that is running the DistributedMapCacheServer service") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Server Hostname") + .description("The name of the server that is running the DistributedMapCacheServer service") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() - .name("Server Port") - .description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service") - .required(true) - .addValidator(StandardValidators.PORT_VALIDATOR) - .defaultValue("4557") - .build(); + .name("Server Port") + .description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .defaultValue("4557") + .build(); public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description("If specified, indicates the SSL Context Service that is used to communicate with the " - + "remote server. If not specified, communications will not be encrypted") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); + .name("SSL Context Service") + .description("If specified, indicates the SSL Context Service that is used to communicate with the " + + "remote server. If not specified, communications will not be encrypted") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder() - .name("Communications Timeout") - .description("Specifies how long to wait when communicating with the remote server before determining that " - + "there is a communications failure if data cannot be sent or received") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("30 secs") - .build(); + .name("Communications Timeout") + .description("Specifies how long to wait when communicating with the remote server before determining that " + + "there is a communications failure if data cannot be sent or received") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("30 secs") + .build(); private final BlockingQueue queue = new LinkedBlockingQueue<>(); private volatile ConfigurationContext configContext; @@ -118,28 +118,29 @@ public class DistributedMapCacheClientService extends AbstractControllerService }); } + @Override public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { - withCommsSession(new CommsAction() { - @Override - public Object execute(final CommsSession session) throws IOException { - final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); - dos.writeUTF("put"); - - serialize(key, keySerializer, dos); - serialize(value, valueSerializer, dos); - - dos.flush(); - final DataInputStream dis = new DataInputStream(session.getInputStream()); - final boolean success = dis.readBoolean(); - if ( !success ) { - throw new IOException("Expected to receive confirmation of 'put' request but received unexpected response"); - } - - return null; - } - }); + withCommsSession(new CommsAction() { + @Override + public Object execute(final CommsSession session) throws IOException { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("put"); + + serialize(key, keySerializer, dos); + serialize(value, valueSerializer, dos); + + dos.flush(); + final DataInputStream dis = new DataInputStream(session.getInputStream()); + final boolean success = dis.readBoolean(); + if ( !success ) { + throw new IOException("Expected to receive confirmation of 'put' request but received unexpected response"); + } + + return null; + } + }); } - + @Override public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { return withCommsSession(new CommsAction() { diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java index 663f4416f6..c2fc0d7600 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java @@ -35,7 +35,6 @@ import org.wali.UpdateType; import org.wali.WriteAheadRepository; public class PersistentMapCache implements MapCache { - private final MapCache wrapped; private final WriteAheadRepository wali; @@ -78,10 +77,10 @@ public class PersistentMapCache implements MapCache { return putResult; } - + @Override public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException { - final MapPutResult putResult = wrapped.put(key, value); + final MapPutResult putResult = wrapped.put(key, value); if ( putResult.isSuccessful() ) { // The put was successful. final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value); @@ -91,15 +90,15 @@ public class PersistentMapCache implements MapCache { if ( putResult.getEvictedKey() != null ) { records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue())); } - + wali.update(Collections.singletonList(record), false); - + final long modCount = modifications.getAndIncrement(); if ( modCount > 0 && modCount % 100000 == 0 ) { wali.checkpoint(); } } - + return putResult; } @@ -114,7 +113,7 @@ public class PersistentMapCache implements MapCache { } @Override - public ByteBuffer remove(ByteBuffer key) throws IOException { + public ByteBuffer remove(final ByteBuffer key) throws IOException { final ByteBuffer removeResult = wrapped.remove(key); if (removeResult != null) { final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, key, removeResult); @@ -163,7 +162,7 @@ public class PersistentMapCache implements MapCache { private static class Serde implements SerDe { @Override - public void serializeEdit(MapWaliRecord previousRecordState, MapWaliRecord newRecordState, java.io.DataOutputStream out) throws IOException { + public void serializeEdit(final MapWaliRecord previousRecordState, final MapWaliRecord newRecordState, final java.io.DataOutputStream out) throws IOException { final UpdateType updateType = newRecordState.getUpdateType(); if (updateType == UpdateType.DELETE) { out.write(0); @@ -181,7 +180,7 @@ public class PersistentMapCache implements MapCache { } @Override - public void serializeRecord(MapWaliRecord record, java.io.DataOutputStream out) throws IOException { + public void serializeRecord(final MapWaliRecord record, final java.io.DataOutputStream out) throws IOException { serializeEdit(null, record, out); } @@ -206,7 +205,7 @@ public class PersistentMapCache implements MapCache { } @Override - public MapWaliRecord deserializeRecord(DataInputStream in, int version) throws IOException { + public MapWaliRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { return deserializeEdit(in, new HashMap(), version); }