From e67eb6c424d76ee259f5076c277454a73e3a2bf4 Mon Sep 17 00:00:00 2001 From: Ramkrishna Date: Thu, 16 Mar 2017 16:11:35 +0530 Subject: [PATCH] HBSE-15314 Allow more than one backing file in bucketcache (Chunhui Shen) --- .../hbase/io/hfile/bucket/BucketCache.java | 9 +- .../hbase/io/hfile/bucket/FileIOEngine.java | 184 ++++++++++++++---- .../io/hfile/bucket/TestFileIOEngine.java | 47 ++++- 3 files changed, 190 insertions(+), 50 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 3e9c3767e2e..3c27f14eff1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -314,8 +314,13 @@ public class BucketCache implements BlockCache, HeapSize { */ private IOEngine getIOEngineFromName(String ioEngineName, long capacity) throws IOException { - if (ioEngineName.startsWith("file:")) { - return new FileIOEngine(ioEngineName.substring(5), capacity); + if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { + // In order to make the usage simple, we only need the prefix 'files:' in + // document whether one or multiple file(s), but also support 'file:' for + // the compatibility + String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1) + .split(FileIOEngine.FILE_DELIMITER); + return new FileIOEngine(capacity, filePaths); } else if (ioEngineName.startsWith("offheap")) { return new ByteBufferIOEngine(capacity, true); } else if (ioEngineName.startsWith("heap")) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index aaf5cf98304..7586d5748b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -18,10 +18,12 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,38 +41,52 @@ import org.apache.hadoop.util.StringUtils; @InterfaceAudience.Private public class FileIOEngine implements IOEngine { private static final Log LOG = LogFactory.getLog(FileIOEngine.class); - private final RandomAccessFile raf; - private final FileChannel fileChannel; - private final String path; - private long size; + public static final String FILE_DELIMITER = ","; + private final String[] filePaths; + private final FileChannel[] fileChannels; + private final RandomAccessFile[] rafs; - public FileIOEngine(String filePath, long fileSize) throws IOException { - this.path = filePath; - this.size = fileSize; - try { - raf = new RandomAccessFile(filePath, "rw"); - } catch (java.io.FileNotFoundException fex) { - LOG.error("Can't create bucket cache file " + filePath, fex); - throw fex; + private final long sizePerFile; + private final long capacity; + + private FileReadAccessor readAccessor = new FileReadAccessor(); + private FileWriteAccessor writeAccessor = new FileWriteAccessor(); + + public FileIOEngine(long capacity, String... filePaths) throws IOException { + this.sizePerFile = capacity / filePaths.length; + this.capacity = this.sizePerFile * filePaths.length; + this.filePaths = filePaths; + this.fileChannels = new FileChannel[filePaths.length]; + this.rafs = new RandomAccessFile[filePaths.length]; + for (int i = 0; i < filePaths.length; i++) { + String filePath = filePaths[i]; + try { + rafs[i] = new RandomAccessFile(filePath, "rw"); + long totalSpace = new File(filePath).getTotalSpace(); + if (totalSpace < sizePerFile) { + // The next setting length will throw exception,logging this message + // is just used for the detail reason of exception, + String msg = "Only " + StringUtils.byteDesc(totalSpace) + + " total space under " + filePath + ", not enough for requested " + + StringUtils.byteDesc(sizePerFile); + LOG.warn(msg); + } + rafs[i].setLength(sizePerFile); + fileChannels[i] = rafs[i].getChannel(); + LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile) + + ", on the path:" + filePath); + } catch (IOException fex) { + LOG.error("Failed allocating cache on " + filePath, fex); + shutdown(); + throw fex; + } } - - try { - raf.setLength(fileSize); - } catch (IOException ioex) { - LOG.error("Can't extend bucket cache file; insufficient space for " - + StringUtils.byteDesc(fileSize), ioex); - raf.close(); - throw ioex; - } - - fileChannel = raf.getChannel(); - LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath); } @Override public String toString() { - return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path + - ", size=" + String.format("%,d", this.size); + return "ioengine=" + this.getClass().getSimpleName() + ", paths=" + + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity); } /** @@ -94,7 +110,7 @@ public class FileIOEngine implements IOEngine { public Cacheable read(long offset, int length, CacheableDeserializer deserializer) throws IOException { ByteBuffer dstBuffer = ByteBuffer.allocate(length); - fileChannel.read(dstBuffer, offset); + accessFile(readAccessor, dstBuffer, offset); // The buffer created out of the fileChannel is formed by copying the data from the file // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts // this buffer from the file the data is already copied and there is no need to ensure that @@ -114,7 +130,7 @@ public class FileIOEngine implements IOEngine { */ @Override public void write(ByteBuffer srcBuffer, long offset) throws IOException { - fileChannel.write(srcBuffer, offset); + accessFile(writeAccessor, srcBuffer, offset); } /** @@ -123,7 +139,16 @@ public class FileIOEngine implements IOEngine { */ @Override public void sync() throws IOException { - fileChannel.force(true); + for (int i = 0; i < fileChannels.length; i++) { + try { + if (fileChannels[i] != null) { + fileChannels[i].force(true); + } + } catch (IOException ie) { + LOG.warn("Failed syncing data to " + this.filePaths[i]); + throw ie; + } + } } /** @@ -131,15 +156,17 @@ public class FileIOEngine implements IOEngine { */ @Override public void shutdown() { - try { - fileChannel.close(); - } catch (IOException ex) { - LOG.error("Can't shutdown cleanly", ex); - } - try { - raf.close(); - } catch (IOException ex) { - LOG.error("Can't shutdown cleanly", ex); + for (int i = 0; i < filePaths.length; i++) { + try { + if (fileChannels[i] != null) { + fileChannels[i].close(); + } + if (rafs[i] != null) { + rafs[i].close(); + } + } catch (IOException ex) { + LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex); + } } } @@ -147,7 +174,84 @@ public class FileIOEngine implements IOEngine { public void write(ByteBuff srcBuffer, long offset) throws IOException { // When caching block into BucketCache there will be single buffer backing for this HFileBlock. assert srcBuffer.hasArray(); - fileChannel.write( - ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(), srcBuffer.remaining()), offset); + write(ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(), + srcBuffer.remaining()), offset); + } + + private void accessFile(FileAccessor accessor, ByteBuffer buffer, + long globalOffset) throws IOException { + int startFileNum = getFileNum(globalOffset); + int remainingAccessDataLen = buffer.remaining(); + int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1); + int accessFileNum = startFileNum; + long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset); + int bufLimit = buffer.limit(); + while (true) { + FileChannel fileChannel = fileChannels[accessFileNum]; + if (endFileNum > accessFileNum) { + // short the limit; + buffer.limit((int) (buffer.limit() - remainingAccessDataLen + + sizePerFile - accessOffset)); + } + int accessLen = accessor.access(fileChannel, buffer, accessOffset); + // recover the limit + buffer.limit(bufLimit); + if (accessLen < remainingAccessDataLen) { + remainingAccessDataLen -= accessLen; + accessFileNum++; + accessOffset = 0; + } else { + break; + } + if (accessFileNum >= fileChannels.length) { + throw new IOException("Required data len " + + StringUtils.byteDesc(buffer.remaining()) + + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + + " where offset=" + globalOffset); + } + } + } + + /** + * Get the absolute offset in given file with the relative global offset. + * @param fileNum + * @param globalOffset + * @return the absolute offset + */ + private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) { + return globalOffset - fileNum * sizePerFile; + } + + private int getFileNum(long offset) { + if (offset < 0) { + throw new IllegalArgumentException("Unexpected offset " + offset); + } + int fileNum = (int) (offset / sizePerFile); + if (fileNum >= fileChannels.length) { + throw new RuntimeException("Not expected offset " + offset + + " where capacity=" + capacity); + } + return fileNum; + } + + private static interface FileAccessor { + int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) + throws IOException; + } + + private static class FileReadAccessor implements FileAccessor { + @Override + public int access(FileChannel fileChannel, ByteBuffer byteBuffer, + long accessOffset) throws IOException { + return fileChannel.read(byteBuffer, accessOffset); + } + } + + private static class FileWriteAccessor implements FileAccessor { + @Override + public int access(FileChannel fileChannel, ByteBuffer byteBuffer, + long accessOffset) throws IOException { + return fileChannel.write(byteBuffer, accessOffset); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java index 93f4cf79e06..d1f3dfe3b67 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java @@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer; import org.apache.hadoop.hbase.nio.ByteBuff; @@ -38,13 +40,39 @@ import org.junit.experimental.categories.Category; public class TestFileIOEngine { @Test public void testFileIOEngine() throws IOException { - int size = 2 * 1024 * 1024; // 2 MB - String filePath = "testFileIOEngine"; + long totalCapacity = 6 * 1024 * 1024; // 6 MB + String[] filePaths = { "testFileIOEngine1", "testFileIOEngine2", + "testFileIOEngine3" }; + long sizePerFile = totalCapacity / filePaths.length; // 2 MB per File + List boundaryStartPositions = new ArrayList(); + boundaryStartPositions.add(0L); + for (int i = 1; i < filePaths.length; i++) { + boundaryStartPositions.add(sizePerFile * i - 1); + boundaryStartPositions.add(sizePerFile * i); + boundaryStartPositions.add(sizePerFile * i + 1); + } + List boundaryStopPositions = new ArrayList(); + for (int i = 1; i < filePaths.length; i++) { + boundaryStopPositions.add(sizePerFile * i - 1); + boundaryStopPositions.add(sizePerFile * i); + boundaryStopPositions.add(sizePerFile * i + 1); + } + boundaryStopPositions.add(sizePerFile * filePaths.length - 1); + FileIOEngine fileIOEngine = new FileIOEngine(totalCapacity, filePaths); try { - FileIOEngine fileIOEngine = new FileIOEngine(filePath, size); - for (int i = 0; i < 50; i++) { + for (int i = 0; i < 500; i++) { int len = (int) Math.floor(Math.random() * 100); - long offset = (long) Math.floor(Math.random() * size % (size - len)); + long offset = (long) Math.floor(Math.random() * totalCapacity % (totalCapacity - len)); + if (i < boundaryStartPositions.size()) { + // make the boundary start positon + offset = boundaryStartPositions.get(i); + } else if ((i - boundaryStartPositions.size()) < boundaryStopPositions.size()) { + // make the boundary stop positon + offset = boundaryStopPositions.get(i - boundaryStartPositions.size()) - len + 1; + } else if (i % 2 == 0) { + // make the cross-files block writing/reading + offset = Math.max(1, i % filePaths.length) * sizePerFile - len / 2; + } byte[] data1 = new byte[len]; for (int j = 0; j < data1.length; ++j) { data1[j] = (byte) (Math.random() * 255); @@ -58,9 +86,12 @@ public class TestFileIOEngine { } } } finally { - File file = new File(filePath); - if (file.exists()) { - file.delete(); + fileIOEngine.shutdown(); + for (String filePath : filePaths) { + File file = new File(filePath); + if (file.exists()) { + file.delete(); + } } }