CompressionStrategyTest: Fix thread-unsafe Closer usage. (#12605)

Closer is not thread-safe, so we need one per thread in the
concurrency tests.
This commit is contained in:
Gian Merlino 2022-06-04 10:57:13 -07:00 committed by GitHub
parent a503683a4a
commit abf0e0a159
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 28 additions and 39 deletions

View File

@ -23,9 +23,7 @@ import com.google.common.collect.Iterables;
import org.apache.druid.collections.ResourceHolder; import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.ByteBufferUtils; import org.apache.druid.java.util.common.ByteBufferUtils;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -46,7 +44,7 @@ import java.util.concurrent.TimeUnit;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class CompressionStrategyTest public class CompressionStrategyTest
{ {
@Parameterized.Parameters @Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> compressionStrategies() public static Iterable<Object[]> compressionStrategies()
{ {
return Iterables.transform( return Iterables.transform(
@ -64,44 +62,32 @@ public class CompressionStrategyTest
// MUST be smaller than CompressedPools.BUFFER_SIZE // MUST be smaller than CompressedPools.BUFFER_SIZE
private static final int DATA_SIZER = 0xFFFF; private static final int DATA_SIZER = 0xFFFF;
private static byte[] originalData; private static byte[] ORIGINAL_DATA;
@BeforeClass @BeforeClass
public static void setupClass() public static void setupClass()
{ {
originalData = new byte[DATA_SIZER]; ORIGINAL_DATA = new byte[DATA_SIZER];
Random random = new Random(54671457); Random random = new Random(54671457);
random.nextBytes(originalData); random.nextBytes(ORIGINAL_DATA);
}
private Closer closer;
@Before
public void createCloser()
{
closer = Closer.create();
}
@After
public void closeCloser() throws IOException
{
closer.close();
} }
@Test @Test
public void testBasicOperations() public void testBasicOperations() throws IOException
{ {
ByteBuffer compressionOut = compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, closer); try (final Closer closer = Closer.create();
ByteBuffer compressionIn = compressionStrategy.getCompressor().allocateInBuffer(originalData.length, closer); final ResourceHolder<ByteBuffer> holder = ByteBufferUtils.allocateDirect(ORIGINAL_DATA.length)) {
try (final ResourceHolder<ByteBuffer> holder = ByteBufferUtils.allocateDirect(originalData.length)) { ByteBuffer compressionOut = compressionStrategy.getCompressor().allocateOutBuffer(ORIGINAL_DATA.length, closer);
ByteBuffer compressionIn = compressionStrategy.getCompressor().allocateInBuffer(ORIGINAL_DATA.length, closer);
final ByteBuffer output = holder.get(); final ByteBuffer output = holder.get();
compressionIn.put(originalData); compressionIn.put(ORIGINAL_DATA);
compressionIn.rewind(); compressionIn.rewind();
ByteBuffer compressed = compressionStrategy.getCompressor().compress(compressionIn, compressionOut); ByteBuffer compressed = compressionStrategy.getCompressor().compress(compressionIn, compressionOut);
compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output); compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output);
byte[] checkArray = new byte[DATA_SIZER]; byte[] checkArray = new byte[DATA_SIZER];
output.get(checkArray); output.get(checkArray);
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray); Assert.assertArrayEquals("Uncompressed data does not match", ORIGINAL_DATA, checkArray);
} }
} }
@ -122,19 +108,22 @@ public class CompressionStrategyTest
results.add( results.add(
threadPoolExecutor.submit( threadPoolExecutor.submit(
() -> { () -> {
ByteBuffer compressionOut = compressionStrategy.getCompressor() try (final Closer closer = Closer.create()) {
.allocateOutBuffer(originalData.length, closer); ByteBuffer compressionOut = compressionStrategy.getCompressor()
ByteBuffer compressionIn = compressionStrategy.getCompressor() .allocateOutBuffer(ORIGINAL_DATA.length, closer);
.allocateInBuffer(originalData.length, closer); ByteBuffer compressionIn = compressionStrategy.getCompressor()
compressionIn.put(originalData); .allocateInBuffer(ORIGINAL_DATA.length, closer);
compressionIn.position(0); compressionIn.put(ORIGINAL_DATA);
ByteBuffer compressed = compressionStrategy.getCompressor().compress(compressionIn, compressionOut); compressionIn.position(0);
ByteBuffer output = compressionStrategy.getCompressor().allocateOutBuffer(originalData.length, closer); ByteBuffer compressed = compressionStrategy.getCompressor().compress(compressionIn, compressionOut);
compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output); ByteBuffer output = compressionStrategy.getCompressor()
byte[] checkArray = new byte[DATA_SIZER]; .allocateOutBuffer(ORIGINAL_DATA.length, closer);
output.get(checkArray); compressionStrategy.getDecompressor().decompress(compressed, compressed.remaining(), output);
Assert.assertArrayEquals("Uncompressed data does not match", originalData, checkArray); byte[] checkArray = new byte[DATA_SIZER];
return true; output.get(checkArray);
Assert.assertArrayEquals("Uncompressed data does not match", ORIGINAL_DATA, checkArray);
return true;
}
} }
) )
); );