From f48c1d7893cb45178abdfc9722c31bf77222bcbb Mon Sep 17 00:00:00 2001 From: Bryan Duxbury Date: Mon, 25 Feb 2008 19:50:02 +0000 Subject: [PATCH] HBASE-442 Move internal classes out of HRegionServer git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@630968 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 +- .../regionserver/CompactSplitThread.java | 201 ++++++++ .../hadoop/hbase/regionserver/Flusher.java | 148 ++++++ .../hbase/regionserver/HRegionServer.java | 442 ++---------------- .../hadoop/hbase/regionserver/LogRoller.java | 101 ++++ .../hadoop/hbase/regionserver/QueueEntry.java | 78 ++++ 6 files changed, 565 insertions(+), 408 deletions(-) create mode 100644 src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java create mode 100644 src/java/org/apache/hadoop/hbase/regionserver/Flusher.java create mode 100644 src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java create mode 100644 src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java diff --git a/CHANGES.txt b/CHANGES.txt index 7b5d7147fca..add09857c13 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -65,7 +65,8 @@ Hbase Change Log HBASE-457 Factor Master into Master, RegionManager, and ServerManager HBASE-464 HBASE-419 introduced javadoc errors HBASE-468 Move HStoreKey back to o.a.h.h - + HBASE-442 Move internal classes out of HRegionServer + Branch 0.1 INCOMPATIBLE CHANGES diff --git a/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java new file mode 100644 index 00000000000..a2ba9ba0140 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -0,0 +1,201 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.util.Writables; + +/** + * Compact region on request and then run split if appropriate + */ +class CompactSplitThread extends Thread +implements RegionUnavailableListener, HConstants { + static final Log LOG = LogFactory.getLog(CompactSplitThread.class); + + private HTable root = null; + private HTable meta = null; + private long startTime; + private final long frequency; + + private HRegionServer server; + private HBaseConfiguration conf; + + private final BlockingQueue compactionQueue = + new LinkedBlockingQueue(); + + /** constructor */ + public CompactSplitThread(HRegionServer server) { + super(); + this.server = server; + this.conf = server.conf; + this.frequency = + conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", + 20 * 1000); + } + + /** {@inheritDoc} */ + @Override + public void run() { + while (!server.isStopRequested()) { + QueueEntry e = null; + try { + e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); + if (e == null) { + continue; + } + e.getRegion().compactIfNeeded(); + split(e.getRegion()); + } catch (InterruptedException ex) { + continue; + } catch (IOException ex) { + LOG.error("Compaction failed" + + (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), + RemoteExceptionHandler.checkIOException(ex)); + if (!server.checkFileSystem()) { + break; + } + + } catch (Exception ex) { + LOG.error("Compaction failed" + + (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), + ex); + if (!server.checkFileSystem()) { + break; + } + } + } + LOG.info(getName() + " exiting"); + } + + /** + * @param e QueueEntry for region to be compacted + */ + public void compactionRequested(QueueEntry e) { + compactionQueue.add(e); + } + + void compactionRequested(final HRegion r) { + compactionRequested(new QueueEntry(r, System.currentTimeMillis())); + } + + private void split(final HRegion region) throws IOException { + final HRegionInfo oldRegionInfo = region.getRegionInfo(); + final HRegion[] newRegions = region.splitRegion(this); + if (newRegions == null) { + // Didn't need to be split + return; + } + + // When a region is split, the META table needs to updated if we're + // splitting a 'normal' region, and the ROOT table needs to be + // updated if we are splitting a META region. + HTable t = null; + if (region.getRegionInfo().isMetaTable()) { + // We need to update the root region + if (this.root == null) { + this.root = new HTable(conf, ROOT_TABLE_NAME); + } + t = root; + } else { + // For normal regions we need to update the meta region + if (meta == null) { + meta = new HTable(conf, META_TABLE_NAME); + } + t = meta; + } + LOG.info("Updating " + t.getTableName() + " with region split info"); + + // Mark old region as offline and split in META. + // NOTE: there is no need for retry logic here. HTable does it for us. + oldRegionInfo.setOffline(true); + oldRegionInfo.setSplit(true); + BatchUpdate update = new BatchUpdate(oldRegionInfo.getRegionName()); + update.put(COL_REGIONINFO, Writables.getBytes(oldRegionInfo)); + update.put(COL_SPLITA, Writables.getBytes(newRegions[0].getRegionInfo())); + update.put(COL_SPLITB, Writables.getBytes(newRegions[1].getRegionInfo())); + t.commit(update); + + // Add new regions to META + for (int i = 0; i < newRegions.length; i++) { + update = new BatchUpdate(newRegions[i].getRegionName()); + update.put(COL_REGIONINFO, Writables.getBytes( + newRegions[i].getRegionInfo())); + t.commit(update); + } + + // Now tell the master about the new regions + if (LOG.isDebugEnabled()) { + LOG.debug("Reporting region split to master"); + } + server.reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(), + newRegions[1].getRegionInfo()); + LOG.info("region split, META updated, and report to master all" + + " successful. Old region=" + oldRegionInfo.toString() + + ", new regions: " + newRegions[0].toString() + ", " + + newRegions[1].toString() + ". Split took " + + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); + + // Do not serve the new regions. Let the Master assign them. + } + + /** {@inheritDoc} */ + public void closing(final Text regionName) { + startTime = System.currentTimeMillis(); + server.getWriteLock().lock(); + try { + // Remove region from regions Map and add it to the Map of retiring + // regions. + server.setRegionClosing(regionName); + if (LOG.isDebugEnabled()) { + LOG.debug(regionName.toString() + " closing (" + + "Adding to retiringRegions)"); + } + } finally { + server.getWriteLock().unlock(); + } + } + + /** {@inheritDoc} */ + public void closed(final Text regionName) { + server.getWriteLock().lock(); + try { + server.setRegionClosed(regionName); + if (LOG.isDebugEnabled()) { + LOG.debug(regionName.toString() + " closed"); + } + } finally { + server.getWriteLock().unlock(); + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java b/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java new file mode 100644 index 00000000000..c6b92c572c8 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java @@ -0,0 +1,148 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.Set; +import java.util.Iterator; +import java.util.ConcurrentModificationException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.DroppedSnapshotException; +import org.apache.hadoop.hbase.RemoteExceptionHandler; + +/** Flush cache upon request */ +class Flusher extends Thread implements CacheFlushListener { + static final Log LOG = LogFactory.getLog(Flusher.class); + private final DelayQueue flushQueue = + new DelayQueue(); + + private final long optionalFlushPeriod; + private final HRegionServer server; + private final HBaseConfiguration conf; + private final Integer lock = new Integer(0); + + /** constructor */ + public Flusher(final HRegionServer server) { + super(); + this.server = server; + conf = server.conf; + this.optionalFlushPeriod = conf.getLong( + "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L); + } + + /** {@inheritDoc} */ + @Override + public void run() { + while (!server.isStopRequested()) { + QueueEntry e = null; + try { + e = flushQueue.poll(server.threadWakeFrequency, TimeUnit.MILLISECONDS); + if (e == null) { + continue; + } + synchronized(lock) { // Don't interrupt while we're working + if (e.getRegion().flushcache()) { + server.compactionRequested(e); + } + + e.setExpirationTime(System.currentTimeMillis() + + optionalFlushPeriod); + flushQueue.add(e); + } + + // Now ensure that all the active regions are in the queue + Set regions = server.getRegionsToCheck(); + for (HRegion r: regions) { + e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod); + synchronized (flushQueue) { + if (!flushQueue.contains(e)) { + flushQueue.add(e); + } + } + } + + // Now make sure that the queue only contains active regions + synchronized (flushQueue) { + for (Iterator i = flushQueue.iterator(); i.hasNext(); ) { + e = i.next(); + if (!regions.contains(e.getRegion())) { + i.remove(); + } + } + } + } catch (InterruptedException ex) { + continue; + } catch (ConcurrentModificationException ex) { + continue; + } catch (DroppedSnapshotException ex) { + // Cache flush can fail in a few places. If it fails in a critical + // section, we get a DroppedSnapshotException and a replay of hlog + // is required. Currently the only way to do this is a restart of + // the server. + LOG.fatal("Replay of hlog required. Forcing server restart", ex); + if (!server.checkFileSystem()) { + break; + } + server.stop(); + } catch (IOException ex) { + LOG.error("Cache flush failed" + + (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), + RemoteExceptionHandler.checkIOException(ex)); + if (!server.checkFileSystem()) { + break; + } + } catch (Exception ex) { + LOG.error("Cache flush failed" + + (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), + ex); + if (!server.checkFileSystem()) { + break; + } + } + } + flushQueue.clear(); + LOG.info(getName() + " exiting"); + } + + /** {@inheritDoc} */ + public void flushRequested(HRegion region) { + QueueEntry e = new QueueEntry(region, System.currentTimeMillis()); + synchronized (flushQueue) { + if (flushQueue.contains(e)) { + flushQueue.remove(e); + } + flushQueue.add(e); + } + } + + /** + * Only interrupt once it's done with a run through the work loop. + */ + void interruptPolitely() { + synchronized (lock) { + interrupt(); + } + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index bbce87905a5..16c28ce92d0 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -185,415 +185,18 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } - /** Queue entry passed to flusher, compactor and splitter threads */ - class QueueEntry implements Delayed { - private final HRegion region; - private long expirationTime; - - QueueEntry(HRegion region, long expirationTime) { - this.region = region; - this.expirationTime = expirationTime; - } - - /** {@inheritDoc} */ - @Override - public boolean equals(Object o) { - QueueEntry other = (QueueEntry) o; - return this.hashCode() == other.hashCode(); - } - - /** {@inheritDoc} */ - @Override - public int hashCode() { - return this.region.getRegionInfo().hashCode(); - } - - /** {@inheritDoc} */ - public long getDelay(TimeUnit unit) { - return unit.convert(this.expirationTime - System.currentTimeMillis(), - TimeUnit.MILLISECONDS); - } - - /** {@inheritDoc} */ - public int compareTo(Delayed o) { - long delta = this.getDelay(TimeUnit.MILLISECONDS) - - o.getDelay(TimeUnit.MILLISECONDS); - - int value = 0; - if (delta > 0) { - value = 1; - - } else if (delta < 0) { - value = -1; - } - return value; - } - - /** @return the region */ - public HRegion getRegion() { - return region; - } - - /** @param expirationTime the expirationTime to set */ - public void setExpirationTime(long expirationTime) { - this.expirationTime = expirationTime; - } - } - // Compactions final CompactSplitThread compactSplitThread; - // Needed during shutdown so we send an interrupt after completion of a - // compaction, not in the midst. - final Integer compactSplitLock = new Integer(0); - /** Compact region on request and then run split if appropriate - */ - private class CompactSplitThread extends Thread - implements RegionUnavailableListener { - private HTable root = null; - private HTable meta = null; - private long startTime; - private final long frequency; - - private final BlockingQueue compactionQueue = - new LinkedBlockingQueue(); - - /** constructor */ - public CompactSplitThread() { - super(); - this.frequency = - conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", - 20 * 1000); - } - - /** {@inheritDoc} */ - @Override - public void run() { - while (!stopRequested.get()) { - QueueEntry e = null; - try { - e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); - if (e == null) { - continue; - } - e.getRegion().compactIfNeeded(); - split(e.getRegion()); - } catch (InterruptedException ex) { - continue; - } catch (IOException ex) { - LOG.error("Compaction failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), - RemoteExceptionHandler.checkIOException(ex)); - if (!checkFileSystem()) { - break; - } - - } catch (Exception ex) { - LOG.error("Compaction failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), - ex); - if (!checkFileSystem()) { - break; - } - } - } - LOG.info(getName() + " exiting"); - } - - /** - * @param e QueueEntry for region to be compacted - */ - public void compactionRequested(QueueEntry e) { - compactionQueue.add(e); - } - - void compactionRequested(final HRegion r) { - compactionRequested(new QueueEntry(r, System.currentTimeMillis())); - } - - private void split(final HRegion region) throws IOException { - final HRegionInfo oldRegionInfo = region.getRegionInfo(); - final HRegion[] newRegions = region.splitRegion(this); - if (newRegions == null) { - // Didn't need to be split - return; - } - - // When a region is split, the META table needs to updated if we're - // splitting a 'normal' region, and the ROOT table needs to be - // updated if we are splitting a META region. - HTable t = null; - if (region.getRegionInfo().isMetaTable()) { - // We need to update the root region - if (this.root == null) { - this.root = new HTable(conf, ROOT_TABLE_NAME); - } - t = root; - } else { - // For normal regions we need to update the meta region - if (meta == null) { - meta = new HTable(conf, META_TABLE_NAME); - } - t = meta; - } - LOG.info("Updating " + t.getTableName() + " with region split info"); - - // Mark old region as offline and split in META. - // NOTE: there is no need for retry logic here. HTable does it for us. - oldRegionInfo.setOffline(true); - oldRegionInfo.setSplit(true); - BatchUpdate update = new BatchUpdate(oldRegionInfo.getRegionName()); - update.put(COL_REGIONINFO, Writables.getBytes(oldRegionInfo)); - update.put(COL_SPLITA, Writables.getBytes(newRegions[0].getRegionInfo())); - update.put(COL_SPLITB, Writables.getBytes(newRegions[1].getRegionInfo())); - t.commit(update); - - // Add new regions to META - for (int i = 0; i < newRegions.length; i++) { - update = new BatchUpdate(newRegions[i].getRegionName()); - update.put(COL_REGIONINFO, Writables.getBytes( - newRegions[i].getRegionInfo())); - t.commit(update); - } - - // Now tell the master about the new regions - if (LOG.isDebugEnabled()) { - LOG.debug("Reporting region split to master"); - } - reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(), - newRegions[1].getRegionInfo()); - LOG.info("region split, META updated, and report to master all" + - " successful. Old region=" + oldRegionInfo.toString() + - ", new regions: " + newRegions[0].toString() + ", " + - newRegions[1].toString() + ". Split took " + - StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime)); - - // Do not serve the new regions. Let the Master assign them. - } - - /** {@inheritDoc} */ - public void closing(final Text regionName) { - startTime = System.currentTimeMillis(); - lock.writeLock().lock(); - try { - // Remove region from regions Map and add it to the Map of retiring - // regions. - retiringRegions.put(regionName, onlineRegions.remove(regionName)); - if (LOG.isDebugEnabled()) { - LOG.debug(regionName.toString() + " closing (" + - "Adding to retiringRegions)"); - } - } finally { - lock.writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - public void closed(final Text regionName) { - lock.writeLock().lock(); - try { - retiringRegions.remove(regionName); - if (LOG.isDebugEnabled()) { - LOG.debug(regionName.toString() + " closed"); - } - } finally { - lock.writeLock().unlock(); - } - } - } - // Cache flushing final Flusher cacheFlusher; - // Needed during shutdown so we send an interrupt after completion of a - // flush, not in the midst. - final Integer cacheFlusherLock = new Integer(0); - /** Flush cache upon request */ - class Flusher extends Thread implements CacheFlushListener { - private final DelayQueue flushQueue = - new DelayQueue(); - - private final long optionalFlushPeriod; - - /** constructor */ - public Flusher() { - super(); - this.optionalFlushPeriod = conf.getLong( - "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L); - - } - - /** {@inheritDoc} */ - @Override - public void run() { - while (!stopRequested.get()) { - QueueEntry e = null; - try { - e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - if (e == null) { - continue; - } - synchronized(cacheFlusherLock) { // Don't interrupt while we're working - if (e.getRegion().flushcache()) { - compactSplitThread.compactionRequested(e); - } - - e.setExpirationTime(System.currentTimeMillis() + - optionalFlushPeriod); - flushQueue.add(e); - } - - // Now insure that all the active regions are in the queue - - Set regions = getRegionsToCheck(); - for (HRegion r: regions) { - e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod); - synchronized (flushQueue) { - if (!flushQueue.contains(e)) { - flushQueue.add(e); - } - } - } - - // Now make sure that the queue only contains active regions - - synchronized (flushQueue) { - for (Iterator i = flushQueue.iterator(); i.hasNext(); ) { - e = i.next(); - if (!regions.contains(e.getRegion())) { - i.remove(); - } - } - } - } catch (InterruptedException ex) { - continue; - - } catch (ConcurrentModificationException ex) { - continue; - - } catch (DroppedSnapshotException ex) { - // Cache flush can fail in a few places. If it fails in a critical - // section, we get a DroppedSnapshotException and a replay of hlog - // is required. Currently the only way to do this is a restart of - // the server. - LOG.fatal("Replay of hlog required. Forcing server restart", ex); - if (!checkFileSystem()) { - break; - } - HRegionServer.this.stop(); - - } catch (IOException ex) { - LOG.error("Cache flush failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), - RemoteExceptionHandler.checkIOException(ex)); - if (!checkFileSystem()) { - break; - } - - } catch (Exception ex) { - LOG.error("Cache flush failed" + - (e != null ? (" for region " + e.getRegion().getRegionName()) : ""), - ex); - if (!checkFileSystem()) { - break; - } - } - } - flushQueue.clear(); - LOG.info(getName() + " exiting"); - } - - /** {@inheritDoc} */ - public void flushRequested(HRegion region) { - QueueEntry e = new QueueEntry(region, System.currentTimeMillis()); - synchronized (flushQueue) { - if (flushQueue.contains(e)) { - flushQueue.remove(e); - } - flushQueue.add(e); - } - } - } - // HLog and HLog roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes protected HLog log; final LogRoller logRoller; final Integer logRollerLock = new Integer(0); - /** Runs periodically to determine if the HLog should be rolled */ - class LogRoller extends Thread implements LogRollListener { - private final Integer rollLock = new Integer(0); - private final long optionalLogRollInterval; - private long lastLogRollTime; - private volatile boolean rollLog; - - /** constructor */ - public LogRoller() { - super(); - this.optionalLogRollInterval = conf.getLong( - "hbase.regionserver.optionallogrollinterval", 30L * 60L * 1000L); - this.rollLog = false; - lastLogRollTime = System.currentTimeMillis(); - } - - /** {@inheritDoc} */ - @Override - public void run() { - while (!stopRequested.get()) { - while (!rollLog && !stopRequested.get()) { - long now = System.currentTimeMillis(); - if (this.lastLogRollTime + this.optionalLogRollInterval <= now) { - rollLog = true; - this.lastLogRollTime = now; - } else { - synchronized (rollLock) { - try { - rollLock.wait(threadWakeFrequency); - - } catch (InterruptedException e) { - continue; - } - } - } - } - if (!rollLog) { - // There's only two reasons to break out of the while loop. - // 1. Log roll requested - // 2. Stop requested - // so if a log roll was not requested, continue and break out of loop - continue; - } - synchronized (logRollerLock) { - try { - LOG.info("Rolling hlog. Number of entries: " + log.getNumEntries()); - log.rollWriter(); - - } catch (IOException ex) { - LOG.error("Log rolling failed", - RemoteExceptionHandler.checkIOException(ex)); - checkFileSystem(); - - } catch (Exception ex) { - LOG.error("Log rolling failed", ex); - checkFileSystem(); - - } finally { - rollLog = false; - } - } - } - } - - /** {@inheritDoc} */ - public void logRollRequested() { - synchronized (rollLock) { - rollLog = true; - rollLock.notifyAll(); - } - } - } - /** * Starts a HRegionServer at the default location * @param conf @@ -624,13 +227,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { conf.getInt("hbase.master.lease.period", 30 * 1000); // Cache flushing thread. - this.cacheFlusher = new Flusher(); + this.cacheFlusher = new Flusher(this); // Compaction thread - this.compactSplitThread = new CompactSplitThread(); + this.compactSplitThread = new CompactSplitThread(this); // Log rolling thread - this.logRoller = new LogRoller(); + this.logRoller = new LogRoller(this); // Task thread to process requests from Master this.worker = new Worker(); @@ -817,12 +420,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already - synchronized(cacheFlusherLock) { - this.cacheFlusher.interrupt(); - } - synchronized (compactSplitLock) { - this.compactSplitThread.interrupt(); - } + cacheFlusher.interruptPolitely(); + compactSplitThread.interrupt(); synchronized (logRollerLock) { this.logRoller.interrupt(); } @@ -1592,9 +1191,28 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } /** @return the info server */ + /** + * Get the InfoServer this HRegionServer has put up. + */ public InfoServer getInfoServer() { return infoServer; } + + /** + * Check if a stop has been requested. + */ + public boolean isStopRequested() { + return stopRequested.get(); + } + + /** Get the write lock for the server */ + ReentrantReadWriteLock.WriteLock getWriteLock() { + return lock.writeLock(); + } + + void compactionRequested(QueueEntry e) { + compactSplitThread.compactionRequested(e); + } /** * @return Immutable list of this servers regions. @@ -1624,6 +1242,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { return getRegion(regionName, false); } + /** Move a region from online to closing. */ + void setRegionClosing(final Text regionName) { + retiringRegions.put(regionName, onlineRegions.remove(regionName)); + } + + /** Set a region as closed. */ + void setRegionClosed(final Text regionName) { + retiringRegions.remove(regionName); + } + /** * Protected utility method for safely obtaining an HRegion handle. * @param regionName Name of online {@link HRegion} to return @@ -1633,7 +1261,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { * @throws NotServingRegionException */ protected HRegion getRegion(final Text regionName, - final boolean checkRetiringRegions) + final boolean checkRetiringRegions) throws NotServingRegionException { HRegion region = null; this.lock.readLock().lock(); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java new file mode 100644 index 00000000000..0cef2b35d91 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -0,0 +1,101 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.RemoteExceptionHandler; + +/** Runs periodically to determine if the HLog should be rolled */ +class LogRoller extends Thread implements LogRollListener { + static final Log LOG = LogFactory.getLog(LogRoller.class); + private final Integer rollLock = new Integer(0); + private final long optionalLogRollInterval; + private long lastLogRollTime; + private volatile boolean rollLog; + private final HRegionServer server; + private final HBaseConfiguration conf; + + /** constructor */ + public LogRoller(final HRegionServer server) { + super(); + this.server = server; + conf = server.conf; + this.optionalLogRollInterval = conf.getLong( + "hbase.regionserver.optionallogrollinterval", 30L * 60L * 1000L); + this.rollLog = false; + lastLogRollTime = System.currentTimeMillis(); + } + + /** {@inheritDoc} */ + @Override + public void run() { + while (!server.isStopRequested()) { + while (!rollLog && !server.isStopRequested()) { + long now = System.currentTimeMillis(); + if (this.lastLogRollTime + this.optionalLogRollInterval <= now) { + rollLog = true; + this.lastLogRollTime = now; + } else { + synchronized (rollLock) { + try { + rollLock.wait(server.threadWakeFrequency); + } catch (InterruptedException e) { + continue; + } + } + } + } + if (!rollLog) { + // There's only two reasons to break out of the while loop. + // 1. Log roll requested + // 2. Stop requested + // so if a log roll was not requested, continue and break out of loop + continue; + } + synchronized (server.logRollerLock) { + try { + LOG.info("Rolling hlog. Number of entries: " + server.getLog().getNumEntries()); + server.getLog().rollWriter(); + } catch (IOException ex) { + LOG.error("Log rolling failed", + RemoteExceptionHandler.checkIOException(ex)); + server.checkFileSystem(); + } catch (Exception ex) { + LOG.error("Log rolling failed", ex); + server.checkFileSystem(); + } finally { + rollLog = false; + } + } + } + } + + /** {@inheritDoc} */ + public void logRollRequested() { + synchronized (rollLock) { + rollLog = true; + rollLock.notifyAll(); + } + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java b/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java new file mode 100644 index 00000000000..4c808d3b19f --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java @@ -0,0 +1,78 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Delayed; + +/** Queue entry passed to flusher, compactor and splitter threads */ +class QueueEntry implements Delayed { + private final HRegion region; + private long expirationTime; + + QueueEntry(HRegion region, long expirationTime) { + this.region = region; + this.expirationTime = expirationTime; + } + + /** {@inheritDoc} */ + @Override + public boolean equals(Object o) { + QueueEntry other = (QueueEntry) o; + return this.hashCode() == other.hashCode(); + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + return this.region.getRegionInfo().hashCode(); + } + + /** {@inheritDoc} */ + public long getDelay(TimeUnit unit) { + return unit.convert(this.expirationTime - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + } + + /** {@inheritDoc} */ + public int compareTo(Delayed o) { + long delta = this.getDelay(TimeUnit.MILLISECONDS) - + o.getDelay(TimeUnit.MILLISECONDS); + + int value = 0; + if (delta > 0) { + value = 1; + + } else if (delta < 0) { + value = -1; + } + return value; + } + + /** @return the region */ + public HRegion getRegion() { + return region; + } + + /** @param expirationTime the expirationTime to set */ + public void setExpirationTime(long expirationTime) { + this.expirationTime = expirationTime; + } +} \ No newline at end of file