HBASE-24485 Backport to branch-1 HBASE-17738 BucketCache startup is slow (#1823)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
4118383fb1
commit
d3d527637f
@ -0,0 +1,39 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Defines the way the ByteBuffers are created
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public interface ByteBufferAllocator {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocates a bytebuffer
|
||||||
|
* @param size the size of the bytebuffer
|
||||||
|
* @param directByteBuffer indicator to create a direct bytebuffer
|
||||||
|
* @return the bytebuffer that is created
|
||||||
|
* @throws IOException exception thrown if there is an error while creating the ByteBuffer
|
||||||
|
*/
|
||||||
|
ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException;
|
||||||
|
}
|
@ -18,7 +18,16 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.util;
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -31,13 +40,15 @@ import org.apache.hadoop.util.StringUtils;
|
|||||||
* reading/writing data from this large buffer with a position and offset
|
* reading/writing data from this large buffer with a position and offset
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public final class ByteBufferArray {
|
public class ByteBufferArray {
|
||||||
private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
|
private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
|
||||||
|
|
||||||
static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
|
static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
|
||||||
private ByteBuffer buffers[];
|
@VisibleForTesting
|
||||||
|
ByteBuffer[] buffers;
|
||||||
private int bufferSize;
|
private int bufferSize;
|
||||||
private int bufferCount;
|
@VisibleForTesting
|
||||||
|
int bufferCount;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* We allocate a number of byte buffers as the capacity. In order not to out
|
* We allocate a number of byte buffers as the capacity. In order not to out
|
||||||
@ -46,7 +57,8 @@ public final class ByteBufferArray {
|
|||||||
* @param capacity total size of the byte buffer array
|
* @param capacity total size of the byte buffer array
|
||||||
* @param directByteBuffer true if we allocate direct buffer
|
* @param directByteBuffer true if we allocate direct buffer
|
||||||
*/
|
*/
|
||||||
public ByteBufferArray(long capacity, boolean directByteBuffer) {
|
public ByteBufferArray(long capacity, boolean directByteBuffer, ByteBufferAllocator allocator)
|
||||||
|
throws IOException {
|
||||||
this.bufferSize = DEFAULT_BUFFER_SIZE;
|
this.bufferSize = DEFAULT_BUFFER_SIZE;
|
||||||
if (this.bufferSize > (capacity / 16))
|
if (this.bufferSize > (capacity / 16))
|
||||||
this.bufferSize = (int) roundUp(capacity / 16, 32768);
|
this.bufferSize = (int) roundUp(capacity / 16, 32768);
|
||||||
@ -55,13 +67,74 @@ public final class ByteBufferArray {
|
|||||||
+ ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
|
+ ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
|
||||||
+ bufferCount + ", direct=" + directByteBuffer);
|
+ bufferCount + ", direct=" + directByteBuffer);
|
||||||
buffers = new ByteBuffer[bufferCount + 1];
|
buffers = new ByteBuffer[bufferCount + 1];
|
||||||
for (int i = 0; i <= bufferCount; i++) {
|
createBuffers(directByteBuffer, allocator);
|
||||||
if (i < bufferCount) {
|
}
|
||||||
buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize)
|
|
||||||
: ByteBuffer.allocate(bufferSize);
|
@VisibleForTesting
|
||||||
} else {
|
void createBuffers(boolean directByteBuffer, ByteBufferAllocator allocator)
|
||||||
buffers[i] = ByteBuffer.allocate(0);
|
throws IOException {
|
||||||
|
int threadCount = getThreadCount();
|
||||||
|
ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L,
|
||||||
|
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
|
||||||
|
int perThreadCount = (int)Math.floor((double) (bufferCount) / threadCount);
|
||||||
|
int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1));
|
||||||
|
Future<ByteBuffer[]>[] futures = new Future[threadCount];
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < threadCount; i++) {
|
||||||
|
// Last thread will have to deal with a different number of buffers
|
||||||
|
int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount;
|
||||||
|
futures[i] = service.submit(
|
||||||
|
new BufferCreatorCallable(bufferSize, directByteBuffer, buffersToCreate, allocator));
|
||||||
}
|
}
|
||||||
|
int bufferIndex = 0;
|
||||||
|
for (Future<ByteBuffer[]> future : futures) {
|
||||||
|
try {
|
||||||
|
ByteBuffer[] buffers = future.get();
|
||||||
|
for (ByteBuffer buffer : buffers) {
|
||||||
|
this.buffers[bufferIndex++] = buffer;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
LOG.error("Buffer creation interrupted", e);
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
service.shutdownNow();
|
||||||
|
}
|
||||||
|
// always create on heap empty dummy buffer at last
|
||||||
|
this.buffers[bufferCount] = ByteBuffer.allocate(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getThreadCount() {
|
||||||
|
return Runtime.getRuntime().availableProcessors();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A callable that creates buffers of the specified length either onheap/offheap using the
|
||||||
|
* {@link ByteBufferAllocator}
|
||||||
|
*/
|
||||||
|
private static class BufferCreatorCallable implements Callable<ByteBuffer[]> {
|
||||||
|
private final int bufferCapacity;
|
||||||
|
private final boolean directByteBuffer;
|
||||||
|
private final int bufferCount;
|
||||||
|
private final ByteBufferAllocator allocator;
|
||||||
|
|
||||||
|
BufferCreatorCallable(int bufferCapacity, boolean directByteBuffer, int bufferCount,
|
||||||
|
ByteBufferAllocator allocator) {
|
||||||
|
this.bufferCapacity = bufferCapacity;
|
||||||
|
this.directByteBuffer = directByteBuffer;
|
||||||
|
this.bufferCount = bufferCount;
|
||||||
|
this.allocator = allocator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer[] call() throws Exception {
|
||||||
|
ByteBuffer[] buffers = new ByteBuffer[this.bufferCount];
|
||||||
|
for (int i = 0; i < this.bufferCount; i++) {
|
||||||
|
buffers[i] = allocator.allocate(this.bufferCapacity, this.directByteBuffer);
|
||||||
|
}
|
||||||
|
return buffers;
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,95 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({MiscTests.class, SmallTests.class})
|
||||||
|
public class TestByteBufferArray {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testByteBufferCreation() throws Exception {
|
||||||
|
int capacity = 470 * 1021 * 1023;
|
||||||
|
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
||||||
|
@Override
|
||||||
|
public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException {
|
||||||
|
if (directByteBuffer) {
|
||||||
|
return ByteBuffer.allocateDirect((int) size);
|
||||||
|
} else {
|
||||||
|
return ByteBuffer.allocate((int) size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
ByteBufferArray array = new ByteBufferArray(capacity, false, allocator);
|
||||||
|
assertEquals(119, array.buffers.length);
|
||||||
|
for (int i = 0; i < array.buffers.length; i++) {
|
||||||
|
if (i == array.buffers.length - 1) {
|
||||||
|
assertEquals(array.buffers[i].capacity(), 0);
|
||||||
|
} else {
|
||||||
|
assertEquals(array.buffers[i].capacity(), ByteBufferArray.DEFAULT_BUFFER_SIZE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testByteBufferCreation1() throws Exception {
|
||||||
|
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
||||||
|
@Override
|
||||||
|
public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException {
|
||||||
|
if (directByteBuffer) {
|
||||||
|
return ByteBuffer.allocateDirect((int) size);
|
||||||
|
} else {
|
||||||
|
return ByteBuffer.allocate((int) size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
ByteBufferArray array = new DummyByteBufferArray(7 * 1024 * 1024, false, allocator);
|
||||||
|
// overwrite
|
||||||
|
array.bufferCount = 25;
|
||||||
|
array.buffers = new ByteBuffer[array.bufferCount + 1];
|
||||||
|
array.createBuffers(true, allocator);
|
||||||
|
for (int i = 0; i < array.buffers.length; i++) {
|
||||||
|
if (i == array.buffers.length - 1) {
|
||||||
|
assertEquals(array.buffers[i].capacity(), 0);
|
||||||
|
} else {
|
||||||
|
assertEquals(array.buffers[i].capacity(), 458752);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class DummyByteBufferArray extends ByteBufferArray {
|
||||||
|
|
||||||
|
public DummyByteBufferArray(long capacity, boolean directByteBuffer,
|
||||||
|
ByteBufferAllocator allocator) throws IOException {
|
||||||
|
super(capacity, directByteBuffer, allocator);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
int getThreadCount() {
|
||||||
|
return 16;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.util.ByteBufferAllocator;
|
||||||
import org.apache.hadoop.hbase.util.ByteBufferArray;
|
import org.apache.hadoop.hbase.util.ByteBufferArray;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -44,7 +45,17 @@ public class ByteBufferIOEngine implements IOEngine {
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
this.capacity = capacity;
|
this.capacity = capacity;
|
||||||
this.direct = direct;
|
this.direct = direct;
|
||||||
bufferArray = new ByteBufferArray(capacity, direct);
|
ByteBufferAllocator allocator = new ByteBufferAllocator() {
|
||||||
|
@Override
|
||||||
|
public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException {
|
||||||
|
if (directByteBuffer) {
|
||||||
|
return ByteBuffer.allocateDirect((int) size);
|
||||||
|
} else {
|
||||||
|
return ByteBuffer.allocate((int) size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
bufferArray = new ByteBufferArray(capacity, direct, allocator);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
x
Reference in New Issue
Block a user