HBASE-17837 Backport HBASE-15314 to branch-1.3 (Chunhui shen & Ram))
This commit is contained in:
parent
d0139a8777
commit
589a0e2efb
|
@ -309,9 +309,14 @@ public class BucketCache implements BlockCache, HeapSize {
|
||||||
*/
|
*/
|
||||||
private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
|
private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (ioEngineName.startsWith("file:"))
|
if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
|
||||||
return new FileIOEngine(ioEngineName.substring(5), capacity);
|
// In order to make the usage simple, we only need the prefix 'files:' in
|
||||||
else if (ioEngineName.startsWith("offheap"))
|
// document whether one or multiple file(s), but also support 'file:' for
|
||||||
|
// the compatibility
|
||||||
|
String[] filePaths =
|
||||||
|
ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER);
|
||||||
|
return new FileIOEngine(capacity, filePaths);
|
||||||
|
} else if (ioEngineName.startsWith("offheap"))
|
||||||
return new ByteBufferIOEngine(capacity, true);
|
return new ByteBufferIOEngine(capacity, true);
|
||||||
else if (ioEngineName.startsWith("heap"))
|
else if (ioEngineName.startsWith("heap"))
|
||||||
return new ByteBufferIOEngine(capacity, false);
|
return new ByteBufferIOEngine(capacity, false);
|
||||||
|
|
|
@ -18,10 +18,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -34,38 +36,52 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class FileIOEngine implements IOEngine {
|
public class FileIOEngine implements IOEngine {
|
||||||
private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
|
private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
|
||||||
private final RandomAccessFile raf;
|
public static final String FILE_DELIMITER = ",";
|
||||||
private final FileChannel fileChannel;
|
private final String[] filePaths;
|
||||||
private final String path;
|
private final FileChannel[] fileChannels;
|
||||||
private long size;
|
private final RandomAccessFile[] rafs;
|
||||||
|
|
||||||
public FileIOEngine(String filePath, long fileSize) throws IOException {
|
private final long sizePerFile;
|
||||||
this.path = filePath;
|
private final long capacity;
|
||||||
this.size = fileSize;
|
|
||||||
|
private FileReadAccessor readAccessor = new FileReadAccessor();
|
||||||
|
private FileWriteAccessor writeAccessor = new FileWriteAccessor();
|
||||||
|
|
||||||
|
public FileIOEngine(long capacity, String... filePaths) throws IOException {
|
||||||
|
this.sizePerFile = capacity / filePaths.length;
|
||||||
|
this.capacity = this.sizePerFile * filePaths.length;
|
||||||
|
this.filePaths = filePaths;
|
||||||
|
this.fileChannels = new FileChannel[filePaths.length];
|
||||||
|
this.rafs = new RandomAccessFile[filePaths.length];
|
||||||
|
for (int i = 0; i < filePaths.length; i++) {
|
||||||
|
String filePath = filePaths[i];
|
||||||
try {
|
try {
|
||||||
raf = new RandomAccessFile(filePath, "rw");
|
rafs[i] = new RandomAccessFile(filePath, "rw");
|
||||||
} catch (java.io.FileNotFoundException fex) {
|
long totalSpace = new File(filePath).getTotalSpace();
|
||||||
LOG.error("Can't create bucket cache file " + filePath, fex);
|
if (totalSpace < sizePerFile) {
|
||||||
|
// The next setting length will throw exception,logging this message
|
||||||
|
// is just used for the detail reason of exception,
|
||||||
|
String msg = "Only " + StringUtils.byteDesc(totalSpace)
|
||||||
|
+ " total space under " + filePath + ", not enough for requested "
|
||||||
|
+ StringUtils.byteDesc(sizePerFile);
|
||||||
|
LOG.warn(msg);
|
||||||
|
}
|
||||||
|
rafs[i].setLength(sizePerFile);
|
||||||
|
fileChannels[i] = rafs[i].getChannel();
|
||||||
|
LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
|
||||||
|
+ ", on the path:" + filePath);
|
||||||
|
} catch (IOException fex) {
|
||||||
|
LOG.error("Failed allocating cache on " + filePath, fex);
|
||||||
|
shutdown();
|
||||||
throw fex;
|
throw fex;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
|
||||||
raf.setLength(fileSize);
|
|
||||||
} catch (IOException ioex) {
|
|
||||||
LOG.error("Can't extend bucket cache file; insufficient space for "
|
|
||||||
+ StringUtils.byteDesc(fileSize), ioex);
|
|
||||||
raf.close();
|
|
||||||
throw ioex;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fileChannel = raf.getChannel();
|
|
||||||
LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path +
|
return "ioengine=" + this.getClass().getSimpleName() + ", paths="
|
||||||
", size=" + String.format("%,d", this.size);
|
+ Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -86,7 +102,7 @@ public class FileIOEngine implements IOEngine {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public int read(ByteBuffer dstBuffer, long offset) throws IOException {
|
public int read(ByteBuffer dstBuffer, long offset) throws IOException {
|
||||||
return fileChannel.read(dstBuffer, offset);
|
return accessFile(readAccessor, dstBuffer, offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -97,7 +113,7 @@ public class FileIOEngine implements IOEngine {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
|
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
|
||||||
fileChannel.write(srcBuffer, offset);
|
accessFile(writeAccessor, srcBuffer, offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -106,7 +122,16 @@ public class FileIOEngine implements IOEngine {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void sync() throws IOException {
|
public void sync() throws IOException {
|
||||||
fileChannel.force(true);
|
for (int i = 0; i < fileChannels.length; i++) {
|
||||||
|
try {
|
||||||
|
if (fileChannels[i] != null) {
|
||||||
|
fileChannels[i].force(true);
|
||||||
|
}
|
||||||
|
} catch (IOException ie) {
|
||||||
|
LOG.warn("Failed syncing data to " + this.filePaths[i]);
|
||||||
|
throw ie;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -114,15 +139,93 @@ public class FileIOEngine implements IOEngine {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
|
for (int i = 0; i < filePaths.length; i++) {
|
||||||
try {
|
try {
|
||||||
fileChannel.close();
|
if (fileChannels[i] != null) {
|
||||||
|
fileChannels[i].close();
|
||||||
|
}
|
||||||
|
if (rafs[i] != null) {
|
||||||
|
rafs[i].close();
|
||||||
|
}
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
LOG.error("Can't shutdown cleanly", ex);
|
LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex);
|
||||||
}
|
|
||||||
try {
|
|
||||||
raf.close();
|
|
||||||
} catch (IOException ex) {
|
|
||||||
LOG.error("Can't shutdown cleanly", ex);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int accessFile(FileAccessor accessor, ByteBuffer buffer, long globalOffset)
|
||||||
|
throws IOException {
|
||||||
|
int startFileNum = getFileNum(globalOffset);
|
||||||
|
int remainingAccessDataLen = buffer.remaining();
|
||||||
|
int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
|
||||||
|
int accessFileNum = startFileNum;
|
||||||
|
long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
|
||||||
|
int bufLimit = buffer.limit();
|
||||||
|
while (true) {
|
||||||
|
FileChannel fileChannel = fileChannels[accessFileNum];
|
||||||
|
if (endFileNum > accessFileNum) {
|
||||||
|
// short the limit;
|
||||||
|
buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
|
||||||
|
}
|
||||||
|
int accessLen = accessor.access(fileChannel, buffer, accessOffset);
|
||||||
|
// recover the limit
|
||||||
|
buffer.limit(bufLimit);
|
||||||
|
if (accessLen < remainingAccessDataLen) {
|
||||||
|
remainingAccessDataLen -= accessLen;
|
||||||
|
accessFileNum++;
|
||||||
|
accessOffset = 0;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (accessFileNum >= fileChannels.length) {
|
||||||
|
throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining())
|
||||||
|
+ " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
|
||||||
|
+ globalOffset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bufLimit;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the absolute offset in given file with the relative global offset.
|
||||||
|
* @param fileNum
|
||||||
|
* @param globalOffset
|
||||||
|
* @return the absolute offset
|
||||||
|
*/
|
||||||
|
private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) {
|
||||||
|
return globalOffset - fileNum * sizePerFile;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getFileNum(long offset) {
|
||||||
|
if (offset < 0) {
|
||||||
|
throw new IllegalArgumentException("Unexpected offset " + offset);
|
||||||
|
}
|
||||||
|
int fileNum = (int) (offset / sizePerFile);
|
||||||
|
if (fileNum >= fileChannels.length) {
|
||||||
|
throw new RuntimeException("Not expected offset " + offset + " where capacity=" + capacity);
|
||||||
|
}
|
||||||
|
return fileNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static interface FileAccessor {
|
||||||
|
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
|
||||||
|
throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class FileReadAccessor implements FileAccessor {
|
||||||
|
@Override
|
||||||
|
public int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
|
||||||
|
throws IOException {
|
||||||
|
return fileChannel.read(byteBuffer, accessOffset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class FileWriteAccessor implements FileAccessor {
|
||||||
|
@Override
|
||||||
|
public int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
|
||||||
|
throws IOException {
|
||||||
|
return fileChannel.write(byteBuffer, accessOffset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -35,13 +37,39 @@ import org.junit.experimental.categories.Category;
|
||||||
public class TestFileIOEngine {
|
public class TestFileIOEngine {
|
||||||
@Test
|
@Test
|
||||||
public void testFileIOEngine() throws IOException {
|
public void testFileIOEngine() throws IOException {
|
||||||
int size = 2 * 1024 * 1024; // 2 MB
|
long totalCapacity = 6 * 1024 * 1024; // 6 MB
|
||||||
String filePath = "testFileIOEngine";
|
String[] filePaths = { "testFileIOEngine1", "testFileIOEngine2",
|
||||||
|
"testFileIOEngine3" };
|
||||||
|
long sizePerFile = totalCapacity / filePaths.length; // 2 MB per File
|
||||||
|
List<Long> boundaryStartPositions = new ArrayList<Long>();
|
||||||
|
boundaryStartPositions.add(0L);
|
||||||
|
for (int i = 1; i < filePaths.length; i++) {
|
||||||
|
boundaryStartPositions.add(sizePerFile * i - 1);
|
||||||
|
boundaryStartPositions.add(sizePerFile * i);
|
||||||
|
boundaryStartPositions.add(sizePerFile * i + 1);
|
||||||
|
}
|
||||||
|
List<Long> boundaryStopPositions = new ArrayList<Long>();
|
||||||
|
for (int i = 1; i < filePaths.length; i++) {
|
||||||
|
boundaryStopPositions.add(sizePerFile * i - 1);
|
||||||
|
boundaryStopPositions.add(sizePerFile * i);
|
||||||
|
boundaryStopPositions.add(sizePerFile * i + 1);
|
||||||
|
}
|
||||||
|
boundaryStopPositions.add(sizePerFile * filePaths.length - 1);
|
||||||
|
FileIOEngine fileIOEngine = new FileIOEngine(totalCapacity, filePaths);
|
||||||
try {
|
try {
|
||||||
FileIOEngine fileIOEngine = new FileIOEngine(filePath, size);
|
for (int i = 0; i < 500; i++) {
|
||||||
for (int i = 0; i < 50; i++) {
|
|
||||||
int len = (int) Math.floor(Math.random() * 100);
|
int len = (int) Math.floor(Math.random() * 100);
|
||||||
long offset = (long) Math.floor(Math.random() * size % (size - len));
|
long offset = (long) Math.floor(Math.random() * totalCapacity % (totalCapacity - len));
|
||||||
|
if (i < boundaryStartPositions.size()) {
|
||||||
|
// make the boundary start positon
|
||||||
|
offset = boundaryStartPositions.get(i);
|
||||||
|
} else if ((i - boundaryStartPositions.size()) < boundaryStopPositions.size()) {
|
||||||
|
// make the boundary stop positon
|
||||||
|
offset = boundaryStopPositions.get(i - boundaryStartPositions.size()) - len + 1;
|
||||||
|
} else if (i % 2 == 0) {
|
||||||
|
// make the cross-files block writing/reading
|
||||||
|
offset = Math.max(1, i % filePaths.length) * sizePerFile - len / 2;
|
||||||
|
}
|
||||||
byte[] data1 = new byte[len];
|
byte[] data1 = new byte[len];
|
||||||
for (int j = 0; j < data1.length; ++j) {
|
for (int j = 0; j < data1.length; ++j) {
|
||||||
data1[j] = (byte) (Math.random() * 255);
|
data1[j] = (byte) (Math.random() * 255);
|
||||||
|
@ -54,11 +82,14 @@ public class TestFileIOEngine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
fileIOEngine.shutdown();
|
||||||
|
for (String filePath : filePaths) {
|
||||||
File file = new File(filePath);
|
File file = new File(filePath);
|
||||||
if (file.exists()) {
|
if (file.exists()) {
|
||||||
file.delete();
|
file.delete();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue