HADOOP-2357 Compaction cleanup; less deleting + prevent possible file leaks

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@601383 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2007-12-05 16:06:25 +00:00
parent afeed7bfc6
commit 0ce1829c73
5 changed files with 155 additions and 153 deletions

View File

@ -91,6 +91,7 @@ Trunk (unreleased changes)
(Edward Yoon via Stack) (Edward Yoon via Stack)
HADOOP-2299 Support inclusive scans (Bryan Duxbury via Stack) HADOOP-2299 Support inclusive scans (Bryan Duxbury via Stack)
HADOOP-2333 Client side retries happen at the wrong level HADOOP-2333 Client side retries happen at the wrong level
HADOOP-2357 Compaction cleanup; less deleting + prevent possible file leaks
Release 0.15.1 Release 0.15.1

View File

@ -151,11 +151,12 @@ public class HLog implements HConstants {
try { try {
for (int i = 0; i < logfiles.length; i++) { for (int i = 0; i < logfiles.length; i++) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Splitting " + logfiles[i]); LOG.debug("Splitting " + i + " of " + logfiles.length + ": " +
logfiles[i]);
} }
// Check for empty file. // Check for empty file.
if (fs.getFileStatus(logfiles[i]).getLen() <= 0) { if (fs.getFileStatus(logfiles[i]).getLen() <= 0) {
LOG.warn("Skipping " + logfiles[i].toString() + LOG.info("Skipping " + logfiles[i].toString() +
" because zero length"); " because zero length");
continue; continue;
} }
@ -164,26 +165,29 @@ public class HLog implements HConstants {
try { try {
HLogKey key = new HLogKey(); HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit(); HLogEdit val = new HLogEdit();
while (in.next(key, val)) { int count = 0;
for (; in.next(key, val); count++) {
Text regionName = key.getRegionName(); Text regionName = key.getRegionName();
SequenceFile.Writer w = logWriters.get(regionName); SequenceFile.Writer w = logWriters.get(regionName);
if (w == null) { if (w == null) {
Path logfile = new Path(HRegion.getRegionDir(rootDir, Path logfile = new Path(HRegion.getRegionDir(rootDir,
HRegionInfo.encodeRegionName(regionName)), HRegionInfo.encodeRegionName(regionName)),
HREGION_OLDLOGFILE_NAME); HREGION_OLDLOGFILE_NAME);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("getting new log file writer for path " + logfile); LOG.debug("Creating new log file writer for path " + logfile);
} }
w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class, w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
HLogEdit.class); HLogEdit.class);
logWriters.put(regionName, w); logWriters.put(regionName, w);
} }
if (LOG.isDebugEnabled()) { if (count % 100 == 0 && count > 0 && LOG.isDebugEnabled()) {
LOG.debug("Edit " + key.toString() + "=" + val.toString()); LOG.debug("Applied " + count + " edits");
} }
w.append(key, val); w.append(key, val);
} }
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + count + " total edits");
}
} finally { } finally {
in.close(); in.close();
} }

View File

@ -245,9 +245,9 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
String serverName = Writables.bytesToString(results.get(COL_SERVER)); String serverName = Writables.bytesToString(results.get(COL_SERVER));
long startCode = Writables.bytesToLong(results.get(COL_STARTCODE)); long startCode = Writables.bytesToLong(results.get(COL_STARTCODE));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + " scanner: " + LOG.debug(Thread.currentThread().getName() + " regioninfo: {" +
Long.valueOf(scannerId) + " regioninfo: {" + info.toString() + info.toString() + "}, server: " + serverName + ", startCode: " +
"}, server: " + serverName + ", startCode: " + startCode); startCode);
} }
// Note Region has been assigned. // Note Region has been assigned.
@ -447,9 +447,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
storedInfo = serversToServerInfo.get(serverName); storedInfo = serversToServerInfo.get(serverName);
deadServer = deadServers.contains(serverName); deadServer = deadServers.contains(serverName);
} }
if (LOG.isDebugEnabled()) {
LOG.debug("Checking " + info.getRegionName() + " is assigned");
}
/* /*
* If the server is not dead and either: * If the server is not dead and either:
* the stored info is not null and the start code does not match * the stored info is not null and the start code does not match

View File

@ -476,7 +476,9 @@ public class HRegion implements HConstants {
return this.conf; return this.conf;
} }
/** @return region directory Path */ /** @return region directory Path
* @see HRegion#getRegionDir(Path, String)
*/
public Path getRegionDir() { public Path getRegionDir() {
return this.regiondir; return this.regiondir;
} }
@ -878,11 +880,6 @@ public class HRegion implements HConstants {
*/ */
private boolean internalFlushcache(long startTime) throws IOException { private boolean internalFlushcache(long startTime) throws IOException {
if (startTime == -1) { if (startTime == -1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not flushing cache for region " +
regionInfo.getRegionName() +
": snapshotMemcaches() determined that there was nothing to do");
}
return false; return false;
} }
@ -1633,13 +1630,17 @@ public class HRegion implements HConstants {
* *
* @param fs the file system object * @param fs the file system object
* @param baseDirectory base directory for HBase * @param baseDirectory base directory for HBase
* @param name region file name * @param name region file name ENCODED!
* @throws IOException * @throws IOException
* @return True if deleted. * @return True if deleted.
* @see HRegionInfo#encodeRegionName(Text)
*/ */
static boolean deleteRegion(FileSystem fs, Path baseDirectory, String name) static boolean deleteRegion(FileSystem fs, Path baseDirectory, String name)
throws IOException { throws IOException {
Path p = HRegion.getRegionDir(fs.makeQualified(baseDirectory), name); Path p = HRegion.getRegionDir(fs.makeQualified(baseDirectory), name);
if (LOG.isDebugEnabled()) {
LOG.debug("DELETING region " + p.toString());
}
return fs.delete(p); return fs.delete(p);
} }
@ -1647,8 +1648,9 @@ public class HRegion implements HConstants {
* Computes the Path of the HRegion * Computes the Path of the HRegion
* *
* @param dir hbase home directory * @param dir hbase home directory
* @param name region file name * @param name region file name ENCODED!
* @return Path of HRegion directory * @return Path of HRegion directory
* @see HRegionInfo#encodeRegionName(Text)
*/ */
public static Path getRegionDir(final Path dir, final String name) { public static Path getRegionDir(final Path dir, final String name) {
return new Path(dir, new Path(HREGIONDIR_PREFIX + name)); return new Path(dir, new Path(HREGIONDIR_PREFIX + name));

View File

@ -542,7 +542,8 @@ class HStore implements HConstants {
HBaseConfiguration conf) throws IOException { HBaseConfiguration conf) throws IOException {
this.dir = dir; this.dir = dir;
this.compactionDir = new Path(dir, "compaction.dir"); this.compactionDir = new Path(HRegion.getRegionDir(dir, encodedName),
"compaction.dir");
this.regionName = regionName; this.regionName = regionName;
this.encodedRegionName = encodedName; this.encodedRegionName = encodedName;
this.family = family; this.family = family;
@ -603,16 +604,7 @@ class HStore implements HConstants {
// means it was built prior to the previous run of HStore, and so it cannot // means it was built prior to the previous run of HStore, and so it cannot
// contain any updates also contained in the log. // contain any updates also contained in the log.
long maxSeqID = -1; this.maxSeqId = getMaxSequenceId(hstoreFiles);
for (HStoreFile hsf: hstoreFiles) {
long seqid = hsf.loadInfo(fs);
if(seqid > 0) {
if(seqid > maxSeqID) {
maxSeqID = seqid;
}
}
}
this.maxSeqId = maxSeqID;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("maximum sequence id for hstore " + storeName + " is " + LOG.debug("maximum sequence id for hstore " + storeName + " is " +
this.maxSeqId); this.maxSeqId);
@ -641,6 +633,25 @@ class HStore implements HConstants {
} }
} }
/*
* @param hstoreFiles
* @return Maximum sequence number found or -1.
* @throws IOException
*/
private long getMaxSequenceId(final List<HStoreFile> hstoreFiles)
throws IOException {
long maxSeqID = -1;
for (HStoreFile hsf : hstoreFiles) {
long seqid = hsf.loadInfo(fs);
if (seqid > 0) {
if (seqid > maxSeqID) {
maxSeqID = seqid;
}
}
}
return maxSeqID;
}
long getMaxSequenceId() { long getMaxSequenceId() {
return this.maxSeqId; return this.maxSeqId;
} }
@ -670,16 +681,17 @@ class HStore implements HConstants {
try { try {
HLogKey key = new HLogKey(); HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit(); HLogEdit val = new HLogEdit();
long skippedEdits = 0;
while (login.next(key, val)) { while (login.next(key, val)) {
maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum()); maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
if (key.getLogSeqNum() <= maxSeqID) { if (key.getLogSeqNum() <= maxSeqID) {
if (LOG.isDebugEnabled()) { skippedEdits++;
LOG.debug("Skipping edit <" + key.toString() + "=" +
val.toString() + "> key sequence: " + key.getLogSeqNum() +
" max sequence: " + maxSeqID);
}
continue; continue;
} }
if (skippedEdits > 0 && LOG.isDebugEnabled()) {
LOG.debug("Skipped " + skippedEdits +
" edits because sequence id <= " + maxSeqID);
}
// Check this edit is for me. Also, guard against writing // Check this edit is for me. Also, guard against writing
// METACOLUMN info such as HBASE::CACHEFLUSH entries // METACOLUMN info such as HBASE::CACHEFLUSH entries
Text column = val.getColumn(); Text column = val.getColumn();
@ -977,119 +989,88 @@ class HStore implements HConstants {
* @return true if compaction completed successfully * @return true if compaction completed successfully
*/ */
boolean compact() throws IOException { boolean compact() throws IOException {
long maxId = -1;
synchronized (compactLock) { synchronized (compactLock) {
Path curCompactStore = HStoreFile.getHStoreDir(this.compactionDir, Path curCompactStore = getCompactionDir();
encodedRegionName, familyName);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("started compaction of " + storefiles.size() + " files in " + LOG.debug("started compaction of " + storefiles.size() +
curCompactStore.toString()); " files using " + curCompactStore.toString());
} }
if (this.fs.exists(curCompactStore)) { if (this.fs.exists(curCompactStore)) {
LOG.warn("Cleaning up a previous incomplete compaction at " + // Clean out its content in prep. for this new compaction. Has either
curCompactStore.toString()); // aborted previous compaction or it has content of a previous
if (!this.fs.delete(curCompactStore)) { // compaction.
LOG.warn("Deleted returned false on " + curCompactStore.toString()); Path [] toRemove = this.fs.listPaths(new Path [] {curCompactStore});
for (int i = 0; i < toRemove.length; i++) {
this.fs.delete(toRemove[i]);
} }
} }
// Storefiles are keyed by sequence id. The oldest file comes first.
// We need to return out of here a List that has the newest file first.
List<HStoreFile> filesToCompact =
new ArrayList<HStoreFile>(this.storefiles.values());
Collections.reverse(filesToCompact);
if (filesToCompact.size() < 1 ||
(filesToCompact.size() == 1 && !filesToCompact.get(0).isReference())) {
if (LOG.isDebugEnabled()) {
LOG.debug("nothing to compact for " + this.storeName);
}
return false;
}
if (!fs.exists(curCompactStore) && !fs.mkdirs(curCompactStore)) {
LOG.warn("Mkdir on " + curCompactStore.toString() + " failed");
return false;
}
// Step through them, writing to the brand-new TreeMap
HStoreFile compactedOutputFile = new HStoreFile(conf, this.compactionDir,
encodedRegionName, familyName, -1);
MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs,
this.compression, this.bloomFilter);
try { try {
// Storefiles are keyed by sequence id. The oldest file comes first. compactHStoreFiles(compactedOut, filesToCompact);
// We need to return out of here a List that has the newest file as
// first.
List<HStoreFile> filesToCompact =
new ArrayList<HStoreFile>(this.storefiles.values());
Collections.reverse(filesToCompact);
HStoreFile compactedOutputFile = new HStoreFile(conf,
this.compactionDir, encodedRegionName, familyName, -1);
if (filesToCompact.size() < 1 ||
(filesToCompact.size() == 1 &&
!filesToCompact.get(0).isReference())) {
if (LOG.isDebugEnabled()) {
LOG.debug("nothing to compact for " + this.storeName);
}
return false;
}
if (!fs.mkdirs(curCompactStore)) {
LOG.warn("Mkdir on " + curCompactStore.toString() + " failed");
}
// Compute the max-sequenceID seen in any of the to-be-compacted
// TreeMaps if it hasn't been passed in to us.
if (maxId == -1) {
for (HStoreFile hsf: filesToCompact) {
long seqid = hsf.loadInfo(fs);
if (seqid > 0) {
if (seqid > maxId) {
maxId = seqid;
}
}
}
}
// Step through them, writing to the brand-new TreeMap
MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs,
this.compression, this.bloomFilter);
try {
compactHStoreFiles(compactedOut, filesToCompact);
} finally {
compactedOut.close();
}
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
if (maxId >= 0) {
compactedOutputFile.writeInfo(fs, maxId);
} else {
compactedOutputFile.writeInfo(fs, -1);
}
// Write out a list of data files that we're replacing
Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
try {
out.writeInt(filesToCompact.size());
for (HStoreFile hsf: filesToCompact) {
hsf.write(out);
}
} finally {
out.close();
}
// Indicate that we're done.
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
(new DataOutputStream(fs.create(doneFile))).close();
// Move the compaction into place.
completeCompaction();
return true;
} finally { } finally {
// Clean up the parent -- the region dir in the compactions directory. compactedOut.close();
if (this.fs.exists(curCompactStore.getParent())) {
if (!this.fs.delete(curCompactStore.getParent())) {
LOG.warn("Delete returned false deleting " +
curCompactStore.getParent().toString());
}
}
} }
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
// Compute max-sequenceID seen in any of the to-be-compacted TreeMaps.
long maxId = getMaxSequenceId(filesToCompact);
compactedOutputFile.writeInfo(fs, maxId);
// Write out a list of data files that we're replacing
Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
try {
out.writeInt(filesToCompact.size());
for (HStoreFile hsf : filesToCompact) {
hsf.write(out);
}
} finally {
out.close();
}
// Indicate that we're done.
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
(new DataOutputStream(fs.create(doneFile))).close();
// Move the compaction into place.
completeCompaction(curCompactStore);
return true;
} }
} }
/* /*
* Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>. * Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>.
* We create a new set of MapFile.Reader objects so we don't screw up * We create a new set of MapFile.Reader objects so we don't screw up the
* the caching associated with the currently-loaded ones. Our * caching associated with the currently-loaded ones. Our iteration-based
* iteration-based access pattern is practically designed to ruin * access pattern is practically designed to ruin the cache.
* the cache.
* *
* We work by opening a single MapFile.Reader for each file, and * We work by opening a single MapFile.Reader for each file, and iterating
* iterating through them in parallel. We always increment the * through them in parallel. We always increment the lowest-ranked one.
* lowest-ranked one. Updates to a single row/column will appear * Updates to a single row/column will appear ranked by timestamp. This allows
* ranked by timestamp. This allows us to throw out deleted values or * us to throw out deleted values or obsolete versions. @param compactedOut
* obsolete versions. * @param toCompactFiles @throws IOException
* @param compactedOut
* @param toCompactFiles
* @throws IOException
*/ */
private void compactHStoreFiles(final MapFile.Writer compactedOut, private void compactHStoreFiles(final MapFile.Writer compactedOut,
final List<HStoreFile> toCompactFiles) throws IOException { final List<HStoreFile> toCompactFiles) throws IOException {
@ -1107,6 +1088,7 @@ class HStore implements HConstants {
// culprit. // culprit.
LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() + LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() +
(hsf.isReference()? " " + hsf.getReference().toString(): "")); (hsf.isReference()? " " + hsf.getReference().toString(): ""));
closeCompactionReaders(rdrs);
throw e; throw e;
} }
} }
@ -1195,13 +1177,17 @@ class HStore implements HConstants {
} }
} }
} finally { } finally {
for (int i = 0; i < rdrs.length; i++) { closeCompactionReaders(rdrs);
if (rdrs[i] != null) { }
try { }
rdrs[i].close();
} catch (IOException e) { private void closeCompactionReaders(final CompactionReader [] rdrs) {
LOG.warn("Exception closing reader", e); for (int i = 0; i < rdrs.length; i++) {
} if (rdrs[i] != null) {
try {
rdrs[i].close();
} catch (IOException e) {
LOG.warn("Exception closing reader", e);
} }
} }
} }
@ -1326,11 +1312,11 @@ class HStore implements HConstants {
* 8) Releasing the write-lock * 8) Releasing the write-lock
* 9) Allow new scanners to proceed. * 9) Allow new scanners to proceed.
* </pre> * </pre>
*
* @param curCompactStore Compaction to complete.
*/ */
private void completeCompaction() throws IOException { private void completeCompaction(final Path curCompactStore)
Path curCompactStore = HStoreFile.getHStoreDir(this.compactionDir, throws IOException {
encodedRegionName, familyName);
// 1. Wait for active scanners to exit // 1. Wait for active scanners to exit
newScannerLock.writeLock().lock(); // prevent new scanners newScannerLock.writeLock().lock(); // prevent new scanners
try { try {
@ -1346,6 +1332,7 @@ class HStore implements HConstants {
// 2. Acquiring the HStore write-lock // 2. Acquiring the HStore write-lock
this.lock.writeLock().lock(); this.lock.writeLock().lock();
} }
try { try {
Path doneFile = new Path(curCompactStore, COMPACTION_DONE); Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
if (!fs.exists(doneFile)) { if (!fs.exists(doneFile)) {
@ -1366,7 +1353,6 @@ class HStore implements HConstants {
hsf.readFields(in); hsf.readFields(in);
toCompactFiles.add(hsf); toCompactFiles.add(hsf);
} }
} finally { } finally {
in.close(); in.close();
} }
@ -1412,13 +1398,13 @@ class HStore implements HConstants {
// 7. Loading the new TreeMap. // 7. Loading the new TreeMap.
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
this.readers.put(orderVal, this.readers.put(orderVal,
finalCompactedFile.getReader(this.fs, this.bloomFilter)); finalCompactedFile.getReader(this.fs, this.bloomFilter));
this.storefiles.put(orderVal, finalCompactedFile); this.storefiles.put(orderVal, finalCompactedFile);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed replacing compacted files. Compacted file is " + LOG.error("Failed replacing compacted files. Compacted file is " +
finalCompactedFile.toString() + ". Files replaced are " + finalCompactedFile.toString() + ". Files replaced are " +
toCompactFiles.toString() + toCompactFiles.toString() +
" some of which may have been already removed", e); " some of which may have been already removed", e);
} }
} finally { } finally {
// 8. Releasing the write-lock // 8. Releasing the write-lock
@ -1479,6 +1465,17 @@ class HStore implements HConstants {
} }
} }
/*
* @return Path to the compaction directory for this column family.
* Compaction dir is a subdirectory of the region. Needs to have the
* same regiondir/storefamily path prefix; HStoreFile constructor presumes
* it (TODO: Fix).
*/
private Path getCompactionDir() {
return HStoreFile.getHStoreDir(this.compactionDir,
this.encodedRegionName, this.familyName);
}
private MapFile.Reader [] getReaders() { private MapFile.Reader [] getReaders() {
return this.readers.values(). return this.readers.values().
toArray(new MapFile.Reader[this.readers.size()]); toArray(new MapFile.Reader[this.readers.size()]);