HBASE-11815 - Flush and compaction could just close the tmp writer if

there is an exception
This commit is contained in:
Ramkrishna 2014-10-07 09:42:21 +05:30
parent 226873f1ae
commit de15b1fd98
2 changed files with 25 additions and 6 deletions

View File

@ -66,12 +66,21 @@ public class DefaultStoreFlusher extends StoreFlusher {
writer = store.createWriterInTmp( writer = store.createWriterInTmp(
cellsCount, store.getFamily().getCompression(), false, true, true); cellsCount, store.getFamily().getCompression(), false, true, true);
writer.setTimeRangeTracker(snapshot.getTimeRangeTracker()); writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
IOException e = null;
try { try {
performFlush(scanner, writer, smallestReadPoint); performFlush(scanner, writer, smallestReadPoint);
} catch (IOException ioe) {
e = ioe;
// throw the exception out
throw ioe;
} finally { } finally {
if (e != null) {
writer.close();
} else {
finalizeWriter(writer, cacheFlushId, status); finalizeWriter(writer, cacheFlushId, status);
} }
} }
}
} finally { } finally {
scanner.close(); scanner.close();
} }

View File

@ -26,9 +26,9 @@ import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
@ -55,6 +55,7 @@ public class DefaultCompactor extends Compactor {
StoreFile.Writer writer = null; StoreFile.Writer writer = null;
List<Path> newFiles = new ArrayList<Path>(); List<Path> newFiles = new ArrayList<Path>();
boolean cleanSeqId = false; boolean cleanSeqId = false;
IOException e = null;
try { try {
InternalScanner scanner = null; InternalScanner scanner = null;
try { try {
@ -92,13 +93,22 @@ public class DefaultCompactor extends Compactor {
scanner.close(); scanner.close();
} }
} }
} finally { } catch (IOException ioe) {
e = ioe;
// Throw the exception
throw ioe;
}
finally {
if (writer != null) { if (writer != null) {
if (e != null) {
writer.close();
} else {
writer.appendMetadata(fd.maxSeqId, request.isAllFiles()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
writer.close(); writer.close();
newFiles.add(writer.getPath()); newFiles.add(writer.getPath());
} }
} }
}
return newFiles; return newFiles;
} }