From 79be2d87f7ae88f3ced2c48d826409a6b8c7b3ff Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Sat, 15 Nov 2008 00:36:27 +0000 Subject: [PATCH] HBASE-938 major compaction period is not checked periodically git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@714200 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../regionserver/CompactSplitThread.java | 9 +- .../hadoop/hbase/regionserver/HRegion.java | 22 ++- .../hbase/regionserver/HRegionServer.java | 132 ++++++++++++------ .../hadoop/hbase/regionserver/HStore.java | 66 ++++++--- .../hadoop/hbase/regionserver/HStoreFile.java | 73 +++++++++- .../hbase/regionserver/MemcacheFlusher.java | 2 +- .../org/apache/hadoop/hbase/util/Threads.java | 16 ++- 8 files changed, 241 insertions(+), 80 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e7dd1420970..80b7a8f8b62 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -119,6 +119,7 @@ Release 0.19.0 - Unreleased HBASE-998 Narrow getClosestRowBefore by passing column family HBASE-999 Up versions on historian and keep history of deleted regions for a while rather than delete immediately + HBASE-938 Major compaction period is not checked periodically NEW FEATURES HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters] diff --git a/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 1694ed0dc7e..93e0fbe4ec7 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -117,13 +117,16 @@ class CompactSplitThread extends Thread implements HConstants { /** * @param r HRegion store belongs to + * @param why Why compaction requested -- used in debug messages */ - public synchronized void compactionRequested(HRegion r) { + public synchronized void compactionRequested(final HRegion r, + final String why) { if (this.server.stopRequested.get()) { return; } - LOG.debug("Compaction requested for region: " + - Bytes.toString(r.getRegionName())); + LOG.debug("Compaction requested for region " + + Bytes.toString(r.getRegionName()) + + (why != null && !why.isEmpty()? " because: " + why: "")); synchronized (regionsInQueue) { if (!regionsInQueue.contains(r)) { compactionQueue.add(r); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 179dccdfb76..1106d2c0149 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1336,8 +1336,8 @@ public class HRegion implements HConstants { try { for (HStore store : stores.values()) { List keys = - store.getKeys(new HStoreKey(row, ts, this.regionInfo), ALL_VERSIONS, - now, null); + store.getKeys(new HStoreKey(row, ts, this.regionInfo), + ALL_VERSIONS, now, null); TreeMap edits = new TreeMap( new HStoreKey.HStoreKeyWritableComparator(regionInfo)); for (HStoreKey key: keys) { @@ -1369,7 +1369,8 @@ public class HRegion implements HConstants { long now = System.currentTimeMillis(); try { for (HStore store : stores.values()) { - List keys = store.getKeys(new HStoreKey(row, timestamp, this.regionInfo), + List keys = + store.getKeys(new HStoreKey(row, timestamp, this.regionInfo), ALL_VERSIONS, now, columnPattern); TreeMap edits = new TreeMap( new HStoreKey.HStoreKeyWritableComparator(regionInfo)); @@ -2400,6 +2401,19 @@ public class HRegion implements HConstants { static void listFiles(FileSystem fs, HRegion r) throws IOException { listPaths(fs, r.getRegionDir()); } + + /** + * @return True if needs a mojor compaction. + * @throws IOException + */ + boolean isMajorCompaction() throws IOException { + for (HStore store: this.stores.values()) { + if (store.isMajorCompaction()) { + return true; + } + } + return false; + } /* * List the files under the specified directory @@ -2425,4 +2439,4 @@ public class HRegion implements HConstants { } } } -} \ No newline at end of file +} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 00ff89fb822..e9974f1f7ba 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -52,6 +52,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -174,32 +175,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { private final LinkedList reservedSpace = new LinkedList(); private RegionServerMetrics metrics; - - /** - * Thread to shutdown the region server in an orderly manner. This thread - * is registered as a shutdown hook in the HRegionServer constructor and is - * only called when the HRegionServer receives a kill signal. - */ - class ShutdownThread extends Thread { - private final HRegionServer instance; - - /** - * @param instance - */ - public ShutdownThread(HRegionServer instance) { - this.instance = instance; - } - - @Override - public void run() { - LOG.info("Starting shutdown thread."); - - // tell the region server to stop and wait for it to complete - instance.stop(); - instance.join(); - LOG.info("Shutdown thread complete"); - } - } // Compactions final CompactSplitThread compactSplitThread; @@ -207,6 +182,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // Cache flushing final MemcacheFlusher cacheFlusher; + /* Check for major compactions. + */ + final Chore majorCompactionChecker; + // HLog and HLog roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes protected volatile HLog log; @@ -260,6 +239,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // Log flushing thread this.logFlusher = new LogFlusher(this.threadWakeFrequency, this.stopRequested); + + // Background thread to check for major compactions; needed if region + // has not gotten updates in a while. Make it run at a lesser frequency. + int multiplier = this.conf.getInt(THREAD_WAKE_FREQUENCY + + ".multiplier", 1000); + this.majorCompactionChecker = new MajorCompactionChecker(this, + this.threadWakeFrequency * multiplier, this.stopRequested); // Task thread to process requests from Master this.worker = new Worker(); @@ -474,6 +460,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { logFlusher.interrupt(); compactSplitThread.interruptIfNecessary(); logRoller.interruptIfNecessary(); + this.majorCompactionChecker.interrupt(); if (abortRequested) { if (this.fsOk) { @@ -571,6 +558,66 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { throw ex; } } + + /* + * Thread to shutdown the region server in an orderly manner. This thread + * is registered as a shutdown hook in the HRegionServer constructor and is + * only called when the HRegionServer receives a kill signal. + */ + private static class ShutdownThread extends Thread { + private final Log LOG = LogFactory.getLog(this.getClass()); + private final HRegionServer instance; + + /** + * @param instance + */ + public ShutdownThread(HRegionServer instance) { + this.instance = instance; + } + + @Override + public void run() { + LOG.info("Starting shutdown thread."); + + // tell the region server to stop and wait for it to complete + instance.stop(); + instance.join(); + LOG.info("Shutdown thread complete"); + } + } + + /* + * Inner class that runs on a long period checking if regions need major + * compaction. + */ + private static class MajorCompactionChecker extends Chore { + private final Log LOG = LogFactory.getLog(this.getClass()); + private final HRegionServer instance; + + MajorCompactionChecker(final HRegionServer h, + final int sleepTime, final AtomicBoolean stopper) { + super(sleepTime, stopper); + this.instance = h; + LOG.info("Runs every " + sleepTime + "ms"); + } + + @Override + protected void chore() { + Set keys = this.instance.onlineRegions.keySet(); + for (Integer i: keys) { + HRegion r = this.instance.onlineRegions.get(i); + try { + if (r != null && r.isMajorCompaction()) { + // Queue a compaction. Will recognize if major is needed. + this.instance.compactSplitThread. + compactionRequested(r, getName() + " requests major compaction"); + } + } catch (IOException e) { + LOG.warn("Failed major compaction check on " + r, e); + } + } + } + }; /** * Report the status of the server. A server is online once all the startup @@ -660,6 +707,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor", handler); Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler); + Threads.setDaemonThreadRunning(this.majorCompactionChecker, + n + ".majorCompactionChecker", handler); + // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. this.leases.setName(n + ".leaseChecker"); @@ -690,7 +740,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // Verify that all threads are alive if (!(leases.isAlive() && compactSplitThread.isAlive() && cacheFlusher.isAlive() && logRoller.isAlive() && - workerThread.isAlive())) { + workerThread.isAlive() && this.majorCompactionChecker.isAlive())) { // One or more threads are no longer alive - shut down stop(); return false; @@ -750,20 +800,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { * Presumption is that all closes and stops have already been called. */ void join() { - join(this.workerThread); - join(this.cacheFlusher); - join(this.compactSplitThread); - join(this.logRoller); - } - - private void join(final Thread t) { - while (t.isAlive()) { - try { - t.join(); - } catch (InterruptedException e) { - // continue - } - } + Threads.shutdown(this.majorCompactionChecker); + Threads.shutdown(this.workerThread); + Threads.shutdown(this.cacheFlusher); + Threads.shutdown(this.compactSplitThread); + Threads.shutdown(this.logRoller); } /* @@ -925,13 +966,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // Force split a region HRegion region = getRegion(info.getRegionName()); region.regionInfo.shouldSplit(true); - compactSplitThread.compactionRequested(region); + compactSplitThread.compactionRequested(region, + "MSG_REGION_SPLIT"); } break; case MSG_REGION_COMPACT: { // Compact a region HRegion region = getRegion(info.getRegionName()); - compactSplitThread.compactionRequested(region); + compactSplitThread.compactionRequested(region, + "MSG_REGION_COMPACT"); } break; default: @@ -983,7 +1026,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { try { region = instantiateRegion(regionInfo); // Startup a compaction early if one is needed. - this.compactSplitThread.compactionRequested(region); + this.compactSplitThread. + compactionRequested(region, "Region open check"); } catch (IOException e) { LOG.error("error opening region " + regionInfo.getRegionNameAsString(), e); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HStore.java b/src/java/org/apache/hadoop/hbase/regionserver/HStore.java index ee0addd73d0..6674e113505 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -435,6 +435,7 @@ public class HStore implements HConstants { curfile = new HStoreFile(conf, fs, basedir, this.info, family.getName(), fid, reference); long storeSeqId = -1; + boolean majorCompaction = false; try { storeSeqId = curfile.loadInfo(fs); if (storeSeqId > this.maxSeqId) { @@ -488,7 +489,9 @@ public class HStore implements HConstants { // Found map and sympathetic info file. Add this hstorefile to result. if (LOG.isDebugEnabled()) { LOG.debug("loaded " + FSUtils.getPath(p) + ", isReference=" + - isReference + ", sequence id=" + storeSeqId + ", length=" + length); + isReference + ", sequence id=" + storeSeqId + + ", length=" + length + ", majorCompaction=" + + curfile.isMajorCompaction()); } results.put(Long.valueOf(storeSeqId), curfile); // Keep list of sympathetic data mapfiles for cleaning info dir in next @@ -691,7 +694,8 @@ public class HStore implements HConstants { " with " + entries + " entries, sequence id " + logCacheFlushId + ", data size " + StringUtils.humanReadableInt(flushed) + ", file size " + - StringUtils.humanReadableInt(newStoreSize)); + StringUtils.humanReadableInt(newStoreSize) + " to " + + this.info.getRegionNameAsString()); } } return storefiles.size() >= compactionThreshold; @@ -832,11 +836,11 @@ public class HStore implements HConstants { // Check to see if we need to do a major compaction on this region. // If so, change doMajorCompaction to true to skip the incremental // compacting below. Only check if doMajorCompaction is not true. - long lastMajorCompaction = 0L; if (!doMajorCompaction) { - doMajorCompaction = isMajorCompaction(); + doMajorCompaction = isMajorCompaction(filesToCompact); } - if (!doMajorCompaction && !hasReferences(filesToCompact) && + boolean references = hasReferences(filesToCompact); + if (!doMajorCompaction && !references && filesToCompact.size() < compactionThreshold) { return checkSplit(forceSplit); } @@ -862,7 +866,7 @@ public class HStore implements HConstants { fileSizes[i] = len; totalSize += len; } - if (!doMajorCompaction && !hasReferences(filesToCompact)) { + if (!doMajorCompaction && !references) { // Here we select files for incremental compaction. // The rule is: if the largest(oldest) one is more than twice the // size of the second, skip the largest, and continue to next..., @@ -888,7 +892,7 @@ public class HStore implements HConstants { if (LOG.isDebugEnabled()) { LOG.debug("Compaction size of " + this.storeNameStr + ": " + StringUtils.humanReadableInt(totalSize) + "; Skipped " + point + - " files , size: " + skipped); + " file(s), size: " + skipped); } } @@ -904,7 +908,8 @@ public class HStore implements HConstants { HStoreFile compactedOutputFile = new HStoreFile(conf, fs, this.compactionDir, this.info, family.getName(), -1L, null); if (LOG.isDebugEnabled()) { - LOG.debug("started compaction of " + rdrs.size() + " files into " + + LOG.debug("Started compaction of " + rdrs.size() + " file(s)" + + (references? "(hasReferences=true)": " ") + " into " + FSUtils.getPath(compactedOutputFile.getMapFilePath())); } MapFile.Writer writer = compactedOutputFile.getWriter(this.fs, @@ -917,15 +922,14 @@ public class HStore implements HConstants { } // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. - compactedOutputFile.writeInfo(fs, maxId); + compactedOutputFile.writeInfo(fs, maxId, doMajorCompaction); // Move the compaction into place. completeCompaction(filesToCompact, compactedOutputFile); if (LOG.isDebugEnabled()) { - LOG.debug("Completed compaction of " + this.storeNameStr + - " store size is " + StringUtils.humanReadableInt(storeSize) + - (doMajorCompaction? "": "; time since last major compaction: " + - (lastMajorCompaction/1000) + " seconds")); + LOG.debug("Completed " + (doMajorCompaction? "major": "") + + " compaction of " + this.storeNameStr + + " store size is " + StringUtils.humanReadableInt(storeSize)); } } return checkSplit(forceSplit); @@ -955,19 +959,40 @@ public class HStore implements HConstants { /* * @return True if we should run a major compaction. */ - private boolean isMajorCompaction() throws IOException { + boolean isMajorCompaction() throws IOException { + return isMajorCompaction(null); + } + + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. + */ + private boolean isMajorCompaction(final List filesToCompact) + throws IOException { boolean result = false; Path mapdir = HStoreFile.getMapDir(this.basedir, this.info.getEncodedName(), this.family.getName()); long lowTimestamp = getLowestTimestamp(fs, mapdir); if (lowTimestamp < (System.currentTimeMillis() - this.majorCompactionTime) && lowTimestamp > 0l) { - if (LOG.isDebugEnabled()) { - LOG.debug("Major compaction triggered on store: " + - this.storeNameStr + ". Time since last major compaction: " + - ((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds"); + // Major compaction time has elapsed. + long elapsedTime = System.currentTimeMillis() - lowTimestamp; + if (filesToCompact != null && filesToCompact.size() == 1 && + filesToCompact.get(0).isMajorCompaction() && + (this.ttl == HConstants.FOREVER || elapsedTime < this.ttl)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping major compaction because only one (major) " + + "compacted file only and elapsedTime " + elapsedTime + + " is < ttl=" + this.ttl); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on store: " + + this.storeNameStr + ". Time since last major compaction: " + + ((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds"); + } + result = true; } - result = true; } return result; } @@ -1160,7 +1185,8 @@ public class HStore implements HConstants { try { // 1. Moving the new MapFile into place. HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir, - this.info, family.getName(), -1, null); + this.info, family.getName(), -1, null, + compactedFile.isMajorCompaction()); if (LOG.isDebugEnabled()) { LOG.debug("moving " + FSUtils.getPath(compactedFile.getMapFilePath()) + " to " + FSUtils.getPath(finalCompactedFile.getMapFilePath())); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index 30c495c7032..8dffe6ea0f3 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -83,6 +83,7 @@ import org.apache.hadoop.io.SequenceFile; public class HStoreFile implements HConstants { static final Log LOG = LogFactory.getLog(HStoreFile.class.getName()); static final byte INFO_SEQ_NUM = 0; + static final byte MAJOR_COMPACTION = INFO_SEQ_NUM + 1; static final String HSTORE_DATFILE_DIR = "mapfiles"; static final String HSTORE_INFO_DIR = "info"; static final String HSTORE_FILTER_DIR = "filter"; @@ -97,6 +98,9 @@ public class HStoreFile implements HConstants { private final FileSystem fs; private final Reference reference; private final HRegionInfo hri; + /* If true, this file was product of a major compaction. + */ + private boolean majorCompaction = false; /** * Constructor that fully initializes the object @@ -111,6 +115,24 @@ public class HStoreFile implements HConstants { HStoreFile(HBaseConfiguration conf, FileSystem fs, Path basedir, final HRegionInfo hri, byte [] colFamily, long fileId, final Reference ref) + throws IOException { + this(conf, fs, basedir, hri, colFamily, fileId, ref, false); + } + + /** + * Constructor that fully initializes the object + * @param conf Configuration object + * @param basedir qualified path that is parent of region directory + * @param colFamily name of the column family + * @param fileId file identifier + * @param ref Reference to another HStoreFile. + * @param hri The region info for this file (HACK HBASE-868). TODO: Fix. + * @param mc Try if this file was result of a major compression. + * @throws IOException + */ + HStoreFile(HBaseConfiguration conf, FileSystem fs, Path basedir, + final HRegionInfo hri, byte [] colFamily, long fileId, + final Reference ref, final boolean mc) throws IOException { this.conf = conf; this.fs = fs; @@ -133,6 +155,7 @@ public class HStoreFile implements HConstants { // If a reference, construction does not write the pointer files. Thats // done by invocations of writeReferenceFiles(hsf, fs). Happens at split. this.reference = ref; + this.majorCompaction = mc; } /** @return the region name */ @@ -288,11 +311,11 @@ public class HStoreFile implements HConstants { /** * Reads in an info file * - * @param fs file system + * @param filesystem file system * @return The sequence id contained in the info file * @throws IOException */ - long loadInfo(FileSystem fs) throws IOException { + long loadInfo(final FileSystem filesystem) throws IOException { Path p = null; if (isReference()) { p = getInfoFilePath(reference.getEncodedRegionName(), @@ -300,10 +323,18 @@ public class HStoreFile implements HConstants { } else { p = getInfoFilePath(); } - DataInputStream in = new DataInputStream(fs.open(p)); + long length = filesystem.getFileStatus(p).getLen(); + boolean hasMoreThanSeqNum = length > (Byte.SIZE + Bytes.SIZEOF_LONG); + DataInputStream in = new DataInputStream(filesystem.open(p)); try { byte flag = in.readByte(); - if(flag == INFO_SEQ_NUM) { + if (flag == INFO_SEQ_NUM) { + if (hasMoreThanSeqNum) { + flag = in.readByte(); + if (flag == MAJOR_COMPACTION) { + this.majorCompaction = in.readBoolean(); + } + } return in.readLong(); } throw new IOException("Cannot process log file: " + p); @@ -315,16 +346,37 @@ public class HStoreFile implements HConstants { /** * Writes the file-identifier to disk * - * @param fs file system + * @param filesystem file system * @param infonum file id * @throws IOException */ - void writeInfo(FileSystem fs, long infonum) throws IOException { + void writeInfo(final FileSystem filesystem, final long infonum) + throws IOException { + writeInfo(filesystem, infonum, false); + } + + /** + * Writes the file-identifier to disk + * + * @param filesystem file system + * @param infonum file id + * @param mc True if this file is product of a major compaction + * @throws IOException + */ + void writeInfo(final FileSystem filesystem, final long infonum, + final boolean mc) + throws IOException { Path p = getInfoFilePath(); - FSDataOutputStream out = fs.create(p); + FSDataOutputStream out = filesystem.create(p); try { out.writeByte(INFO_SEQ_NUM); out.writeLong(infonum); + if (mc) { + // Set whether major compaction flag on this file. + this.majorCompaction = mc; + out.writeByte(MAJOR_COMPACTION); + out.writeBoolean(mc); + } } finally { out.close(); } @@ -430,6 +482,13 @@ public class HStoreFile implements HConstants { return encodedRegionName + "/" + Bytes.toString(colFamily) + "/" + fileId + (isReference()? "-" + reference.toString(): ""); } + + /** + * @return True if this file was made by a major compaction. + */ + public boolean isMajorCompaction() { + return this.majorCompaction; + } private static String createHStoreFilename(final long fid, final int encodedRegionName) { diff --git a/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java b/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java index c6f6b3a4108..75cce55aefd 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java @@ -177,7 +177,7 @@ class MemcacheFlusher extends Thread implements FlushRequester { // compact if removeFromQueue is true. Note that region.flushCache() // only returns true if a flush is done and if a compaction is needed. if (region.flushcache() && !removeFromQueue) { - server.compactSplitThread.compactionRequested(region); + server.compactSplitThread.compactionRequested(region, getName()); } } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical diff --git a/src/java/org/apache/hadoop/hbase/util/Threads.java b/src/java/org/apache/hadoop/hbase/util/Threads.java index 489f951bca0..41c40c3ae25 100644 --- a/src/java/org/apache/hadoop/hbase/util/Threads.java +++ b/src/java/org/apache/hadoop/hbase/util/Threads.java @@ -57,4 +57,18 @@ public class Threads { t.start(); return t; } -} \ No newline at end of file + + /** + * Shutdown passed thread using isAlive and join. + * @param t Thread to shutdown + */ + public static void shutdown(final Thread t) { + while (t.isAlive()) { + try { + t.join(); + } catch (InterruptedException e) { + LOG.warn(t.getName(), e); + } + } + } +}