NIFI-8388 Upgrade Hazelcast from 4.0.1 to 4.2

- Removes dependency on Guava using ByteBuffer methods for serializing revision

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4967.
This commit is contained in:
exceptionfactory 2021-03-31 16:24:33 -05:00 committed by Pierre Villard
parent f21c1be60f
commit 64b12176b2
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
5 changed files with 24 additions and 52 deletions

View File

@ -12,10 +12,5 @@ The following binary components are provided under the Apache Software License v
(ASLv2) Hazelcast
The following NOTICE information applies:
Core Hazelcast Module 4.0.1
Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
(ASLv2) Guava
The following NOTICE information applies:
Guava
Copyright 2015 The Guava Authors
Core Hazelcast Module 4.2
Copyright (c) 2008-2021, Hazelcast, Inc. All Rights Reserved.

View File

@ -52,13 +52,7 @@
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.1-jre</version>
<version>4.2</version>
</dependency>
<!-- Test dependencies -->

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.hazelcast.services.cacheclient;
import com.google.common.primitives.Longs;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
@ -36,6 +35,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -199,8 +199,8 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement
}
@Override
public void close() throws IOException {
getLogger().debug("Closing " + this.getClass().getSimpleName());
public void close() {
getLogger().debug("Closing {}", getClass().getSimpleName());
}
@Override
@ -209,7 +209,7 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement
}
private static long parseRevision(final byte[] value) {
return Longs.fromByteArray(Arrays.copyOfRange(value, 0, Long.BYTES));
return ByteBuffer.wrap(Arrays.copyOfRange(value, 0, Long.BYTES)).getLong();
}
private static <V> V parsePayload(final Deserializer<V> deserializer, final byte[] value) throws IOException {
@ -249,8 +249,13 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement
*/
private <S> byte[] serialize(final S value, final Serializer<S> serializer, final long version) throws IOException {
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
stream.write(Longs.toByteArray(version));
stream.write(getVersionByteArray(version));
serializer.serialize(value, stream);
return stream.toByteArray();
}
private byte[] getVersionByteArray(final long version) {
return ByteBuffer.allocate(Long.BYTES).putLong(version).array();
}
}

View File

@ -22,17 +22,14 @@ public class EmbeddedHazelcastCacheManagerTest extends AbstractHazelcastCacheMan
@Test
public void testExecution() throws Exception {
// given
testSubject = new EmbeddedHazelcastCacheManager();
testRunner.addControllerService("hazelcast-connection-service", testSubject);
givenHazelcastMapCacheClient();
givenServicesAreEnabled();
// when
whenExecuting();
// then
thenProcessingIsSuccessful();
}
}

View File

@ -19,63 +19,44 @@ package org.apache.nifi.hazelcast.services.cachemanager;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class ExternalHazelcastCacheManagerTest extends AbstractHazelcastCacheManagerTest {
private Thread hazelcastServer;
private HazelcastInstance hazelcastInstance;
private int port;
@Before
public void setUp() {
hazelcastServer = new Thread(new Runnable() {
HazelcastInstance hazelcastInstance;
port = NetworkUtils.availablePort();
final Config config = new Config();
config.getNetworkConfig().setPort(port);
config.setClusterName("nifi");
@Override
public void run() {
final Config config = new Config();
config.getNetworkConfig().setPort(5704);
config.setClusterName("nifi");
hazelcastInstance = Hazelcast.newHazelcastInstance(config);
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
hazelcastInstance.shutdown();
hazelcastInstance = null;
}
});
hazelcastServer.start();
hazelcastInstance = Hazelcast.newHazelcastInstance(config);
super.setUp();
}
@After
public void tearDown() {
super.tearDown();
hazelcastServer.interrupt();
hazelcastInstance.shutdown();
}
@Test
public void testExecution() throws Exception {
// given
testSubject = new ExternalHazelcastCacheManager();
testRunner.addControllerService("hazelcast-connection-service", testSubject);
testRunner.setProperty(testSubject, ExternalHazelcastCacheManager.HAZELCAST_SERVER_ADDRESS, "localhost:5704");
testRunner.setProperty(testSubject, ExternalHazelcastCacheManager.HAZELCAST_SERVER_ADDRESS, String.format("localhost:%d", port));
givenHazelcastMapCacheClient();
givenServicesAreEnabled();
// when
whenExecuting();
// then
thenProcessingIsSuccessful();
}
}