HBASE-19435 Reopen Files for ClosedChannelException in BucketCache

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Zach York 2017-12-04 12:11:21 -08:00 committed by tedyu
parent 5398d83bc7
commit 9e34d99c07
2 changed files with 42 additions and 1 deletions

View File

@ -19,12 +19,15 @@
package org.apache.hadoop.hbase.io.hfile.bucket; package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.Arrays; import java.util.Arrays;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -108,6 +111,17 @@ public class FileIOEngine implements IOEngine {
return 0; return 0;
} }
@VisibleForTesting
void closeFileChannels() {
for (FileChannel fileChannel: fileChannels) {
try {
fileChannel.close();
} catch (IOException e) {
LOG.warn("Failed to close FileChannel", e);
}
}
}
/** /**
* Transfers data from the given byte buffer to file * Transfers data from the given byte buffer to file
* @param srcBuffer the given byte buffer from which bytes are to be read * @param srcBuffer the given byte buffer from which bytes are to be read
@ -169,11 +183,18 @@ public class FileIOEngine implements IOEngine {
int bufLimit = buffer.limit(); int bufLimit = buffer.limit();
while (true) { while (true) {
FileChannel fileChannel = fileChannels[accessFileNum]; FileChannel fileChannel = fileChannels[accessFileNum];
int accessLen = 0;
if (endFileNum > accessFileNum) { if (endFileNum > accessFileNum) {
// short the limit; // short the limit;
buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset)); buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
} }
int accessLen = accessor.access(fileChannel, buffer, accessOffset); try {
accessLen = accessor.access(fileChannel, buffer, accessOffset);
} catch (ClosedChannelException e) {
LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file. ", e);
refreshFileConnection(accessFileNum);
continue;
}
// recover the limit // recover the limit
buffer.limit(bufLimit); buffer.limit(bufLimit);
if (accessLen < remainingAccessDataLen) { if (accessLen < remainingAccessDataLen) {
@ -213,6 +234,11 @@ public class FileIOEngine implements IOEngine {
return fileNum; return fileNum;
} }
private void refreshFileConnection(int accessFileNum) throws FileNotFoundException {
rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
}
private static interface FileAccessor { private static interface FileAccessor {
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
throws IOException; throws IOException;

View File

@ -114,4 +114,19 @@ public class TestFileIOEngine {
fileIOEngine.read(ByteBuffer.wrap(data2), 0); fileIOEngine.read(ByteBuffer.wrap(data2), 0);
assertArrayEquals(data1, data2); assertArrayEquals(data1, data2);
} }
@Test
public void testClosedChannelException() throws IOException {
fileIOEngine.closeFileChannels();
int len = 5;
long offset = 0L;
byte[] data1 = new byte[len];
for (int j = 0; j < data1.length; ++j) {
data1[j] = (byte) (Math.random() * 255);
}
byte[] data2 = new byte[len];
fileIOEngine.write(ByteBuffer.wrap(data1), offset);
fileIOEngine.read(ByteBuffer.wrap(data2), offset);
assertArrayEquals(data1, data2);
}
} }