From 931d452cb2cc069bbb97e45dca094e9b163275c3 Mon Sep 17 00:00:00 2001 From: Jim Kellerman Date: Fri, 10 Aug 2007 22:11:05 +0000 Subject: [PATCH] HADOOP-1678 On region split, master should designate which host should serve daughter splits. Phase 1: Master balances load for new regions and when a region server fails. git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@564780 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 + .../org/apache/hadoop/hbase/HBaseAdmin.java | 5 +- .../org/apache/hadoop/hbase/HConstants.java | 8 +- src/java/org/apache/hadoop/hbase/HLog.java | 29 +- .../org/apache/hadoop/hbase/HLogEdit.java | 10 +- src/java/org/apache/hadoop/hbase/HLogKey.java | 2 +- src/java/org/apache/hadoop/hbase/HMaster.java | 1704 ++++++++++------- src/java/org/apache/hadoop/hbase/HRegion.java | 47 +- .../apache/hadoop/hbase/HRegionServer.java | 27 + .../org/apache/hadoop/hbase/HServerInfo.java | 25 +- .../org/apache/hadoop/hbase/HServerLoad.java | 136 ++ src/java/org/apache/hadoop/hbase/HStore.java | 30 +- .../org/apache/hadoop/hbase/HStoreFile.java | 51 +- src/java/org/apache/hadoop/hbase/HTable.java | 1 + .../apache/hadoop/hbase/HTableDescriptor.java | 4 +- .../apache/hadoop/hbase/MiniHBaseCluster.java | 16 + .../hbase/TestCleanRegionServerExit.java | 4 +- .../org/apache/hadoop/hbase/TestHRegion.java | 2 +- .../apache/hadoop/hbase/TestHStoreFile.java | 25 +- .../hadoop/hbase/TestRegionServerAbort.java | 4 +- .../org/apache/hadoop/hbase/TestScanner2.java | 3 +- .../org/apache/hadoop/hbase/TestSplit.java | 2 + .../org/apache/hadoop/hbase/TestToString.java | 4 +- 23 files changed, 1433 insertions(+), 709 deletions(-) create mode 100644 src/java/org/apache/hadoop/hbase/HServerLoad.java diff --git a/CHANGES.txt b/CHANGES.txt index ff6d96dfa3a..052a78ebfb1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -86,3 +86,6 @@ Trunk (unreleased changes) 53. HADOOP-1528 HClient for multiple tables - expose close table function 54. HADOOP-1466 Clean up warnings, visibility and javadoc issues in HBase. 55. HADOOP-1662 Make region splits faster + 56. HADOOP-1678 On region split, master should designate which host should + serve daughter splits. Phase 1: Master balances load for new regions and + when a region server fails. diff --git a/src/java/org/apache/hadoop/hbase/HBaseAdmin.java b/src/java/org/apache/hadoop/hbase/HBaseAdmin.java index 283aecb91fa..6a9b7baf07a 100644 --- a/src/java/org/apache/hadoop/hbase/HBaseAdmin.java +++ b/src/java/org/apache/hadoop/hbase/HBaseAdmin.java @@ -488,8 +488,9 @@ public class HBaseAdmin implements HConstants { * @throws IllegalArgumentException - if the table name is reserved */ protected void checkReservedTableName(Text tableName) { - if(tableName.equals(ROOT_TABLE_NAME) - || tableName.equals(META_TABLE_NAME)) { + if(tableName.charAt(0) == '-' || + tableName.charAt(0) == '.' || + tableName.find(",") != -1) { throw new IllegalArgumentException(tableName + " is a reserved table name"); } diff --git a/src/java/org/apache/hadoop/hbase/HConstants.java b/src/java/org/apache/hadoop/hbase/HConstants.java index 93066d5eb85..453fda10f69 100644 --- a/src/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/java/org/apache/hadoop/hbase/HConstants.java @@ -92,15 +92,17 @@ public interface HConstants { // Do we ever need to know all the information that we are storing? /** The root table's name. */ - static final Text ROOT_TABLE_NAME = new Text("--ROOT--"); + static final Text ROOT_TABLE_NAME = new Text("-ROOT-"); /** The META table's name. */ - static final Text META_TABLE_NAME = new Text("--META--"); + static final Text META_TABLE_NAME = new Text(".META."); // Defines for the column names used in both ROOT and META HBase 'meta' tables. - /** The ROOT and META column family */ + /** The ROOT and META column family (string) */ static final String COLUMN_FAMILY_STR = "info:"; + + /** The ROOT and META column family (Text) */ static final Text COLUMN_FAMILY = new Text(COLUMN_FAMILY_STR); /** Array of meta column names */ diff --git a/src/java/org/apache/hadoop/hbase/HLog.java b/src/java/org/apache/hadoop/hbase/HLog.java index 6513e88d7c2..a80038c1bae 100644 --- a/src/java/org/apache/hadoop/hbase/HLog.java +++ b/src/java/org/apache/hadoop/hbase/HLog.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.*; import java.io.*; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; /** * HLog stores all the edits to the HStore. @@ -73,14 +74,14 @@ public class HLog implements HConstants { SequenceFile.Writer writer; TreeMap outputfiles = new TreeMap(); - boolean insideCacheFlush = false; + volatile boolean insideCacheFlush = false; TreeMap regionToLastFlush = new TreeMap(); - boolean closed = false; - transient long logSeqNum = 0; + volatile boolean closed = false; + volatile long logSeqNum = 0; long filenum = 0; - transient int numEntries = 0; + AtomicInteger numEntries = new AtomicInteger(0); Integer rollLock = new Integer(0); @@ -125,7 +126,7 @@ public class HLog implements HConstants { logWriters.put(regionName, w); } if (LOG.isDebugEnabled()) { - LOG.debug("Edit " + key.toString()); + LOG.debug("Edit " + key.toString() + "=" + val.toString()); } w.append(key, val); } @@ -173,6 +174,16 @@ public class HLog implements HConstants { fs.mkdirs(dir); rollWriter(); } + + synchronized void setSequenceNumber(long newvalue) { + if (newvalue > logSeqNum) { + if (LOG.isDebugEnabled()) { + LOG.debug("changing sequence number from " + logSeqNum + " to " + + newvalue); + } + logSeqNum = newvalue; + } + } /** * Roll the log writer. That is, start writing log messages to a new file. @@ -266,7 +277,7 @@ public class HLog implements HConstants { } fs.delete(p); } - this.numEntries = 0; + this.numEntries.set(0); } } @@ -343,13 +354,13 @@ public class HLog implements HConstants { new HLogKey(regionName, tableName, row, seqNum[counter++]); HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp); writer.append(logKey, logEdit); - numEntries++; + numEntries.getAndIncrement(); } } /** @return How many items have been added to the log */ int getNumEntries() { - return numEntries; + return numEntries.get(); } /** @@ -418,7 +429,7 @@ public class HLog implements HConstants { writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), new HLogEdit(HLog.METACOLUMN, COMPLETE_CACHEFLUSH.get(), System.currentTimeMillis())); - numEntries++; + numEntries.getAndIncrement(); // Remember the most-recent flush for each region. // This is used to delete obsolete log files. diff --git a/src/java/org/apache/hadoop/hbase/HLogEdit.java b/src/java/org/apache/hadoop/hbase/HLogEdit.java index e14cb4604eb..d25001a83f3 100644 --- a/src/java/org/apache/hadoop/hbase/HLogEdit.java +++ b/src/java/org/apache/hadoop/hbase/HLogEdit.java @@ -72,8 +72,14 @@ public class HLogEdit implements Writable { /** {@inheritDoc} */ @Override public String toString() { - return getColumn().toString() + " " + this.getTimestamp() + " " + - new String(getVal()).trim(); + String value = ""; + try { + value = new String(getVal(), HConstants.UTF8_ENCODING); + + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("UTF8 encoding not present?", e); + } + return "(" + getColumn().toString() + "/" + getTimestamp() + "/" + value + ")"; } // Writable diff --git a/src/java/org/apache/hadoop/hbase/HLogKey.java b/src/java/org/apache/hadoop/hbase/HLogKey.java index bdced37b28b..c81822fca4d 100644 --- a/src/java/org/apache/hadoop/hbase/HLogKey.java +++ b/src/java/org/apache/hadoop/hbase/HLogKey.java @@ -84,7 +84,7 @@ public class HLogKey implements WritableComparable { */ @Override public String toString() { - return tablename + "," + regionName + "," + row + "," + logSeqNum; + return tablename + "/" + regionName + "/" + row + "/" + logSeqNum; } /** diff --git a/src/java/org/apache/hadoop/hbase/HMaster.java b/src/java/org/apache/hadoop/hbase/HMaster.java index e9c8b7af9cf..61c44f4aa29 100644 --- a/src/java/org/apache/hadoop/hbase/HMaster.java +++ b/src/java/org/apache/hadoop/hbase/HMaster.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.SortedMap; -import java.util.SortedSet; import java.util.Timer; import java.util.TimerTask; import java.util.TreeMap; @@ -40,6 +39,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,25 +61,25 @@ import org.apache.hadoop.ipc.Server; * There is only one HMaster for a single HBase deployment. */ public class HMaster implements HConstants, HMasterInterface, - HMasterRegionInterface, Runnable { +HMasterRegionInterface, Runnable { - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public long getProtocolVersion(String protocol, - @SuppressWarnings("unused") long clientVersion) - throws IOException { + @SuppressWarnings("unused") long clientVersion) throws IOException { + if (protocol.equals(HMasterInterface.class.getName())) { return HMasterInterface.versionID; + } else if (protocol.equals(HMasterRegionInterface.class.getName())) { return HMasterRegionInterface.versionID; + } else { throw new IOException("Unknown protocol to name node: " + protocol); } } static final Log LOG = LogFactory.getLog(HMaster.class.getName()); - + volatile boolean closed; Path dir; Configuration conf; @@ -88,25 +88,18 @@ public class HMaster implements HConstants, HMasterInterface, long threadWakeFrequency; int numRetries; long maxRegionOpenTime; - + BlockingQueue msgQueue; - + private Leases serverLeases; private Server server; private HServerAddress address; - + HConnection connection; - + long metaRescanInterval; - - volatile HServerAddress rootRegionLocation; - - /** - * Columns in the 'meta' ROOT and META tables. - */ - static final Text METACOLUMNS[] = { - COLUMN_FAMILY - }; + + final AtomicReference rootRegionLocation; /** * Base HRegion scanner class. Holds utilty common to ROOT and @@ -156,22 +149,19 @@ public class HMaster implements HConstants, HMasterInterface, * once. */ abstract class BaseScanner implements Runnable { - private final Text FIRST_ROW = new Text(); protected boolean rootRegion; protected final Text tableName; - + protected abstract void initialScan(); protected abstract void maintenanceScan(); - + BaseScanner(final Text tableName) { super(); this.tableName = tableName; this.rootRegion = tableName.equals(ROOT_TABLE_NAME); } - - /** - * {@inheritDoc} - */ + + /** {@inheritDoc} */ public void run() { initialScan(); while (!closed) { @@ -194,14 +184,18 @@ public class HMaster implements HConstants, HMasterInterface, long scannerId = -1L; LOG.info(Thread.currentThread().getName() + " scanning meta region " + region.regionName + " on " + region.server.toString()); + // Array to hold list of split parents found. Scan adds to list. After // scan we go check if parents can be removed. + Map> splitParents = new HashMap>(); try { regionServer = connection.getHRegionConnection(region.server); - scannerId = regionServer.openScanner(region.regionName, METACOLUMNS, - FIRST_ROW, System.currentTimeMillis(), null); + scannerId = + regionServer.openScanner(region.regionName, COLUMN_FAMILY_ARRAY, + EMPTY_START_ROW, System.currentTimeMillis(), null); + int numberOfRegionsFound = 0; while (true) { TreeMap results = new TreeMap(); @@ -209,13 +203,16 @@ public class HMaster implements HConstants, HMasterInterface, if (values.length == 0) { break; } + for (int i = 0; i < values.length; i++) { results.put(values[i].getKey().getColumn(), values[i].getData()); } + HRegionInfo info = HRegion.getRegionInfo(results); String serverName = HRegion.getServerName(results); long startCode = HRegion.getStartCode(results); - if(LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + " scanner: " + Long.valueOf(scannerId) + " regioninfo: {" + info.toString() + "}, server: " + serverName + ", startCode: " + startCode); @@ -223,17 +220,20 @@ public class HMaster implements HConstants, HMasterInterface, // Note Region has been assigned. checkAssigned(info, serverName, startCode); + if (isSplitParent(info)) { splitParents.put(info, results); } numberOfRegionsFound += 1; } - if(rootRegion) { + if (rootRegion) { numberOfMetaRegions.set(numberOfRegionsFound); } + } catch (IOException e) { if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + if (e instanceof UnknownScannerException) { // Reset scannerId so we do not try closing a scanner the other side // has lost account of: prevents duplicated stack trace out of the @@ -242,10 +242,11 @@ public class HMaster implements HConstants, HMasterInterface, } } throw e; + } finally { try { if (scannerId != -1L && regionServer != null) { - regionServer.close(scannerId); + regionServer.close(scannerId); } } catch (IOException e) { if (e instanceof RemoteException) { @@ -254,24 +255,28 @@ public class HMaster implements HConstants, HMasterInterface, LOG.error("Closing scanner", e); } } - // Scan is finished. Take a look at split parents to see if any - // we can clean up. + + // Scan is finished. Take a look at split parents to see if any we can clean up. + if (splitParents.size() > 0) { for (Map.Entry> e: - splitParents.entrySet()) { + splitParents.entrySet()) { + TreeMap results = e.getValue(); cleanupSplits(e.getKey(), - HRegion.getSplit(results, HRegion.COL_SPLITA), - HRegion.getSplit(results, HRegion.COL_SPLITB)); + HRegion.getSplit(results, HRegion.COL_SPLITA), + HRegion.getSplit(results, HRegion.COL_SPLITB)); } } LOG.info(Thread.currentThread().getName() + " scan of meta region " + region.regionName + " complete"); } - + private boolean isSplitParent(final HRegionInfo info) { boolean result = false; + // Skip if not a split region. + if (!info.isSplit()) { return result; } @@ -280,7 +285,7 @@ public class HMaster implements HConstants, HMasterInterface, } return true; } - + /* * @param info * @param splitA @@ -290,52 +295,62 @@ public class HMaster implements HConstants, HMasterInterface, * @throws IOException */ private boolean cleanupSplits(final HRegionInfo info, - final HRegionInfo splitA, final HRegionInfo splitB) + final HRegionInfo splitA, final HRegionInfo splitB) throws IOException { + boolean result = false; if (LOG.isDebugEnabled()) { LOG.debug("Checking " + info.getRegionName() + " to see if daughter " + - "splits still hold references"); + "splits still hold references"); } boolean noReferencesA = splitA == null; boolean noReferencesB = splitB == null; + if (!noReferencesA) { - noReferencesA = hasReferences(info.getRegionName(), splitA, - HRegion.COL_SPLITA); + noReferencesA = + hasReferences(info.getRegionName(), splitA, HRegion.COL_SPLITA); } if (!noReferencesB) { - noReferencesB = hasReferences(info.getRegionName(), splitB, - HRegion.COL_SPLITB); + noReferencesB = + hasReferences(info.getRegionName(), splitB, HRegion.COL_SPLITB); } if (!(noReferencesA && noReferencesB)) { + // No references. Remove this item from table and deleted region on // disk. + LOG.info("Deleting region " + info.getRegionName() + - " because daughter splits no longer hold references"); + " because daughter splits no longer hold references"); + HRegion.deleteRegion(fs, dir, info.getRegionName()); HRegion.removeRegionFromMETA(conf, this.tableName, - info.getRegionName()); + info.getRegionName()); + result = true; } + if (LOG.isDebugEnabled()) { LOG.debug("Done checking " + info.getRegionName() + ": splitA: " + - noReferencesA + ", splitB: "+ noReferencesB); + noReferencesA + ", splitB: "+ noReferencesB); } return result; } - + protected boolean hasReferences(final Text regionName, - final HRegionInfo split, final Text column) - throws IOException { + final HRegionInfo split, final Text column) throws IOException { + boolean result = HRegion.hasReferences(fs, fs.makeQualified(dir), split); + if (result) { return result; } + if (LOG.isDebugEnabled()) { LOG.debug(split.getRegionName().toString() - +" no longer has references to " + regionName.toString()); + +" no longer has references to " + regionName.toString()); } + HTable t = new HTable(conf, this.tableName); try { HRegion.removeSplitFromMETA(t, regionName, column); @@ -344,56 +359,63 @@ public class HMaster implements HConstants, HMasterInterface, } return result; } - + protected void checkAssigned(final HRegionInfo info, final String serverName, final long startCode) { + // Skip region - if ... - if(info.offLine // offline - || killedRegions.contains(info.regionName) // queued for offline - || regionsToDelete.contains(info.regionName)) { // queued for delete + + if(info.offLine // offline + || killedRegions.contains(info.regionName) // queued for offline + || regionsToDelete.contains(info.regionName)) { // queued for delete unassignedRegions.remove(info.regionName); assignAttempts.remove(info.regionName); + if(LOG.isDebugEnabled()) { - LOG.debug("not assigning region: " + info.regionName + - " (offline: " + info.isOffline() + ", split: " + info.isSplit() + - ")"); + LOG.debug("not assigning region: " + info.regionName + " (offline: " + + info.isOffline() + ", split: " + info.isSplit() + ")"); } return; } - + HServerInfo storedInfo = null; - if(serverName != null) { - TreeMap regionsToKill = killList.get(serverName); - if(regionsToKill != null && regionsToKill.containsKey(info.regionName)) { + if (serverName != null) { + Map regionsToKill = killList.get(serverName); + if (regionsToKill != null && + regionsToKill.containsKey(info.regionName)) { + // Skip if region is on kill list + if(LOG.isDebugEnabled()) { - LOG.debug("not assigning region (on kill list): " + - info.regionName); + LOG.debug("not assigning region (on kill list): " + info.regionName); } return; } - storedInfo = serversToServerInfo.get(serverName); + synchronized (serversToServerInfo) { + storedInfo = serversToServerInfo.get(serverName); + } } - if( !(unassignedRegions.containsKey(info.regionName) || - pendingRegions.contains(info.regionName)) + if (!(unassignedRegions.containsKey(info.regionName) || + pendingRegions.contains(info.regionName)) && (storedInfo == null || storedInfo.getStartCode() != startCode)) { + // The current assignment is no good; load the region. + unassignedRegions.put(info.regionName, info); assignAttempts.put(info.regionName, Long.valueOf(0L)); + } else if (LOG.isDebugEnabled()) { - LOG.debug("Finished if " + info.getRegionName() + " is assigned: " + + LOG.debug("Finished if " + info.getRegionName() + " is assigned: " + "unassigned: " + unassignedRegions.containsKey(info.regionName) + ", pending: " + pendingRegions.contains(info.regionName)); } } } - + volatile boolean rootScanned; - - /** - * Scanner for the ROOT HRegion. - */ + + /** Scanner for the ROOT HRegion. */ class RootScanner extends BaseScanner { /** Constructor */ public RootScanner() { @@ -403,14 +425,17 @@ public class HMaster implements HConstants, HMasterInterface, private void scanRoot() { int tries = 0; while (!closed && tries < numRetries) { - while(!closed && rootRegionLocation == null) { - // rootRegionLocation will be filled in when we get an 'open region' - // regionServerReport message from the HRegionServer that has been - // allocated the ROOT region below. - try { - Thread.sleep(threadWakeFrequency); - } catch (InterruptedException e) { - // continue + synchronized (rootRegionLocation) { + while(!closed && rootRegionLocation.get() == null) { + // rootRegionLocation will be filled in when we get an 'open region' + // regionServerReport message from the HRegionServer that has been + // allocated the ROOT region below. + + try { + rootRegionLocation.wait(); + } catch (InterruptedException e) { + // continue + } } } if (closed) { @@ -418,15 +443,20 @@ public class HMaster implements HConstants, HMasterInterface, } try { - synchronized(rootScannerLock) { // Don't interrupt us while we're working - scanRegion(new MetaRegion(rootRegionLocation, + // Don't interrupt us while we're working + + synchronized(rootScannerLock) { + scanRegion(new MetaRegion(rootRegionLocation.get(), HGlobals.rootRegionInfo.regionName, null)); } break; + } catch (IOException e) { if (e instanceof RemoteException) { try { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); + } catch (IOException ex) { e = ex; } @@ -439,6 +469,8 @@ public class HMaster implements HConstants, HMasterInterface, } } if (!closed) { + // sleep before retry + try { Thread.sleep(threadWakeFrequency); } catch (InterruptedException e) { @@ -447,19 +479,19 @@ public class HMaster implements HConstants, HMasterInterface, } } } - + @Override protected void initialScan() { scanRoot(); rootScanned = true; } - + @Override protected void maintenanceScan() { scanRoot(); } } - + private RootScanner rootScanner; private Thread rootScannerThread; Integer rootScannerLock = new Integer(0); @@ -469,24 +501,20 @@ public class HMaster implements HConstants, HMasterInterface, HServerAddress server; Text regionName; Text startKey; - + MetaRegion(HServerAddress server, Text regionName, Text startKey) { this.server = server; this.regionName = regionName; this.startKey = startKey; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override public boolean equals(Object o) { return this.compareTo(o) == 0; } - - /** - * {@inheritDoc} - */ + + /** {@inheritDoc} */ @Override public int hashCode() { int result = this.regionName.hashCode(); @@ -496,24 +524,21 @@ public class HMaster implements HConstants, HMasterInterface, // Comparable - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public int compareTo(Object o) { MetaRegion other = (MetaRegion)o; - + int result = this.regionName.compareTo(other.regionName); if(result == 0) { result = this.startKey.compareTo(other.startKey); } return result; } - } /** Set by root scanner to indicate the number of meta regions */ AtomicInteger numberOfMetaRegions; - + /** Work for the meta scanner is queued up here */ BlockingQueue metaRegionsToScan; @@ -522,7 +547,7 @@ public class HMaster implements HConstants, HMasterInterface, /** Set by meta scanner after initial scan */ volatile boolean initialMetaScanComplete; - + /** * MetaScanner META table. * @@ -537,11 +562,11 @@ public class HMaster implements HConstants, HMasterInterface, public MetaScanner() { super(HConstants.META_TABLE_NAME); } - + private void scanOneMetaRegion(MetaRegion region) { int tries = 0; while (!closed && tries < numRetries) { - while (!closed && !rootScanned && rootRegionLocation == null) { + while (!closed && !rootScanned && rootRegionLocation.get() == null) { try { Thread.sleep(threadWakeFrequency); } catch (InterruptedException e) { @@ -553,8 +578,9 @@ public class HMaster implements HConstants, HMasterInterface, } try { + // Don't interrupt us while we're working + synchronized (metaScannerLock) { - // Don't interrupt us while we're working scanRegion(region); onlineMetaRegions.put(region.startKey, region); } @@ -565,6 +591,7 @@ public class HMaster implements HConstants, HMasterInterface, try { e = RemoteExceptionHandler.decodeRemoteException( (RemoteException) e); + } catch (IOException ex) { e = ex; } @@ -577,6 +604,8 @@ public class HMaster implements HConstants, HMasterInterface, } } if (!closed) { + // sleep before retry + try { Thread.sleep(threadWakeFrequency); } catch (InterruptedException e) { @@ -603,7 +632,7 @@ public class HMaster implements HConstants, HMasterInterface, } initialMetaScanComplete = true; } - + @Override protected void maintenanceScan() { ArrayList regions = new ArrayList(); @@ -613,29 +642,31 @@ public class HMaster implements HConstants, HMasterInterface, } metaRegionsScanned(); } - + /** * Called by the meta scanner when it has completed scanning all meta * regions. This wakes up any threads that were waiting for this to happen. */ private synchronized boolean metaRegionsScanned() { - if (!rootScanned || numberOfMetaRegions.get() != onlineMetaRegions.size()) { + if (!rootScanned || + numberOfMetaRegions.get() != onlineMetaRegions.size()) { + return false; } LOG.info("all meta regions scanned"); notifyAll(); return true; } - + /** * Other threads call this method to wait until all the meta regions have * been scanned. */ synchronized boolean waitForMetaRegionsOrClose() { while (!closed) { - if (rootScanned - && numberOfMetaRegions.get() == onlineMetaRegions.size()) { - + if (rootScanned && + numberOfMetaRegions.get() == onlineMetaRegions.size()) { + break; } @@ -660,39 +691,49 @@ public class HMaster implements HConstants, HMasterInterface, * We fill 'unassignedRecords' by scanning ROOT and META tables, learning the * set of all known valid regions. */ - SortedMap unassignedRegions; + Map unassignedRegions; /** * The 'assignAttempts' table maps from regions to a timestamp that indicates * the last time we *tried* to assign the region to a RegionServer. If the * timestamp is out of date, then we can try to reassign it. */ - SortedMap assignAttempts; + Map assignAttempts; /** * Regions that have been assigned, and the server has reported that it has * started serving it, but that we have not yet recorded in the meta table. */ - SortedSet pendingRegions; - + Set pendingRegions; + /** * The 'killList' is a list of regions that are going to be closed, but not * reopened. */ - SortedMap> killList; - + Map> killList; + /** 'killedRegions' contains regions that are in the process of being closed */ - SortedSet killedRegions; + Set killedRegions; /** * 'regionsToDelete' contains regions that need to be deleted, but cannot be * until the region server closes it */ - SortedSet regionsToDelete; - - /** The map of known server names to server info */ - SortedMap serversToServerInfo = - Collections.synchronizedSortedMap(new TreeMap()); + Set regionsToDelete; + + /** + * The map of known server names to server info + * + * Access to this map and loadToServers and serversToLoad must be synchronized + * on this object + */ + Map serversToServerInfo; + + /** SortedMap server load -> Set of server names */ + SortedMap> loadToServers; + + /** Map of server names -> server load */ + Map serversToLoad; /** Build the HMaster out of a raw configuration item. * @@ -722,7 +763,7 @@ public class HMaster implements HConstants, HMasterInterface, this.rand = new Random(); // Make sure the root directory exists! - + if(! fs.exists(dir)) { fs.mkdirs(dir); } @@ -730,99 +771,111 @@ public class HMaster implements HConstants, HMasterInterface, Path rootRegionDir = HRegion.getRegionDir(dir, HGlobals.rootRegionInfo.regionName); LOG.info("Root region dir: " + rootRegionDir.toString()); - if(! fs.exists(rootRegionDir)) { + + if (!fs.exists(rootRegionDir)) { LOG.info("bootstrap: creating ROOT and first META regions"); try { HRegion root = HRegion.createHRegion(0L, HGlobals.rootTableDesc, - this.dir, this.conf); + this.dir, this.conf); HRegion meta = HRegion.createHRegion(1L, HGlobals.metaTableDesc, this.dir, this.conf); + // Add first region from the META table to the ROOT region. + HRegion.addRegionToMETA(root, meta); root.close(); root.getLog().closeAndDelete(); meta.close(); meta.getLog().closeAndDelete(); + } catch (IOException e) { if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); } - LOG.error("", e); + LOG.error("bootstrap", e); } } this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.numRetries = conf.getInt("hbase.client.retries.number", 2); - this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); + this.maxRegionOpenTime = + conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); + this.msgQueue = new LinkedBlockingQueue(); + this.serverLeases = new Leases( - conf.getLong("hbase.master.lease.period", 30 * 1000), - conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000)); + conf.getLong("hbase.master.lease.period", 30 * 1000), + conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000)); + this.server = RPC.getServer(this, address.getBindAddress(), - address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), - false, conf); + address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), + false, conf); // The rpc-server port can be ephemeral... ensure we have the correct info + this.address = new HServerAddress(server.getListenerAddress()); conf.set(MASTER_ADDRESS, address.toString()); - + this.connection = HConnectionManager.getConnection(conf); - - this.metaRescanInterval - = conf.getLong("hbase.master.meta.thread.rescanfrequency", 60 * 1000); + + this.metaRescanInterval = + conf.getLong("hbase.master.meta.thread.rescanfrequency", 60 * 1000); // The root region - - this.rootRegionLocation = null; + + this.rootRegionLocation = new AtomicReference(); this.rootScanned = false; this.rootScanner = new RootScanner(); this.rootScannerThread = new Thread(rootScanner, "HMaster.rootScanner"); - + // Scans the meta table this.numberOfMetaRegions = new AtomicInteger(); this.metaRegionsToScan = new LinkedBlockingQueue(); - + this.onlineMetaRegions = Collections.synchronizedSortedMap(new TreeMap()); - + this.initialMetaScanComplete = false; - + this.metaScanner = new MetaScanner(); this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner"); this.unassignedRegions = - Collections.synchronizedSortedMap(new TreeMap()); - - this.unassignedRegions.put(HGlobals.rootRegionInfo.regionName, HGlobals.rootRegionInfo); - + Collections.synchronizedMap(new HashMap()); + + this.unassignedRegions.put(HGlobals.rootRegionInfo.regionName, + HGlobals.rootRegionInfo); + this.assignAttempts = - Collections.synchronizedSortedMap(new TreeMap()); - - this.pendingRegions = - Collections.synchronizedSortedSet(new TreeSet()); - + Collections.synchronizedMap(new HashMap()); + this.assignAttempts.put(HGlobals.rootRegionInfo.regionName, - Long.valueOf(0L)); + Long.valueOf(0L)); + + this.pendingRegions = + Collections.synchronizedSet(new HashSet()); this.killList = - Collections.synchronizedSortedMap( - new TreeMap>()); - + Collections.synchronizedMap( + new HashMap>()); + this.killedRegions = - Collections.synchronizedSortedSet(new TreeSet()); - + Collections.synchronizedSet(new HashSet()); + this.regionsToDelete = - Collections.synchronizedSortedSet(new TreeSet()); - + Collections.synchronizedSet(new HashSet()); + + this.serversToServerInfo = new HashMap(); + this.loadToServers = new TreeMap>(); + this.serversToLoad = new HashMap(); + // We're almost open for business this.closed = false; LOG.info("HMaster initialized on " + this.address.toString()); } - - /** - * @return HServerAddress of the master server - */ + + /** @return HServerAddress of the master server */ public HServerAddress getMasterAddress() { return address; } @@ -837,24 +890,33 @@ public class HMaster implements HConstants, HMasterInterface, // Start the server last so everything else is running before we start // receiving requests + this.server.start(); + } catch (IOException e) { if (e instanceof RemoteException) { try { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } catch (IOException ex) { - LOG.warn("", ex); + LOG.warn("thread start", ex); } } + // Something happened during startup. Shut things down. + this.closed = true; LOG.error("Failed startup", e); } - // Main processing loop + /* + * Main processing loop + */ + for (PendingOperation op = null; !closed; ) { try { op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { // continue } @@ -865,15 +927,18 @@ public class HMaster implements HConstants, HMasterInterface, if (LOG.isDebugEnabled()) { LOG.debug("Main processing loop: " + op.toString()); } + if (!op.process()) { // Operation would have blocked because not all meta regions are // online. This could cause a deadlock, because this thread is waiting // for the missing meta region(s) to come back online, but since it // is waiting, it cannot process the meta region online operation it // is waiting for. So put this operation back on the queue for now. + if (msgQueue.size() == 0) { // The queue is currently empty so wait for a while to see if what // we need comes in first + try { Thread.sleep(threadWakeFrequency); } catch (InterruptedException e) { @@ -886,16 +951,18 @@ public class HMaster implements HConstants, HMasterInterface, } msgQueue.put(op); } catch (InterruptedException e) { - throw new RuntimeException("Putting into msgQueue was " + - "interrupted.", e); + throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } + } catch (Exception ex) { if (ex instanceof RemoteException) { try { - ex = RemoteExceptionHandler.decodeRemoteException((RemoteException) ex); + ex = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) ex); + } catch (IOException e) { - LOG.warn("", e); + LOG.warn("main processing loop: " + op.toString(), e); } } LOG.warn("Processing pending operations: " + op.toString(), ex); @@ -907,53 +974,44 @@ public class HMaster implements HConstants, HMasterInterface, } } letRegionServersShutdown(); - + /* * Clean up and close up shop */ - // Wake other threads so they notice the close - synchronized(rootScannerLock) { - rootScannerThread.interrupt(); + rootScannerThread.interrupt(); // Wake root scanner } synchronized(metaScannerLock) { - metaScannerThread.interrupt(); + metaScannerThread.interrupt(); // Wake meta scanner } - server.stop(); // Stop server - serverLeases.close(); // Turn off the lease monitor - + server.stop(); // Stop server + serverLeases.close(); // Turn off the lease monitor + // Join up with all threads - + try { - // Wait for the root scanner to finish. - rootScannerThread.join(); + rootScannerThread.join(); // Wait for the root scanner to finish. } catch (Exception iex) { - // Print if ever there is an interrupt (Just for kicks. Remove if it - // ever happens). LOG.warn("root scanner", iex); } try { - // Join the thread till it finishes. - metaScannerThread.join(); + metaScannerThread.join(); // Wait for meta scanner to finish. } catch(Exception iex) { - // Print if ever there is an interrupt (Just for kicks. Remove if it - // ever happens). LOG.warn("meta scanner", iex); } try { - // Join until its finished. TODO: Maybe do in parallel in its own thread - // as is done in TaskTracker if its taking a long time to go down. - server.join(); + // TODO: Maybe do in parallel in its own thread as is done in TaskTracker + // if its taking a long time to go down. + + server.join(); // Wait for server to finish. } catch(InterruptedException iex) { - // Print if ever there is an interrupt (Just for kicks. Remove if it - // ever happens). LOG.warn("server", iex); } - + LOG.info("HMaster main thread exiting"); } - + /* * Wait on regionservers to report in * with {@link #regionServerReport(HServerInfo, HMsg[])} so they get notice @@ -962,36 +1020,48 @@ public class HMaster implements HConstants, HMasterInterface, * by remote region servers have expired. */ private void letRegionServersShutdown() { - while (this.serversToServerInfo.size() > 0) { - LOG.info("Waiting on following regionserver(s) to go down (or " + - "region server lease expiration, whichever happens first): " + - this.serversToServerInfo.values()); - try { - Thread.sleep(threadWakeFrequency); - } catch (InterruptedException e) { - // continue + synchronized (serversToServerInfo) { + while (this.serversToServerInfo.size() > 0) { + LOG.info("Waiting on following regionserver(s) to go down (or " + + "region server lease expiration, whichever happens first): " + + this.serversToServerInfo.values()); + try { + serversToServerInfo.wait(threadWakeFrequency); + } catch (InterruptedException e) { + // continue + } } } } - - ////////////////////////////////////////////////////////////////////////////// - // HMasterRegionInterface - ////////////////////////////////////////////////////////////////////////////// - - /** - * {@inheritDoc} + + /* + * HMasterRegionInterface */ + + /** {@inheritDoc} */ @SuppressWarnings("unused") - public void regionServerStartup(HServerInfo serverInfo) - throws IOException { + public void regionServerStartup(HServerInfo serverInfo) throws IOException { String s = serverInfo.getServerAddress().toString().trim(); HServerInfo storedInfo = null; LOG.info("received start message from: " + s); - + // If we get the startup message but there's an old server by that // name, then we can timeout the old one right away and register // the new one. - storedInfo = serversToServerInfo.remove(s); + + synchronized (serversToServerInfo) { + storedInfo = serversToServerInfo.remove(s); + HServerLoad load = serversToLoad.remove(s); + + if (load != null) { + Set servers = loadToServers.get(load); + if (servers != null) { + servers.remove(s); + loadToServers.put(load, servers); + } + } + serversToServerInfo.notifyAll(); + } if (storedInfo != null && !closed) { try { msgQueue.put(new PendingServerShutdown(storedInfo)); @@ -1001,36 +1071,49 @@ public class HMaster implements HConstants, HMasterInterface, } // Either way, record the new server - serversToServerInfo.put(s, serverInfo); - if(!closed) { - long serverLabel = getServerLabel(s); - if (LOG.isTraceEnabled()) { - LOG.trace("Created lease for " + serverLabel); + + synchronized (serversToServerInfo) { + HServerLoad load = new HServerLoad(); + serverInfo.setLoad(load); + serversToServerInfo.put(s, serverInfo); + serversToLoad.put(s, load); + Set servers = loadToServers.get(load); + if (servers == null) { + servers = new HashSet(); } + servers.add(s); + loadToServers.put(load, servers); + } + + if (!closed) { + long serverLabel = getServerLabel(s); serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s)); } } - + private long getServerLabel(final String s) { return s.hashCode(); } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException { - String s = serverInfo.getServerAddress().toString().trim(); - long serverLabel = getServerLabel(s); + String serverName = serverInfo.getServerAddress().toString().trim(); + long serverLabel = getServerLabel(serverName); + if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { - + // HRegionServer is shutting down. Cancel the server's lease. - if (cancelLease(s, serverLabel)) { + // Note that cancelling the server's lease takes care of updating + // serversToServerInfo, etc. + + if (cancelLease(serverName, serverLabel)) { // Only process the exit message if the server still has a lease. // Otherwise we could end up processing the server exit twice. - LOG.info("Region server " + s + ": MSG_REPORT_EXITING"); - + + LOG.info("Region server " + serverName + ": MSG_REPORT_EXITING"); + // Get all the regions the server was serving reassigned // (if we are not shutting down). @@ -1039,7 +1122,7 @@ public class HMaster implements HConstants, HMasterInterface, HRegionInfo info = msgs[i].getRegionInfo(); if (info.tableDesc.getName().equals(ROOT_TABLE_NAME)) { - rootRegionLocation = null; + rootRegionLocation.set(null); } else if (info.tableDesc.getName().equals(META_TABLE_NAME)) { onlineMetaRegions.remove(info.getStartKey()); @@ -1050,28 +1133,32 @@ public class HMaster implements HConstants, HMasterInterface, } } } - + // We don't need to return anything to the server because it isn't // going to do any more work. return new HMsg[0]; } - + if (closed) { // Tell server to shut down if we are shutting down. This should // happen after check of MSG_REPORT_EXITING above, since region server // will send us one of these messages after it gets MSG_REGIONSERVER_STOP + return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; } - HServerInfo storedInfo = serversToServerInfo.get(s); + HServerInfo storedInfo; + synchronized (serversToServerInfo) { + storedInfo = serversToServerInfo.get(serverName); + } if(storedInfo == null) { if(LOG.isDebugEnabled()) { - LOG.debug("received server report from unknown server: " + s); + LOG.debug("received server report from unknown server: " + serverName); } // The HBaseMaster may have been restarted. // Tell the RegionServer to start over and call regionServerStartup() - + return new HMsg[]{new HMsg(HMsg.MSG_CALL_SERVER_STARTUP)}; } else if (storedInfo.getStartCode() != serverInfo.getStartCode()) { @@ -1086,60 +1173,111 @@ public class HMaster implements HConstants, HMasterInterface, // The answer is to ask A to shut down for good. if (LOG.isDebugEnabled()) { - LOG.debug("region server race condition detected: " + s); + LOG.debug("region server race condition detected: " + serverName); } + cancelLease(serverName, serverLabel); return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; - + } else { // All's well. Renew the server's lease. // This will always succeed; otherwise, the fetch of serversToServerInfo // would have failed above. - + serverLeases.renewLease(serverLabel, serverLabel); - // Refresh the info object - serversToServerInfo.put(s, serverInfo); + // Refresh the info object and the load information + + synchronized (serversToServerInfo) { + serversToServerInfo.put(serverName, serverInfo); + + HServerLoad load = serversToLoad.get(serverName); + if (load != null && !load.equals(serverInfo.getLoad())) { + // We have previous information about the load on this server + // and the load on this server has changed + + Set servers = loadToServers.get(load); + + // Note that servers should never be null because loadToServers + // and serversToLoad are manipulated in pairs + + servers.remove(serverName); + loadToServers.put(load, servers); + } + + // Set the current load information + + load = serverInfo.getLoad(); + serversToLoad.put(serverName, load); + Set servers = loadToServers.get(load); + if (servers == null) { + servers = new HashSet(); + } + servers.add(serverName); + loadToServers.put(load, servers); + } // Next, process messages for this server return processMsgs(serverInfo, msgs); } } - /** cancel a server's lease */ + /** Cancel a server's lease and update its load information */ private boolean cancelLease(final String serverName, final long serverLabel) { boolean leaseCancelled = false; - if (serversToServerInfo.remove(serverName) != null) { - // Only cancel lease once. - // This method can be called a couple of times during shutdown. - LOG.info("Cancelling lease for " + serverName); - serverLeases.cancelLease(serverLabel, serverLabel); - leaseCancelled = true; + synchronized (serversToServerInfo) { + HServerInfo info = serversToServerInfo.remove(serverName); + if (info != null) { + // Only cancel lease and update load information once. + // This method can be called a couple of times during shutdown. + + LOG.info("Cancelling lease for " + serverName); + serverLeases.cancelLease(serverLabel, serverLabel); + leaseCancelled = true; + + // update load information + + HServerLoad load = serversToLoad.remove(serverName); + if (load != null) { + Set servers = loadToServers.get(load); + if (servers != null) { + servers.remove(serverName); + loadToServers.put(load, servers); + } + } + } + serversToServerInfo.notifyAll(); } return leaseCancelled; } - - /** Process all the incoming messages from a server that's contacted us. */ - private HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[]) throws IOException { + + /** + * Process all the incoming messages from a server that's contacted us. + * + * Note that we never need to update the server's load information because + * that has already been done in regionServerReport. + */ + private HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[]) + throws IOException { + ArrayList returnMsgs = new ArrayList(); - - TreeMap regionsToKill = - killList.remove(info.getServerAddress().toString()); - + String serverName = info.getServerAddress().toString(); + HashMap regionsToKill = killList.remove(serverName); + // Get reports on what the RegionServer did. - - for(int i = 0; i < incomingMsgs.length; i++) { + + for (int i = 0; i < incomingMsgs.length; i++) { HRegionInfo region = incomingMsgs[i].getRegionInfo(); - switch(incomingMsgs[i].getMsg()) { + switch (incomingMsgs[i].getMsg()) { case HMsg.MSG_REPORT_OPEN: HRegionInfo regionInfo = unassignedRegions.get(region.regionName); - if(regionInfo == null) { + if (regionInfo == null) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("region server " + info.getServerAddress().toString() + " should not have opened region " + region.regionName); } @@ -1156,19 +1294,29 @@ public class HMaster implements HConstants, HMasterInterface, region.regionName); // Remove from unassigned list so we don't assign it to someone else + unassignedRegions.remove(region.regionName); assignAttempts.remove(region.regionName); - if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { + + if (region.regionName.compareTo( + HGlobals.rootRegionInfo.regionName) == 0) { + // Store the Root Region location (in memory) - rootRegionLocation = new HServerAddress(info.getServerAddress()); + + synchronized (rootRegionLocation) { + rootRegionLocation.set(new HServerAddress(info.getServerAddress())); + rootRegionLocation.notifyAll(); + } break; } // Note that the table has been assigned and is waiting for the meta // table to be updated. + pendingRegions.add(region.regionName); - + // Queue up an update to note the region location. + try { msgQueue.put(new PendingOpenReport(info, region)); } catch (InterruptedException e) { @@ -1181,8 +1329,12 @@ public class HMaster implements HConstants, HMasterInterface, LOG.info(info.getServerAddress().toString() + " no longer serving " + region.regionName); - if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { // Root region - rootRegionLocation = null; + if (region.regionName.compareTo( + HGlobals.rootRegionInfo.regionName) == 0) { + + // Root region + + rootRegionLocation.set(null); unassignedRegions.put(region.regionName, region); assignAttempts.put(region.regionName, Long.valueOf(0L)); @@ -1190,45 +1342,53 @@ public class HMaster implements HConstants, HMasterInterface, boolean reassignRegion = true; boolean deleteRegion = false; - if(killedRegions.remove(region.regionName)) { + if (killedRegions.remove(region.regionName)) { reassignRegion = false; } - - if(regionsToDelete.remove(region.regionName)) { + + if (regionsToDelete.remove(region.regionName)) { reassignRegion = false; deleteRegion = true; } - unassignedRegions.remove(region.regionName); - assignAttempts.remove(region.regionName); - - try { - msgQueue.put(new PendingCloseReport(region, reassignRegion, deleteRegion)); - } catch (InterruptedException e) { - throw new RuntimeException("Putting into msgQueue was interrupted.", e); - } // NOTE: we cannot put the region into unassignedRegions as that // could create a race with the pending close if it gets // reassigned before the close is processed. + unassignedRegions.remove(region.regionName); + assignAttempts.remove(region.regionName); + + try { + msgQueue.put(new PendingCloseReport(region, reassignRegion, + deleteRegion)); + + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); + } } break; case HMsg.MSG_REPORT_SPLIT: // A region has split. + HRegionInfo newRegionA = incomingMsgs[++i].getRegionInfo(); HRegionInfo newRegionB = incomingMsgs[++i].getRegionInfo(); + LOG.info("region " + region.regionName + " split. New regions are: " - + newRegionA.regionName + ", " + newRegionB.regionName); - if(region.tableDesc.getName().equals(META_TABLE_NAME)) { + + newRegionA.regionName + ", " + newRegionB.regionName); + + if (region.tableDesc.getName().equals(META_TABLE_NAME)) { // A meta region has split. + onlineMetaRegions.remove(region.getStartKey()); onlineMetaRegions.put(newRegionA.getStartKey(), - new MetaRegion(info.getServerAddress(), newRegionA.getRegionName(), - newRegionA.getStartKey())); + new MetaRegion(info.getServerAddress(), + newRegionA.getRegionName(), newRegionA.getStartKey())); + onlineMetaRegions.put(newRegionB.getStartKey(), - new MetaRegion(info.getServerAddress(), newRegionB.getRegionName(), - newRegionB.getStartKey())); + new MetaRegion(info.getServerAddress(), + newRegionB.getRegionName(), newRegionB.getStartKey())); + numberOfMetaRegions.incrementAndGet(); } break; @@ -1241,56 +1401,183 @@ public class HMaster implements HConstants, HMasterInterface, } // Process the kill list - if(regionsToKill != null) { - for(HRegionInfo i: regionsToKill.values()) { + + if (regionsToKill != null) { + for (HRegionInfo i: regionsToKill.values()) { returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, i)); killedRegions.add(i.regionName); } } // Figure out what the RegionServer ought to do, and write back. - if(unassignedRegions.size() > 0) { - // Open new regions as necessary - int targetForServer = (int) Math.ceil(unassignedRegions.size() - / (1.0 * serversToServerInfo.size())); - int counter = 0; - long now = System.currentTimeMillis(); - for (Text curRegionName: unassignedRegions.keySet()) { - HRegionInfo regionInfo = unassignedRegions.get(curRegionName); - long assignedTime = assignAttempts.get(curRegionName); - if (now - assignedTime > maxRegionOpenTime) { - LOG.info("assigning region " + regionInfo.regionName + " to server " + - info.getServerAddress().toString()); + assignRegions(info, serverName, returnMsgs); + return returnMsgs.toArray(new HMsg[returnMsgs.size()]); + } + /** + * Assigns regions to region servers attempting to balance the load across + * all region servers + * + * @param info + * @param serverName + * @param returnMsgs + */ + private void assignRegions(HServerInfo info, String serverName, + ArrayList returnMsgs) { + + long now = System.currentTimeMillis(); + TreeSet regionsToAssign = new TreeSet(); + for (Map.Entry e: assignAttempts.entrySet()) { + if (now - e.getValue() > maxRegionOpenTime) { + regionsToAssign.add(e.getKey()); + } + } + int nRegionsToAssign = regionsToAssign.size(); + + if (nRegionsToAssign > 0) { + if (serversToServerInfo.size() == 1) { + // Only one server. An unlikely case but still possible. + // Assign all unassigned regions to it. + + for (Text regionName: regionsToAssign) { + HRegionInfo regionInfo = unassignedRegions.get(regionName); + LOG.info("assigning region " + regionName + " to server " + + serverName); + + assignAttempts.put(regionName, Long.valueOf(now)); returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); - - assignAttempts.put(curRegionName, Long.valueOf(now)); - counter++; } - if(counter >= targetForServer) { - break; + } else { + // Multiple servers in play. + // We need to allocate regions only to most lightly loaded servers. + + HServerLoad thisServersLoad = info.getLoad(); + + synchronized (serversToServerInfo) { + SortedMap> lightServers = + loadToServers.headMap(thisServersLoad); + + // How many regions we can assign to more lightly loaded servers? + + int nregions = 0; + for (Map.Entry> e: lightServers.entrySet()) { + HServerLoad lightLoad = + new HServerLoad(e.getKey().getNumberOfRequests(), + e.getKey().getNumberOfRegions()); + + do { + lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1); + nregions += 1; + + } while (lightLoad.compareTo(thisServersLoad) <= 0 && + nregions < nRegionsToAssign); + + nregions *= e.getValue().size(); + + if (nregions >= nRegionsToAssign) { + break; + } + } + + nRegionsToAssign -= nregions; + if (nRegionsToAssign > 0) { + // We still have more regions to assign. See how many we can assign + // before this server becomes more heavily loaded than the next + // most heavily loaded server. + + SortedMap> heavyServers = + loadToServers.tailMap(thisServersLoad); + int nservers = 0; + HServerLoad heavierLoad = null; + for (Map.Entry> e: + heavyServers.entrySet()) { + + Set servers = e.getValue(); + nservers += servers.size(); + + if (e.getKey().compareTo(thisServersLoad) == 0) { + // This is the load factor of the server we are considering + + nservers -= 1; + continue; + } + + // If we get here, we are at the first load entry that is a + // heavier load than the server we are considering + + heavierLoad = e.getKey(); + break; + } + + nregions = 0; + if (heavierLoad != null) { + // There is a more heavily loaded server + + for (HServerLoad load = + new HServerLoad(thisServersLoad.getNumberOfRequests(), + thisServersLoad.getNumberOfRegions()); + load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign; + load.setNumberOfRegions(load.getNumberOfRegions() + 1), + nregions++) { + } + } + + if (nregions < nRegionsToAssign) { + // There are some more heavily loaded servers + // but we can't assign all the regions to this server. + + if (nservers > 0) { + // There are other servers that can share the load. + // Split regions that need assignment across the servers. + + nregions = + (int) Math.ceil((1.0 * nRegionsToAssign) / (1.0 * nservers)); + + } else { + // No other servers with same load. + // Split regions over all available servers + + nregions = (int) Math.ceil((1.0 * nRegionsToAssign) + / (1.0 * serversToServerInfo.size())); + + } + + } else { + // Assign all regions to this server + + nregions = nRegionsToAssign; + } + + for (Map.Entry e: unassignedRegions.entrySet()) { + Text regionName = e.getKey(); + HRegionInfo regionInfo = e.getValue(); + LOG.info("assigning region " + regionName + " to server " + + serverName); + + assignAttempts.put(regionName, Long.valueOf(now)); + returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); + + if (--nregions <= 0) { + break; + } + } + } } } } - return returnMsgs.toArray(new HMsg[returnMsgs.size()]); } - - ////////////////////////////////////////////////////////////////////////////// - // Some internal classes to manage msg-passing and client operations - ////////////////////////////////////////////////////////////////////////////// - - private abstract class PendingOperation { - protected final Text[] columns = { - COLUMN_FAMILY - }; - protected final Text startRow = new Text(); + /* + * Some internal classes to manage msg-passing and client operations + */ + + private abstract class PendingOperation { PendingOperation() { super(); } - + abstract boolean process() throws IOException; } @@ -1302,17 +1589,16 @@ public class HMaster implements HConstants, HMasterInterface, private class PendingServerShutdown extends PendingOperation { private HServerAddress deadServer; private String deadServerName; - private long oldStartCode; private transient boolean logSplit; private transient boolean rootChecked; private transient boolean rootRescanned; - + private class ToDoEntry { boolean deleteRegion; boolean regionOffline; Text row; HRegionInfo info; - + ToDoEntry(Text row, HRegionInfo info) { this.deleteRegion = false; this.regionOffline = false; @@ -1320,122 +1606,146 @@ public class HMaster implements HConstants, HMasterInterface, this.info = info; } } - + PendingServerShutdown(HServerInfo serverInfo) { super(); this.deadServer = serverInfo.getServerAddress(); this.deadServerName = this.deadServer.toString(); - this.oldStartCode = serverInfo.getStartCode(); this.logSplit = false; this.rootChecked = false; this.rootRescanned = false; } - + + /** {@inheritDoc} */ @Override public String toString() { return "PendingServerShutdown of " + this.deadServer.toString(); } - + /** Finds regions that the dead region server was serving */ private void scanMetaRegion(HRegionInterface server, long scannerId, - Text regionName) - throws IOException { + Text regionName) throws IOException { + ArrayList toDoList = new ArrayList(); TreeMap regions = new TreeMap(); - DataInputBuffer inbuf = new DataInputBuffer(); + try { - while(true) { + while (true) { KeyedData[] values = null; + try { values = server.next(scannerId); + } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); + } LOG.error("Shutdown scanning of meta region", e); break; } - - if(values == null || values.length == 0) { + + if (values == null || values.length == 0) { break; } TreeMap results = new TreeMap(); Text row = null; - for(int i = 0; i < values.length; i++) { + for (int i = 0; i < values.length; i++) { if(row == null) { row = values[i].getKey().getRow(); + } else { - if(!row.equals(values[i].getKey().getRow())) { + if (!row.equals(values[i].getKey().getRow())) { LOG.error("Multiple rows in same scanner result set. firstRow=" + row + ", currentRow=" + values[i].getKey().getRow()); } } results.put(values[i].getKey().getColumn(), values[i].getData()); } - if (LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled() && row != null) { LOG.debug("shutdown scanner looking at " + row.toString()); } // Check server name. If null, be conservative and treat as though // region had been on shutdown server (could be null because we // missed edits in hlog because hdfs does not do write-append). + String serverName = null; try { serverName = Keying.bytesToString(results.get(COL_SERVER)); + } catch(UnsupportedEncodingException e) { LOG.error("Server name", e); break; } if (serverName != null && serverName.length() > 0 && deadServerName.compareTo(serverName) != 0) { + // This isn't the server you're looking for - move along + if (LOG.isDebugEnabled()) { LOG.debug("Server name " + serverName + " is not same as " + - deadServerName + ": Passing"); + deadServerName + ": Passing"); } continue; } // Bingo! Found it. + HRegionInfo info = null; try { - info = (HRegionInfo)Writables. - getWritable(results.get(COL_REGIONINFO), new HRegionInfo()); + info = (HRegionInfo) Writables.getWritable( + results.get(COL_REGIONINFO), new HRegionInfo()); + } catch (IOException e) { LOG.error("Read fields", e); break; } LOG.info(info.getRegionName() + " was on shutdown server <" + - serverName + "> (or server is null). Marking unassigned if " + - "meta and clearing pendingRegions"); + serverName + "> (or server is null). Marking unassigned if " + + "meta and clearing pendingRegions"); + if (info.tableDesc.getName().equals(META_TABLE_NAME)) { onlineMetaRegions.remove(info.getStartKey()); } - + ToDoEntry todo = new ToDoEntry(row, info); toDoList.add(todo); - if(killList.containsKey(deadServerName)) { - TreeMap regionsToKill = killList.get(deadServerName); - if(regionsToKill.containsKey(info.regionName)) { + + if (killList.containsKey(deadServerName)) { + HashMap regionsToKill = + killList.get(deadServerName); + + if (regionsToKill.containsKey(info.regionName)) { regionsToKill.remove(info.regionName); killList.put(deadServerName, regionsToKill); unassignedRegions.remove(info.regionName); assignAttempts.remove(info.regionName); - if(regionsToDelete.contains(info.regionName)) { + + if (regionsToDelete.contains(info.regionName)) { // Delete this region + regionsToDelete.remove(info.regionName); todo.deleteRegion = true; + } else { // Mark region offline + todo.regionOffline = true; } } + } else { // Get region reassigned + regions.put(info.regionName, info); - // If was pending, remove otherwise will obstruct its getting - // reassigned. + + // If it was pending, remove. + // Otherwise will obstruct its getting reassigned. + pendingRegions.remove(info.getRegionName()); } } @@ -1444,10 +1754,11 @@ public class HMaster implements HConstants, HMasterInterface, if(scannerId != -1L) { try { server.close(scannerId); - + } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("Closing scanner", e); } @@ -1455,27 +1766,30 @@ public class HMaster implements HConstants, HMasterInterface, } // Remove server from root/meta entries + long clientId = rand.nextLong(); for (ToDoEntry e: toDoList) { long lockid = server.startUpdate(regionName, clientId, e.row); - if(e.deleteRegion) { + + if (e.deleteRegion) { server.delete(regionName, clientId, lockid, COL_REGIONINFO); - } else if(e.regionOffline) { + + } else if (e.regionOffline) { e.info.offLine = true; ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); DataOutputStream s = new DataOutputStream(byteValue); e.info.write(s); server.put(regionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); + byteValue.toByteArray()); } server.delete(regionName, clientId, lockid, COL_SERVER); server.delete(regionName, clientId, lockid, COL_STARTCODE); server.commit(regionName, clientId, lockid, System.currentTimeMillis()); } - + // Get regions reassigned - for(Map.Entry e: regions.entrySet()) { + for (Map.Entry e: regions.entrySet()) { Text region = e.getKey(); HRegionInfo regionInfo = e.getValue(); @@ -1487,64 +1801,72 @@ public class HMaster implements HConstants, HMasterInterface, @Override boolean process() throws IOException { LOG.info("process shutdown of server " + deadServer + ": logSplit: " + - this.logSplit + ", rootChecked: " + this.rootChecked + - ", rootRescanned: " + this.rootRescanned + ", numberOfMetaRegions: " + - numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " + - onlineMetaRegions.size()); + this.logSplit + ", rootChecked: " + this.rootChecked + + ", rootRescanned: " + this.rootRescanned + ", numberOfMetaRegions: " + + numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " + + onlineMetaRegions.size()); - if(!logSplit) { + if (!logSplit) { // Process the old log file + HLog.splitLog(dir, new Path(dir, "log" + "_" + - deadServer.getBindAddress() + "_" + deadServer.getPort()), fs, conf); + deadServer.getBindAddress() + "_" + deadServer.getPort()), fs, conf); + logSplit = true; } - if(!rootChecked) { - if(rootRegionLocation != null - && deadServer.equals(rootRegionLocation)) { - rootRegionLocation = null; + if (!rootChecked) { + if (rootRegionLocation.get() != null && + deadServer.equals(rootRegionLocation.get())) { + + rootRegionLocation.set(null); unassignedRegions.put(HGlobals.rootRegionInfo.regionName, HGlobals.rootRegionInfo); + assignAttempts.put(HGlobals.rootRegionInfo.regionName, Long.valueOf(0L)); } rootChecked = true; } - if(!rootRescanned) { + if (!rootRescanned) { // Scan the ROOT region + HRegionInterface server = null; long scannerId = -1L; - for(int tries = 0; tries < numRetries; tries ++) { + for (int tries = 0; tries < numRetries; tries ++) { if (closed) { return true; } - if (rootRegionLocation == null || !rootScanned) { - // We can't proceed until the root region is online and has been - // scanned + if (rootRegionLocation.get() == null || !rootScanned) { + // We can't proceed until the root region is online and has been scanned + if (LOG.isDebugEnabled()) { LOG.debug("process server shutdown scanning root region " + - "cancelled because rootRegionLocation is null"); + "cancelled because rootRegionLocation is null"); } return false; } - server = connection.getHRegionConnection(rootRegionLocation); + server = connection.getHRegionConnection(rootRegionLocation.get()); scannerId = -1L; try { if (LOG.isDebugEnabled()) { LOG.debug("process server shutdown scanning root region on " + - rootRegionLocation.getBindAddress()); + rootRegionLocation.get().getBindAddress()); } scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName, - columns, startRow, System.currentTimeMillis(), null); - scanMetaRegion(server, scannerId, - HGlobals.rootRegionInfo.regionName); + COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, + System.currentTimeMillis(), null); + + scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName); break; + } catch (IOException e) { if (tries == numRetries - 1) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } throw e; } @@ -1552,8 +1874,8 @@ public class HMaster implements HConstants, HMasterInterface, } if (LOG.isDebugEnabled()) { LOG.debug("process server shutdown scanning root region on " + - rootRegionLocation.getBindAddress() + " finished " + - Thread.currentThread().getName()); + rootRegionLocation.get().getBindAddress() + " finished " + + Thread.currentThread().getName()); } rootRescanned = true; } @@ -1569,37 +1891,45 @@ public class HMaster implements HConstants, HMasterInterface, // We can't block either because that would prevent the meta region // online message from being processed. So return false to have this // operation requeued. + if (LOG.isDebugEnabled()) { LOG.debug("Requeuing shutdown because rootScanned: " + - rootScanned + ", numberOfMetaRegions: " + - numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " + - onlineMetaRegions.size()); + rootScanned + ", numberOfMetaRegions: " + + numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " + + onlineMetaRegions.size()); } return false; } for (MetaRegion r: onlineMetaRegions.values()) { + HRegionInterface server = null; long scannerId = -1L; + if (LOG.isDebugEnabled()) { LOG.debug("process server shutdown scanning " + r.regionName + - " on " + r.server + " " + Thread.currentThread().getName()); + " on " + r.server + " " + Thread.currentThread().getName()); } server = connection.getHRegionConnection(r.server); - scannerId = server.openScanner(r.regionName, columns, startRow, - System.currentTimeMillis(), null); + + scannerId = server.openScanner(r.regionName, COLUMN_FAMILY_ARRAY, + EMPTY_START_ROW, System.currentTimeMillis(), null); + scanMetaRegion(server, scannerId, r.regionName); + if (LOG.isDebugEnabled()) { LOG.debug("process server shutdown finished scanning " + - r.regionName + - " on " + r.server + " " + Thread.currentThread().getName()); + r.regionName + " on " + r.server + " " + + Thread.currentThread().getName()); } } break; + } catch (IOException e) { if (tries == numRetries - 1) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } throw e; } @@ -1608,7 +1938,7 @@ public class HMaster implements HConstants, HMasterInterface, return true; } } - + /** * PendingCloseReport is instantiated when a region server reports that it * has closed a region. @@ -1618,10 +1948,10 @@ public class HMaster implements HConstants, HMasterInterface, private boolean reassignRegion; private boolean deleteRegion; private boolean rootRegion; - + PendingCloseReport(HRegionInfo regionInfo, boolean reassignRegion, boolean deleteRegion) { - + super(); this.regionInfo = regionInfo; @@ -1630,55 +1960,57 @@ public class HMaster implements HConstants, HMasterInterface, // If the region closing down is a meta region then we need to update // the ROOT table - - if(this.regionInfo.tableDesc.getName().equals(META_TABLE_NAME)) { + + if (this.regionInfo.tableDesc.getName().equals(META_TABLE_NAME)) { this.rootRegion = true; - + } else { this.rootRegion = false; } } - + + /** {@inheritDoc} */ @Override public String toString() { return "PendingCloseReport of " + this.regionInfo.getRegionName(); } - + @Override boolean process() throws IOException { for (int tries = 0; tries < numRetries; tries++) { + if (closed) { + return true; + } LOG.info("region closed: " + regionInfo.regionName); // Mark the Region as unavailable in the appropriate meta table Text metaRegionName; HRegionInterface server; - - if (closed) { - return true; - } if (rootRegion) { - if (rootRegionLocation == null || !rootScanned) { + if (rootRegionLocation.get() == null || !rootScanned) { // We can't proceed until the root region is online and has been // scanned return false; } metaRegionName = HGlobals.rootRegionInfo.regionName; - server = connection.getHRegionConnection(rootRegionLocation); + server = connection.getHRegionConnection(rootRegionLocation.get()); onlineMetaRegions.remove(regionInfo.getStartKey()); } else { if (!rootScanned || numberOfMetaRegions.get() != onlineMetaRegions.size()) { + // We can't proceed because not all of the meta regions are online. // We can't block either because that would prevent the meta region // online message from being processed. So return false to have this // operation requeued. + if (LOG.isDebugEnabled()) { LOG.debug("Requeuing close because rootScanned=" + - rootScanned + ", numberOfMetaRegions=" + - numberOfMetaRegions.get() + ", onlineMetaRegions.size()=" + - onlineMetaRegions.size()); + rootScanned + ", numberOfMetaRegions=" + + numberOfMetaRegions.get() + ", onlineMetaRegions.size()=" + + onlineMetaRegions.size()); } return false; } @@ -1688,8 +2020,8 @@ public class HMaster implements HConstants, HMasterInterface, r = onlineMetaRegions.get(regionInfo.getRegionName()); } else { - r = onlineMetaRegions.get( - onlineMetaRegions.headMap(regionInfo.getRegionName()).lastKey()); + r = onlineMetaRegions.get(onlineMetaRegions.headMap( + regionInfo.getRegionName()).lastKey()); } metaRegionName = r.regionName; server = connection.getHRegionConnection(r.server); @@ -1699,30 +2031,31 @@ public class HMaster implements HConstants, HMasterInterface, try { long lockid = server.startUpdate(metaRegionName, clientId, regionInfo.regionName); - - if(deleteRegion) { + + if (deleteRegion) { server.delete(metaRegionName, clientId, lockid, COL_REGIONINFO); - - } else if(!reassignRegion ) { + + } else if (!reassignRegion ) { regionInfo.offLine = true; ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); DataOutputStream s = new DataOutputStream(byteValue); regionInfo.write(s); server.put(metaRegionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); + byteValue.toByteArray()); } server.delete(metaRegionName, clientId, lockid, COL_SERVER); server.delete(metaRegionName, clientId, lockid, COL_STARTCODE); server.commit(metaRegionName, clientId, lockid, System.currentTimeMillis()); - + break; } catch (IOException e) { if (tries == numRetries - 1) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } throw e; } @@ -1730,19 +2063,20 @@ public class HMaster implements HConstants, HMasterInterface, } } - if(reassignRegion) { + if (reassignRegion) { LOG.info("reassign region: " + regionInfo.regionName); - + unassignedRegions.put(regionInfo.regionName, regionInfo); assignAttempts.put(regionInfo.regionName, Long.valueOf(0L)); - - } else if(deleteRegion) { + + } else if (deleteRegion) { try { HRegion.deleteRegion(fs, dir, regionInfo.regionName); } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("failed delete region " + regionInfo.regionName, e); throw e; @@ -1762,14 +2096,17 @@ public class HMaster implements HConstants, HMasterInterface, private HRegionInfo region; private HServerAddress serverAddress; private byte [] startCode; - + PendingOpenReport(HServerInfo info, HRegionInfo region) { if (region.tableDesc.getName().equals(META_TABLE_NAME)) { // The region which just came on-line is a META region. // We need to look in the ROOT region for its information. + this.rootRegion = true; + } else { // Just an ordinary region. Look for it in the META table. + this.rootRegion = false; } this.region = region; @@ -1781,44 +2118,47 @@ public class HMaster implements HConstants, HMasterInterface, LOG.error("Start code", e); } } - + + /** {@inheritDoc} */ @Override public String toString() { return "PendingOpenOperation from " + serverAddress.toString(); } - + @Override boolean process() throws IOException { for (int tries = 0; tries < numRetries; tries++) { - LOG.info(region.getRegionName() + " open on " + - this.serverAddress.toString()); - // Register the newly-available Region's location. - Text metaRegionName; - HRegionInterface server; if (closed) { return true; } - + LOG.info(region.getRegionName() + " open on " + + this.serverAddress.toString()); + + // Register the newly-available Region's location. + + Text metaRegionName; + HRegionInterface server; if (rootRegion) { - if (rootRegionLocation == null || !rootScanned) { - // We can't proceed until the root region is online and has been - // scanned + if (rootRegionLocation.get() == null || !rootScanned) { + // We can't proceed until the root region is online and has been scanned if (LOG.isDebugEnabled()) { - LOG.debug("root region=" + rootRegionLocation.toString() + + LOG.debug("root region=" + rootRegionLocation.get().toString() + ", rootScanned=" + rootScanned); } return false; } metaRegionName = HGlobals.rootRegionInfo.regionName; - server = connection.getHRegionConnection(rootRegionLocation); + server = connection.getHRegionConnection(rootRegionLocation.get()); + } else { if (!rootScanned || - numberOfMetaRegions.get() != onlineMetaRegions.size()) { - + numberOfMetaRegions.get() != onlineMetaRegions.size()) { + // We can't proceed because not all of the meta regions are online. // We can't block either because that would prevent the meta region // online message from being processed. So return false to have this - // operation requeue + // operation requeued. + if (LOG.isDebugEnabled()) { LOG.debug("Requeuing open because rootScanned: " + rootScanned + ", numberOfMetaRegions: " + @@ -1831,49 +2171,62 @@ public class HMaster implements HConstants, HMasterInterface, MetaRegion r = null; if (onlineMetaRegions.containsKey(region.getRegionName())) { r = onlineMetaRegions.get(region.getRegionName()); + } else { - r = onlineMetaRegions.get( - onlineMetaRegions.headMap(region.getRegionName()).lastKey()); + r = onlineMetaRegions.get(onlineMetaRegions.headMap( + region.getRegionName()).lastKey()); } metaRegionName = r.regionName; server = connection.getHRegionConnection(r.server); } LOG.info("updating row " + region.getRegionName() + " in table " + metaRegionName); + long clientId = rand.nextLong(); try { long lockid = server.startUpdate(metaRegionName, clientId, region.getRegionName()); + server.put(metaRegionName, clientId, lockid, COL_SERVER, serverAddress.toString().getBytes(UTF8_ENCODING)); + server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode); + server.commit(metaRegionName, clientId, lockid, System.currentTimeMillis()); - + if (region.tableDesc.getName().equals(META_TABLE_NAME)) { // It's a meta region. + MetaRegion m = new MetaRegion(serverAddress, region.regionName, region.startKey); + if (!initialMetaScanComplete) { // Put it on the queue to be scanned for the first time. + try { metaRegionsToScan.put(m); + } catch (InterruptedException e) { throw new RuntimeException( "Putting into metaRegionsToScan was interrupted.", e); } } else { // Add it to the online meta regions + onlineMetaRegions.put(region.startKey, m); } } // If updated successfully, remove from pending list. + pendingRegions.remove(region.getRegionName()); break; + } catch (IOException e) { if (tries == numRetries - 1) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } throw e; } @@ -1883,20 +2236,16 @@ public class HMaster implements HConstants, HMasterInterface, } } - ////////////////////////////////////////////////////////////////////////////// - // HMasterInterface - ////////////////////////////////////////////////////////////////////////////// - - /** - * {@inheritDoc} + /* + * HMasterInterface */ + + /** {@inheritDoc} */ public boolean isMasterRunning() { return !closed; } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public void shutdown() { TimerTask tt = new TimerTask() { @Override @@ -1912,11 +2261,10 @@ public class HMaster implements HConstants, HMasterInterface, t.schedule(tt, 10); } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public void createTable(HTableDescriptor desc) throws IOException { + if (!isMasterRunning()) { throw new MasterNotRunningException(); } @@ -1926,28 +2274,29 @@ public class HMaster implements HConstants, HMasterInterface, try { // We can not access meta regions if they have not already been // assigned and scanned. If we timeout waiting, just shutdown. + if (metaScanner.waitForMetaRegionsOrClose()) { break; } createTable(newRegion); LOG.info("created table " + desc.getName()); break; + } catch (IOException e) { if (tries == numRetries - 1) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } throw e; } } } } - - /* - * Set of tables currently in creation. Access needs to be synchronized. - */ + + /* Set of tables currently in creation. Access needs to be synchronized. */ private Set tableInCreation = new HashSet(); - + private void createTable(final HRegionInfo newRegion) throws IOException { Text tableName = newRegion.tableDesc.getName(); synchronized (tableInCreation) { @@ -1962,36 +2311,43 @@ public class HMaster implements HConstants, HMasterInterface, // table would sit should it exist. Open scanner on it. If a region // for the table we want to create already exists, then table already // created. Throw already-exists exception. - MetaRegion m = (onlineMetaRegions.containsKey(newRegion.regionName)) ? + + MetaRegion m = (onlineMetaRegions.containsKey(newRegion.regionName) ? onlineMetaRegions.get(newRegion.regionName) : - onlineMetaRegions.get(onlineMetaRegions. - headMap(newRegion.getTableDesc().getName()).lastKey()); + onlineMetaRegions.get(onlineMetaRegions.headMap( + newRegion.getTableDesc().getName()).lastKey())); + Text metaRegionName = m.regionName; HRegionInterface r = connection.getHRegionConnection(m.server); - long scannerid = r.openScanner(metaRegionName, - new Text[] { COL_REGIONINFO }, tableName, System.currentTimeMillis(), - null); + long scannerid = r.openScanner(metaRegionName, COL_REGIONINFO_ARRAY, + tableName, System.currentTimeMillis(), null); try { KeyedData[] data = r.next(scannerid); - // Test data and that the row for the data is for our table. If - // table does not exist, scanner will return row after where our table - // would be inserted if it exists so look for exact match on table - // name. + + // Test data and that the row for the data is for our table. If table + // does not exist, scanner will return row after where our table would + // be inserted if it exists so look for exact match on table name. + if (data != null && data.length > 0 && - HRegionInfo.getTableNameFromRegionName(data[0].getKey().getRow()). - equals(tableName)) { + HRegionInfo.getTableNameFromRegionName( + data[0].getKey().getRow()).equals(tableName)) { + // Then a region for this table already exists. Ergo table exists. + throw new TableExistsException(tableName.toString()); } + } finally { r.close(scannerid); } // 2. Create the HRegion - HRegion region = HRegion.createHRegion(newRegion.regionId, newRegion. - getTableDesc(), this.dir, this.conf); + + HRegion region = HRegion.createHRegion(newRegion.regionId, + newRegion.getTableDesc(), this.dir, this.conf); // 3. Insert into meta + HRegionInfo info = region.getRegionInfo(); Text regionName = region.getRegionName(); ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); @@ -1999,18 +2355,22 @@ public class HMaster implements HConstants, HMasterInterface, info.write(s); long clientId = rand.nextLong(); long lockid = r.startUpdate(metaRegionName, clientId, regionName); + r.put(metaRegionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); - r.commit(metaRegionName, clientId, lockid, - System.currentTimeMillis()); + byteValue.toByteArray()); + + r.commit(metaRegionName, clientId, lockid, System.currentTimeMillis()); // 4. Close the new region to flush it to disk. Close its log file too. + region.close(); region.getLog().closeAndDelete(); // 5. Get it assigned to a server + unassignedRegions.put(regionName, info); assignAttempts.put(regionName, Long.valueOf(0L)); + } finally { synchronized (tableInCreation) { tableInCreation.remove(newRegion.getTableDesc().getName()); @@ -2018,70 +2378,62 @@ public class HMaster implements HConstants, HMasterInterface, } } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public void deleteTable(Text tableName) throws IOException { new TableDelete(tableName).process(); LOG.info("deleted table: " + tableName); } - - /** - * {@inheritDoc} - */ - public void addColumn(Text tableName, HColumnDescriptor column) throws IOException { + + /** {@inheritDoc} */ + public void addColumn(Text tableName, HColumnDescriptor column) + throws IOException { + new AddColumn(tableName, column).process(); } - - /** - * {@inheritDoc} - */ + + /** {@inheritDoc} */ public void deleteColumn(Text tableName, Text columnName) throws IOException { new DeleteColumn(tableName, HStoreKey.extractFamily(columnName)).process(); } - - /** - * {@inheritDoc} - */ + + /** {@inheritDoc} */ public void enableTable(Text tableName) throws IOException { new ChangeTableState(tableName, true).process(); } - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public void disableTable(Text tableName) throws IOException { new ChangeTableState(tableName, false).process(); } - - /** - * {@inheritDoc} - */ + + /** {@inheritDoc} */ public HServerAddress findRootRegion() { - return rootRegionLocation; + return rootRegionLocation.get(); } - - // Helper classes for HMasterInterface + + /* + * Helper classes for HMasterInterface + */ private abstract class TableOperation { - private SortedSet metaRegions; + private Set metaRegions; protected Text tableName; - - protected TreeSet unservedRegions; - + protected Set unservedRegions; + protected TableOperation(Text tableName) throws IOException { if (!isMasterRunning()) { throw new MasterNotRunningException(); } - this.metaRegions = new TreeSet(); + + this.metaRegions = new HashSet(); this.tableName = tableName; - this.unservedRegions = new TreeSet(); + this.unservedRegions = new HashSet(); // We can not access any meta region if they have not already been // assigned and scanned. if (metaScanner.waitForMetaRegionsOrClose()) { - throw new MasterNotRunningException(); // We're shutting down. Forget it. + throw new MasterNotRunningException(); // We're shutting down. Forget it. } Text firstMetaRegion = null; @@ -2097,108 +2449,114 @@ public class HMaster implements HConstants, HMasterInterface, this.metaRegions.addAll(onlineMetaRegions.tailMap(firstMetaRegion).values()); } - + void process() throws IOException { for (int tries = 0; tries < numRetries; tries++) { boolean tableExists = false; try { synchronized(metaScannerLock) { // Prevent meta scanner from running - for(MetaRegion m: metaRegions) { + for (MetaRegion m: metaRegions) { // Get a connection to a meta server HRegionInterface server = connection.getHRegionConnection(m.server); // Open a scanner on the meta region - + long scannerId = - server.openScanner(m.regionName, METACOLUMNS, tableName, + server.openScanner(m.regionName, COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null); - + try { DataInputBuffer inbuf = new DataInputBuffer(); - while(true) { + while (true) { HRegionInfo info = new HRegionInfo(); String serverName = null; long startCode = -1L; - + KeyedData[] values = server.next(scannerId); if(values == null || values.length == 0) { break; } boolean haveRegionInfo = false; - for(int i = 0; i < values.length; i++) { - if(values[i].getData().length == 0) { + for (int i = 0; i < values.length; i++) { + if (values[i].getData().length == 0) { break; } Text column = values[i].getKey().getColumn(); - if(column.equals(COL_REGIONINFO)) { + if (column.equals(COL_REGIONINFO)) { haveRegionInfo = true; inbuf.reset(values[i].getData(), - values[i].getData().length); + values[i].getData().length); info.readFields(inbuf); - } else if(column.equals(COL_SERVER)) { + + } else if (column.equals(COL_SERVER)) { try { serverName = new String(values[i].getData(), UTF8_ENCODING); - } catch(UnsupportedEncodingException e) { + + } catch (UnsupportedEncodingException e) { assert(false); } - } else if(column.equals(COL_STARTCODE)) { + + } else if (column.equals(COL_STARTCODE)) { try { startCode = Long.valueOf(new String(values[i].getData(), - UTF8_ENCODING)).longValue(); - } catch(UnsupportedEncodingException e) { + UTF8_ENCODING)).longValue(); + + } catch (UnsupportedEncodingException e) { assert(false); } } } - - if(!haveRegionInfo) { + + if (!haveRegionInfo) { throw new IOException(COL_REGIONINFO + " not found"); } - - if(info.tableDesc.getName().compareTo(tableName) > 0) { + + if (info.tableDesc.getName().compareTo(tableName) > 0) { break; // Beyond any more entries for this table } - + tableExists = true; - if(!isBeingServed(serverName, startCode)) { + if (!isBeingServed(serverName, startCode)) { unservedRegions.add(info); } processScanItem(serverName, startCode, info); } // while(true) - + } finally { - if(scannerId != -1L) { + if (scannerId != -1L) { try { server.close(scannerId); } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("", e); } } scannerId = -1L; } - - if(!tableExists) { + + if (!tableExists) { throw new IOException(tableName + " does not exist"); } - + postProcessMeta(m, server); unservedRegions.clear(); - + } // for(MetaRegion m:) } // synchronized(metaScannerLock) - + } catch (IOException e) { if (tries == numRetries - 1) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } throw e; } @@ -2207,72 +2565,80 @@ public class HMaster implements HConstants, HMasterInterface, break; } // for(tries...) } - + protected boolean isBeingServed(String serverName, long startCode) { boolean result = false; - if(serverName != null && startCode != -1L) { - HServerInfo s = serversToServerInfo.get(serverName); + if (serverName != null && startCode != -1L) { + HServerInfo s; + synchronized (serversToServerInfo) { + s = serversToServerInfo.get(serverName); + } result = s != null && s.getStartCode() == startCode; } return result; } - + protected boolean isEnabled(HRegionInfo info) { return !info.offLine; } - + protected abstract void processScanItem(String serverName, long startCode, HRegionInfo info) throws IOException; - + protected abstract void postProcessMeta(MetaRegion m, - HRegionInterface srvr) - throws IOException; + HRegionInterface server) throws IOException; } /** Instantiated to enable or disable a table */ private class ChangeTableState extends TableOperation { private boolean online; + + protected Map> servedRegions = + new HashMap>(); - protected TreeMap> servedRegions = - new TreeMap>(); protected long lockid; protected long clientId; - + ChangeTableState(Text tableName, boolean onLine) throws IOException { super(tableName); this.online = onLine; } - + @Override protected void processScanItem(String serverName, long startCode, HRegionInfo info) { + if (isBeingServed(serverName, startCode)) { - TreeSet regions = servedRegions.get(serverName); + HashSet regions = servedRegions.get(serverName); if (regions == null) { - regions = new TreeSet(); + regions = new HashSet(); } regions.add(info); servedRegions.put(serverName, regions); } } - + @Override protected void postProcessMeta(MetaRegion m, HRegionInterface server) - throws IOException { + throws IOException { + // Process regions not being served - if(LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled()) { LOG.debug("processing unserved regions"); } - for(HRegionInfo i: unservedRegions) { + for (HRegionInfo i: unservedRegions) { if (i.offLine && i.isSplit()) { if (LOG.isDebugEnabled()) { LOG.debug("Skipping region " + i.toString() + " because it is " + - "offline because it has been split"); + "offline because it has been split"); } continue; } + // Update meta table - if(LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled()) { LOG.debug("updating columns in row: " + i.regionName); } @@ -2285,86 +2651,93 @@ public class HMaster implements HConstants, HMasterInterface, server.delete(m.regionName, clientId, lockid, COL_STARTCODE); server.commit(m.regionName, clientId, lockid, System.currentTimeMillis()); - + lockid = -1L; - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + i.regionName); } } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("column update failed in row: " + i.regionName, e); } finally { try { - if(lockid != -1L) { + if (lockid != -1L) { server.abort(m.regionName, clientId, lockid); } } catch (IOException iex) { if (iex instanceof RemoteException) { - iex = RemoteExceptionHandler.decodeRemoteException((RemoteException) iex); + iex = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) iex); } LOG.error("", iex); } } - if(online) { // Bring offline regions on-line - if(!unassignedRegions.containsKey(i.regionName)) { + if (online) { // Bring offline regions on-line + if (!unassignedRegions.containsKey(i.regionName)) { unassignedRegions.put(i.regionName, i); - assignAttempts.put(i.regionName, 0L); + assignAttempts.put(i.regionName, Long.valueOf(0L)); } + } else { // Prevent region from getting assigned. unassignedRegions.remove(i.regionName); assignAttempts.remove(i.regionName); } } - + // Process regions currently being served - if(LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled()) { LOG.debug("processing regions currently being served"); } - for(Map.Entry> e: servedRegions.entrySet()) { + for (Map.Entry> e: servedRegions.entrySet()) { String serverName = e.getKey(); if (online) { LOG.debug("Already online"); continue; // Already being served } - + // Cause regions being served to be taken off-line and disabled - TreeMap localKillList = killList.get(serverName); - if(localKillList == null) { - localKillList = new TreeMap(); + + HashMap localKillList = killList.get(serverName); + if (localKillList == null) { + localKillList = new HashMap(); } - for(HRegionInfo i: e.getValue()) { - if(LOG.isDebugEnabled()) { + for (HRegionInfo i: e.getValue()) { + if (LOG.isDebugEnabled()) { LOG.debug("adding region " + i.regionName + " to local kill list"); } localKillList.put(i.regionName, i); } - if(localKillList.size() > 0) { - if(LOG.isDebugEnabled()) { + if (localKillList.size() > 0) { + if (LOG.isDebugEnabled()) { LOG.debug("inserted local kill list into kill list for server " + - serverName); + serverName); } killList.put(serverName, localKillList); } } servedRegions.clear(); } - - protected void updateRegionInfo(final HRegionInterface srvr, - final Text regionName, final HRegionInfo i) - throws IOException { + + protected void updateRegionInfo(final HRegionInterface server, + final Text regionName, final HRegionInfo i) throws IOException { + i.offLine = !online; + ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); DataOutputStream s = new DataOutputStream(byteValue); i.write(s); - srvr.put(regionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); + + server.put(regionName, clientId, lockid, COL_REGIONINFO, + byteValue.toByteArray()); } } @@ -2374,45 +2747,51 @@ public class HMaster implements HConstants, HMasterInterface, * the table. */ private class TableDelete extends ChangeTableState { - + TableDelete(Text tableName) throws IOException { super(tableName, false); } - + @Override - protected void postProcessMeta(MetaRegion m, HRegionInterface srvr) - throws IOException { + protected void postProcessMeta(MetaRegion m, HRegionInterface server) + throws IOException { + // For regions that are being served, mark them for deletion - for (TreeSet s: servedRegions.values()) { + + for (HashSet s: servedRegions.values()) { for (HRegionInfo i: s) { regionsToDelete.add(i.regionName); } } // Unserved regions we can delete now + for (HRegionInfo i: unservedRegions) { // Delete the region + try { HRegion.deleteRegion(fs, dir, i.regionName); + } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("failed to delete region " + i.regionName, e); } } - super.postProcessMeta(m, srvr); + super.postProcessMeta(m, server); } - + @Override protected void updateRegionInfo( - @SuppressWarnings("hiding") HRegionInterface server, Text regionName, - @SuppressWarnings("unused") HRegionInfo i) - throws IOException { + @SuppressWarnings("hiding") HRegionInterface server, Text regionName, + @SuppressWarnings("unused") HRegionInfo i) throws IOException { + server.delete(regionName, clientId, lockid, COL_REGIONINFO); } } - + private abstract class ColumnOperation extends TableOperation { protected ColumnOperation(Text tableName) throws IOException { super(tableName); @@ -2420,18 +2799,18 @@ public class HMaster implements HConstants, HMasterInterface, @Override protected void processScanItem( - @SuppressWarnings("unused") String serverName, - @SuppressWarnings("unused") long startCode, - final HRegionInfo info) - throws IOException { - if(isEnabled(info)) { + @SuppressWarnings("unused") String serverName, + @SuppressWarnings("unused") long startCode, + final HRegionInfo info) throws IOException { + + if (isEnabled(info)) { throw new TableNotDisabledException(tableName.toString()); } } protected void updateRegionInfo(HRegionInterface server, Text regionName, - HRegionInfo i) - throws IOException { + HRegionInfo i) throws IOException { + ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); DataOutputStream s = new DataOutputStream(byteValue); i.write(s); @@ -2440,12 +2819,15 @@ public class HMaster implements HConstants, HMasterInterface, try { lockid = server.startUpdate(regionName, clientId, i.regionName); server.put(regionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); + byteValue.toByteArray()); + server.commit(regionName, clientId, lockid, System.currentTimeMillis()); lockid = -1L; - if(LOG.isDebugEnabled()) { + + if (LOG.isDebugEnabled()) { LOG.debug("updated columns in row: " + i.regionName); } + } catch (Exception e) { if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); @@ -2453,13 +2835,14 @@ public class HMaster implements HConstants, HMasterInterface, LOG.error("column update failed in row: " + i.regionName, e); } finally { - if(lockid != -1L) { + if (lockid != -1L) { try { server.abort(regionName, clientId, lockid); - + } catch (IOException iex) { if (iex instanceof RemoteException) { - iex = RemoteExceptionHandler.decodeRemoteException((RemoteException) iex); + iex = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) iex); } LOG.error("", iex); } @@ -2471,42 +2854,43 @@ public class HMaster implements HConstants, HMasterInterface, /** Instantiated to remove a column family from a table */ private class DeleteColumn extends ColumnOperation { private Text columnName; - + DeleteColumn(Text tableName, Text columnName) throws IOException { super(tableName); this.columnName = columnName; } - + @Override - protected void postProcessMeta(MetaRegion m, HRegionInterface srvr) + protected void postProcessMeta(MetaRegion m, HRegionInterface server) throws IOException { - for(HRegionInfo i: unservedRegions) { + for (HRegionInfo i: unservedRegions) { i.tableDesc.families().remove(columnName); - updateRegionInfo(srvr, m.regionName, i); - + updateRegionInfo(server, m.regionName, i); + // Delete the directories used by the column - + try { fs.delete(HStoreFile.getMapDir(dir, i.regionName, columnName)); - + } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("", e); } - + try { fs.delete(HStoreFile.getInfoDir(dir, i.regionName, columnName)); - + } catch (IOException e) { if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + e = RemoteExceptionHandler.decodeRemoteException( + (RemoteException) e); } LOG.error("", e); } - } } } @@ -2514,72 +2898,87 @@ public class HMaster implements HConstants, HMasterInterface, /** Instantiated to add a column family to a table */ private class AddColumn extends ColumnOperation { private HColumnDescriptor newColumn; - - AddColumn(Text tableName, HColumnDescriptor newColumn) - throws IOException { - + + AddColumn(Text tableName, HColumnDescriptor newColumn) throws IOException { super(tableName); this.newColumn = newColumn; } - - @Override - protected void postProcessMeta(MetaRegion m, HRegionInterface srvr) - throws IOException { - for(HRegionInfo i: unservedRegions) { - + @Override + protected void postProcessMeta(MetaRegion m, HRegionInterface server) + throws IOException { + + for (HRegionInfo i: unservedRegions) { + // All we need to do to add a column is add it to the table descriptor. // When the region is brought on-line, it will find the column missing // and create it. - + i.tableDesc.addFamily(newColumn); - updateRegionInfo(srvr, m.regionName, i); + updateRegionInfo(server, m.regionName, i); } } } - - ////////////////////////////////////////////////////////////////////////////// - // Managing leases - ////////////////////////////////////////////////////////////////////////////// + + /* + * Managing leases + */ /** Instantiated to monitor the health of a region server */ private class ServerExpirer implements LeaseListener { @SuppressWarnings("hiding") private String server; - + ServerExpirer(String server) { this.server = server; } - - /** - * {@inheritDoc} - */ + + /** {@inheritDoc} */ public void leaseExpired() { LOG.info(server + " lease expired"); - // Remove the server from the known servers list - HServerInfo storedInfo = serversToServerInfo.remove(server); - + + // Remove the server from the known servers list and update load info + + HServerInfo info; + synchronized (serversToServerInfo) { + info = serversToServerInfo.remove(server); + + if (info != null) { + String serverName = info.getServerAddress().toString(); + HServerLoad load = serversToLoad.remove(serverName); + if (load != null) { + Set servers = loadToServers.get(load); + if (servers != null) { + servers.remove(serverName); + loadToServers.put(load, servers); + } + } + } + serversToServerInfo.notifyAll(); + } + // NOTE: If the server was serving the root region, we cannot reassign it // here because the new server will start serving the root region before // the PendingServerShutdown operation has a chance to split the log file. + try { - msgQueue.put(new PendingServerShutdown(storedInfo)); + msgQueue.put(new PendingServerShutdown(info)); } catch (InterruptedException e) { throw new RuntimeException("Putting into msgQueue was interrupted.", e); } } } - ////////////////////////////////////////////////////////////////////////////// - // Main program - ////////////////////////////////////////////////////////////////////////////// - + /* + * Main program + */ + private static void printUsageAndExit() { System.err.println("Usage: java org.apache.hbase.HMaster " + - "[--bind=hostname:port] start|stop"); + "[--bind=hostname:port] start|stop"); System.exit(0); } - + /** * Main program * @param args @@ -2588,18 +2987,19 @@ public class HMaster implements HConstants, HMasterInterface, if (args.length < 1) { printUsageAndExit(); } - + Configuration conf = new HBaseConfiguration(); - + // Process command-line args. TODO: Better cmd-line processing // (but hopefully something not as painful as cli options). + final String addressArgKey = "--bind="; for (String cmd: args) { if (cmd.startsWith(addressArgKey)) { conf.set(MASTER_ADDRESS, cmd.substring(addressArgKey.length())); continue; } - + if (cmd.equals("start")) { try { (new Thread(new HMaster(conf))).start(); @@ -2609,7 +3009,7 @@ public class HMaster implements HConstants, HMasterInterface, } break; } - + if (cmd.equals("stop")) { try { HBaseAdmin adm = new HBaseAdmin(conf); @@ -2620,7 +3020,7 @@ public class HMaster implements HConstants, HMasterInterface, } break; } - + // Print out usage if we get to here. printUsageAndExit(); } diff --git a/src/java/org/apache/hadoop/hbase/HRegion.java b/src/java/org/apache/hadoop/hbase/HRegion.java index 002eb9f5a7b..5fc331ae370 100644 --- a/src/java/org/apache/hadoop/hbase/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/HRegion.java @@ -275,6 +275,7 @@ public class HRegion implements HConstants { int compactionThreshold = 0; private final HLocking lock = new HLocking(); private long desiredMaxFileSize; + private final long maxSequenceId; ////////////////////////////////////////////////////////////////////////////// // Constructor @@ -324,12 +325,26 @@ public class HRegion implements HConstants { } // Load in all the HStores. + + long maxSeqId = -1; for(Map.Entry e : this.regionInfo.tableDesc.families().entrySet()) { Text colFamily = HStoreKey.extractFamily(e.getKey()); - stores.put(colFamily, - new HStore(rootDir, this.regionInfo.regionName, e.getValue(), fs, - oldLogFile, conf)); + + HStore store = new HStore(rootDir, this.regionInfo.regionName, + e.getValue(), fs, oldLogFile, conf); + + stores.put(colFamily, store); + + long storeSeqId = store.getMaxSequenceId(); + if (storeSeqId > maxSeqId) { + maxSeqId = storeSeqId; + } + } + this.maxSequenceId = maxSeqId; + if (LOG.isDebugEnabled()) { + LOG.debug("maximum sequence id for region " + regionInfo.getRegionName() + + " is " + this.maxSequenceId); } // Get rid of any splits or merges that were lost in-progress @@ -361,6 +376,10 @@ public class HRegion implements HConstants { this.writestate.writesOngoing = false; LOG.info("region " + this.regionInfo.regionName + " available"); } + + long getMaxSequenceId() { + return this.maxSequenceId; + } /** Returns a HRegionInfo object for this region */ HRegionInfo getRegionInfo() { @@ -464,8 +483,8 @@ public class HRegion implements HConstants { * @throws IOException */ HRegion[] closeAndSplit(final Text midKey, - final RegionUnavailableListener listener) - throws IOException { + final RegionUnavailableListener listener) throws IOException { + checkMidKey(midKey); long startTime = System.currentTimeMillis(); Path splits = getSplitsDir(); @@ -496,6 +515,7 @@ public class HRegion implements HConstants { Vector hstoreFilesToSplit = close(); if (hstoreFilesToSplit == null) { LOG.warn("Close came back null (Implement abort of close?)"); + throw new RuntimeException("close returned empty vector of HStoreFiles"); } // Tell listener that region is now closed and that they can therefore @@ -690,8 +710,11 @@ public class HRegion implements HConstants { biggest = size; } } - biggest.setSplitable(splitable); + if (biggest != null) { + biggest.setSplitable(splitable); + } return biggest; + } finally { lock.releaseReadLock(); } @@ -1405,6 +1428,7 @@ public class HRegion implements HConstants { } } + /** {@inheritDoc} */ @Override public String toString() { return getRegionName().toString(); @@ -1842,9 +1866,7 @@ public class HRegion implements HConstants { if (bytes == null || bytes.length == 0) { return null; } - return (HRegionInfo)((bytes == null || bytes.length == 0)? - null: - Writables.getWritable(bytes, new HRegionInfo())); + return (HRegionInfo) Writables.getWritable(bytes, new HRegionInfo()); } /** @@ -1905,6 +1927,13 @@ public class HRegion implements HConstants { return startCode; } + /** + * Computes the Path of the HRegion + * + * @param dir parent directory + * @param regionName name of the region + * @return Path of HRegion directory + */ public static Path getRegionDir(final Path dir, final Text regionName) { return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName)); } diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index 7c137eeb0cf..067f4cc2212 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -32,6 +32,7 @@ import java.util.Vector; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; @@ -391,6 +392,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // Leases private Leases leases; + + // Request counter + private AtomicInteger requestCount; /** * Starts a HRegionServer at the default location @@ -424,6 +428,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { Collections.synchronizedSortedMap(new TreeMap()); this.outboundMsgs = new Vector(); + this.requestCount = new AtomicInteger(); // Config'ed params this.numRetries = conf.getInt("hbase.client.retries.number", 2); @@ -597,6 +602,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { if (LOG.isDebugEnabled()) { LOG.debug("Telling master we are up"); } + requestCount.set(0); + serverInfo.setLoad(new HServerLoad(0, onlineRegions.size())); hbaseMaster.regionServerStartup(serverInfo); if (LOG.isDebugEnabled()) { LOG.debug("Done telling master we are up"); @@ -626,6 +633,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } try { + serverInfo.setLoad(new HServerLoad(requestCount.get(), + onlineRegions.size())); + requestCount.set(0); + HMsg msgs[] = hbaseMaster.regionServerReport(serverInfo, outboundArray); lastMsg = System.currentTimeMillis(); @@ -897,6 +908,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { this.lock.writeLock().lock(); try { + this.log.setSequenceNumber(region.getMaxSequenceId()); this.onlineRegions.put(region.getRegionName(), region); } finally { this.lock.writeLock().unlock(); @@ -963,6 +975,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { */ public HRegionInfo getRegionInfo(final Text regionName) throws NotServingRegionException { + requestCount.incrementAndGet(); return getRegion(regionName).getRegionInfo(); } @@ -971,6 +984,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { */ public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException { + requestCount.incrementAndGet(); long clientid = rand.nextLong(); long lockid = startUpdate(regionName, clientid, b.getRow()); for(BatchOperation op: b) { @@ -993,6 +1007,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { public byte [] get(final Text regionName, final Text row, final Text column) throws IOException { + requestCount.incrementAndGet(); return getRegion(regionName).get(row, column); } @@ -1002,6 +1017,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { public byte [][] get(final Text regionName, final Text row, final Text column, final int numVersions) throws IOException { + requestCount.incrementAndGet(); return getRegion(regionName).get(row, column, numVersions); } @@ -1010,6 +1026,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { */ public byte [][] get(final Text regionName, final Text row, final Text column, final long timestamp, final int numVersions) throws IOException { + requestCount.incrementAndGet(); return getRegion(regionName).get(row, column, timestamp, numVersions); } @@ -1018,6 +1035,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { */ public KeyedData[] getRow(final Text regionName, final Text row) throws IOException { + requestCount.incrementAndGet(); HRegion region = getRegion(regionName); TreeMap map = region.getFull(row); KeyedData result[] = new KeyedData[map.size()]; @@ -1034,6 +1052,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { */ public KeyedData[] next(final long scannerId) throws IOException { + requestCount.incrementAndGet(); String scannerName = String.valueOf(scannerId); HInternalScannerInterface s = scanners.get(scannerName); if (s == null) { @@ -1077,6 +1096,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { */ public long startUpdate(Text regionName, long clientid, Text row) throws IOException { + requestCount.incrementAndGet(); HRegion region = getRegion(regionName); long lockid = region.startUpdate(row); this.leases.createLease(clientid, lockid, @@ -1120,6 +1140,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { public void put(final Text regionName, final long clientid, final long lockid, final Text column, final byte [] val) throws IOException { + requestCount.incrementAndGet(); HRegion region = getRegion(regionName, true); leases.renewLease(clientid, lockid); region.put(lockid, column, val); @@ -1130,6 +1151,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { */ public void delete(Text regionName, long clientid, long lockid, Text column) throws IOException { + requestCount.incrementAndGet(); HRegion region = getRegion(regionName); leases.renewLease(clientid, lockid); region.delete(lockid, column); @@ -1140,6 +1162,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { */ public void abort(Text regionName, long clientid, long lockid) throws IOException { + requestCount.incrementAndGet(); HRegion region = getRegion(regionName, true); leases.cancelLease(clientid, lockid); region.abort(lockid); @@ -1150,6 +1173,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { */ public void commit(Text regionName, final long clientid, final long lockid, final long timestamp) throws IOException { + requestCount.incrementAndGet(); HRegion region = getRegion(regionName, true); leases.cancelLease(clientid, lockid); region.commit(lockid, timestamp); @@ -1159,6 +1183,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { * {@inheritDoc} */ public void renewLease(long lockid, long clientid) throws IOException { + requestCount.incrementAndGet(); leases.renewLease(clientid, lockid); } @@ -1247,6 +1272,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { public long openScanner(Text regionName, Text[] cols, Text firstRow, final long timestamp, final RowFilterInterface filter) throws IOException { + requestCount.incrementAndGet(); HRegion r = getRegion(regionName); long scannerId = -1L; try { @@ -1277,6 +1303,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { * {@inheritDoc} */ public void close(final long scannerId) throws IOException { + requestCount.incrementAndGet(); String scannerName = String.valueOf(scannerId); HInternalScannerInterface s = null; synchronized(scanners) { diff --git a/src/java/org/apache/hadoop/hbase/HServerInfo.java b/src/java/org/apache/hadoop/hbase/HServerInfo.java index 56b02098ed0..f07273fd669 100644 --- a/src/java/org/apache/hadoop/hbase/HServerInfo.java +++ b/src/java/org/apache/hadoop/hbase/HServerInfo.java @@ -33,21 +33,24 @@ import java.io.*; public class HServerInfo implements Writable { private HServerAddress serverAddress; private long startCode; + private HServerLoad load; /** default constructor - used by Writable */ public HServerInfo() { this.serverAddress = new HServerAddress(); this.startCode = 0; + this.load = new HServerLoad(); } /** - * Constructs a fully initialized object + * Constructor * @param serverAddress * @param startCode */ public HServerInfo(HServerAddress serverAddress, long startCode) { this.serverAddress = new HServerAddress(serverAddress); this.startCode = startCode; + this.load = new HServerLoad(); } /** @@ -57,6 +60,21 @@ public class HServerInfo implements Writable { public HServerInfo(HServerInfo other) { this.serverAddress = new HServerAddress(other.getServerAddress()); this.startCode = other.getStartCode(); + this.load = other.getLoad(); + } + + /** + * @return the load + */ + public HServerLoad getLoad() { + return load; + } + + /** + * @param load the load to set + */ + public void setLoad(HServerLoad load) { + this.load = load; } /** @return the server address */ @@ -72,7 +90,8 @@ public class HServerInfo implements Writable { /** {@inheritDoc} */ @Override public String toString() { - return "address: " + this.serverAddress + ", startcode: " + this.startCode; + return "address: " + this.serverAddress + ", startcode: " + this.startCode + + ", load: (" + this.load.toString() + ")"; } // Writable @@ -81,11 +100,13 @@ public class HServerInfo implements Writable { public void readFields(DataInput in) throws IOException { this.serverAddress.readFields(in); this.startCode = in.readLong(); + this.load.readFields(in); } /** {@inheritDoc} */ public void write(DataOutput out) throws IOException { this.serverAddress.write(out); out.writeLong(this.startCode); + this.load.write(out); } } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/HServerLoad.java b/src/java/org/apache/hadoop/hbase/HServerLoad.java new file mode 100644 index 00000000000..86c460c0e8a --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/HServerLoad.java @@ -0,0 +1,136 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; + +/** + * This class encapsulates metrics for determining the load on a HRegionServer + */ +public class HServerLoad implements WritableComparable { + private int numberOfRequests; // number of requests since last report + private int numberOfRegions; // number of regions being served + + /* + * TODO: Other metrics that might be considered when the master is actually + * doing load balancing instead of merely trying to decide where to assign + * a region: + *
    + *
  • # of CPUs, heap size (to determine the "class" of machine). For + * now, we consider them to be homogeneous.
  • + *
  • #requests per region (Map<{String|HRegionInfo}, Integer>)
  • + *
  • #compactions and/or #splits (churn)
  • + *
  • server death rate (maybe there is something wrong with this server)
  • + *
+ */ + + /** default constructior (used by Writable) */ + public HServerLoad() {} + + /** + * Constructor + * @param numberOfRequests + * @param numberOfRegions + */ + public HServerLoad(int numberOfRequests, int numberOfRegions) { + this.numberOfRequests = numberOfRequests; + this.numberOfRegions = numberOfRegions; + } + + /** + * @return load factor for this server + */ + public int getLoad() { + int load = numberOfRequests == 0 ? 1 : numberOfRequests; + load *= numberOfRegions == 0 ? 1 : numberOfRegions; + return load; + } + + /** {@inheritDoc} */ + @Override + public String toString() { + return "requests: " + numberOfRequests + " regions: " + numberOfRegions; + } + + /** {@inheritDoc} */ + @Override + public boolean equals(Object o) { + return compareTo(o) == 0; + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + int result = Integer.valueOf(numberOfRequests).hashCode(); + result ^= Integer.valueOf(numberOfRegions).hashCode(); + return result; + } + + // Getters + + /** + * @return the numberOfRegions + */ + public int getNumberOfRegions() { + return numberOfRegions; + } + + /** + * @return the numberOfRequests + */ + public int getNumberOfRequests() { + return numberOfRequests; + } + + // Setters + + /** + * @param numberOfRegions the numberOfRegions to set + */ + public void setNumberOfRegions(int numberOfRegions) { + this.numberOfRegions = numberOfRegions; + } + + // Writable + + /** {@inheritDoc} */ + public void readFields(DataInput in) throws IOException { + numberOfRequests = in.readInt(); + numberOfRegions = in.readInt(); + } + + /** {@inheritDoc} */ + public void write(DataOutput out) throws IOException { + out.writeInt(numberOfRequests); + out.writeInt(numberOfRegions); + } + + // Comparable + + /** {@inheritDoc} */ + public int compareTo(Object o) { + HServerLoad other = (HServerLoad) o; + return this.getLoad() - other.getLoad(); + } +} diff --git a/src/java/org/apache/hadoop/hbase/HStore.java b/src/java/org/apache/hadoop/hbase/HStore.java index 52a1ed250bc..fdb90e90a89 100644 --- a/src/java/org/apache/hadoop/hbase/HStore.java +++ b/src/java/org/apache/hadoop/hbase/HStore.java @@ -90,6 +90,8 @@ class HStore implements HConstants { TreeMap readers = new TreeMap(); Random rand = new Random(); + + private long maxSeqId; /** * An HStore is a set of zero or more MapFiles, which stretch backwards over @@ -196,6 +198,7 @@ class HStore implements HConstants { // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. That // means it was built prior to the previous run of HStore, and so it cannot // contain any updates also contained in the log. + long maxSeqID = -1; for (HStoreFile hsf: hstoreFiles) { long seqid = hsf.loadInfo(fs); @@ -205,8 +208,14 @@ class HStore implements HConstants { } } } - - doReconstructionLog(reconstructionLog, maxSeqID); + this.maxSeqId = maxSeqID; + if (LOG.isDebugEnabled()) { + LOG.debug("maximum sequence id for hstore " + storeName + " is " + + this.maxSeqId); + } + + doReconstructionLog(reconstructionLog, maxSeqId); + this.maxSeqId += 1; // Compact all the MapFiles into a single file. The resulting MapFile // should be "timeless"; that is, it should not have an associated seq-ID, @@ -228,6 +237,10 @@ class HStore implements HConstants { } } + long getMaxSequenceId() { + return this.maxSeqId; + } + /* * Read the reconstructionLog to see whether we need to build a brand-new * MapFile out of non-flushed log entries. @@ -258,6 +271,11 @@ class HStore implements HConstants { while (login.next(key, val)) { maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum()); if (key.getLogSeqNum() <= maxSeqID) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping edit <" + key.toString() + "=" + + val.toString() + "> key sequence: " + key.getLogSeqNum() + + " max sequence: " + maxSeqID); + } continue; } // Check this edit is for me. Also, guard against writing @@ -277,7 +295,8 @@ class HStore implements HConstants { } HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp()); if (LOG.isDebugEnabled()) { - LOG.debug("Applying edit " + k.toString()); + LOG.debug("Applying edit <" + k.toString() + "=" + val.toString() + + ">"); } reconstructedCache.put(k, val.getVal()); } @@ -428,16 +447,12 @@ class HStore implements HConstants { String name = flushedFile.toString(); MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression, this.bloomFilter); - int count = 0; - int total = 0; try { for (Map.Entry es: inputCache.entrySet()) { HStoreKey curkey = es.getKey(); - total++; if (this.familyName. equals(HStoreKey.extractFamily(curkey.getColumn()))) { out.append(curkey, new ImmutableBytesWritable(es.getValue())); - count++; } } } finally { @@ -1030,6 +1045,7 @@ class HStore implements HConstants { ////////////////////////////////////////////////////////////////////////////// class HStoreScanner extends HAbstractScanner { + @SuppressWarnings("hiding") private MapFile.Reader[] readers; HStoreScanner(long timestamp, Text[] targetCols, Text firstRow) diff --git a/src/java/org/apache/hadoop/hbase/HStoreFile.java b/src/java/org/apache/hadoop/hbase/HStoreFile.java index 38e7dd11528..7f35e7d0068 100644 --- a/src/java/org/apache/hadoop/hbase/HStoreFile.java +++ b/src/java/org/apache/hadoop/hbase/HStoreFile.java @@ -94,7 +94,17 @@ public class HStoreFile implements HConstants, WritableComparable { static final String HSTORE_DATFILE_DIR = "mapfiles"; static final String HSTORE_INFO_DIR = "info"; static final String HSTORE_FILTER_DIR = "filter"; - public static enum Range {top, bottom} + + /** + * For split HStoreFiles, specifies if the file covers the lower half or + * the upper half of the key range + */ + public static enum Range { + /** HStoreFile contains upper half of key range */ + top, + /** HStoreFile contains lower half of key range */ + bottom + } /* * Regex that will work for straight filenames and for reference names. @@ -156,7 +166,7 @@ public class HStoreFile implements HConstants, WritableComparable { /* * Data structure to hold reference to a store file over in another region. */ - static class Reference { + static class Reference implements Writable { Text regionName; long fileid; Range region; @@ -190,11 +200,15 @@ public class HStoreFile implements HConstants, WritableComparable { return this.regionName; } + /** {@inheritDoc} */ + @Override public String toString() { return this.regionName + "/" + this.fileid + "/" + this.region; } // Make it serializable. + + /** {@inheritDoc} */ public void write(DataOutput out) throws IOException { this.regionName.write(out); out.writeLong(this.fileid); @@ -203,6 +217,7 @@ public class HStoreFile implements HConstants, WritableComparable { this.midkey.write(out); } + /** {@inheritDoc} */ public void readFields(DataInput in) throws IOException { this.regionName = new Text(); this.regionName.readFields(in); @@ -417,6 +432,8 @@ public class HStoreFile implements HConstants, WritableComparable { private static boolean isReference(final Path p, final Matcher m) { if (m == null || !m.matches()) { LOG.warn("Failed match of store file name " + p.toString()); + throw new RuntimeException("Failed match of store file name " + + p.toString()); } return m.groupCount() > 1 && m.group(2) != null; } @@ -662,6 +679,7 @@ public class HStoreFile implements HConstants, WritableComparable { } } + /** {@inheritDoc} */ @SuppressWarnings({ "unused"}) @Override public synchronized void finalKey(WritableComparable key) @@ -669,6 +687,7 @@ public class HStoreFile implements HConstants, WritableComparable { throw new UnsupportedOperationException("Unsupported"); } + /** {@inheritDoc} */ @Override public synchronized Writable get(WritableComparable key, Writable val) throws IOException { @@ -676,6 +695,7 @@ public class HStoreFile implements HConstants, WritableComparable { return super.get(key, val); } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public synchronized WritableComparable getClosest(WritableComparable key, @@ -692,6 +712,7 @@ public class HStoreFile implements HConstants, WritableComparable { return super.getClosest(key, val); } + /** {@inheritDoc} */ @SuppressWarnings("unused") @Override public synchronized WritableComparable midKey() throws IOException { @@ -699,6 +720,7 @@ public class HStoreFile implements HConstants, WritableComparable { return null; } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public synchronized boolean next(WritableComparable key, Writable val) @@ -727,6 +749,7 @@ public class HStoreFile implements HConstants, WritableComparable { return false; } + /** {@inheritDoc} */ @Override public synchronized void reset() throws IOException { if (top) { @@ -737,6 +760,7 @@ public class HStoreFile implements HConstants, WritableComparable { super.reset(); } + /** {@inheritDoc} */ @Override public synchronized boolean seek(WritableComparable key) throws IOException { @@ -758,6 +782,15 @@ public class HStoreFile implements HConstants, WritableComparable { static class Reader extends MapFile.Reader { private final Filter bloomFilter; + /** + * Constructor + * + * @param fs + * @param dirName + * @param conf + * @param filter + * @throws IOException + */ public Reader(FileSystem fs, String dirName, Configuration conf, final Filter filter) throws IOException { @@ -810,6 +843,18 @@ public class HStoreFile implements HConstants, WritableComparable { private final Filter bloomFilter; + /** + * Constructor + * + * @param conf + * @param fs + * @param dirName + * @param keyClass + * @param valClass + * @param compression + * @param filter + * @throws IOException + */ @SuppressWarnings("unchecked") public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, @@ -905,6 +950,7 @@ public class HStoreFile implements HConstants, WritableComparable { return (isReference())? l / 2: l; } + /** {@inheritDoc} */ @Override public String toString() { return this.regionName.toString() + "/" + this.colFamily.toString() + @@ -912,6 +958,7 @@ public class HStoreFile implements HConstants, WritableComparable { (isReference()? "/" + this.reference.toString(): ""); } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { return this.compareTo(o) == 0; diff --git a/src/java/org/apache/hadoop/hbase/HTable.java b/src/java/org/apache/hadoop/hbase/HTable.java index a5db0f1c178..5a748dbf432 100644 --- a/src/java/org/apache/hadoop/hbase/HTable.java +++ b/src/java/org/apache/hadoop/hbase/HTable.java @@ -133,6 +133,7 @@ public class HTable implements HConstants { } + /** @return the table name */ public Text getTableName() { return this.tableName; } diff --git a/src/java/org/apache/hadoop/hbase/HTableDescriptor.java b/src/java/org/apache/hadoop/hbase/HTableDescriptor.java index 8a8c7c2879b..82381cd4046 100644 --- a/src/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/src/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -41,13 +41,13 @@ public class HTableDescriptor implements WritableComparable { /* * Legal table names can only contain 'word characters': - * i.e. [a-zA-Z_0-9]. + * i.e. [a-zA-Z_0-9-.]. * Lets be restrictive until a reason to be otherwise. One reason to limit * characters in table name is to ensure table regions as entries in META * regions can be found (See HADOOP-1581 'HBASE: Un-openable tablename bug'). */ private static final Pattern LEGAL_TABLE_NAME = - Pattern.compile("[\\w-]+"); + Pattern.compile("^[\\w-.]+$"); /** Constructs an empty object */ public HTableDescriptor() { diff --git a/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java b/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java index 564237acbbf..1e49bd6bb12 100644 --- a/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -143,33 +143,43 @@ public class MiniHBaseCluster implements HConstants { } } + /** runs the master server */ public static class MasterThread extends Thread { private final HMaster master; MasterThread(final HMaster m) { super(m, "Master:" + m.getMasterAddress().toString()); this.master = m; } + + /** {@inheritDoc} */ @Override public void run() { LOG.info("Starting " + getName()); super.run(); } + + /** @return master server */ public HMaster getMaster() { return this.master; } } + /** runs region servers */ public static class RegionServerThread extends Thread { private final HRegionServer regionServer; RegionServerThread(final HRegionServer r, final int index) { super(r, "RegionServer:" + index); this.regionServer = r; } + + /** {@inheritDoc} */ @Override public void run() { LOG.info("Starting " + getName()); super.run(); } + + /** @return the region server */ public HRegionServer getRegionServer() { return this.regionServer; } @@ -227,6 +237,11 @@ public class MiniHBaseCluster implements HConstants { return threads; } + /** + * Starts a region server thread running + * + * @throws IOException + */ public void startRegionServer() throws IOException { RegionServerThread t = startRegionServer(this.conf, this.regionThreads.size()); @@ -275,6 +290,7 @@ public class MiniHBaseCluster implements HConstants { * Shut down the specified region server cleanly * * @param serverNumber + * @return the region server that was stopped */ public HRegionServer stopRegionServer(int serverNumber) { HRegionServer server = diff --git a/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java b/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java index a50d9ae9b74..94812ffa85a 100644 --- a/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java +++ b/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java @@ -34,10 +34,10 @@ public class TestCleanRegionServerExit extends HBaseClusterTestCase { /** constructor */ public TestCleanRegionServerExit() { - super(); + super(2); conf.setInt("ipc.client.timeout", 5000); // reduce ipc client timeout conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries - conf.setInt("hbase.client.retries.number", 2); // reduce HBase retries + conf.setInt("hbase.client.retries.number", 3); // reduce HBase retries Logger.getRootLogger().setLevel(Level.WARN); Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG); } diff --git a/src/test/org/apache/hadoop/hbase/TestHRegion.java b/src/test/org/apache/hadoop/hbase/TestHRegion.java index 87f02ea898b..a698ca33fb9 100644 --- a/src/test/org/apache/hadoop/hbase/TestHRegion.java +++ b/src/test/org/apache/hadoop/hbase/TestHRegion.java @@ -40,7 +40,7 @@ import org.apache.log4j.Logger; * HRegions or in the HBaseMaster, so only basic testing is possible. */ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListener { - private static final Logger LOG = + static final Logger LOG = Logger.getLogger(TestHRegion.class.getName()); /** Constructor */ diff --git a/src/test/org/apache/hadoop/hbase/TestHStoreFile.java b/src/test/org/apache/hadoop/hbase/TestHStoreFile.java index f4d9bf94fa3..34835a3edc0 100644 --- a/src/test/org/apache/hadoop/hbase/TestHStoreFile.java +++ b/src/test/org/apache/hadoop/hbase/TestHStoreFile.java @@ -34,6 +34,9 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; +/** + * Test HStoreFile + */ public class TestHStoreFile extends TestCase { static final Log LOG = LogFactory.getLog(TestHStoreFile.class); private static String DIR = System.getProperty("test.build.data", "."); @@ -226,7 +229,9 @@ public class TestHStoreFile extends TestCase { } assertTrue(key.compareTo(midkey) < 0); } - LOG.info("Last in bottom: " + previous.toString()); + if (previous != null) { + LOG.info("Last in bottom: " + previous.toString()); + } // Now test reading from the top. top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), this.conf, HStoreFile.Range.top, midkey); @@ -249,17 +254,17 @@ public class TestHStoreFile extends TestCase { // Next test using a midkey that does not exist in the file. // First, do a key that is < than first key. Ensure splits behave // properly. - midkey = new HStoreKey(new Text(" ")); + WritableComparable badkey = new HStoreKey(new Text(" ")); bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), - this.conf, HStoreFile.Range.bottom, midkey); - // When midkey is < than the bottom, should return no values. + this.conf, HStoreFile.Range.bottom, badkey); + // When badkey is < than the bottom, should return no values. assertFalse(bottom.next(key, value)); // Now read from the top. top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), this.conf, - HStoreFile.Range.top, midkey); + HStoreFile.Range.top, badkey); first = true; while (top.next(key, value)) { - assertTrue(key.compareTo(midkey) >= 0); + assertTrue(key.compareTo(badkey) >= 0); if (first) { first = false; LOG.info("First top when key < bottom: " + key.toString()); @@ -275,10 +280,10 @@ public class TestHStoreFile extends TestCase { assertTrue(tmp.charAt(i) == 'z'); } - // Test when midkey is > than last key in file ('||' > 'zz'). - midkey = new HStoreKey(new Text("|||")); + // Test when badkey is > than last key in file ('||' > 'zz'). + badkey = new HStoreKey(new Text("|||")); bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), - this.conf, HStoreFile.Range.bottom, midkey); + this.conf, HStoreFile.Range.bottom, badkey); first = true; while (bottom.next(key, value)) { if (first) { @@ -297,7 +302,7 @@ public class TestHStoreFile extends TestCase { } // Now look at top. Should not return any values. top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), this.conf, - HStoreFile.Range.top, midkey); + HStoreFile.Range.top, badkey); assertFalse(top.next(key, value)); } finally { diff --git a/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java b/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java index 2fb4f2268fa..1fff888e477 100644 --- a/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java +++ b/src/test/org/apache/hadoop/hbase/TestRegionServerAbort.java @@ -34,10 +34,10 @@ public class TestRegionServerAbort extends HBaseClusterTestCase { /** constructor */ public TestRegionServerAbort() { - super(); + super(2); conf.setInt("ipc.client.timeout", 5000); // reduce client timeout conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries - conf.setInt("hbase.client.retries.number", 2); // reduce HBase retries + conf.setInt("hbase.client.retries.number", 3); // reduce HBase retries Logger.getRootLogger().setLevel(Level.WARN); Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG); } diff --git a/src/test/org/apache/hadoop/hbase/TestScanner2.java b/src/test/org/apache/hadoop/hbase/TestScanner2.java index 6805f31341d..4eb02c0a065 100644 --- a/src/test/org/apache/hadoop/hbase/TestScanner2.java +++ b/src/test/org/apache/hadoop/hbase/TestScanner2.java @@ -206,7 +206,8 @@ public class TestScanner2 extends HBaseClusterTestCase { HRegionLocation rl = t.getRegionLocation(table); regionServer = t.getConnection().getHRegionConnection(rl.getServerAddress()); scannerId = regionServer.openScanner(rl.getRegionInfo().getRegionName(), - HMaster.METACOLUMNS, new Text(), System.currentTimeMillis(), null); + HConstants.COLUMN_FAMILY_ARRAY, new Text(), + System.currentTimeMillis(), null); while (true) { TreeMap results = new TreeMap(); KeyedData[] values = regionServer.next(scannerId); diff --git a/src/test/org/apache/hadoop/hbase/TestSplit.java b/src/test/org/apache/hadoop/hbase/TestSplit.java index 6423dc51692..8d9b21083f1 100644 --- a/src/test/org/apache/hadoop/hbase/TestSplit.java +++ b/src/test/org/apache/hadoop/hbase/TestSplit.java @@ -46,6 +46,7 @@ public class TestSplit extends HBaseTestCase { private static final char FIRST_CHAR = 'a'; private static final char LAST_CHAR = 'z'; + /** {@inheritDoc} */ @Override public void setUp() throws Exception { super.setUp(); @@ -59,6 +60,7 @@ public class TestSplit extends HBaseTestCase { conf.setLong("hbase.hregion.max.filesize", 1024 * 128); } + /** {@inheritDoc} */ @Override public void tearDown() throws Exception { try { diff --git a/src/test/org/apache/hadoop/hbase/TestToString.java b/src/test/org/apache/hadoop/hbase/TestToString.java index be69a89d8c9..f0dea62b3d1 100644 --- a/src/test/org/apache/hadoop/hbase/TestToString.java +++ b/src/test/org/apache/hadoop/hbase/TestToString.java @@ -35,8 +35,8 @@ public class TestToString extends TestCase { HServerAddress address = new HServerAddress(hostport); assertEquals("HServerAddress toString", address.toString(), hostport); HServerInfo info = new HServerInfo(address, -1); - assertEquals("HServerInfo", info.toString(), - "address: " + hostport + ", startcode: " + -1); + assertEquals("HServerInfo", "address: " + hostport + ", startcode: -1" + + ", load: (requests: 0 regions: 0)", info.toString()); } /**