HBASE-9380 StoreFile.Reader is not being closed on memstore flush

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1518827 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2013-08-29 21:20:45 +00:00
parent 739f438176
commit e526e3f5e2
1 changed files with 15 additions and 2 deletions

View File

@ -48,12 +48,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -274,6 +274,7 @@ public class HStore implements Store {
return ttl;
}
@Override
public String getColumnFamilyName() {
return this.family.getNameAsString();
}
@ -309,6 +310,7 @@ public class HStore implements Store {
return this.compactionCheckMultiplier;
}
@Override
public long getBlockingFileCount() {
return blockingFileCount;
}
@ -345,6 +347,7 @@ public class HStore implements Store {
return closeCheckInterval;
}
@Override
public HColumnDescriptor getFamily() {
return this.family;
}
@ -420,6 +423,7 @@ public class HStore implements Store {
for (final StoreFileInfo storeFileInfo: files) {
// open each store file in parallel
completionService.submit(new Callable<StoreFile>() {
@Override
public StoreFile call() throws IOException {
StoreFile storeFile = createStoreFileAndReader(storeFileInfo.getPath());
return storeFile;
@ -633,6 +637,7 @@ public class HStore implements Store {
new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
for (final StoreFile f : result) {
completionService.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
f.closeReader(true);
return null;
@ -771,6 +776,7 @@ public class HStore implements Store {
* @param isCompaction whether we are creating a new file in a compaction
* @return Writer for a new StoreFile in the tmp dir.
*/
@Override
public StoreFile.Writer createWriterInTmp(long maxKeyCount,
Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint)
throws IOException {
@ -936,6 +942,7 @@ public class HStore implements Store {
* @throws IOException
* @return Storefile we compacted into or null if we failed or opted out early.
*/
@Override
public List<StoreFile> compact(CompactionContext compaction) throws IOException {
assert compaction != null && compaction.hasSelection();
CompactionRequest cr = compaction.getRequest();
@ -1078,6 +1085,7 @@ public class HStore implements Store {
* See HBASE-2331.
* @param compaction
*/
@Override
public void completeCompactionMarker(CompactionDescriptor compaction)
throws IOException {
LOG.debug("Completing compaction from the WAL marker");
@ -1299,6 +1307,7 @@ public class HStore implements Store {
return compaction;
}
@Override
public void cancelRequestedCompaction(CompactionContext compaction) {
finishCompactionRequest(compaction.getRequest());
}
@ -1324,7 +1333,7 @@ public class HStore implements Store {
throws IOException {
StoreFile storeFile = null;
try {
createStoreFileAndReader(path, NoOpDataBlockEncoder.INSTANCE);
storeFile = createStoreFileAndReader(path, NoOpDataBlockEncoder.INSTANCE);
} catch (IOException e) {
LOG.error("Failed to open store file : " + path
+ ", keeping it in tmp location", e);
@ -1554,6 +1563,7 @@ public class HStore implements Store {
return foundCandidate;
}
@Override
public boolean canSplit() {
this.lock.readLock().lock();
try {
@ -1597,6 +1607,7 @@ public class HStore implements Store {
return storeSize;
}
@Override
public void triggerMajorCompaction() {
this.forceMajor = true;
}
@ -1775,6 +1786,7 @@ public class HStore implements Store {
}
}
@Override
public StoreFlushContext createFlushContext(long cacheFlushId) {
return new StoreFlusherImpl(cacheFlushId);
}
@ -1866,6 +1878,7 @@ public class HStore implements Store {
return DEEP_OVERHEAD + this.memstore.heapSize();
}
@Override
public KeyValue.KVComparator getComparator() {
return comparator;
}