HBASE-20204 Add locking to RefreshFileConnections in BucketCache

This is a follow-up to HBASE-20141 where Anoop suggested adding locking
for refreshing channels.
This commit is contained in:
Zach York 2018-03-14 15:38:22 -07:00
parent de25f8b209
commit cba8d2fb8d
2 changed files with 38 additions and 13 deletions

View File

@ -26,6 +26,7 @@ import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
@ -49,6 +50,7 @@ public class FileIOEngine implements IOEngine {
private final String[] filePaths; private final String[] filePaths;
private final FileChannel[] fileChannels; private final FileChannel[] fileChannels;
private final RandomAccessFile[] rafs; private final RandomAccessFile[] rafs;
private final ReentrantLock[] channelLocks;
private final long sizePerFile; private final long sizePerFile;
private final long capacity; private final long capacity;
@ -75,6 +77,7 @@ public class FileIOEngine implements IOEngine {
} }
} }
this.rafs = new RandomAccessFile[filePaths.length]; this.rafs = new RandomAccessFile[filePaths.length];
this.channelLocks = new ReentrantLock[filePaths.length];
for (int i = 0; i < filePaths.length; i++) { for (int i = 0; i < filePaths.length; i++) {
String filePath = filePaths[i]; String filePath = filePaths[i];
try { try {
@ -90,6 +93,7 @@ public class FileIOEngine implements IOEngine {
} }
rafs[i].setLength(sizePerFile); rafs[i].setLength(sizePerFile);
fileChannels[i] = rafs[i].getChannel(); fileChannels[i] = rafs[i].getChannel();
channelLocks[i] = new ReentrantLock();
LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile) LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
+ ", on the path:" + filePath); + ", on the path:" + filePath);
} catch (IOException fex) { } catch (IOException fex) {
@ -233,8 +237,7 @@ public class FileIOEngine implements IOEngine {
} catch (ClosedByInterruptException e) { } catch (ClosedByInterruptException e) {
throw e; throw e;
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file. ", e); refreshFileConnection(accessFileNum, e);
refreshFileConnection(accessFileNum);
continue; continue;
} }
// recover the limit // recover the limit
@ -282,13 +285,26 @@ public class FileIOEngine implements IOEngine {
} }
@VisibleForTesting @VisibleForTesting
void refreshFileConnection(int accessFileNum) throws IOException { void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException {
ReentrantLock channelLock = channelLocks[accessFileNum];
channelLock.lock();
try {
FileChannel fileChannel = fileChannels[accessFileNum]; FileChannel fileChannel = fileChannels[accessFileNum];
if (fileChannel != null) { if (fileChannel != null) {
// Don't re-open a channel if we were waiting on another
// thread to re-open the channel and it is now open.
if (fileChannel.isOpen()) {
return;
}
fileChannel.close(); fileChannel.close();
} }
LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: "
+ filePaths[accessFileNum], ioe);
rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw"); rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
fileChannels[accessFileNum] = rafs[accessFileNum].getChannel(); fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
} finally{
channelLock.unlock();
}
} }
private static interface FileAccessor { private static interface FileAccessor {

View File

@ -18,7 +18,8 @@
package org.apache.hadoop.hbase.io.hfile.bucket; package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import java.io.File; import java.io.File;
@ -143,10 +144,18 @@ public class TestFileIOEngine {
} }
@Test @Test
public void testRefreshFileConnectionClosesConnections() throws IOException { public void testRefreshFileConnection() throws IOException {
FileChannel fileChannel = fileIOEngine.getFileChannels()[0]; FileChannel[] fileChannels = fileIOEngine.getFileChannels();
FileChannel fileChannel = fileChannels[0];
assertNotNull(fileChannel); assertNotNull(fileChannel);
fileIOEngine.refreshFileConnection(0); fileChannel.close();
assertFalse(fileChannel.isOpen()); fileIOEngine.refreshFileConnection(0, new IOException("Test Exception"));
FileChannel[] reopenedFileChannels = fileIOEngine.getFileChannels();
FileChannel reopenedFileChannel = reopenedFileChannels[0];
assertNotEquals(fileChannel, reopenedFileChannel);
assertEquals(fileChannels.length, reopenedFileChannels.length);
for (int i = 1; i < fileChannels.length; i++) {
assertEquals(fileChannels[i], reopenedFileChannels[i]);
}
} }
} }