HBASE-19435 Reopen Files for ClosedChannelException in BucketCache
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
c3743be05c
commit
a39d5bed1b
|
@ -19,12 +19,15 @@
|
||||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -139,6 +142,17 @@ public class FileIOEngine implements IOEngine {
|
||||||
return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE);
|
return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void closeFileChannels() {
|
||||||
|
for (FileChannel fileChannel: fileChannels) {
|
||||||
|
try {
|
||||||
|
fileChannel.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to close FileChannel", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transfers data from the given byte buffer to file
|
* Transfers data from the given byte buffer to file
|
||||||
* @param srcBuffer the given byte buffer from which bytes are to be read
|
* @param srcBuffer the given byte buffer from which bytes are to be read
|
||||||
|
@ -208,12 +222,19 @@ public class FileIOEngine implements IOEngine {
|
||||||
int bufLimit = buffer.limit();
|
int bufLimit = buffer.limit();
|
||||||
while (true) {
|
while (true) {
|
||||||
FileChannel fileChannel = fileChannels[accessFileNum];
|
FileChannel fileChannel = fileChannels[accessFileNum];
|
||||||
|
int accessLen = 0;
|
||||||
if (endFileNum > accessFileNum) {
|
if (endFileNum > accessFileNum) {
|
||||||
// short the limit;
|
// short the limit;
|
||||||
buffer.limit((int) (buffer.limit() - remainingAccessDataLen
|
buffer.limit((int) (buffer.limit() - remainingAccessDataLen
|
||||||
+ sizePerFile - accessOffset));
|
+ sizePerFile - accessOffset));
|
||||||
}
|
}
|
||||||
int accessLen = accessor.access(fileChannel, buffer, accessOffset);
|
try {
|
||||||
|
accessLen = accessor.access(fileChannel, buffer, accessOffset);
|
||||||
|
} catch (ClosedChannelException e) {
|
||||||
|
LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file. ", e);
|
||||||
|
refreshFileConnection(accessFileNum);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
// recover the limit
|
// recover the limit
|
||||||
buffer.limit(bufLimit);
|
buffer.limit(bufLimit);
|
||||||
if (accessLen < remainingAccessDataLen) {
|
if (accessLen < remainingAccessDataLen) {
|
||||||
|
@ -224,10 +245,9 @@ public class FileIOEngine implements IOEngine {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (accessFileNum >= fileChannels.length) {
|
if (accessFileNum >= fileChannels.length) {
|
||||||
throw new IOException("Required data len "
|
throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining())
|
||||||
+ StringUtils.byteDesc(buffer.remaining())
|
+ " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
|
||||||
+ " exceed the engine's capacity " + StringUtils.byteDesc(capacity)
|
+ globalOffset);
|
||||||
+ " where offset=" + globalOffset);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -254,6 +274,11 @@ public class FileIOEngine implements IOEngine {
|
||||||
return fileNum;
|
return fileNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void refreshFileConnection(int accessFileNum) throws FileNotFoundException {
|
||||||
|
rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
|
||||||
|
fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
|
||||||
|
}
|
||||||
|
|
||||||
private static interface FileAccessor {
|
private static interface FileAccessor {
|
||||||
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
|
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||||
|
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -119,4 +118,20 @@ public class TestFileIOEngine {
|
||||||
ByteBuff data2 = deserializer.getDeserializedByteBuff();
|
ByteBuff data2 = deserializer.getDeserializedByteBuff();
|
||||||
assertArrayEquals(data1, data2.array());
|
assertArrayEquals(data1, data2.array());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClosedChannelException() throws IOException {
|
||||||
|
fileIOEngine.closeFileChannels();
|
||||||
|
int len = 5;
|
||||||
|
long offset = 0L;
|
||||||
|
byte[] data1 = new byte[len];
|
||||||
|
for (int j = 0; j < data1.length; ++j) {
|
||||||
|
data1[j] = (byte) (Math.random() * 255);
|
||||||
|
}
|
||||||
|
fileIOEngine.write(ByteBuffer.wrap(data1), offset);
|
||||||
|
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
|
||||||
|
fileIOEngine.read(offset, len, deserializer);
|
||||||
|
ByteBuff data2 = deserializer.getDeserializedByteBuff();
|
||||||
|
assertArrayEquals(data1, data2.array());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue