HBASE-17872 The MSLABImpl generates the invaild cells when unsafe is not availble

This commit is contained in:
CHIA-PING TSAI 2017-04-08 17:37:37 +08:00 committed by Chia-Ping Tsai
parent 59e8b8e2ba
commit df96d328fb
3 changed files with 215 additions and 27 deletions

View File

@ -43,15 +43,14 @@ import sun.nio.ch.DirectBuffer;
@SuppressWarnings("restriction")
@InterfaceAudience.Public
public final class ByteBufferUtils {
// "Compressed integer" serialization helper constants.
public final static int VALUE_MASK = 0x7f;
public final static int NEXT_BIT_SHIFT = 7;
public final static int NEXT_BIT_MASK = 1 << 7;
@VisibleForTesting
static boolean UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
final static boolean UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
@VisibleForTesting
static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
final static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
private ByteBufferUtils() {
}
@ -404,12 +403,11 @@ public final class ByteBufferUtils {
} else if (UNSAFE_AVAIL) {
UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length);
} else {
int outOldPos = out.position();
out.position(destinationOffset);
ByteBuffer outDup = out.duplicate();
outDup.position(destinationOffset);
ByteBuffer inDup = in.duplicate();
inDup.position(sourceOffset).limit(sourceOffset + length);
out.put(inDup);
out.position(outOldPos);
outDup.put(inDup);
}
return destinationOffset + length;
}
@ -990,7 +988,7 @@ public final class ByteBufferUtils {
/**
* Copies bytes from given array's offset to length part into the given buffer. Puts the bytes
* to buffer's given position.
* to buffer's given position. This doesn't affact the position of buffer.
* @param out
* @param in
* @param inOffset
@ -1003,16 +1001,15 @@ public final class ByteBufferUtils {
} else if (UNSAFE_AVAIL) {
UnsafeAccess.copy(in, inOffset, out, outOffset, length);
} else {
int oldPos = out.position();
out.position(outOffset);
out.put(in, inOffset, length);
out.position(oldPos);
ByteBuffer outDup = out.duplicate();
outDup.position(outOffset);
outDup.put(in, inOffset, length);
}
}
/**
* Copies specified number of bytes from given offset of 'in' ByteBuffer to
* the array.
* the array. This doesn't affact the position of buffer.
* @param out
* @param in
* @param sourceOffset
@ -1026,10 +1023,9 @@ public final class ByteBufferUtils {
} else if (UNSAFE_AVAIL) {
UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length);
} else {
int oldPos = in.position();
in.position(sourceOffset);
in.get(out, destinationOffset, length);
in.position(oldPos);
ByteBuffer inDup = in.duplicate();
inDup.position(sourceOffset);
inDup.get(out, destinationOffset, length);
}
}

View File

@ -27,14 +27,22 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.MiscTests;
@ -50,13 +58,13 @@ import org.junit.runners.Parameterized;
@Category({MiscTests.class, SmallTests.class})
@RunWith(Parameterized.class)
public class TestByteBufferUtils {
private static final String UNSAFE_AVAIL_NAME = "UNSAFE_AVAIL";
private static final String UNSAFE_UNALIGNED_NAME = "UNSAFE_UNALIGNED";
private byte[] array;
@AfterClass
public static void afterClass() throws Exception {
ByteBufferUtils.UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
ByteBufferUtils.UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
detectAvailabilityOfUnsafe();
}
@Parameterized.Parameters
@ -69,15 +77,50 @@ public class TestByteBufferUtils {
return paramList;
}
public TestByteBufferUtils(boolean useUnsafeIfPossible) {
if (useUnsafeIfPossible) {
ByteBufferUtils.UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
ByteBufferUtils.UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
} else {
ByteBufferUtils.UNSAFE_AVAIL = false;
ByteBufferUtils.UNSAFE_UNALIGNED = false;
private static void setUnsafe(String fieldName, boolean value) throws Exception {
Field field = ByteBufferUtils.class.getDeclaredField(fieldName);
field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
int oldModifiers = field.getModifiers();
modifiersField.setInt(field, oldModifiers & ~Modifier.FINAL);
try {
field.set(null, value);
} finally {
modifiersField.setInt(field, oldModifiers);
}
}
static void disableUnsafe() throws Exception {
if (ByteBufferUtils.UNSAFE_AVAIL) {
setUnsafe(UNSAFE_AVAIL_NAME, false);
}
if (ByteBufferUtils.UNSAFE_UNALIGNED) {
setUnsafe(UNSAFE_UNALIGNED_NAME, false);
}
assertFalse(ByteBufferUtils.UNSAFE_AVAIL);
assertFalse(ByteBufferUtils.UNSAFE_UNALIGNED);
}
static void detectAvailabilityOfUnsafe() throws Exception {
if (ByteBufferUtils.UNSAFE_AVAIL != UnsafeAvailChecker.isAvailable()) {
setUnsafe(UNSAFE_AVAIL_NAME, UnsafeAvailChecker.isAvailable());
}
if (ByteBufferUtils.UNSAFE_UNALIGNED != UnsafeAvailChecker.unaligned()) {
setUnsafe(UNSAFE_UNALIGNED_NAME, UnsafeAvailChecker.unaligned());
}
assertEquals(ByteBufferUtils.UNSAFE_AVAIL, UnsafeAvailChecker.isAvailable());
assertEquals(ByteBufferUtils.UNSAFE_UNALIGNED, UnsafeAvailChecker.unaligned());
}
public TestByteBufferUtils(boolean useUnsafeIfPossible) throws Exception {
if (useUnsafeIfPossible) {
detectAvailabilityOfUnsafe();
} else {
disableUnsafe();
}
}
/**
* Create an array with sample data.
*/
@ -388,6 +431,111 @@ public class TestByteBufferUtils {
assertEquals(i, buffer.getInt());
}
private void testCopyFromSrcToDestWithThreads(Object input, Object output,
List<Integer> lengthes, List<Integer> offsets) throws InterruptedException {
assertTrue((input instanceof ByteBuffer) || (input instanceof byte[]));
assertTrue((output instanceof ByteBuffer) || (output instanceof byte[]));
assertEquals(lengthes.size(), offsets.size());
final int threads = lengthes.size();
CountDownLatch latch = new CountDownLatch(1);
List<Runnable> exes = new ArrayList<>(threads);
int oldInputPos = (input instanceof ByteBuffer) ? ((ByteBuffer) input).position() : 0;
int oldOutputPos = (output instanceof ByteBuffer) ? ((ByteBuffer) output).position() : 0;
for (int i = 0; i != threads; ++i) {
int offset = offsets.get(i);
int length = lengthes.get(i);
exes.add(() -> {
try {
latch.await();
if (input instanceof ByteBuffer && output instanceof byte[]) {
ByteBufferUtils.copyFromBufferToArray((byte[]) output,
(ByteBuffer) input, offset, offset, length);
}
if (input instanceof byte[] && output instanceof ByteBuffer) {
ByteBufferUtils.copyFromArrayToBuffer((ByteBuffer) output,
offset, (byte[]) input, offset, length);
}
if (input instanceof ByteBuffer && output instanceof ByteBuffer) {
ByteBufferUtils.copyFromBufferToBuffer((ByteBuffer) input,
(ByteBuffer) output, offset, offset, length);
}
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
});
}
ExecutorService service = Executors.newFixedThreadPool(threads);
exes.forEach(service::execute);
latch.countDown();
service.shutdown();
assertTrue(service.awaitTermination(5, TimeUnit.SECONDS));
if (input instanceof ByteBuffer) {
assertEquals(oldInputPos, ((ByteBuffer) input).position());
}
if (output instanceof ByteBuffer) {
assertEquals(oldOutputPos, ((ByteBuffer) output).position());
}
String inputString = (input instanceof ByteBuffer) ?
Bytes.toString(Bytes.toBytes((ByteBuffer) input)) : Bytes.toString((byte[]) input);
String outputString = (output instanceof ByteBuffer) ?
Bytes.toString(Bytes.toBytes((ByteBuffer) output)) : Bytes.toString((byte[]) output);
assertEquals(inputString, outputString);
}
@Test
public void testCopyFromSrcToDestWithThreads() throws InterruptedException {
List<byte[]> words = Arrays.asList(
Bytes.toBytes("with"),
Bytes.toBytes("great"),
Bytes.toBytes("power"),
Bytes.toBytes("comes"),
Bytes.toBytes("great"),
Bytes.toBytes("responsibility")
);
List<Integer> lengthes = words.stream().map(v -> v.length).collect(Collectors.toList());
List<Integer> offsets = new ArrayList<>(words.size());
for (int i = 0; i != words.size(); ++i) {
offsets.add(words.subList(0, i).stream().mapToInt(v -> v.length).sum());
}
int totalSize = words.stream().mapToInt(v -> v.length).sum();
byte[] fullContent = new byte[totalSize];
int offset = 0;
for (byte[] w : words) {
offset = Bytes.putBytes(fullContent, offset, w, 0, w.length);
}
// test copyFromBufferToArray
for (ByteBuffer input : Arrays.asList(
ByteBuffer.allocateDirect(totalSize),
ByteBuffer.allocate(totalSize))) {
words.forEach(input::put);
byte[] output = new byte[totalSize];
testCopyFromSrcToDestWithThreads(input, output, lengthes, offsets);
}
// test copyFromArrayToBuffer
for (ByteBuffer output : Arrays.asList(
ByteBuffer.allocateDirect(totalSize),
ByteBuffer.allocate(totalSize))) {
byte[] input = fullContent;
testCopyFromSrcToDestWithThreads(input, output, lengthes, offsets);
}
// test copyFromBufferToBuffer
for (ByteBuffer input : Arrays.asList(
ByteBuffer.allocateDirect(totalSize),
ByteBuffer.allocate(totalSize))) {
words.forEach(input::put);
for (ByteBuffer output : Arrays.asList(
ByteBuffer.allocateDirect(totalSize),
ByteBuffer.allocate(totalSize))) {
testCopyFromSrcToDestWithThreads(input, output, lengthes, offsets);
}
}
}
@Test
public void testCopyFromBufferToArray() {
ByteBuffer buffer = ByteBuffer.allocate(15);
@ -492,4 +640,5 @@ public class TestByteBufferUtils {
bb[i] = b;
}
}
}

View File

@ -0,0 +1,43 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.hbase.client.TestFromClientSide3;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category({LargeTests.class, ClientTests.class})
public class TestFromClientSide3WoUnsafe extends TestFromClientSide3 {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestByteBufferUtils.disableUnsafe();
TestFromClientSide3.setUpBeforeClass();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TestFromClientSide3.tearDownAfterClass();
TestByteBufferUtils.detectAvailabilityOfUnsafe();
}
}