diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index 29b810f17a5..0710d26a22e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -26,6 +26,7 @@ import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.Arrays; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; @@ -49,6 +50,7 @@ public class FileIOEngine implements IOEngine { private final String[] filePaths; private final FileChannel[] fileChannels; private final RandomAccessFile[] rafs; + private final ReentrantLock[] channelLocks; private final long sizePerFile; private final long capacity; @@ -75,6 +77,7 @@ public class FileIOEngine implements IOEngine { } } this.rafs = new RandomAccessFile[filePaths.length]; + this.channelLocks = new ReentrantLock[filePaths.length]; for (int i = 0; i < filePaths.length; i++) { String filePath = filePaths[i]; try { @@ -90,6 +93,7 @@ public class FileIOEngine implements IOEngine { } rafs[i].setLength(sizePerFile); fileChannels[i] = rafs[i].getChannel(); + channelLocks[i] = new ReentrantLock(); LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile) + ", on the path:" + filePath); } catch (IOException fex) { @@ -233,8 +237,7 @@ public class FileIOEngine implements IOEngine { } catch (ClosedByInterruptException e) { throw e; } catch (ClosedChannelException e) { - LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file. ", e); - refreshFileConnection(accessFileNum); + refreshFileConnection(accessFileNum, e); continue; } // recover the limit @@ -282,13 +285,26 @@ public class FileIOEngine implements IOEngine { } @VisibleForTesting - void refreshFileConnection(int accessFileNum) throws IOException { - FileChannel fileChannel = fileChannels[accessFileNum]; - if (fileChannel != null) { - fileChannel.close(); + void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException { + ReentrantLock channelLock = channelLocks[accessFileNum]; + channelLock.lock(); + try { + FileChannel fileChannel = fileChannels[accessFileNum]; + if (fileChannel != null) { + // Don't re-open a channel if we were waiting on another + // thread to re-open the channel and it is now open. + if (fileChannel.isOpen()) { + return; + } + fileChannel.close(); + } + LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: " + + filePaths[accessFileNum], ioe); + rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw"); + fileChannels[accessFileNum] = rafs[accessFileNum].getChannel(); + } finally{ + channelLock.unlock(); } - rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw"); - fileChannels[accessFileNum] = rafs[accessFileNum].getChannel(); } private static interface FileAccessor { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java index 648098667e5..efb8145d060 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java @@ -18,7 +18,8 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import java.io.File; @@ -143,10 +144,18 @@ public class TestFileIOEngine { } @Test - public void testRefreshFileConnectionClosesConnections() throws IOException { - FileChannel fileChannel = fileIOEngine.getFileChannels()[0]; + public void testRefreshFileConnection() throws IOException { + FileChannel[] fileChannels = fileIOEngine.getFileChannels(); + FileChannel fileChannel = fileChannels[0]; assertNotNull(fileChannel); - fileIOEngine.refreshFileConnection(0); - assertFalse(fileChannel.isOpen()); + fileChannel.close(); + fileIOEngine.refreshFileConnection(0, new IOException("Test Exception")); + FileChannel[] reopenedFileChannels = fileIOEngine.getFileChannels(); + FileChannel reopenedFileChannel = reopenedFileChannels[0]; + assertNotEquals(fileChannel, reopenedFileChannel); + assertEquals(fileChannels.length, reopenedFileChannels.length); + for (int i = 1; i < fileChannels.length; i++) { + assertEquals(fileChannels[i], reopenedFileChannels[i]); + } } }