mirror of https://github.com/apache/nifi.git
NIFI-533: Fixed checkstyle issues
This commit is contained in:
parent
00b686b0b2
commit
ebee094ff3
|
@ -25,10 +25,10 @@ import java.lang.annotation.Target;
|
|||
|
||||
/**
|
||||
* <p>
|
||||
* Marker annotation that a component can use to indicate that a method should be
|
||||
* Marker annotation that a component can use to indicate that a method should be
|
||||
* called whenever the state of the Primary Node in a cluster has changed.
|
||||
* </p>
|
||||
*
|
||||
*
|
||||
* <p>
|
||||
* Methods with this annotation should take either no arguments or one argument of type
|
||||
* {@link PrimaryNodeState}. The {@link PrimaryNodeState} provides context about what changed
|
||||
|
|
|
@ -20,14 +20,14 @@ package org.apache.nifi.annotation.notification;
|
|||
* Represents a state change that occurred for the Primary Node of a NiFi cluster.
|
||||
*/
|
||||
public enum PrimaryNodeState {
|
||||
/**
|
||||
* The node receiving this state has been elected the Primary Node of the NiFi cluster.
|
||||
*/
|
||||
ELECTED_PRIMARY_NODE,
|
||||
|
||||
/**
|
||||
* The node receiving this state was the Primary Node but has now had its Primary Node
|
||||
* role revoked.
|
||||
*/
|
||||
PRIMARY_NODE_REVOKED;
|
||||
/**
|
||||
* The node receiving this state has been elected the Primary Node of the NiFi cluster.
|
||||
*/
|
||||
ELECTED_PRIMARY_NODE,
|
||||
|
||||
/**
|
||||
* The node receiving this state was the Primary Node but has now had its Primary Node
|
||||
* role revoked.
|
||||
*/
|
||||
PRIMARY_NODE_REVOKED;
|
||||
}
|
||||
|
|
|
@ -296,7 +296,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
*/
|
||||
private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null);
|
||||
|
||||
private AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber;
|
||||
private final AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber;
|
||||
|
||||
// guarded by rwLock
|
||||
/**
|
||||
|
@ -449,7 +449,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
this.protocolSender = protocolSender;
|
||||
try {
|
||||
this.templateManager = new TemplateManager(properties.getTemplateDirectory());
|
||||
} catch (IOException e) {
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
|
@ -794,7 +794,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
* @throws NullPointerException if either arg is null
|
||||
* @throws ProcessorInstantiationException if the processor cannot be instantiated for any reason
|
||||
*/
|
||||
public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException {
|
||||
public ProcessorNode createProcessor(final String type, final String id) throws ProcessorInstantiationException {
|
||||
return createProcessor(type, id, true);
|
||||
}
|
||||
|
||||
|
@ -1508,7 +1508,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
}
|
||||
|
||||
if (config.getProperties() != null) {
|
||||
for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
|
||||
for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) {
|
||||
if (entry.getValue() != null) {
|
||||
procNode.setProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
@ -1661,7 +1661,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
|
||||
if (ports != null) {
|
||||
remotePorts = new LinkedHashSet<>(ports.size());
|
||||
for (RemoteProcessGroupPortDTO port : ports) {
|
||||
for (final RemoteProcessGroupPortDTO port : ports) {
|
||||
final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
|
||||
descriptor.setId(port.getId());
|
||||
descriptor.setName(port.getName());
|
||||
|
@ -3024,15 +3024,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
|
||||
final ProcessGroup rootGroup = getGroup(getRootGroupId());
|
||||
for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
|
||||
}
|
||||
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
|
||||
}
|
||||
for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
|
||||
}
|
||||
|
||||
|
||||
// update primary
|
||||
this.primary = primary;
|
||||
eventDrivenWorkerQueue.setPrimary(primary);
|
||||
|
@ -3092,7 +3092,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
public boolean isInputAvailable() {
|
||||
try {
|
||||
return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier()));
|
||||
} catch (IOException e) {
|
||||
} catch (final IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -3101,7 +3101,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
public boolean isOutputAvailable() {
|
||||
try {
|
||||
return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(), event.getContentClaimIdentifier()));
|
||||
} catch (IOException e) {
|
||||
} catch (final IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -3401,7 +3401,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
private final NodeProtocolSender protocolSender;
|
||||
private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US);
|
||||
|
||||
public BulletinsTask(NodeProtocolSender protocolSender) {
|
||||
public BulletinsTask(final NodeProtocolSender protocolSender) {
|
||||
if (protocolSender == null) {
|
||||
throw new IllegalArgumentException("NodeProtocolSender may not be null.");
|
||||
}
|
||||
|
@ -3557,7 +3557,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
|
||||
private class HeartbeatMessageGeneratorTask implements Runnable {
|
||||
|
||||
private AtomicReference<HeartbeatMessage> heartbeatMessageRef = new AtomicReference<>();
|
||||
private final AtomicReference<HeartbeatMessage> heartbeatMessageRef = new AtomicReference<>();
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -3624,7 +3624,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<ProvenanceEventRecord> getProvenanceEvents(long firstEventId, int maxRecords) throws IOException {
|
||||
public List<ProvenanceEventRecord> getProvenanceEvents(final long firstEventId, final int maxRecords) throws IOException {
|
||||
return new ArrayList<ProvenanceEventRecord>(provenanceEventRepository.getEvents(firstEventId, maxRecords));
|
||||
}
|
||||
|
||||
|
|
|
@ -185,7 +185,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
|
|||
static final String LAST_MODIFIED = "LastModified";
|
||||
|
||||
static {
|
||||
SimpleDateFormat sdf = new SimpleDateFormat(LAST_MODIFIED_DATE_PATTERN_RFC1123, Locale.US);
|
||||
final SimpleDateFormat sdf = new SimpleDateFormat(LAST_MODIFIED_DATE_PATTERN_RFC1123, Locale.US);
|
||||
sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||
UNINITIALIZED_LAST_MODIFIED_VALUE = sdf.format(new Date(1L));
|
||||
}
|
||||
|
@ -221,13 +221,13 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
|
|||
this.properties = Collections.unmodifiableList(properties);
|
||||
|
||||
// load etag and lastModified from file
|
||||
File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
|
||||
final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
|
||||
try (FileInputStream fis = new FileInputStream(httpCache)) {
|
||||
Properties props = new Properties();
|
||||
final Properties props = new Properties();
|
||||
props.load(fis);
|
||||
entityTagRef.set(props.getProperty(ETAG));
|
||||
lastModifiedRef.set(props.getProperty(LAST_MODIFIED));
|
||||
} catch (IOException swallow) {
|
||||
} catch (final IOException swallow) {
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -242,20 +242,20 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
|
||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||
entityTagRef.set("");
|
||||
lastModifiedRef.set(UNINITIALIZED_LAST_MODIFIED_VALUE);
|
||||
}
|
||||
|
||||
@OnShutdown
|
||||
public void onShutdown() {
|
||||
File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
|
||||
final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
|
||||
try (FileOutputStream fos = new FileOutputStream(httpCache)) {
|
||||
Properties props = new Properties();
|
||||
final Properties props = new Properties();
|
||||
props.setProperty(ETAG, entityTagRef.get());
|
||||
props.setProperty(LAST_MODIFIED, lastModifiedRef.get());
|
||||
props.store(fos, "GetHTTP file modification values");
|
||||
} catch (IOException swallow) {
|
||||
} catch (final IOException swallow) {
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -287,7 +287,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
|
|||
keystore.load(in, service.getKeyStorePassword().toCharArray());
|
||||
}
|
||||
|
||||
SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(truststore, new TrustSelfSignedStrategy()).loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()).build();
|
||||
final SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(truststore, new TrustSelfSignedStrategy()).loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()).build();
|
||||
|
||||
return sslContext;
|
||||
}
|
||||
|
@ -310,7 +310,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
|
|||
try {
|
||||
uri = new URI(url);
|
||||
source = uri.getHost();
|
||||
} catch (URISyntaxException swallow) {
|
||||
} catch (final URISyntaxException swallow) {
|
||||
// this won't happen as the url has already been validated
|
||||
}
|
||||
|
||||
|
@ -435,20 +435,19 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
|
|||
readLock.unlock();
|
||||
writeLock.lock();
|
||||
try {
|
||||
if (timeToPersist < System.currentTimeMillis()) {
|
||||
if (timeToPersist < System.currentTimeMillis()) {
|
||||
timeToPersist = System.currentTimeMillis() + PERSISTENCE_INTERVAL_MSEC;
|
||||
File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
|
||||
final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
|
||||
try (FileOutputStream fos = new FileOutputStream(httpCache)) {
|
||||
Properties props = new Properties();
|
||||
final Properties props = new Properties();
|
||||
props.setProperty(ETAG, entityTagRef.get());
|
||||
props.setProperty(LAST_MODIFIED, lastModifiedRef.get());
|
||||
props.store(fos, "GetHTTP file modification values");
|
||||
} catch (IOException e) {
|
||||
} catch (final IOException e) {
|
||||
getLogger().error("Failed to persist ETag and LastMod due to " + e, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
} finally {
|
||||
readLock.lock();
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ public class TestDetectDuplicate {
|
|||
|
||||
@Test
|
||||
public void testDuplicate() throws InitializationException {
|
||||
TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
|
||||
final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
|
||||
final DistributedMapCacheClientImpl client = createClient();
|
||||
final Map<String, String> clientProperties = new HashMap<>();
|
||||
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
|
||||
|
@ -65,7 +65,7 @@ public class TestDetectDuplicate {
|
|||
runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client");
|
||||
runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file");
|
||||
runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "48 hours");
|
||||
Map<String, String> props = new HashMap<>();
|
||||
final Map<String, String> props = new HashMap<>();
|
||||
props.put("hash.value", "1000");
|
||||
runner.enqueue(new byte[]{}, props);
|
||||
runner.enableControllerService(client);
|
||||
|
@ -84,7 +84,7 @@ public class TestDetectDuplicate {
|
|||
@Test
|
||||
public void testDuplicateWithAgeOff() throws InitializationException, InterruptedException {
|
||||
|
||||
TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
|
||||
final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class);
|
||||
final DistributedMapCacheClientImpl client = createClient();
|
||||
final Map<String, String> clientProperties = new HashMap<>();
|
||||
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost");
|
||||
|
@ -94,7 +94,7 @@ public class TestDetectDuplicate {
|
|||
runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "2 secs");
|
||||
runner.enableControllerService(client);
|
||||
|
||||
Map<String, String> props = new HashMap<>();
|
||||
final Map<String, String> props = new HashMap<>();
|
||||
props.put("hash.value", "1000");
|
||||
runner.enqueue(new byte[]{}, props);
|
||||
|
||||
|
@ -114,7 +114,7 @@ public class TestDetectDuplicate {
|
|||
|
||||
final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl();
|
||||
final ComponentLog logger = new MockProcessorLog("client", client);
|
||||
MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger);
|
||||
final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger);
|
||||
client.initialize(clientInitContext);
|
||||
|
||||
return client;
|
||||
|
@ -130,12 +130,12 @@ public class TestDetectDuplicate {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
|
||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected java.util.List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
List<PropertyDescriptor> props = new ArrayList<>();
|
||||
final List<PropertyDescriptor> props = new ArrayList<>();
|
||||
props.add(DistributedMapCacheClientService.HOSTNAME);
|
||||
props.add(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT);
|
||||
props.add(DistributedMapCacheClientService.PORT);
|
||||
|
@ -144,7 +144,7 @@ public class TestDetectDuplicate {
|
|||
}
|
||||
|
||||
@Override
|
||||
public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
|
||||
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
|
||||
if (exists) {
|
||||
return false;
|
||||
}
|
||||
|
@ -154,7 +154,8 @@ public class TestDetectDuplicate {
|
|||
}
|
||||
|
||||
@Override
|
||||
public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
|
||||
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
|
||||
final Deserializer<V> valueDeserializer) throws IOException {
|
||||
if (exists) {
|
||||
return (V) cacheValue;
|
||||
}
|
||||
|
@ -163,25 +164,24 @@ public class TestDetectDuplicate {
|
|||
}
|
||||
|
||||
@Override
|
||||
public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
|
||||
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
|
||||
return exists;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
|
||||
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
|
||||
public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
|
||||
exists = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
|
||||
|
||||
}
|
||||
@Override
|
||||
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
private static class StringSerializer implements Serializer<String> {
|
||||
|
|
|
@ -46,39 +46,39 @@ import org.slf4j.LoggerFactory;
|
|||
@Tags({"distributed", "cache", "state", "map", "cluster"})
|
||||
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"})
|
||||
@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map "
|
||||
+ "between nodes in a NiFi cluster")
|
||||
+ "between nodes in a NiFi cluster")
|
||||
public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
|
||||
|
||||
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
|
||||
.name("Server Hostname")
|
||||
.description("The name of the server that is running the DistributedMapCacheServer service")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
.name("Server Hostname")
|
||||
.description("The name of the server that is running the DistributedMapCacheServer service")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
|
||||
.name("Server Port")
|
||||
.description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.PORT_VALIDATOR)
|
||||
.defaultValue("4557")
|
||||
.build();
|
||||
.name("Server Port")
|
||||
.description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.PORT_VALIDATOR)
|
||||
.defaultValue("4557")
|
||||
.build();
|
||||
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("SSL Context Service")
|
||||
.description("If specified, indicates the SSL Context Service that is used to communicate with the "
|
||||
+ "remote server. If not specified, communications will not be encrypted")
|
||||
.required(false)
|
||||
.identifiesControllerService(SSLContextService.class)
|
||||
.build();
|
||||
.name("SSL Context Service")
|
||||
.description("If specified, indicates the SSL Context Service that is used to communicate with the "
|
||||
+ "remote server. If not specified, communications will not be encrypted")
|
||||
.required(false)
|
||||
.identifiesControllerService(SSLContextService.class)
|
||||
.build();
|
||||
public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("Communications Timeout")
|
||||
.description("Specifies how long to wait when communicating with the remote server before determining that "
|
||||
+ "there is a communications failure if data cannot be sent or received")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.defaultValue("30 secs")
|
||||
.build();
|
||||
.name("Communications Timeout")
|
||||
.description("Specifies how long to wait when communicating with the remote server before determining that "
|
||||
+ "there is a communications failure if data cannot be sent or received")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.defaultValue("30 secs")
|
||||
.build();
|
||||
|
||||
private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>();
|
||||
private volatile ConfigurationContext configContext;
|
||||
|
@ -118,28 +118,29 @@ public class DistributedMapCacheClientService extends AbstractControllerService
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
|
||||
withCommsSession(new CommsAction<Object>() {
|
||||
@Override
|
||||
public Object execute(final CommsSession session) throws IOException {
|
||||
final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
|
||||
dos.writeUTF("put");
|
||||
|
||||
serialize(key, keySerializer, dos);
|
||||
serialize(value, valueSerializer, dos);
|
||||
|
||||
dos.flush();
|
||||
final DataInputStream dis = new DataInputStream(session.getInputStream());
|
||||
final boolean success = dis.readBoolean();
|
||||
if ( !success ) {
|
||||
throw new IOException("Expected to receive confirmation of 'put' request but received unexpected response");
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
});
|
||||
withCommsSession(new CommsAction<Object>() {
|
||||
@Override
|
||||
public Object execute(final CommsSession session) throws IOException {
|
||||
final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
|
||||
dos.writeUTF("put");
|
||||
|
||||
serialize(key, keySerializer, dos);
|
||||
serialize(value, valueSerializer, dos);
|
||||
|
||||
dos.flush();
|
||||
final DataInputStream dis = new DataInputStream(session.getInputStream());
|
||||
final boolean success = dis.readBoolean();
|
||||
if ( !success ) {
|
||||
throw new IOException("Expected to receive confirmation of 'put' request but received unexpected response");
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
|
||||
return withCommsSession(new CommsAction<Boolean>() {
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.wali.UpdateType;
|
|||
import org.wali.WriteAheadRepository;
|
||||
|
||||
public class PersistentMapCache implements MapCache {
|
||||
|
||||
private final MapCache wrapped;
|
||||
private final WriteAheadRepository<MapWaliRecord> wali;
|
||||
|
||||
|
@ -78,10 +77,10 @@ public class PersistentMapCache implements MapCache {
|
|||
|
||||
return putResult;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException {
|
||||
final MapPutResult putResult = wrapped.put(key, value);
|
||||
final MapPutResult putResult = wrapped.put(key, value);
|
||||
if ( putResult.isSuccessful() ) {
|
||||
// The put was successful.
|
||||
final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
|
||||
|
@ -91,15 +90,15 @@ public class PersistentMapCache implements MapCache {
|
|||
if ( putResult.getEvictedKey() != null ) {
|
||||
records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
|
||||
}
|
||||
|
||||
|
||||
wali.update(Collections.singletonList(record), false);
|
||||
|
||||
|
||||
final long modCount = modifications.getAndIncrement();
|
||||
if ( modCount > 0 && modCount % 100000 == 0 ) {
|
||||
wali.checkpoint();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return putResult;
|
||||
}
|
||||
|
||||
|
@ -114,7 +113,7 @@ public class PersistentMapCache implements MapCache {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer remove(ByteBuffer key) throws IOException {
|
||||
public ByteBuffer remove(final ByteBuffer key) throws IOException {
|
||||
final ByteBuffer removeResult = wrapped.remove(key);
|
||||
if (removeResult != null) {
|
||||
final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, key, removeResult);
|
||||
|
@ -163,7 +162,7 @@ public class PersistentMapCache implements MapCache {
|
|||
private static class Serde implements SerDe<MapWaliRecord> {
|
||||
|
||||
@Override
|
||||
public void serializeEdit(MapWaliRecord previousRecordState, MapWaliRecord newRecordState, java.io.DataOutputStream out) throws IOException {
|
||||
public void serializeEdit(final MapWaliRecord previousRecordState, final MapWaliRecord newRecordState, final java.io.DataOutputStream out) throws IOException {
|
||||
final UpdateType updateType = newRecordState.getUpdateType();
|
||||
if (updateType == UpdateType.DELETE) {
|
||||
out.write(0);
|
||||
|
@ -181,7 +180,7 @@ public class PersistentMapCache implements MapCache {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void serializeRecord(MapWaliRecord record, java.io.DataOutputStream out) throws IOException {
|
||||
public void serializeRecord(final MapWaliRecord record, final java.io.DataOutputStream out) throws IOException {
|
||||
serializeEdit(null, record, out);
|
||||
}
|
||||
|
||||
|
@ -206,7 +205,7 @@ public class PersistentMapCache implements MapCache {
|
|||
}
|
||||
|
||||
@Override
|
||||
public MapWaliRecord deserializeRecord(DataInputStream in, int version) throws IOException {
|
||||
public MapWaliRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
|
||||
return deserializeEdit(in, new HashMap<Object, MapWaliRecord>(), version);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue