From 0ce1829c73fd8ae0e37717d0ece2edb92870b7ec Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 5 Dec 2007 16:06:25 +0000 Subject: [PATCH] HADOOP-2357 Compaction cleanup; less deleting + prevent possible file leaks git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@601383 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + src/java/org/apache/hadoop/hbase/HLog.java | 18 +- src/java/org/apache/hadoop/hbase/HMaster.java | 10 +- src/java/org/apache/hadoop/hbase/HRegion.java | 18 +- src/java/org/apache/hadoop/hbase/HStore.java | 261 +++++++++--------- 5 files changed, 155 insertions(+), 153 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 65e83209ee1..aca49caf60b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -91,6 +91,7 @@ Trunk (unreleased changes) (Edward Yoon via Stack) HADOOP-2299 Support inclusive scans (Bryan Duxbury via Stack) HADOOP-2333 Client side retries happen at the wrong level + HADOOP-2357 Compaction cleanup; less deleting + prevent possible file leaks Release 0.15.1 diff --git a/src/java/org/apache/hadoop/hbase/HLog.java b/src/java/org/apache/hadoop/hbase/HLog.java index f59a694c0b4..ec072a8b2db 100644 --- a/src/java/org/apache/hadoop/hbase/HLog.java +++ b/src/java/org/apache/hadoop/hbase/HLog.java @@ -151,11 +151,12 @@ public class HLog implements HConstants { try { for (int i = 0; i < logfiles.length; i++) { if (LOG.isDebugEnabled()) { - LOG.debug("Splitting " + logfiles[i]); + LOG.debug("Splitting " + i + " of " + logfiles.length + ": " + + logfiles[i]); } // Check for empty file. if (fs.getFileStatus(logfiles[i]).getLen() <= 0) { - LOG.warn("Skipping " + logfiles[i].toString() + + LOG.info("Skipping " + logfiles[i].toString() + " because zero length"); continue; } @@ -164,26 +165,29 @@ public class HLog implements HConstants { try { HLogKey key = new HLogKey(); HLogEdit val = new HLogEdit(); - while (in.next(key, val)) { + int count = 0; + for (; in.next(key, val); count++) { Text regionName = key.getRegionName(); SequenceFile.Writer w = logWriters.get(regionName); if (w == null) { Path logfile = new Path(HRegion.getRegionDir(rootDir, HRegionInfo.encodeRegionName(regionName)), HREGION_OLDLOGFILE_NAME); - if (LOG.isDebugEnabled()) { - LOG.debug("getting new log file writer for path " + logfile); + LOG.debug("Creating new log file writer for path " + logfile); } w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class, HLogEdit.class); logWriters.put(regionName, w); } - if (LOG.isDebugEnabled()) { - LOG.debug("Edit " + key.toString() + "=" + val.toString()); + if (count % 100 == 0 && count > 0 && LOG.isDebugEnabled()) { + LOG.debug("Applied " + count + " edits"); } w.append(key, val); } + if (LOG.isDebugEnabled()) { + LOG.debug("Applied " + count + " total edits"); + } } finally { in.close(); } diff --git a/src/java/org/apache/hadoop/hbase/HMaster.java b/src/java/org/apache/hadoop/hbase/HMaster.java index 08b8cd3989e..b13668945af 100644 --- a/src/java/org/apache/hadoop/hbase/HMaster.java +++ b/src/java/org/apache/hadoop/hbase/HMaster.java @@ -245,9 +245,9 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, String serverName = Writables.bytesToString(results.get(COL_SERVER)); long startCode = Writables.bytesToLong(results.get(COL_STARTCODE)); if (LOG.isDebugEnabled()) { - LOG.debug(Thread.currentThread().getName() + " scanner: " + - Long.valueOf(scannerId) + " regioninfo: {" + info.toString() + - "}, server: " + serverName + ", startCode: " + startCode); + LOG.debug(Thread.currentThread().getName() + " regioninfo: {" + + info.toString() + "}, server: " + serverName + ", startCode: " + + startCode); } // Note Region has been assigned. @@ -447,9 +447,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, storedInfo = serversToServerInfo.get(serverName); deadServer = deadServers.contains(serverName); } - if (LOG.isDebugEnabled()) { - LOG.debug("Checking " + info.getRegionName() + " is assigned"); - } + /* * If the server is not dead and either: * the stored info is not null and the start code does not match diff --git a/src/java/org/apache/hadoop/hbase/HRegion.java b/src/java/org/apache/hadoop/hbase/HRegion.java index 5ef0a1e1ce8..be3a2da12a8 100644 --- a/src/java/org/apache/hadoop/hbase/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/HRegion.java @@ -476,7 +476,9 @@ public class HRegion implements HConstants { return this.conf; } - /** @return region directory Path */ + /** @return region directory Path + * @see HRegion#getRegionDir(Path, String) + */ public Path getRegionDir() { return this.regiondir; } @@ -878,11 +880,6 @@ public class HRegion implements HConstants { */ private boolean internalFlushcache(long startTime) throws IOException { if (startTime == -1) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not flushing cache for region " + - regionInfo.getRegionName() + - ": snapshotMemcaches() determined that there was nothing to do"); - } return false; } @@ -1633,13 +1630,17 @@ public class HRegion implements HConstants { * * @param fs the file system object * @param baseDirectory base directory for HBase - * @param name region file name + * @param name region file name ENCODED! * @throws IOException * @return True if deleted. + * @see HRegionInfo#encodeRegionName(Text) */ static boolean deleteRegion(FileSystem fs, Path baseDirectory, String name) throws IOException { Path p = HRegion.getRegionDir(fs.makeQualified(baseDirectory), name); + if (LOG.isDebugEnabled()) { + LOG.debug("DELETING region " + p.toString()); + } return fs.delete(p); } @@ -1647,8 +1648,9 @@ public class HRegion implements HConstants { * Computes the Path of the HRegion * * @param dir hbase home directory - * @param name region file name + * @param name region file name ENCODED! * @return Path of HRegion directory + * @see HRegionInfo#encodeRegionName(Text) */ public static Path getRegionDir(final Path dir, final String name) { return new Path(dir, new Path(HREGIONDIR_PREFIX + name)); diff --git a/src/java/org/apache/hadoop/hbase/HStore.java b/src/java/org/apache/hadoop/hbase/HStore.java index d2e151d988b..af3981d3345 100644 --- a/src/java/org/apache/hadoop/hbase/HStore.java +++ b/src/java/org/apache/hadoop/hbase/HStore.java @@ -542,7 +542,8 @@ class HStore implements HConstants { HBaseConfiguration conf) throws IOException { this.dir = dir; - this.compactionDir = new Path(dir, "compaction.dir"); + this.compactionDir = new Path(HRegion.getRegionDir(dir, encodedName), + "compaction.dir"); this.regionName = regionName; this.encodedRegionName = encodedName; this.family = family; @@ -603,16 +604,7 @@ class HStore implements HConstants { // means it was built prior to the previous run of HStore, and so it cannot // contain any updates also contained in the log. - long maxSeqID = -1; - for (HStoreFile hsf: hstoreFiles) { - long seqid = hsf.loadInfo(fs); - if(seqid > 0) { - if(seqid > maxSeqID) { - maxSeqID = seqid; - } - } - } - this.maxSeqId = maxSeqID; + this.maxSeqId = getMaxSequenceId(hstoreFiles); if (LOG.isDebugEnabled()) { LOG.debug("maximum sequence id for hstore " + storeName + " is " + this.maxSeqId); @@ -641,6 +633,25 @@ class HStore implements HConstants { } } + /* + * @param hstoreFiles + * @return Maximum sequence number found or -1. + * @throws IOException + */ + private long getMaxSequenceId(final List hstoreFiles) + throws IOException { + long maxSeqID = -1; + for (HStoreFile hsf : hstoreFiles) { + long seqid = hsf.loadInfo(fs); + if (seqid > 0) { + if (seqid > maxSeqID) { + maxSeqID = seqid; + } + } + } + return maxSeqID; + } + long getMaxSequenceId() { return this.maxSeqId; } @@ -670,16 +681,17 @@ class HStore implements HConstants { try { HLogKey key = new HLogKey(); HLogEdit val = new HLogEdit(); + long skippedEdits = 0; while (login.next(key, val)) { maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum()); if (key.getLogSeqNum() <= maxSeqID) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping edit <" + key.toString() + "=" + - val.toString() + "> key sequence: " + key.getLogSeqNum() + - " max sequence: " + maxSeqID); - } + skippedEdits++; continue; } + if (skippedEdits > 0 && LOG.isDebugEnabled()) { + LOG.debug("Skipped " + skippedEdits + + " edits because sequence id <= " + maxSeqID); + } // Check this edit is for me. Also, guard against writing // METACOLUMN info such as HBASE::CACHEFLUSH entries Text column = val.getColumn(); @@ -977,119 +989,88 @@ class HStore implements HConstants { * @return true if compaction completed successfully */ boolean compact() throws IOException { - long maxId = -1; synchronized (compactLock) { - Path curCompactStore = HStoreFile.getHStoreDir(this.compactionDir, - encodedRegionName, familyName); + Path curCompactStore = getCompactionDir(); if (LOG.isDebugEnabled()) { - LOG.debug("started compaction of " + storefiles.size() + " files in " + - curCompactStore.toString()); + LOG.debug("started compaction of " + storefiles.size() + + " files using " + curCompactStore.toString()); } if (this.fs.exists(curCompactStore)) { - LOG.warn("Cleaning up a previous incomplete compaction at " + - curCompactStore.toString()); - if (!this.fs.delete(curCompactStore)) { - LOG.warn("Deleted returned false on " + curCompactStore.toString()); + // Clean out its content in prep. for this new compaction. Has either + // aborted previous compaction or it has content of a previous + // compaction. + Path [] toRemove = this.fs.listPaths(new Path [] {curCompactStore}); + for (int i = 0; i < toRemove.length; i++) { + this.fs.delete(toRemove[i]); } } + // Storefiles are keyed by sequence id. The oldest file comes first. + // We need to return out of here a List that has the newest file first. + List filesToCompact = + new ArrayList(this.storefiles.values()); + Collections.reverse(filesToCompact); + if (filesToCompact.size() < 1 || + (filesToCompact.size() == 1 && !filesToCompact.get(0).isReference())) { + if (LOG.isDebugEnabled()) { + LOG.debug("nothing to compact for " + this.storeName); + } + return false; + } + + if (!fs.exists(curCompactStore) && !fs.mkdirs(curCompactStore)) { + LOG.warn("Mkdir on " + curCompactStore.toString() + " failed"); + return false; + } + + // Step through them, writing to the brand-new TreeMap + HStoreFile compactedOutputFile = new HStoreFile(conf, this.compactionDir, + encodedRegionName, familyName, -1); + MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs, + this.compression, this.bloomFilter); try { - // Storefiles are keyed by sequence id. The oldest file comes first. - // We need to return out of here a List that has the newest file as - // first. - List filesToCompact = - new ArrayList(this.storefiles.values()); - Collections.reverse(filesToCompact); - - HStoreFile compactedOutputFile = new HStoreFile(conf, - this.compactionDir, encodedRegionName, familyName, -1); - if (filesToCompact.size() < 1 || - (filesToCompact.size() == 1 && - !filesToCompact.get(0).isReference())) { - if (LOG.isDebugEnabled()) { - LOG.debug("nothing to compact for " + this.storeName); - } - return false; - } - - if (!fs.mkdirs(curCompactStore)) { - LOG.warn("Mkdir on " + curCompactStore.toString() + " failed"); - } - - // Compute the max-sequenceID seen in any of the to-be-compacted - // TreeMaps if it hasn't been passed in to us. - if (maxId == -1) { - for (HStoreFile hsf: filesToCompact) { - long seqid = hsf.loadInfo(fs); - if (seqid > 0) { - if (seqid > maxId) { - maxId = seqid; - } - } - } - } - - // Step through them, writing to the brand-new TreeMap - MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs, - this.compression, this.bloomFilter); - try { - compactHStoreFiles(compactedOut, filesToCompact); - } finally { - compactedOut.close(); - } - - // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. - if (maxId >= 0) { - compactedOutputFile.writeInfo(fs, maxId); - } else { - compactedOutputFile.writeInfo(fs, -1); - } - - // Write out a list of data files that we're replacing - Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE); - DataOutputStream out = new DataOutputStream(fs.create(filesToReplace)); - try { - out.writeInt(filesToCompact.size()); - for (HStoreFile hsf: filesToCompact) { - hsf.write(out); - } - } finally { - out.close(); - } - - // Indicate that we're done. - Path doneFile = new Path(curCompactStore, COMPACTION_DONE); - (new DataOutputStream(fs.create(doneFile))).close(); - - // Move the compaction into place. - completeCompaction(); - return true; + compactHStoreFiles(compactedOut, filesToCompact); } finally { - // Clean up the parent -- the region dir in the compactions directory. - if (this.fs.exists(curCompactStore.getParent())) { - if (!this.fs.delete(curCompactStore.getParent())) { - LOG.warn("Delete returned false deleting " + - curCompactStore.getParent().toString()); - } - } + compactedOut.close(); } + + // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. + // Compute max-sequenceID seen in any of the to-be-compacted TreeMaps. + long maxId = getMaxSequenceId(filesToCompact); + compactedOutputFile.writeInfo(fs, maxId); + + // Write out a list of data files that we're replacing + Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE); + DataOutputStream out = new DataOutputStream(fs.create(filesToReplace)); + try { + out.writeInt(filesToCompact.size()); + for (HStoreFile hsf : filesToCompact) { + hsf.write(out); + } + } finally { + out.close(); + } + + // Indicate that we're done. + Path doneFile = new Path(curCompactStore, COMPACTION_DONE); + (new DataOutputStream(fs.create(doneFile))).close(); + + // Move the compaction into place. + completeCompaction(curCompactStore); + return true; } } /* - * Compact passed toCompactFiles into compactedOut. - * We create a new set of MapFile.Reader objects so we don't screw up - * the caching associated with the currently-loaded ones. Our - * iteration-based access pattern is practically designed to ruin - * the cache. - * - * We work by opening a single MapFile.Reader for each file, and - * iterating through them in parallel. We always increment the - * lowest-ranked one. Updates to a single row/column will appear - * ranked by timestamp. This allows us to throw out deleted values or - * obsolete versions. - * @param compactedOut - * @param toCompactFiles - * @throws IOException + * Compact passed toCompactFiles into compactedOut. + * We create a new set of MapFile.Reader objects so we don't screw up the + * caching associated with the currently-loaded ones. Our iteration-based + * access pattern is practically designed to ruin the cache. + * + * We work by opening a single MapFile.Reader for each file, and iterating + * through them in parallel. We always increment the lowest-ranked one. + * Updates to a single row/column will appear ranked by timestamp. This allows + * us to throw out deleted values or obsolete versions. @param compactedOut + * @param toCompactFiles @throws IOException */ private void compactHStoreFiles(final MapFile.Writer compactedOut, final List toCompactFiles) throws IOException { @@ -1107,6 +1088,7 @@ class HStore implements HConstants { // culprit. LOG.warn("Failed with " + e.toString() + ": " + hsf.toString() + (hsf.isReference()? " " + hsf.getReference().toString(): "")); + closeCompactionReaders(rdrs); throw e; } } @@ -1195,13 +1177,17 @@ class HStore implements HConstants { } } } finally { - for (int i = 0; i < rdrs.length; i++) { - if (rdrs[i] != null) { - try { - rdrs[i].close(); - } catch (IOException e) { - LOG.warn("Exception closing reader", e); - } + closeCompactionReaders(rdrs); + } + } + + private void closeCompactionReaders(final CompactionReader [] rdrs) { + for (int i = 0; i < rdrs.length; i++) { + if (rdrs[i] != null) { + try { + rdrs[i].close(); + } catch (IOException e) { + LOG.warn("Exception closing reader", e); } } } @@ -1326,11 +1312,11 @@ class HStore implements HConstants { * 8) Releasing the write-lock * 9) Allow new scanners to proceed. * + * + * @param curCompactStore Compaction to complete. */ - private void completeCompaction() throws IOException { - Path curCompactStore = HStoreFile.getHStoreDir(this.compactionDir, - encodedRegionName, familyName); - + private void completeCompaction(final Path curCompactStore) + throws IOException { // 1. Wait for active scanners to exit newScannerLock.writeLock().lock(); // prevent new scanners try { @@ -1346,6 +1332,7 @@ class HStore implements HConstants { // 2. Acquiring the HStore write-lock this.lock.writeLock().lock(); } + try { Path doneFile = new Path(curCompactStore, COMPACTION_DONE); if (!fs.exists(doneFile)) { @@ -1366,7 +1353,6 @@ class HStore implements HConstants { hsf.readFields(in); toCompactFiles.add(hsf); } - } finally { in.close(); } @@ -1412,13 +1398,13 @@ class HStore implements HConstants { // 7. Loading the new TreeMap. Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); this.readers.put(orderVal, - finalCompactedFile.getReader(this.fs, this.bloomFilter)); + finalCompactedFile.getReader(this.fs, this.bloomFilter)); this.storefiles.put(orderVal, finalCompactedFile); } catch (IOException e) { LOG.error("Failed replacing compacted files. Compacted file is " + - finalCompactedFile.toString() + ". Files replaced are " + - toCompactFiles.toString() + - " some of which may have been already removed", e); + finalCompactedFile.toString() + ". Files replaced are " + + toCompactFiles.toString() + + " some of which may have been already removed", e); } } finally { // 8. Releasing the write-lock @@ -1479,6 +1465,17 @@ class HStore implements HConstants { } } + /* + * @return Path to the compaction directory for this column family. + * Compaction dir is a subdirectory of the region. Needs to have the + * same regiondir/storefamily path prefix; HStoreFile constructor presumes + * it (TODO: Fix). + */ + private Path getCompactionDir() { + return HStoreFile.getHStoreDir(this.compactionDir, + this.encodedRegionName, this.familyName); + } + private MapFile.Reader [] getReaders() { return this.readers.values(). toArray(new MapFile.Reader[this.readers.size()]);