diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index 5a47c7371ea..ad1c394d7cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -19,12 +19,15 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.Arrays; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.yetus.audience.InterfaceAudience; @@ -139,6 +142,17 @@ public class FileIOEngine implements IOEngine { return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE); } + @VisibleForTesting + void closeFileChannels() { + for (FileChannel fileChannel: fileChannels) { + try { + fileChannel.close(); + } catch (IOException e) { + LOG.warn("Failed to close FileChannel", e); + } + } + } + /** * Transfers data from the given byte buffer to file * @param srcBuffer the given byte buffer from which bytes are to be read @@ -208,12 +222,19 @@ public class FileIOEngine implements IOEngine { int bufLimit = buffer.limit(); while (true) { FileChannel fileChannel = fileChannels[accessFileNum]; + int accessLen = 0; if (endFileNum > accessFileNum) { // short the limit; buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset)); } - int accessLen = accessor.access(fileChannel, buffer, accessOffset); + try { + accessLen = accessor.access(fileChannel, buffer, accessOffset); + } catch (ClosedChannelException e) { + LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file. ", e); + refreshFileConnection(accessFileNum); + continue; + } // recover the limit buffer.limit(bufLimit); if (accessLen < remainingAccessDataLen) { @@ -224,10 +245,9 @@ public class FileIOEngine implements IOEngine { break; } if (accessFileNum >= fileChannels.length) { - throw new IOException("Required data len " - + StringUtils.byteDesc(buffer.remaining()) - + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) - + " where offset=" + globalOffset); + throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining()) + + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset=" + + globalOffset); } } } @@ -254,6 +274,11 @@ public class FileIOEngine implements IOEngine { return fileNum; } + private void refreshFileConnection(int accessFileNum) throws FileNotFoundException { + rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw"); + fileChannels[accessFileNum] = rafs[accessFileNum].getChannel(); + } + private static interface FileAccessor { int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset) throws IOException; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java index 4451c0c3f77..1bcc02630f6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; @@ -119,4 +118,20 @@ public class TestFileIOEngine { ByteBuff data2 = deserializer.getDeserializedByteBuff(); assertArrayEquals(data1, data2.array()); } + + @Test + public void testClosedChannelException() throws IOException { + fileIOEngine.closeFileChannels(); + int len = 5; + long offset = 0L; + byte[] data1 = new byte[len]; + for (int j = 0; j < data1.length; ++j) { + data1[j] = (byte) (Math.random() * 255); + } + fileIOEngine.write(ByteBuffer.wrap(data1), offset); + BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); + fileIOEngine.read(offset, len, deserializer); + ByteBuff data2 = deserializer.getDeserializedByteBuff(); + assertArrayEquals(data1, data2.array()); + } }