HADOOP-1776 Fix for sporadic compaction failures closing and moving compaction
result M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java Minor fix of a log message. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (COMPACTION_DIR, WORKING_COMPACTION): Removed. (compactdir): Renamed compactionDir. Removed from constructor our checking if a compaction was left undone. Instead, just ignore it. When compaction reruns whatever as left on filesystem will just be cleaned up and we'll rerun the compaction (Likelihood of a crash mid-compaction in exactly the area where the compaction was recoverable are low -- more robust just redoing the compaction from scratch). (compactHelper): We were deleting HBaseRoot/compaction.tmp dir after a compaction completed. Usually fine but on a cluster of more than one machine, if two compactions were near-concurrent, one machine could remove the compaction working directory while another was mid-way through its compaction. Result was odd failures during compaction of result file, during the move of the resulting compacting file or subsequently trying to open reader on the resulting compaction file (See HADOOP-1765). a region fsck tool). (getFilesToCompact): Added. (processReadyCompaction): Added. Reorganized compaction so that the window during which loss-of-data is possible is narrowed and even then, we log a message with how a restore might be performed manually (TODO: Add a repair tool). M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java (rename): More checking around rename that it was successful. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java An empty-log gives HLog trouble. Added handling. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Cleanup of debug level logging. M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Minor javadoc and changed a log from info to debug. git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@569446 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d5cc43d394
commit
5e3c5037b4
|
@ -16,6 +16,8 @@ Trunk (unreleased changes)
|
||||||
HADOOP-1729 Recent renaming or META tables breaks hbase shell
|
HADOOP-1729 Recent renaming or META tables breaks hbase shell
|
||||||
HADOOP-1730 unexpected null value causes META scanner to exit (silently)
|
HADOOP-1730 unexpected null value causes META scanner to exit (silently)
|
||||||
HADOOP-1747 On a cluster, on restart, regions multiply assigned
|
HADOOP-1747 On a cluster, on restart, regions multiply assigned
|
||||||
|
HADOOP-1776 Fix for sporadic compaction failures closing and moving
|
||||||
|
compaction result
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HADOOP-1737 Make HColumnDescriptor data publically members settable
|
HADOOP-1737 Make HColumnDescriptor data publically members settable
|
||||||
|
|
|
@ -679,7 +679,7 @@ public class HConnectionManager implements HConstants {
|
||||||
|
|
||||||
// We found at least one server for the table and now we're done.
|
// We found at least one server for the table and now we're done.
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Found " + servers.size() + " server(s) for " +
|
LOG.debug("Found " + servers.size() + " region(s) for " +
|
||||||
tableName + " at " + t);
|
tableName + " at " + t);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -40,8 +40,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
*
|
*
|
||||||
* <p>Each HRegion is identified by a unique long <code>int</code>. HRegions do
|
* <p>Each HRegion is identified by a unique long <code>int</code>. HRegions do
|
||||||
* not need to declare themselves before using the HLog; they simply include
|
* not need to declare themselves before using the HLog; they simply include
|
||||||
* their HRegion-id in the {@link #append(Text, Text, Text, TreeMap, long)} or
|
* their HRegion-id in the <code>append</code> or
|
||||||
* {@link #completeCacheFlush(Text, Text, long)} calls.
|
* <code>completeCacheFlush</code> calls.
|
||||||
*
|
*
|
||||||
* <p>An HLog consists of multiple on-disk files, which have a chronological
|
* <p>An HLog consists of multiple on-disk files, which have a chronological
|
||||||
* order. As data is flushed to other (better) on-disk structures, the log
|
* order. As data is flushed to other (better) on-disk structures, the log
|
||||||
|
@ -107,6 +107,12 @@ public class HLog implements HConstants {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Splitting " + logfiles[i]);
|
LOG.debug("Splitting " + logfiles[i]);
|
||||||
}
|
}
|
||||||
|
// Check for empty file.
|
||||||
|
if (fs.getFileStatus(logfiles[i]).getLen() <= 0) {
|
||||||
|
LOG.warn("Skipping " + logfiles[i].toString() +
|
||||||
|
" because zero length");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
SequenceFile.Reader in =
|
SequenceFile.Reader in =
|
||||||
new SequenceFile.Reader(fs, logfiles[i], conf);
|
new SequenceFile.Reader(fs, logfiles[i], conf);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -305,10 +305,6 @@ HMasterRegionInterface, Runnable {
|
||||||
SortedMap<Text, byte[]> rowContent)
|
SortedMap<Text, byte[]> rowContent)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
boolean result = false;
|
boolean result = false;
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Checking " + parent.getRegionName() +
|
|
||||||
" to see if daughter splits still hold references");
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean hasReferencesA = hasReferences(metaRegionName, srvr,
|
boolean hasReferencesA = hasReferences(metaRegionName, srvr,
|
||||||
parent.getRegionName(), rowContent, COL_SPLITA);
|
parent.getRegionName(), rowContent, COL_SPLITA);
|
||||||
|
@ -318,7 +314,6 @@ HMasterRegionInterface, Runnable {
|
||||||
if (!hasReferencesA && !hasReferencesB) {
|
if (!hasReferencesA && !hasReferencesB) {
|
||||||
LOG.info("Deleting region " + parent.getRegionName() +
|
LOG.info("Deleting region " + parent.getRegionName() +
|
||||||
" because daughter splits no longer hold references");
|
" because daughter splits no longer hold references");
|
||||||
|
|
||||||
if (!HRegion.deleteRegion(fs, dir, parent.getRegionName())) {
|
if (!HRegion.deleteRegion(fs, dir, parent.getRegionName())) {
|
||||||
LOG.warn("Deletion of " + parent.getRegionName() + " failed");
|
LOG.warn("Deletion of " + parent.getRegionName() + " failed");
|
||||||
}
|
}
|
||||||
|
@ -330,11 +325,11 @@ HMasterRegionInterface, Runnable {
|
||||||
b.delete(lockid, COL_STARTCODE);
|
b.delete(lockid, COL_STARTCODE);
|
||||||
srvr.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
|
srvr.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
|
||||||
result = true;
|
result = true;
|
||||||
}
|
} else if (LOG.isDebugEnabled()) {
|
||||||
|
// If debug, note we checked and current state of daughters.
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Checked " + parent.getRegionName() +
|
||||||
LOG.debug("Done checking " + parent.getRegionName() + ": splitA: " +
|
" for references: splitA: " + hasReferencesA + ", splitB: "+
|
||||||
hasReferencesA + ", splitB: "+ hasReferencesB);
|
hasReferencesB);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -649,7 +649,7 @@ public class HRegion implements HConstants {
|
||||||
for (HStore store: stores.values()) {
|
for (HStore store: stores.values()) {
|
||||||
if (store.needsCompaction()) {
|
if (store.needsCompaction()) {
|
||||||
needsCompaction = true;
|
needsCompaction = true;
|
||||||
LOG.info(store.toString() + " needs compaction");
|
LOG.debug(store.toString() + " needs compaction");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1628,7 +1628,7 @@ public class HRegion implements HConstants {
|
||||||
/**
|
/**
|
||||||
* Computes the Path of the HRegion
|
* Computes the Path of the HRegion
|
||||||
*
|
*
|
||||||
* @param dir parent directory
|
* @param dir hbase home directory
|
||||||
* @param regionName name of the region
|
* @param regionName name of the region
|
||||||
* @return Path of HRegion directory
|
* @return Path of HRegion directory
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -62,8 +62,6 @@ import org.onelab.filter.RetouchedBloomFilter;
|
||||||
class HStore implements HConstants {
|
class HStore implements HConstants {
|
||||||
static final Log LOG = LogFactory.getLog(HStore.class);
|
static final Log LOG = LogFactory.getLog(HStore.class);
|
||||||
|
|
||||||
static final String COMPACTION_DIR = "compaction.tmp";
|
|
||||||
static final String WORKING_COMPACTION = "compaction.inprogress";
|
|
||||||
static final String COMPACTION_TO_REPLACE = "toreplace";
|
static final String COMPACTION_TO_REPLACE = "toreplace";
|
||||||
static final String COMPACTION_DONE = "done";
|
static final String COMPACTION_DONE = "done";
|
||||||
|
|
||||||
|
@ -77,11 +75,11 @@ class HStore implements HConstants {
|
||||||
FileSystem fs;
|
FileSystem fs;
|
||||||
Configuration conf;
|
Configuration conf;
|
||||||
Path mapdir;
|
Path mapdir;
|
||||||
Path compactdir;
|
|
||||||
Path loginfodir;
|
Path loginfodir;
|
||||||
Path filterDir;
|
Path filterDir;
|
||||||
Filter bloomFilter;
|
Filter bloomFilter;
|
||||||
private String storeName;
|
private String storeName;
|
||||||
|
private final Path compactionDir;
|
||||||
|
|
||||||
Integer compactLock = new Integer(0);
|
Integer compactLock = new Integer(0);
|
||||||
Integer flushLock = new Integer(0);
|
Integer flushLock = new Integer(0);
|
||||||
|
@ -133,6 +131,7 @@ class HStore implements HConstants {
|
||||||
FileSystem fs, Path reconstructionLog, Configuration conf)
|
FileSystem fs, Path reconstructionLog, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.dir = dir;
|
this.dir = dir;
|
||||||
|
this.compactionDir = new Path(dir, "compaction.dir");
|
||||||
this.regionName = regionName;
|
this.regionName = regionName;
|
||||||
this.family = family;
|
this.family = family;
|
||||||
this.familyName = HStoreKey.extractFamily(this.family.getName());
|
this.familyName = HStoreKey.extractFamily(this.family.getName());
|
||||||
|
@ -172,17 +171,6 @@ class HStore implements HConstants {
|
||||||
" (no reconstruction log)": " with reconstruction log: " +
|
" (no reconstruction log)": " with reconstruction log: " +
|
||||||
reconstructionLog.toString()));
|
reconstructionLog.toString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Either restart or get rid of any leftover compaction work. Either way,
|
|
||||||
// by the time processReadyCompaction() returns, we can get rid of the
|
|
||||||
// existing compaction-dir.
|
|
||||||
this.compactdir = new Path(dir, COMPACTION_DIR);
|
|
||||||
Path curCompactStore =
|
|
||||||
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
|
|
||||||
if(fs.exists(curCompactStore)) {
|
|
||||||
processReadyCompaction();
|
|
||||||
fs.delete(curCompactStore);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Go through the 'mapdir' and 'loginfodir' together, make sure that all
|
// Go through the 'mapdir' and 'loginfodir' together, make sure that all
|
||||||
// MapFiles are in a reliable state. Every entry in 'mapdir' must have a
|
// MapFiles are in a reliable state. Every entry in 'mapdir' must have a
|
||||||
|
@ -409,7 +397,7 @@ class HStore implements HConstants {
|
||||||
this.readers.clear();
|
this.readers.clear();
|
||||||
result = new Vector<HStoreFile>(storefiles.values());
|
result = new Vector<HStoreFile>(storefiles.values());
|
||||||
this.storefiles.clear();
|
this.storefiles.clear();
|
||||||
LOG.info("closed " + this.storeName);
|
LOG.debug("closed " + this.storeName);
|
||||||
return result;
|
return result;
|
||||||
} finally {
|
} finally {
|
||||||
this.lock.releaseWriteLock();
|
this.lock.releaseWriteLock();
|
||||||
|
@ -563,7 +551,7 @@ class HStore implements HConstants {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
compactHelper(deleteSequenceInfo, -1);
|
compactHelper(deleteSequenceInfo, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* @param deleteSequenceInfo True if we are to set the sequence number to -1
|
* @param deleteSequenceInfo True if we are to set the sequence number to -1
|
||||||
* on compacted file.
|
* on compacted file.
|
||||||
|
@ -577,23 +565,22 @@ class HStore implements HConstants {
|
||||||
long maxId = maxSeenSeqID;
|
long maxId = maxSeenSeqID;
|
||||||
synchronized(compactLock) {
|
synchronized(compactLock) {
|
||||||
Path curCompactStore =
|
Path curCompactStore =
|
||||||
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
|
HStoreFile.getHStoreDir(this.compactionDir, regionName, familyName);
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug("started compaction of " + storefiles.size() + " files in " +
|
LOG.debug("started compaction of " + storefiles.size() + " files in " +
|
||||||
curCompactStore.toString());
|
curCompactStore.toString());
|
||||||
}
|
}
|
||||||
try {
|
if (this.fs.exists(curCompactStore)) {
|
||||||
// Grab a list of files to compact.
|
LOG.warn("Cleaning up a previous incomplete compaction at " +
|
||||||
Vector<HStoreFile> toCompactFiles = null;
|
curCompactStore.toString());
|
||||||
this.lock.obtainWriteLock();
|
if (!this.fs.delete(curCompactStore)) {
|
||||||
try {
|
LOG.warn("Deleted returned false on " + curCompactStore.toString());
|
||||||
toCompactFiles = new Vector<HStoreFile>(storefiles.values());
|
|
||||||
} finally {
|
|
||||||
this.lock.releaseWriteLock();
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Vector<HStoreFile> toCompactFiles = getFilesToCompact();
|
||||||
HStoreFile compactedOutputFile =
|
HStoreFile compactedOutputFile =
|
||||||
new HStoreFile(conf, compactdir, regionName, familyName, -1);
|
new HStoreFile(conf, this.compactionDir, regionName, familyName, -1);
|
||||||
if (toCompactFiles.size() < 1 ||
|
if (toCompactFiles.size() < 1 ||
|
||||||
(toCompactFiles.size() == 1 &&
|
(toCompactFiles.size() == 1 &&
|
||||||
!toCompactFiles.get(0).isReference())) {
|
!toCompactFiles.get(0).isReference())) {
|
||||||
|
@ -606,7 +593,9 @@ class HStore implements HConstants {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
fs.mkdirs(curCompactStore);
|
if (!fs.mkdirs(curCompactStore)) {
|
||||||
|
LOG.warn("Mkdir on " + curCompactStore.toString() + " failed");
|
||||||
|
}
|
||||||
|
|
||||||
// Compute the max-sequenceID seen in any of the to-be-compacted
|
// Compute the max-sequenceID seen in any of the to-be-compacted
|
||||||
// TreeMaps if it hasn't been passed in to us.
|
// TreeMaps if it hasn't been passed in to us.
|
||||||
|
@ -657,13 +646,31 @@ class HStore implements HConstants {
|
||||||
// Move the compaction into place.
|
// Move the compaction into place.
|
||||||
processReadyCompaction();
|
processReadyCompaction();
|
||||||
} finally {
|
} finally {
|
||||||
if (fs.exists(compactdir)) {
|
// Clean up the parent -- the region dir in the compactions directory.
|
||||||
fs.delete(compactdir);
|
if (this.fs.exists(curCompactStore.getParent())) {
|
||||||
|
if (!this.fs.delete(curCompactStore.getParent())) {
|
||||||
|
LOG.warn("Delete returned false deleting " +
|
||||||
|
curCompactStore.getParent().toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @return list of files to compact
|
||||||
|
*/
|
||||||
|
private Vector<HStoreFile> getFilesToCompact() {
|
||||||
|
Vector<HStoreFile> toCompactFiles = null;
|
||||||
|
this.lock.obtainWriteLock();
|
||||||
|
try {
|
||||||
|
toCompactFiles = new Vector<HStoreFile>(storefiles.values());
|
||||||
|
} finally {
|
||||||
|
this.lock.releaseWriteLock();
|
||||||
|
}
|
||||||
|
return toCompactFiles;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>.
|
* Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>.
|
||||||
* We create a new set of MapFile.Reader objects so we don't screw up
|
* We create a new set of MapFile.Reader objects so we don't screw up
|
||||||
|
@ -886,33 +893,34 @@ class HStore implements HConstants {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/*
|
||||||
* It's assumed that the compactLock will be acquired prior to calling this
|
* It's assumed that the compactLock will be acquired prior to calling this
|
||||||
* method! Otherwise, it is not thread-safe!
|
* method! Otherwise, it is not thread-safe!
|
||||||
*
|
*
|
||||||
* It works by processing a compaction that's been written to disk.
|
* It works by processing a compaction that's been written to disk.
|
||||||
*
|
*
|
||||||
* It is usually invoked at the end of a compaction, but might also be
|
* <p>It is usually invoked at the end of a compaction, but might also be
|
||||||
* invoked at HStore startup, if the prior execution died midway through.
|
* invoked at HStore startup, if the prior execution died midway through.
|
||||||
|
*
|
||||||
|
* <p>Moving the compacted TreeMap into place means:
|
||||||
|
* <pre>
|
||||||
|
* 1) Acquiring the write-lock
|
||||||
|
* 2) Figuring out what MapFiles are going to be replaced
|
||||||
|
* 3) Moving the new compacted MapFile into place
|
||||||
|
* 4) Unloading all the replaced MapFiles.
|
||||||
|
* 5) Deleting all the old MapFile files.
|
||||||
|
* 6) Loading the new TreeMap.
|
||||||
|
* 7) Releasing the write-lock
|
||||||
|
* </pre>
|
||||||
*/
|
*/
|
||||||
void processReadyCompaction() throws IOException {
|
void processReadyCompaction() throws IOException {
|
||||||
// Move the compacted TreeMap into place.
|
|
||||||
// That means:
|
|
||||||
// 1) Acquiring the write-lock
|
|
||||||
// 2) Figuring out what MapFiles are going to be replaced
|
|
||||||
// 3) Unloading all the replaced MapFiles.
|
|
||||||
// 4) Deleting all the old MapFile files.
|
|
||||||
// 5) Moving the new MapFile into place
|
|
||||||
// 6) Loading the new TreeMap.
|
|
||||||
// 7) Releasing the write-lock
|
|
||||||
|
|
||||||
// 1. Acquiring the write-lock
|
// 1. Acquiring the write-lock
|
||||||
Path curCompactStore =
|
Path curCompactStore =
|
||||||
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
|
HStoreFile.getHStoreDir(this.compactionDir, regionName, familyName);
|
||||||
this.lock.obtainWriteLock();
|
this.lock.obtainWriteLock();
|
||||||
try {
|
try {
|
||||||
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
|
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
|
||||||
if(!fs.exists(doneFile)) {
|
if (!fs.exists(doneFile)) {
|
||||||
// The last execution didn't finish the compaction, so there's nothing
|
// The last execution didn't finish the compaction, so there's nothing
|
||||||
// we can do. We'll just have to redo it. Abandon it and return.
|
// we can do. We'll just have to redo it. Abandon it and return.
|
||||||
LOG.warn("Redoing a failed compaction");
|
LOG.warn("Redoing a failed compaction");
|
||||||
|
@ -920,7 +928,6 @@ class HStore implements HConstants {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Load in the files to be deleted.
|
// 2. Load in the files to be deleted.
|
||||||
// (Figuring out what MapFiles are going to be replaced)
|
|
||||||
Vector<HStoreFile> toCompactFiles = new Vector<HStoreFile>();
|
Vector<HStoreFile> toCompactFiles = new Vector<HStoreFile>();
|
||||||
Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
|
Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
|
||||||
DataInputStream in = new DataInputStream(fs.open(filesToReplace));
|
DataInputStream in = new DataInputStream(fs.open(filesToReplace));
|
||||||
|
@ -936,41 +943,16 @@ class HStore implements HConstants {
|
||||||
in.close();
|
in.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Unload all the replaced MapFiles. Do it by getting keys of all
|
// 3. Moving the new MapFile into place.
|
||||||
// to remove. Then cycling on keys, removing, closing and deleting.
|
|
||||||
|
|
||||||
// What if we crash at this point? No big deal; we will restart
|
|
||||||
// processReadyCompaction(), and nothing has been lost.
|
|
||||||
Vector<Long> keys = new Vector<Long>(toCompactFiles.size());
|
|
||||||
for(Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
|
|
||||||
if(toCompactFiles.contains(e.getValue())) {
|
|
||||||
keys.add(e.getKey());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Vector<HStoreFile> toDelete = new Vector<HStoreFile>(keys.size());
|
|
||||||
for (Long key: keys) {
|
|
||||||
MapFile.Reader reader = this.readers.remove(key);
|
|
||||||
if (reader != null) {
|
|
||||||
reader.close();
|
|
||||||
}
|
|
||||||
HStoreFile hsf = this.storefiles.remove(key);
|
|
||||||
// 4. Add to the toDelete files all old files, no longer needed
|
|
||||||
toDelete.add(hsf);
|
|
||||||
}
|
|
||||||
|
|
||||||
// What if we fail now? The above deletes will fail silently. We'd
|
|
||||||
// better make sure not to write out any new files with the same names as
|
|
||||||
// something we delete, though.
|
|
||||||
|
|
||||||
// 5. Moving the new MapFile into place
|
|
||||||
HStoreFile compactedFile
|
HStoreFile compactedFile
|
||||||
= new HStoreFile(conf, compactdir, regionName, familyName, -1);
|
= new HStoreFile(conf, this.compactionDir, regionName, familyName, -1);
|
||||||
|
// obtainNewHStoreFile does its best to generate a filename that does not
|
||||||
|
// currently exist.
|
||||||
HStoreFile finalCompactedFile
|
HStoreFile finalCompactedFile
|
||||||
= HStoreFile.obtainNewHStoreFile(conf, dir, regionName, familyName, fs);
|
= HStoreFile.obtainNewHStoreFile(conf, dir, regionName, familyName, fs);
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug("moving " + compactedFile.toString() + " in " +
|
LOG.debug("moving " + compactedFile.toString() + " in " +
|
||||||
compactdir.toString() +
|
this.compactionDir.toString() +
|
||||||
" to " + finalCompactedFile.toString() + " in " + dir.toString());
|
" to " + finalCompactedFile.toString() + " in " + dir.toString());
|
||||||
}
|
}
|
||||||
if (!compactedFile.rename(this.fs, finalCompactedFile)) {
|
if (!compactedFile.rename(this.fs, finalCompactedFile)) {
|
||||||
|
@ -978,24 +960,37 @@ class HStore implements HConstants {
|
||||||
finalCompactedFile.toString());
|
finalCompactedFile.toString());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Safe to delete now compaction has been moved into place.
|
// 4. and 5. Unload all the replaced MapFiles, close and delete.
|
||||||
for (HStoreFile hsf: toDelete) {
|
Vector<Long> toDelete = new Vector<Long>(toCompactFiles.size());
|
||||||
if (hsf.getFileId() == finalCompactedFile.getFileId()) {
|
for (Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
|
||||||
// Be careful we do not delte the just compacted file.
|
if (!toCompactFiles.contains(e.getValue())) {
|
||||||
LOG.warn("Weird. File to delete has same name as one we are " +
|
|
||||||
"about to delete (skipping): " + hsf.getFileId());
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
hsf.delete();
|
Long key = e.getKey();
|
||||||
|
MapFile.Reader reader = this.readers.remove(key);
|
||||||
|
if (reader != null) {
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
toDelete.add(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (Long key: toDelete) {
|
||||||
|
HStoreFile hsf = this.storefiles.remove(key);
|
||||||
|
hsf.delete();
|
||||||
|
}
|
||||||
|
|
||||||
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
|
// 6. Loading the new TreeMap.
|
||||||
|
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
|
||||||
// 6. Loading the new TreeMap.
|
this.readers.put(orderVal,
|
||||||
this.readers.put(orderVal,
|
finalCompactedFile.getReader(this.fs, this.bloomFilter));
|
||||||
finalCompactedFile.getReader(this.fs, this.bloomFilter));
|
this.storefiles.put(orderVal, finalCompactedFile);
|
||||||
this.storefiles.put(orderVal, finalCompactedFile);
|
} finally {
|
||||||
|
LOG.warn("Failed replacing compacted files. Compacted fle is " +
|
||||||
|
finalCompactedFile.toString() + ". Files replaced are " +
|
||||||
|
toCompactFiles.toString() + " some of which may have been removed");
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
// 7. Releasing the write-lock
|
// 7. Releasing the write-lock
|
||||||
this.lock.releaseWriteLock();
|
this.lock.releaseWriteLock();
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.io.DataInput;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -608,7 +609,6 @@ public class HStoreFile implements HConstants, WritableComparable {
|
||||||
try {
|
try {
|
||||||
out.writeByte(INFO_SEQ_NUM);
|
out.writeByte(INFO_SEQ_NUM);
|
||||||
out.writeLong(infonum);
|
out.writeLong(infonum);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
out.close();
|
out.close();
|
||||||
}
|
}
|
||||||
|
@ -637,16 +637,22 @@ public class HStoreFile implements HConstants, WritableComparable {
|
||||||
*/
|
*/
|
||||||
public boolean rename(final FileSystem fs, final HStoreFile hsf)
|
public boolean rename(final FileSystem fs, final HStoreFile hsf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
boolean success = fs.rename(getMapFilePath(), hsf.getMapFilePath());
|
Path src = getMapFilePath();
|
||||||
|
if (!fs.exists(src)) {
|
||||||
|
throw new FileNotFoundException(src.toString());
|
||||||
|
}
|
||||||
|
boolean success = fs.rename(src, hsf.getMapFilePath());
|
||||||
if (!success) {
|
if (!success) {
|
||||||
LOG.warn("Failed rename of " + getMapFilePath() + " to " +
|
LOG.warn("Failed rename of " + src + " to " + hsf.getMapFilePath());
|
||||||
hsf.getMapFilePath());
|
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
success = fs.rename(getInfoFilePath(), hsf.getInfoFilePath());
|
src = getInfoFilePath();
|
||||||
|
if (!fs.exists(src)) {
|
||||||
|
throw new FileNotFoundException(src.toString());
|
||||||
|
}
|
||||||
|
success = fs.rename(src, hsf.getInfoFilePath());
|
||||||
if (!success) {
|
if (!success) {
|
||||||
LOG.warn("Failed rename of " + getInfoFilePath() + " to " +
|
LOG.warn("Failed rename of " + src + " to " + hsf.getInfoFilePath());
|
||||||
hsf.getInfoFilePath());
|
|
||||||
}
|
}
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue