HBASE-15785 Unnecessary lock in ByteBufferArray.

This commit is contained in:
anoopsjohn 2016-05-11 15:56:41 +05:30
parent a11091c49c
commit c9ebcd4e29
2 changed files with 57 additions and 58 deletions

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -42,10 +40,9 @@ public final class ByteBufferArray {
public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private ByteBuffer buffers[]; private ByteBuffer buffers[];
private Lock locks[];
private int bufferSize; private int bufferSize;
private int bufferCount; private int bufferCount;
private ByteBufferAllocator allocator;
/** /**
* We allocate a number of byte buffers as the capacity. In order not to out * We allocate a number of byte buffers as the capacity. In order not to out
* of the array bounds for the last byte(see {@link ByteBufferArray#multiple}), * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
@ -65,10 +62,7 @@ public final class ByteBufferArray {
+ ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
+ bufferCount + ", direct=" + directByteBuffer); + bufferCount + ", direct=" + directByteBuffer);
buffers = new ByteBuffer[bufferCount + 1]; buffers = new ByteBuffer[bufferCount + 1];
locks = new Lock[bufferCount + 1];
this.allocator = allocator;
for (int i = 0; i <= bufferCount; i++) { for (int i = 0; i <= bufferCount; i++) {
locks[i] = new ReentrantLock();
if (i < bufferCount) { if (i < bufferCount) {
buffers[i] = allocator.allocate(bufferSize, directByteBuffer); buffers[i] = allocator.allocate(bufferSize, directByteBuffer);
} else { } else {
@ -109,8 +103,8 @@ public final class ByteBufferArray {
private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() { private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() {
@Override @Override
public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) { public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
bb.get(array, arrayIdx, len); ByteBufferUtils.copyFromBufferToArray(array, bb, pos, arrayIdx, len);
} }
}; };
@ -138,8 +132,8 @@ public final class ByteBufferArray {
private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() { private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() {
@Override @Override
public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) { public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
bb.put(array, arrayIdx, len); ByteBufferUtils.copyFromArrayToBuffer(bb, pos, array, arrayIdx, len);
} }
}; };
@ -149,11 +143,12 @@ public final class ByteBufferArray {
* bytes from the buffer to the destination array, else if it is a write * bytes from the buffer to the destination array, else if it is a write
* action, we will transfer the bytes from the source array to the buffer * action, we will transfer the bytes from the source array to the buffer
* @param bb byte buffer * @param bb byte buffer
* @param pos Start position in ByteBuffer
* @param array a source or destination byte array * @param array a source or destination byte array
* @param arrayOffset offset of the byte array * @param arrayOffset offset of the byte array
* @param len read/write length * @param len read/write length
*/ */
void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len); void visit(ByteBuffer bb, int pos, byte[] array, int arrayOffset, int len);
} }
/** /**
@ -176,7 +171,7 @@ public final class ByteBufferArray {
assert startBuffer >= 0 && startBuffer < bufferCount; assert startBuffer >= 0 && startBuffer < bufferCount;
assert endBuffer >= 0 && endBuffer < bufferCount assert endBuffer >= 0 && endBuffer < bufferCount
|| (endBuffer == bufferCount && endOffset == 0); || (endBuffer == bufferCount && endOffset == 0);
if (startBuffer >= locks.length || startBuffer < 0) { if (startBuffer >= buffers.length || startBuffer < 0) {
String msg = "Failed multiple, start=" + start + ",startBuffer=" String msg = "Failed multiple, start=" + start + ",startBuffer="
+ startBuffer + ",bufferSize=" + bufferSize; + startBuffer + ",bufferSize=" + bufferSize;
LOG.error(msg); LOG.error(msg);
@ -184,26 +179,19 @@ public final class ByteBufferArray {
} }
int srcIndex = 0, cnt = -1; int srcIndex = 0, cnt = -1;
for (int i = startBuffer; i <= endBuffer; ++i) { for (int i = startBuffer; i <= endBuffer; ++i) {
Lock lock = locks[i]; ByteBuffer bb = buffers[i].duplicate();
lock.lock(); int pos = 0;
try {
ByteBuffer bb = buffers[i];
if (i == startBuffer) { if (i == startBuffer) {
cnt = bufferSize - startOffset; cnt = bufferSize - startOffset;
if (cnt > len) cnt = len; if (cnt > len) cnt = len;
bb.limit(startOffset + cnt).position(startOffset); pos = startOffset;
} else if (i == endBuffer) { } else if (i == endBuffer) {
cnt = endOffset; cnt = endOffset;
bb.limit(cnt).position(0);
} else { } else {
cnt = bufferSize; cnt = bufferSize;
bb.limit(cnt).position(0);
} }
visitor.visit(bb, array, srcIndex + arrayOffset, cnt); visitor.visit(bb, pos, array, srcIndex + arrayOffset, cnt);
srcIndex += cnt; srcIndex += cnt;
} finally {
lock.unlock();
}
} }
assert srcIndex == len; assert srcIndex == len;
} }
@ -231,7 +219,7 @@ public final class ByteBufferArray {
assert startBuffer >= 0 && startBuffer < bufferCount; assert startBuffer >= 0 && startBuffer < bufferCount;
assert endBuffer >= 0 && endBuffer < bufferCount assert endBuffer >= 0 && endBuffer < bufferCount
|| (endBuffer == bufferCount && endBufferOffset == 0); || (endBuffer == bufferCount && endBufferOffset == 0);
if (startBuffer >= locks.length || startBuffer < 0) { if (startBuffer >= buffers.length || startBuffer < 0) {
String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer
+ ",bufferSize=" + bufferSize; + ",bufferSize=" + bufferSize;
LOG.error(msg); LOG.error(msg);
@ -240,31 +228,20 @@ public final class ByteBufferArray {
int srcIndex = 0, cnt = -1; int srcIndex = 0, cnt = -1;
ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1]; ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1];
for (int i = startBuffer, j = 0; i <= endBuffer; ++i, j++) { for (int i = startBuffer, j = 0; i <= endBuffer; ++i, j++) {
Lock lock = locks[i]; ByteBuffer bb = buffers[i].duplicate();
lock.lock();
try {
ByteBuffer bb = buffers[i];
if (i == startBuffer) { if (i == startBuffer) {
cnt = bufferSize - startBufferOffset; cnt = bufferSize - startBufferOffset;
if (cnt > len) cnt = len; if (cnt > len) cnt = len;
ByteBuffer dup = bb.duplicate(); bb.limit(startBufferOffset + cnt).position(startBufferOffset);
dup.limit(startBufferOffset + cnt).position(startBufferOffset);
mbb[j] = dup.slice();
} else if (i == endBuffer) { } else if (i == endBuffer) {
cnt = endBufferOffset; cnt = endBufferOffset;
ByteBuffer dup = bb.duplicate(); bb.position(0).limit(cnt);
dup.position(0).limit(cnt);
mbb[j] = dup.slice();
} else { } else {
cnt = bufferSize; cnt = bufferSize;
ByteBuffer dup = bb.duplicate(); bb.position(0).limit(cnt);
dup.position(0).limit(cnt);
mbb[j] = dup.slice();
} }
mbb[j] = bb.slice();
srcIndex += cnt; srcIndex += cnt;
} finally {
lock.unlock();
}
} }
assert srcIndex == len; assert srcIndex == len;
if (mbb.length > 1) { if (mbb.length > 1) {

View File

@ -939,6 +939,28 @@ public final class ByteBufferUtils {
} }
} }
/**
* Copies bytes from given array's offset to length part into the given buffer. Puts the bytes
* to buffer's given position.
* @param out
* @param in
* @param inOffset
* @param length
*/
public static void copyFromArrayToBuffer(ByteBuffer out, int outOffset, byte[] in, int inOffset,
int length) {
if (out.hasArray()) {
System.arraycopy(in, inOffset, out.array(), out.arrayOffset() + outOffset, length);
} else if (UNSAFE_AVAIL) {
UnsafeAccess.copy(in, inOffset, out, outOffset, length);
} else {
int oldPos = out.position();
out.position(outOffset);
out.put(in, inOffset, length);
out.position(oldPos);
}
}
/** /**
* Copies specified number of bytes from given offset of 'in' ByteBuffer to * Copies specified number of bytes from given offset of 'in' ByteBuffer to
* the array. * the array.