HBASE-13259 mmap() based BucketCache IOEngine (Zee Chen & Ram)

This commit is contained in:
ramkrishna 2016-02-23 17:03:38 +05:30
parent a8073c4a98
commit 3ba1a7fd23
7 changed files with 320 additions and 14 deletions

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Defines the way the ByteBuffers are created
*/
@InterfaceAudience.Private
public interface ByteBufferAllocator {
/**
* Allocates a bytebuffer
* @param size the size of the bytebuffer
* @param directByteBuffer indicator to create a direct bytebuffer
* @return the bytebuffer that is created
* @throws IOException exception thrown if there is an error while creating the ByteBuffer
*/
ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException;
}

View File

@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -39,20 +40,23 @@ import org.apache.hadoop.util.StringUtils;
public final class ByteBufferArray {
private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private ByteBuffer buffers[];
private Lock locks[];
private int bufferSize;
private int bufferCount;
private ByteBufferAllocator allocator;
/**
* We allocate a number of byte buffers as the capacity. In order not to out
* of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
* we will allocate one additional buffer with capacity 0;
* @param capacity total size of the byte buffer array
* @param directByteBuffer true if we allocate direct buffer
* @param allocator the ByteBufferAllocator that will create the buffers
* @throws IOException throws IOException if there is an exception thrown by the allocator
*/
public ByteBufferArray(long capacity, boolean directByteBuffer) {
public ByteBufferArray(long capacity, boolean directByteBuffer, ByteBufferAllocator allocator)
throws IOException {
this.bufferSize = DEFAULT_BUFFER_SIZE;
if (this.bufferSize > (capacity / 16))
this.bufferSize = (int) roundUp(capacity / 16, 32768);
@ -62,15 +66,15 @@ public final class ByteBufferArray {
+ bufferCount + ", direct=" + directByteBuffer);
buffers = new ByteBuffer[bufferCount + 1];
locks = new Lock[bufferCount + 1];
this.allocator = allocator;
for (int i = 0; i <= bufferCount; i++) {
locks[i] = new ReentrantLock();
if (i < bufferCount) {
buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize)
: ByteBuffer.allocate(bufferSize);
buffers[i] = allocator.allocate(bufferSize, directByteBuffer);
} else {
// always create on heap
buffers[i] = ByteBuffer.allocate(0);
}
}
}

View File

@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -32,7 +35,18 @@ public class TestByteBufferArray {
@Test
public void testAsSubBufferWhenEndOffsetLandInLastBuffer() throws Exception {
int capacity = 4 * 1024 * 1024;
ByteBufferArray array = new ByteBufferArray(capacity, false);
ByteBufferAllocator allocator = new ByteBufferAllocator() {
@Override
public ByteBuffer allocate(long size, boolean directByteBuffer)
throws IOException {
if (directByteBuffer) {
return ByteBuffer.allocateDirect((int) size);
} else {
return ByteBuffer.allocate((int) size);
}
}
};
ByteBufferArray array = new ByteBufferArray(capacity, false, allocator);
ByteBuff subBuf = array.asSubByteBuff(0, capacity);
subBuf.position(capacity - 1);// Position to the last byte
assertTrue(subBuf.hasRemaining());

View File

@ -303,15 +303,18 @@ public class BucketCache implements BlockCache, HeapSize {
*/
private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
throws IOException {
if (ioEngineName.startsWith("file:"))
if (ioEngineName.startsWith("file:")) {
return new FileIOEngine(ioEngineName.substring(5), capacity);
else if (ioEngineName.startsWith("offheap"))
} else if (ioEngineName.startsWith("offheap")) {
return new ByteBufferIOEngine(capacity, true);
else if (ioEngineName.startsWith("heap"))
} else if (ioEngineName.startsWith("heap")) {
return new ByteBufferIOEngine(capacity, false);
else
} else if (ioEngineName.startsWith("mmap:")) {
return new FileMmapEngine(ioEngineName.substring(5), capacity);
} else {
throw new IllegalArgumentException(
"Don't understand io engine name for cache - prefix with file:, heap or offheap");
}
}
/**

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferAllocator;
import org.apache.hadoop.hbase.util.ByteBufferArray;
/**
@ -42,13 +43,24 @@ public class ByteBufferIOEngine implements IOEngine {
* Construct the ByteBufferIOEngine with the given capacity
* @param capacity
* @param direct true if allocate direct buffer
* @throws IOException
* @throws IOException ideally here no exception to be thrown from the allocator
*/
public ByteBufferIOEngine(long capacity, boolean direct)
throws IOException {
this.capacity = capacity;
this.direct = direct;
bufferArray = new ByteBufferArray(capacity, direct);
ByteBufferAllocator allocator = new ByteBufferAllocator() {
@Override
public ByteBuffer allocate(long size, boolean directByteBuffer)
throws IOException {
if (directByteBuffer) {
return ByteBuffer.allocateDirect((int) size);
} else {
return ByteBuffer.allocate((int) size);
}
}
};
bufferArray = new ByteBufferArray(capacity, direct, allocator);
}
@Override
@ -85,7 +97,7 @@ public class ByteBufferIOEngine implements IOEngine {
* @param srcBuffer the given byte buffer from which bytes are to be read
* @param offset The offset in the ByteBufferArray of the first byte to be
* written
* @throws IOException
* @throws IOException throws IOException if writing to the array throws exception
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {

View File

@ -0,0 +1,166 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferAllocator;
import org.apache.hadoop.hbase.util.ByteBufferArray;
import org.apache.hadoop.util.StringUtils;
/**
* IO engine that stores data to a file on the local file system using memory mapping
* mechanism
*/
@InterfaceAudience.Private
public class FileMmapEngine implements IOEngine {
static final Log LOG = LogFactory.getLog(FileMmapEngine.class);
private final String path;
private long size;
private ByteBufferArray bufferArray;
private final FileChannel fileChannel;
private RandomAccessFile raf = null;
public FileMmapEngine(String filePath, long capacity) throws IOException {
this.path = filePath;
this.size = capacity;
long fileSize = 0;
try {
raf = new RandomAccessFile(filePath, "rw");
fileSize = roundUp(capacity, ByteBufferArray.DEFAULT_BUFFER_SIZE);
raf.setLength(fileSize);
fileChannel = raf.getChannel();
LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath);
} catch (java.io.FileNotFoundException fex) {
LOG.error("Can't create bucket cache file " + filePath, fex);
throw fex;
} catch (IOException ioex) {
LOG.error("Can't extend bucket cache file; insufficient space for "
+ StringUtils.byteDesc(fileSize), ioex);
shutdown();
throw ioex;
}
ByteBufferAllocator allocator = new ByteBufferAllocator() {
int pos = 0;
@Override
public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException {
ByteBuffer buffer = null;
if (directByteBuffer) {
buffer = fileChannel.map(java.nio.channels.FileChannel.MapMode.READ_WRITE, pos * size,
size);
} else {
throw new IllegalArgumentException(
"Only Direct Bytebuffers allowed with FileMMap engine");
}
pos++;
return buffer;
}
};
bufferArray = new ByteBufferArray(fileSize, true, allocator);
}
private long roundUp(long n, long to) {
return ((n + to - 1) / to) * to;
}
@Override
public String toString() {
return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path +
", size=" + String.format("%,d", this.size);
}
/**
* File IO engine is always able to support persistent storage for the cache
* @return true
*/
@Override
public boolean isPersistent() {
return true;
}
@Override
public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
throws IOException {
byte[] dst = new byte[length];
bufferArray.getMultiple(offset, length, dst);
return deserializer.deserialize(new SingleByteBuff(ByteBuffer.wrap(dst)), true,
MemoryType.EXCLUSIVE);
}
/**
* Transfers data from the given byte buffer to file
* @param srcBuffer the given byte buffer from which bytes are to be read
* @param offset The offset in the file where the first byte to be written
* @throws IOException
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
assert srcBuffer.hasArray();
bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
srcBuffer.arrayOffset());
}
@Override
public void write(ByteBuff srcBuffer, long offset) throws IOException {
// This singleByteBuff can be considered to be array backed
assert srcBuffer.hasArray();
bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
srcBuffer.arrayOffset());
}
/**
* Sync the data to file after writing
* @throws IOException
*/
@Override
public void sync() throws IOException {
if (fileChannel != null) {
fileChannel.force(true);
}
}
/**
* Close the file
*/
@Override
public void shutdown() {
try {
fileChannel.close();
} catch (IOException ex) {
LOG.error("Can't shutdown cleanly", ex);
}
try {
raf.close();
} catch (IOException ex) {
LOG.error("Can't shutdown cleanly", ex);
}
}
}

View File

@ -0,0 +1,68 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Basic test for {@link FileMmapEngine}
*/
@Category({IOTests.class, SmallTests.class})
public class TestFileMmapEngine {
@Test
public void testFileMmapEngine() throws IOException {
int size = 2 * 1024 * 1024; // 2 MB
String filePath = "testFileMmapEngine";
try {
FileMmapEngine fileMmapEngine = new FileMmapEngine(filePath, size);
for (int i = 0; i < 50; i++) {
int len = (int) Math.floor(Math.random() * 100);
long offset = (long) Math.floor(Math.random() * size % (size - len));
byte[] data1 = new byte[len];
for (int j = 0; j < data1.length; ++j) {
data1[j] = (byte) (Math.random() * 255);
}
fileMmapEngine.write(ByteBuffer.wrap(data1), offset);
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
fileMmapEngine.read(offset, len, deserializer);
ByteBuff data2 = deserializer.getDeserializedByteBuff();
for (int j = 0; j < data1.length; ++j) {
assertTrue(data1[j] == data2.get(j));
}
}
} finally {
File file = new File(filePath);
if (file.exists()) {
file.delete();
}
}
}
}