HBASE-20204 Add locking to RefreshFileConnections in BucketCache
This is a follow-up to HBASE-20141 where Anoop suggested adding locking for refreshing channels.
This commit is contained in:
parent
9d9d5aec0e
commit
5406f63332
|
@ -26,6 +26,7 @@ import java.nio.channels.ClosedByInterruptException;
|
|||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -43,6 +44,7 @@ public class FileIOEngine implements IOEngine {
|
|||
private final String[] filePaths;
|
||||
private final FileChannel[] fileChannels;
|
||||
private final RandomAccessFile[] rafs;
|
||||
private final ReentrantLock[] channelLocks;
|
||||
|
||||
private final long sizePerFile;
|
||||
private final long capacity;
|
||||
|
@ -56,6 +58,7 @@ public class FileIOEngine implements IOEngine {
|
|||
this.filePaths = filePaths;
|
||||
this.fileChannels = new FileChannel[filePaths.length];
|
||||
this.rafs = new RandomAccessFile[filePaths.length];
|
||||
this.channelLocks = new ReentrantLock[filePaths.length];
|
||||
for (int i = 0; i < filePaths.length; i++) {
|
||||
String filePath = filePaths[i];
|
||||
try {
|
||||
|
@ -71,6 +74,7 @@ public class FileIOEngine implements IOEngine {
|
|||
}
|
||||
rafs[i].setLength(sizePerFile);
|
||||
fileChannels[i] = rafs[i].getChannel();
|
||||
channelLocks[i] = new ReentrantLock();
|
||||
LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
|
||||
+ ", on the path:" + filePath);
|
||||
} catch (IOException fex) {
|
||||
|
@ -193,8 +197,7 @@ public class FileIOEngine implements IOEngine {
|
|||
} catch (ClosedByInterruptException e) {
|
||||
throw e;
|
||||
} catch (ClosedChannelException e) {
|
||||
LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file. ", e);
|
||||
refreshFileConnection(accessFileNum);
|
||||
refreshFileConnection(accessFileNum, e);
|
||||
continue;
|
||||
}
|
||||
// recover the limit
|
||||
|
@ -242,13 +245,26 @@ public class FileIOEngine implements IOEngine {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void refreshFileConnection(int accessFileNum) throws IOException {
|
||||
FileChannel fileChannel = fileChannels[accessFileNum];
|
||||
if (fileChannel != null) {
|
||||
fileChannel.close();
|
||||
void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException {
|
||||
ReentrantLock channelLock = channelLocks[accessFileNum];
|
||||
channelLock.lock();
|
||||
try {
|
||||
FileChannel fileChannel = fileChannels[accessFileNum];
|
||||
if (fileChannel != null) {
|
||||
// Don't re-open a channel if we were waiting on another
|
||||
// thread to re-open the channel and it is now open.
|
||||
if (fileChannel.isOpen()) {
|
||||
return;
|
||||
}
|
||||
fileChannel.close();
|
||||
}
|
||||
LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: "
|
||||
+ filePaths[accessFileNum], ioe);
|
||||
rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
|
||||
fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
|
||||
} finally{
|
||||
channelLock.unlock();
|
||||
}
|
||||
rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
|
||||
fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
|
||||
}
|
||||
|
||||
private static interface FileAccessor {
|
||||
|
|
|
@ -19,7 +19,8 @@
|
|||
package org.apache.hadoop.hbase.io.hfile.bucket;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -134,10 +135,18 @@ public class TestFileIOEngine {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshFileConnectionClosesConnections() throws IOException {
|
||||
FileChannel fileChannel = fileIOEngine.getFileChannels()[0];
|
||||
public void testRefreshFileConnection() throws IOException {
|
||||
FileChannel[] fileChannels = fileIOEngine.getFileChannels();
|
||||
FileChannel fileChannel = fileChannels[0];
|
||||
assertNotNull(fileChannel);
|
||||
fileIOEngine.refreshFileConnection(0);
|
||||
assertFalse(fileChannel.isOpen());
|
||||
fileChannel.close();
|
||||
fileIOEngine.refreshFileConnection(0, new IOException("Test Exception"));
|
||||
FileChannel[] reopenedFileChannels = fileIOEngine.getFileChannels();
|
||||
FileChannel reopenedFileChannel = reopenedFileChannels[0];
|
||||
assertNotEquals(fileChannel, reopenedFileChannel);
|
||||
assertEquals(fileChannels.length, reopenedFileChannels.length);
|
||||
for (int i = 1; i < fileChannels.length; i++) {
|
||||
assertEquals(fileChannels[i], reopenedFileChannels[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue