diff --git a/CHANGES.txt b/CHANGES.txt index 09f5dad578f..edbe5c48226 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -22,3 +22,4 @@ Trunk (unreleased changes) add/remove column. 12. HADOOP-1392. Part2: includes table compaction by merging adjacent regions that have shrunk in size. + 13. HADOOP-1445 Support updates across region splits and compactions diff --git a/conf/hbase-default.xml b/conf/hbase-default.xml index efb6ca5171c..a236a8e3aff 100644 --- a/conf/hbase-default.xml +++ b/conf/hbase-default.xml @@ -21,21 +21,18 @@ - hbase.client.timeout.length - 10000 - Client timeout in milliseconds - - - hbase.client.timeout.number - 5 - Try this many timeouts before giving up. - + hbase.client.pause + 30000 + General client pause value. Used mostly as value to wait + before running a retry of a failed get, region lookup, etc. hbase.client.retries.number - 2 - Count of maximum retries fetching the root region from root - region server. + 5 + Maximum retries. Used as maximum for all retryable + operations such as fetching of the root region from root region + server, getting a cell's value, starting a row update, etc. + Default: 5. @@ -51,6 +48,12 @@ HMaster server lease period in milliseconds. Default is 30 seconds. + + hbase.regionserver.lease.period + 180000 + HRegion server lease period in milliseconds. Default is + 180 seconds. + hbase.server.thread.wakefrequency 10000 @@ -58,12 +61,6 @@ Used as sleep interval by service threads such as META scanner and log roller. - - hbase.regionserver.lease.period - 30000 - HRegion server lease period in milliseconds. Default is - 30 seconds. - hbase.regionserver.handler.count 10 @@ -80,5 +77,27 @@ tests to be responsive. - + + hbase.regionserver.maxlogentries + 30000 + Rotate the logs when count of entries exceeds this value. + Default: 30,000 + + + + hbase.hregion.maxunflushed + 10000 + + Memcache will be flushed to disk if number of Memcache writes + are in excess of this number. + + + + hbase.hregion.max.filesize + 134217728 + + Maximum desired file size for an HRegion. If filesize exceeds + value + (value / 2), the HRegion is split in two. Default: 128M. + + diff --git a/conf/hbase-site.xml b/conf/hbase-site.xml index c1e46d1481a..1d609ea72a8 100644 --- a/conf/hbase-site.xml +++ b/conf/hbase-site.xml @@ -1,4 +1,54 @@ + + hbase.regiondir + hbase + The directory shared by region servers. + + + + hbase.regionserver.msginterval + 1000 + Interval between messages from the RegionServer to HMaster + in milliseconds. Default is 15. Set this value low if you want unit + tests to be responsive. + + + + diff --git a/src/java/org/apache/hadoop/hbase/HAbstractScanner.java b/src/java/org/apache/hadoop/hbase/HAbstractScanner.java index b87c99b47c1..a05c558b27e 100644 --- a/src/java/org/apache/hadoop/hbase/HAbstractScanner.java +++ b/src/java/org/apache/hadoop/hbase/HAbstractScanner.java @@ -21,6 +21,8 @@ import java.util.Vector; import java.util.regex.Pattern; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.BytesWritable; @@ -31,10 +33,11 @@ import org.apache.hadoop.io.Text; * Used by the concrete HMemcacheScanner and HStoreScanners ******************************************************************************/ public abstract class HAbstractScanner implements HInternalScannerInterface { + final Log LOG = LogFactory.getLog(this.getClass().getName()); // Pattern to determine if a column key is a regex - private static Pattern isRegexPattern = Pattern.compile("^.*[\\\\+|^&*$\\[\\]\\}{)(]+.*$"); + static Pattern isRegexPattern = Pattern.compile("^.*[\\\\+|^&*$\\[\\]\\}{)(]+.*$"); // The kind of match we are doing on a column: @@ -42,7 +45,7 @@ public abstract class HAbstractScanner implements HInternalScannerInterface { FAMILY_ONLY, // Just check the column family name REGEX, // Column family + matches regex SIMPLE // Literal matching - }; + } // This class provides column matching functions that are more sophisticated // than a simple string compare. There are three types of matching: @@ -89,15 +92,15 @@ public abstract class HAbstractScanner implements HInternalScannerInterface { // Matching method - boolean matches(Text col) throws IOException { + boolean matches(Text c) throws IOException { if(this.matchType == MATCH_TYPE.SIMPLE) { - return col.equals(this.col); + return c.equals(this.col); } else if(this.matchType == MATCH_TYPE.FAMILY_ONLY) { - return col.toString().startsWith(this.family); + return c.toString().startsWith(this.family); } else if(this.matchType == MATCH_TYPE.REGEX) { - return this.columnMatcher.matcher(col.toString()).matches(); + return this.columnMatcher.matcher(c.toString()).matches(); } else { throw new IOException("Invalid match type: " + this.matchType); @@ -201,20 +204,19 @@ public abstract class HAbstractScanner implements HInternalScannerInterface { public boolean isMultipleMatchScanner() { return this.multipleMatchers; } + /** * Get the next set of values for this scanner. * - * @param key - The key that matched - * @param results - all the results for that key. - * @return - true if a match was found + * @param key The key that matched + * @param results All the results for key + * @return true if a match was found * * @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap) */ public boolean next(HStoreKey key, TreeMap results) - throws IOException { - + throws IOException { // Find the next row label (and timestamp) - Text chosenRow = null; long chosenTimestamp = -1; for(int i = 0; i < keys.length; i++) { @@ -232,7 +234,6 @@ public abstract class HAbstractScanner implements HInternalScannerInterface { } // Grab all the values that match this row/timestamp - boolean insertedItem = false; if(chosenRow != null) { key.setRow(chosenRow); @@ -241,7 +242,6 @@ public abstract class HAbstractScanner implements HInternalScannerInterface { for(int i = 0; i < keys.length; i++) { // Fetch the data - while((keys[i] != null) && (keys[i].getRow().compareTo(chosenRow) == 0)) { @@ -255,10 +255,8 @@ public abstract class HAbstractScanner implements HInternalScannerInterface { break; } - if(columnMatch(i)) { - + if(columnMatch(i)) { // We only want the first result for any specific family member - if(!results.containsKey(keys[i].getColumn())) { results.put(new Text(keys[i].getColumn()), vals[i]); insertedItem = true; @@ -277,7 +275,6 @@ public abstract class HAbstractScanner implements HInternalScannerInterface { && ((keys[i].getRow().compareTo(chosenRow) <= 0) || (keys[i].getTimestamp() > this.timestamp) || (! columnMatch(i)))) { - getNext(i); } } diff --git a/src/java/org/apache/hadoop/hbase/HClient.java b/src/java/org/apache/hadoop/hbase/HClient.java index 96b90a38e24..fe1ab5136cd 100644 --- a/src/java/org/apache/hadoop/hbase/HClient.java +++ b/src/java/org/apache/hadoop/hbase/HClient.java @@ -18,11 +18,14 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Random; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +33,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -37,7 +43,7 @@ import org.apache.hadoop.ipc.RemoteException; * HClient manages a connection to a single HRegionServer. */ public class HClient implements HConstants { - private final Log LOG = LogFactory.getLog(this.getClass().getName()); + final Log LOG = LogFactory.getLog(this.getClass().getName()); private static final Text[] META_COLUMNS = { COLUMN_FAMILY @@ -49,51 +55,55 @@ public class HClient implements HConstants { private static final Text EMPTY_START_ROW = new Text(); - private long clientTimeout; - private int numTimeouts; - private int numRetries; + long pause; + int numRetries; private HMasterInterface master; private final Configuration conf; - private static class TableInfo { + /* + * Data structure that holds current location for a region and its info. + */ + static class RegionLocation { public HRegionInfo regionInfo; public HServerAddress serverAddress; - TableInfo(HRegionInfo regionInfo, HServerAddress serverAddress) { + RegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress) { this.regionInfo = regionInfo; this.serverAddress = serverAddress; } + + @Override + public String toString() { + return "address: " + this.serverAddress.toString() + ", regioninfo: " + + this.regionInfo; + } } // Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress) - - private TreeMap> tablesToServers; + private TreeMap> tablesToServers; // For the "current" table: Map startRow -> (HRegionInfo, HServerAddress) + private SortedMap tableServers; - private SortedMap tableServers; - - // Known region HServerAddress.toString() -> HRegionInterface - + // Known region HServerAddress.toString() -> HRegionInterface private TreeMap servers; // For row mutation operations - private Text currentRegion; - private HRegionInterface currentServer; - private Random rand; - private long clientid; + Text currentRegion; + HRegionInterface currentServer; + Random rand; + long clientid; /** Creates a new HClient */ public HClient(Configuration conf) { this.conf = conf; - this.clientTimeout = conf.getLong("hbase.client.timeout.length", 30 * 1000); - this.numTimeouts = conf.getInt("hbase.client.timeout.number", 5); - this.numRetries = conf.getInt("hbase.client.retries.number", 2); + this.pause = conf.getLong("hbase.client.pause", 30 * 1000); + this.numRetries = conf.getInt("hbase.client.retries.number", 5); this.master = null; - this.tablesToServers = new TreeMap>(); + this.tablesToServers = new TreeMap>(); this.tableServers = null; this.servers = new TreeMap(); @@ -129,11 +139,13 @@ public class HClient implements HConstants { } } - /* Find the address of the master and connect to it */ + /* Find the address of the master and connect to it + */ private void checkMaster() throws MasterNotRunningException { if (this.master != null) { return; } + for(int tries = 0; this.master == null && tries < numRetries; tries++) { HServerAddress masterLocation = new HServerAddress(this.conf.get(MASTER_ADDRESS, @@ -142,9 +154,8 @@ public class HClient implements HConstants { try { HMasterInterface tryMaster = (HMasterInterface)RPC.getProxy(HMasterInterface.class, - HMasterInterface.versionID, masterLocation.getInetSocketAddress(), - this.conf); - + HMasterInterface.versionID, masterLocation.getInetSocketAddress(), + this.conf); if(tryMaster.isMasterRunning()) { this.master = tryMaster; break; @@ -154,16 +165,18 @@ public class HClient implements HConstants { // This was our last chance - don't bother sleeping break; } + LOG.info("Attempt " + tries + " of " + this.numRetries + + " failed with <" + e + ">. Retrying after sleep of " + this.pause); } - // We either cannot connect to the master or it is not running. - // Sleep and retry - + // We either cannot connect to master or it is not running. Sleep & retry try { - Thread.sleep(this.clientTimeout); + Thread.sleep(this.pause); } catch(InterruptedException e) { + // continue } } + if(this.master == null) { throw new MasterNotRunningException(); } @@ -210,7 +223,7 @@ public class HClient implements HConstants { // Save the current table - SortedMap oldServers = this.tableServers; + SortedMap oldServers = this.tableServers; try { // Wait for new table to come on-line @@ -229,23 +242,21 @@ public class HClient implements HConstants { public synchronized void deleteTable(Text tableName) throws IOException { checkReservedTableName(tableName); checkMaster(); - TableInfo firstMetaServer = getFirstMetaServerForTable(tableName); + RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); try { this.master.deleteTable(tableName); - } catch(RemoteException e) { handleRemoteException(e); } // Wait until first region is deleted - - HRegionInterface server = getHRegionConnection(firstMetaServer.serverAddress); - + HRegionInterface server = + getHRegionConnection(firstMetaServer.serverAddress); DataInputBuffer inbuf = new DataInputBuffer(); HStoreKey key = new HStoreKey(); HRegionInfo info = new HRegionInfo(); - for(int tries = 0; tries < numRetries; tries++) { + for (int tries = 0; tries < numRetries; tries++) { long scannerId = -1L; try { scannerId = server.openScanner(firstMetaServer.regionInfo.regionName, @@ -258,7 +269,8 @@ public class HClient implements HConstants { for(int j = 0; j < values.length; j++) { if(values[j].getLabel().equals(COL_REGIONINFO)) { byte[] bytes = new byte[values[j].getData().getSize()]; - System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length); + System.arraycopy(values[j].getData().get(), 0, bytes, 0, + bytes.length); inbuf.reset(bytes, bytes.length); info.readFields(inbuf); if(info.tableDesc.getName().equals(tableName)) { @@ -274,27 +286,19 @@ public class HClient implements HConstants { if(scannerId != -1L) { try { server.close(scannerId); - } catch(Exception e) { LOG.warn(e); } } } - if(LOG.isDebugEnabled()) { - LOG.debug("Sleep. Waiting for first region to be deleted from " + tableName); - } + try { - Thread.sleep(clientTimeout); - + Thread.sleep(pause); } catch(InterruptedException e) { - } - if(LOG.isDebugEnabled()) { - LOG.debug("Wake. Waiting for first region to be deleted from " + tableName); + // continue } } - if(LOG.isDebugEnabled()) { - LOG.debug("table deleted " + tableName); - } + LOG.info("table " + tableName + " deleted"); } public synchronized void addColumn(Text tableName, HColumnDescriptor column) throws IOException { @@ -322,7 +326,7 @@ public class HClient implements HConstants { public synchronized void enableTable(Text tableName) throws IOException { checkReservedTableName(tableName); checkMaster(); - TableInfo firstMetaServer = getFirstMetaServerForTable(tableName); + RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); try { this.master.enableTable(tableName); @@ -379,20 +383,22 @@ public class HClient implements HConstants { LOG.debug("Sleep. Waiting for first region to be enabled from " + tableName); } try { - Thread.sleep(clientTimeout); + Thread.sleep(pause); } catch(InterruptedException e) { + // continue } if(LOG.isDebugEnabled()) { LOG.debug("Wake. Waiting for first region to be enabled from " + tableName); } } + LOG.info("Enabled table " + tableName); } public synchronized void disableTable(Text tableName) throws IOException { checkReservedTableName(tableName); checkMaster(); - TableInfo firstMetaServer = getFirstMetaServerForTable(tableName); + RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName); try { this.master.disableTable(tableName); @@ -449,14 +455,15 @@ public class HClient implements HConstants { LOG.debug("Sleep. Waiting for first region to be disabled from " + tableName); } try { - Thread.sleep(clientTimeout); - + Thread.sleep(pause); } catch(InterruptedException e) { + // continue } if(LOG.isDebugEnabled()) { LOG.debug("Wake. Waiting for first region to be disabled from " + tableName); } } + LOG.info("Disabled table " + tableName); } public synchronized void shutdown() throws IOException { @@ -477,8 +484,8 @@ public class HClient implements HConstants { } } - private TableInfo getFirstMetaServerForTable(Text tableName) throws IOException { - SortedMap metaservers = findMetaServersForTable(tableName); + private RegionLocation getFirstMetaServerForTable(Text tableName) throws IOException { + SortedMap metaservers = findMetaServersForTable(tableName); return metaservers.get(metaservers.firstKey()); } @@ -497,7 +504,10 @@ public class HClient implements HConstants { throw new IllegalArgumentException("table name cannot be null or zero length"); } this.tableServers = tablesToServers.get(tableName); - if(this.tableServers == null ) { + if (this.tableServers == null ) { + if (LOG.isDebugEnabled()) { + LOG.debug("No servers for " + tableName + ". Doing a find..."); + } // We don't know where the table is. // Load the information from meta. this.tableServers = findServersForTable(tableName); @@ -511,23 +521,25 @@ public class HClient implements HConstants { * @return - map of first row to table info for all regions in the table * @throws IOException */ - private SortedMap findServersForTable(Text tableName) + private SortedMap findServersForTable(Text tableName) throws IOException { - - SortedMap servers = null; + SortedMap servers = null; if(tableName.equals(ROOT_TABLE_NAME)) { servers = locateRootRegion(); - } else if(tableName.equals(META_TABLE_NAME)) { servers = loadMetaFromRoot(); - } else { - servers = new TreeMap(); - for(TableInfo t: findMetaServersForTable(tableName).values()) { + servers = new TreeMap(); + for(RegionLocation t: findMetaServersForTable(tableName).values()) { servers.putAll(scanOneMetaRegion(t, tableName)); } this.tablesToServers.put(tableName, servers); } + if (LOG.isDebugEnabled()) { + for (Map.Entry e: servers.entrySet()) { + LOG.debug("Server " + e.getKey() + " is serving: " + e.getValue()); + } + } return servers; } @@ -537,18 +549,15 @@ public class HClient implements HConstants { * @return - returns a SortedMap of the meta servers * @throws IOException */ - private SortedMap findMetaServersForTable(Text tableName) - throws IOException { - - SortedMap metaServers = + private SortedMap findMetaServersForTable(final Text tableName) + throws IOException { + SortedMap metaServers = this.tablesToServers.get(META_TABLE_NAME); - if(metaServers == null) { // Don't know where the meta is metaServers = loadMetaFromRoot(); } Text firstMetaRegion = (metaServers.containsKey(tableName)) ? - tableName : metaServers.headMap(tableName).lastKey(); - + tableName : metaServers.headMap(tableName).lastKey(); return metaServers.tailMap(firstMetaRegion); } @@ -558,10 +567,9 @@ public class HClient implements HConstants { * @return map of first row to TableInfo for all meta regions * @throws IOException */ - private TreeMap loadMetaFromRoot() throws IOException { - SortedMap rootRegion = + private TreeMap loadMetaFromRoot() throws IOException { + SortedMap rootRegion = this.tablesToServers.get(ROOT_TABLE_NAME); - if(rootRegion == null) { rootRegion = locateRootRegion(); } @@ -570,34 +578,34 @@ public class HClient implements HConstants { /* * Repeatedly try to find the root region by asking the master for where it is - * * @return TreeMap for root regin if found * @throws NoServerForRegionException - if the root region can not be located after retrying * @throws IOException */ - private TreeMap locateRootRegion() throws IOException { + private TreeMap locateRootRegion() throws IOException { checkMaster(); HServerAddress rootRegionLocation = null; - for(int tries = 0; rootRegionLocation == null && tries < numRetries; tries++){ + for(int tries = 0; tries < numRetries; tries++) { int localTimeouts = 0; - while(rootRegionLocation == null && localTimeouts < numTimeouts) { + while(rootRegionLocation == null && localTimeouts < numRetries) { rootRegionLocation = master.findRootRegion(); - if(rootRegionLocation == null) { try { if (LOG.isDebugEnabled()) { LOG.debug("Sleeping. Waiting for root region."); } - Thread.sleep(this.clientTimeout); + Thread.sleep(this.pause); if (LOG.isDebugEnabled()) { LOG.debug("Wake. Retry finding root region."); } } catch(InterruptedException iex) { + // continue } localTimeouts++; } } + if(rootRegionLocation == null) { throw new NoServerForRegionException( "Timed out trying to locate root region"); @@ -608,7 +616,6 @@ public class HClient implements HConstants { try { rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName); break; - } catch(NotServingRegionException e) { if(tries == numRetries - 1) { // Don't bother sleeping. We've run out of retries. @@ -616,16 +623,16 @@ public class HClient implements HConstants { } // Sleep and retry finding root region. - try { if (LOG.isDebugEnabled()) { LOG.debug("Root region location changed. Sleeping."); } - Thread.sleep(this.clientTimeout); + Thread.sleep(this.pause); if (LOG.isDebugEnabled()) { LOG.debug("Wake. Retry finding root region."); } } catch(InterruptedException iex) { + // continue } } rootRegionLocation = null; @@ -633,12 +640,12 @@ public class HClient implements HConstants { if (rootRegionLocation == null) { throw new NoServerForRegionException( - "unable to locate root region server"); + "unable to locate root region server"); } - TreeMap rootServer = new TreeMap(); + TreeMap rootServer = new TreeMap(); rootServer.put(EMPTY_START_ROW, - new TableInfo(HGlobals.rootRegionInfo, rootRegionLocation)); + new RegionLocation(HGlobals.rootRegionInfo, rootRegionLocation)); this.tablesToServers.put(ROOT_TABLE_NAME, rootServer); return rootServer; @@ -649,10 +656,9 @@ public class HClient implements HConstants { * @return - TreeMap of meta region servers * @throws IOException */ - private TreeMap scanRoot(TableInfo rootRegion) + private TreeMap scanRoot(RegionLocation rootRegion) throws IOException { - - TreeMap metaservers = + TreeMap metaservers = scanOneMetaRegion(rootRegion, META_TABLE_NAME); this.tablesToServers.put(META_TABLE_NAME, metaservers); return metaservers; @@ -663,16 +669,16 @@ public class HClient implements HConstants { * @param t the meta region we're going to scan * @param tableName the name of the table we're looking for * @return returns a map of startingRow to TableInfo - * @throws NoSuchElementException - if table does not exist + * @throws RegionNotFoundException - if table does not exist * @throws IllegalStateException - if table is offline * @throws NoServerForRegionException - if table can not be found after retrying * @throws IOException */ - private TreeMap scanOneMetaRegion(TableInfo t, Text tableName) - throws IOException { - + private TreeMap scanOneMetaRegion(final RegionLocation t, + final Text tableName) + throws IOException { HRegionInterface server = getHRegionConnection(t.serverAddress); - TreeMap servers = new TreeMap(); + TreeMap servers = new TreeMap(); for(int tries = 0; servers.size() == 0 && tries < this.numRetries; tries++) { @@ -690,13 +696,15 @@ public class HClient implements HConstants { if(values.length == 0) { if(servers.size() == 0) { // If we didn't find any servers then the table does not exist - - throw new NoSuchElementException("table '" + tableName - + "' does not exist"); + throw new RegionNotFoundException("table '" + tableName + + "' does not exist in " + t); } // We found at least one server for the table and now we're done. - + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + servers.size() + " server(s) for " + + "location: " + t + " for tablename " + tableName); + } break; } @@ -714,6 +722,9 @@ public class HClient implements HConstants { if(!regionInfo.tableDesc.getName().equals(tableName)) { // We're done + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + tableName); + } break; } @@ -724,7 +735,6 @@ public class HClient implements HConstants { bytes = results.get(COL_SERVER); if(bytes == null || bytes.length == 0) { // We need to rescan because the table we want is unassigned. - if(LOG.isDebugEnabled()) { LOG.debug("no server address for " + regionInfo.toString()); } @@ -732,15 +742,13 @@ public class HClient implements HConstants { break; } serverAddress = new String(bytes, UTF8_ENCODING); - servers.put(regionInfo.startKey, - new TableInfo(regionInfo, new HServerAddress(serverAddress))); + new RegionLocation(regionInfo, new HServerAddress(serverAddress))); } } finally { if(scannerId != -1L) { try { server.close(scannerId); - } catch(Exception e) { LOG.warn(e); } @@ -752,19 +760,20 @@ public class HClient implements HConstants { + tableName + " after " + this.numRetries + " retries"); } - // The table is not yet being served. Sleep and retry. - - if(LOG.isDebugEnabled()) { - LOG.debug("Sleeping. Table " + tableName - + " not currently being served."); - } - try { - Thread.sleep(this.clientTimeout); - - } catch(InterruptedException e) { - } - if(LOG.isDebugEnabled()) { - LOG.debug("Wake. Retry finding table " + tableName); + if (servers.size() <= 0) { + // The table is not yet being served. Sleep and retry. + if (LOG.isDebugEnabled()) { + LOG.debug("Sleeping. Table " + tableName + + " not currently being served."); + } + try { + Thread.sleep(this.pause); + } catch (InterruptedException e) { + // continue + } + if (LOG.isDebugEnabled()) { + LOG.debug("Wake. Retry finding table " + tableName); + } } } return servers; @@ -804,7 +813,7 @@ public class HClient implements HConstants { throws IOException { TreeSet uniqueTables = new TreeSet(); - SortedMap metaTables = + SortedMap metaTables = this.tablesToServers.get(META_TABLE_NAME); if(metaTables == null) { @@ -812,7 +821,7 @@ public class HClient implements HConstants { metaTables = loadMetaFromRoot(); } - for (TableInfo t: metaTables.values()) { + for (RegionLocation t: metaTables.values()) { HRegionInterface server = getHRegionConnection(t.serverAddress); long scannerId = -1L; try { @@ -846,11 +855,15 @@ public class HClient implements HConstants { } } } - return (HTableDescriptor[])uniqueTables. - toArray(new HTableDescriptor[uniqueTables.size()]); + return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]); } - private synchronized TableInfo getTableInfo(Text row) { + /* + * Find region location hosting passed row using cached info + * @param row Row to find. + * @return Location of row. + */ + synchronized RegionLocation getRegionLocation(Text row) { if(row == null || row.getLength() == 0) { throw new IllegalArgumentException("row key cannot be null or zero length"); } @@ -859,41 +872,42 @@ public class HClient implements HConstants { } // Only one server will have the row we are looking for - - Text serverKey = null; - if(this.tableServers.containsKey(row)) { - serverKey = row; - - } else { - serverKey = this.tableServers.headMap(row).lastKey(); - } + Text serverKey = (this.tableServers.containsKey(row))? row: + this.tableServers.headMap(row).lastKey(); return this.tableServers.get(serverKey); } - private synchronized void findRegion(TableInfo info) throws IOException { - + /* + * Clear caches of passed region location, reload servers for the passed + * region's table and then ensure region location can be found. + * @param info Region location to find. + * @throws IOException + */ + synchronized void findRegion(final RegionLocation info) throws IOException { // Wipe out everything we know about this table - + if (LOG.isDebugEnabled()) { + LOG.debug("Wiping out all we know of " + info); + } this.tablesToServers.remove(info.regionInfo.tableDesc.getName()); this.tableServers.clear(); // Reload information for the whole table - this.tableServers = findServersForTable(info.regionInfo.tableDesc.getName()); - - if(this.tableServers.get(info.regionInfo.startKey) == null ) { - throw new IOException("region " + info.regionInfo.regionName - + " does not exist"); + if (LOG.isDebugEnabled()) { + LOG.debug("Result of findRegion: " + this.tableServers.toString()); + } + if (this.tableServers.get(info.regionInfo.startKey) == null) { + throw new RegionNotFoundException(info.regionInfo.regionName.toString()); } } /** Get a single value for the specified row and column */ public byte[] get(Text row, Text column) throws IOException { - TableInfo info = null; + RegionLocation info = null; BytesWritable value = null; for(int tries = 0; tries < numRetries && info == null; tries++) { - info = getTableInfo(row); + info = getRegionLocation(row); try { value = getHRegionConnection(info.serverAddress).get( @@ -919,11 +933,11 @@ public class HClient implements HConstants { /** Get the specified number of versions of the specified row and column */ public byte[][] get(Text row, Text column, int numVersions) throws IOException { - TableInfo info = null; + RegionLocation info = null; BytesWritable[] values = null; for(int tries = 0; tries < numRetries && info == null; tries++) { - info = getTableInfo(row); + info = getRegionLocation(row); try { values = getHRegionConnection(info.serverAddress).get( @@ -956,11 +970,11 @@ public class HClient implements HConstants { * the specified timestamp. */ public byte[][] get(Text row, Text column, long timestamp, int numVersions) throws IOException { - TableInfo info = null; + RegionLocation info = null; BytesWritable[] values = null; for(int tries = 0; tries < numRetries && info == null; tries++) { - info = getTableInfo(row); + info = getRegionLocation(row); try { values = getHRegionConnection(info.serverAddress).get( @@ -990,11 +1004,11 @@ public class HClient implements HConstants { /** Get all the data for the specified row */ public LabelledData[] getRow(Text row) throws IOException { - TableInfo info = null; + RegionLocation info = null; LabelledData[] value = null; for(int tries = 0; tries < numRetries && info == null; tries++) { - info = getTableInfo(row); + info = getRegionLocation(row); try { value = getHRegionConnection(info.serverAddress).getRow( @@ -1023,38 +1037,81 @@ public class HClient implements HConstants { } return new ClientScanner(columns, startRow); } - - /** Start an atomic row insertion or update */ - public long startUpdate(Text row) throws IOException { - TableInfo info = null; - long lockid = -1L; + + /* + * @return General HClient RetryPolicy instance. + */ + RetryPolicy getRetryPolicy() { + Map, RetryPolicy> exceptionToPolicyMap = + new HashMap, RetryPolicy>(); + // Pass numRetries - 1 because it does less-than-equal internally rather + // than the less-than we do elsewhere where we use numRetries. + RetryPolicy rp = + RetryPolicies.retryUpToMaximumCountWithProportionalSleep(numRetries, + this.pause, TimeUnit.MILLISECONDS); + exceptionToPolicyMap.put(NotServingRegionException.class, rp); + exceptionToPolicyMap.put(WrongRegionException.class, rp); + exceptionToPolicyMap.put(RegionNotFoundException.class, rp); + return RetryPolicies.retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL, + exceptionToPolicyMap); - for(int tries = 0; tries < numRetries && info == null; tries++) { - info = getTableInfo(row); - - try { - this.currentServer = getHRegionConnection(info.serverAddress); - this.currentRegion = info.regionInfo.regionName; - this.clientid = rand.nextLong(); - lockid = currentServer.startUpdate(this.currentRegion, this.clientid, row); + } + + /* + * Interface for {@link #startUpate()} used by the + * {@link org.apache.hadoop.io.retry} mechanism. + */ + private interface StartUpdateInterface { + /** + * @return row lockid for the update + * @throws IOException + */ + long startUpdate() throws IOException; + } - } catch(NotServingRegionException e) { - if(tries == numRetries - 1) { - // No more tries - throw e; + /* Start an atomic row insertion or update + * @param row Name of row to start update against. + * @return Row lockid. + */ + public long startUpdate(final Text row) throws IOException { + // Implemention of the StartUpdate interface. + StartUpdateInterface implementation = new StartUpdateInterface() { + private RegionLocation info = null; + private int attempts = 0; + + /* + * Wrapped method. Proxy wrapper is configured to judge whether + * exception merits retry. + * @return lockid + * @throws IOException + */ + public long startUpdate() throws IOException { + this.attempts++; + if (this.info != null) { + LOG.info("Retry of startUpdate. Attempt " + this.attempts + + " for row " + row); + // If a retry. Something wrong w/ region we have. Refind. + try { + findRegion(info); + } catch (RegionNotFoundException e) { + // continue. If no longer exists, perhaps we just came through + // a split and region is now gone. Below getRegionLocation should + // recalibrate client. + } } - findRegion(info); - info = null; - - } catch(IOException e) { - this.currentServer = null; - this.currentRegion = null; - throw e; + this.info = getRegionLocation(row); + currentServer = getHRegionConnection(info.serverAddress); + currentRegion = info.regionInfo.regionName; + clientid = rand.nextLong(); + return currentServer.startUpdate(currentRegion, clientid, row); } - - } + }; - return lockid; + // Get retry proxy wrapper around 'implementation'. + StartUpdateInterface retryProxy = (StartUpdateInterface)RetryProxy. + create(StartUpdateInterface.class, implementation, getRetryPolicy()); + // Run retry. + return retryProxy.startUpdate(); } /** Change a value for the specified column */ @@ -1062,12 +1119,11 @@ public class HClient implements HConstants { try { this.currentServer.put(this.currentRegion, this.clientid, lockid, column, new BytesWritable(val)); - } catch(IOException e) { try { this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch(IOException e2) { + LOG.warn(e2); } this.currentServer = null; this.currentRegion = null; @@ -1078,13 +1134,13 @@ public class HClient implements HConstants { /** Delete the value for a column */ public void delete(long lockid, Text column) throws IOException { try { - this.currentServer.delete(this.currentRegion, this.clientid, lockid, column); - + this.currentServer.delete(this.currentRegion, this.clientid, lockid, + column); } catch(IOException e) { try { this.currentServer.abort(this.currentRegion, this.clientid, lockid); - } catch(IOException e2) { + LOG.warn(e2); } this.currentServer = null; this.currentRegion = null; @@ -1107,7 +1163,6 @@ public class HClient implements HConstants { public void commit(long lockid) throws IOException { try { this.currentServer.commit(this.currentRegion, this.clientid, lockid); - } finally { this.currentServer = null; this.currentRegion = null; @@ -1123,7 +1178,7 @@ public class HClient implements HConstants { private Text[] columns; private Text startRow; private boolean closed; - private TableInfo[] regions; + private RegionLocation[] regions; private int currentRegion; private HRegionInterface server; private long scannerId; @@ -1139,8 +1194,8 @@ public class HClient implements HConstants { } else { firstServer = tableServers.headMap(startRow).lastKey(); } - Collection info = tableServers.tailMap(firstServer).values(); - this.regions = info.toArray(new TableInfo[info.size()]); + Collection info = tableServers.tailMap(firstServer).values(); + this.regions = info.toArray(new RegionLocation[info.size()]); } public ClientScanner(Text[] columns, Text startRow) throws IOException { @@ -1173,7 +1228,7 @@ public class HClient implements HConstants { this.server = getHRegionConnection(this.regions[currentRegion].serverAddress); for(int tries = 0; tries < numRetries; tries++) { - TableInfo info = this.regions[currentRegion]; + RegionLocation info = this.regions[currentRegion]; try { this.scannerId = this.server.openScanner(info.regionInfo.regionName, @@ -1247,15 +1302,37 @@ public class HClient implements HConstants { System.err.println(" address is read from configuration."); System.err.println("Commands:"); System.err.println(" shutdown Shutdown the HBase cluster."); - System.err.println(" createTable Takes table name, column families,... "); - System.err.println(" deleteTable Takes a table name."); - System.err.println(" iistTables List all tables."); + System.err.println(" createTable Create named table."); + System.err.println(" deleteTable Delete named table."); + System.err.println(" listTables List all tables."); System.err.println("Example Usage:"); System.err.println(" % java " + this.getClass().getName() + " shutdown"); System.err.println(" % java " + this.getClass().getName() + " createTable webcrawl contents: anchors: 10"); } + private void printCreateTableUsage(final String message) { + if (message != null && message.length() > 0) { + System.err.println(message); + } + System.err.println("Usage: java " + this.getClass().getName() + + " [options] createTable ... "); + System.err.println("Example Usage:"); + System.err.println(" % java " + this.getClass().getName() + + " createTable testtable column_x column_y column_z 3"); + } + + private void printDeleteTableUsage(final String message) { + if (message != null && message.length() > 0) { + System.err.println(message); + } + System.err.println("Usage: java " + this.getClass().getName() + + " [options] deleteTable "); + System.err.println("Example Usage:"); + System.err.println(" % java " + this.getClass().getName() + + " deleteTable testtable"); + } + public int doCommandLine(final String args[]) { // Process command-line args. TODO: Better cmd-line processing // (but hopefully something not as painful as cli options). @@ -1296,8 +1373,10 @@ public class HClient implements HConstants { if (cmd.equals("createTable")) { if (i + 2 > args.length) { - throw new IllegalArgumentException("Must supply a table name " + - "and at least one column family"); + printCreateTableUsage("Error: Supply a table name," + + " at least one column family, and maximum versions"); + errCode = 1; + break; } HTableDescriptor desc = new HTableDescriptor(args[i + 1]); boolean addedFamily = false; @@ -1316,7 +1395,9 @@ public class HClient implements HConstants { if (cmd.equals("deleteTable")) { if (i + 1 > args.length) { - throw new IllegalArgumentException("Must supply a table name"); + printDeleteTableUsage("Error: Must supply a table name"); + errCode = 1; + break; } deleteTable(new Text(args[i + 1])); errCode = 0; diff --git a/src/java/org/apache/hadoop/hbase/HConstants.java b/src/java/org/apache/hadoop/hbase/HConstants.java index 9d0dc384fbf..f8f0d8edcc6 100644 --- a/src/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/java/org/apache/hadoop/hbase/HConstants.java @@ -49,6 +49,8 @@ public interface HConstants { // TODO: Someone may try to name a column family 'log'. If they // do, it will clash with the HREGION log dir subdirectory. FIX. static final String HREGION_LOGDIR_NAME = "log"; + + static final long DEFAULT_MAX_FILE_SIZE = 128 * 1024 * 1024; // 128MB // Always store the location of the root table's HRegion. // This HRegion is never split. @@ -72,7 +74,6 @@ public interface HConstants { // Other constants - static final long DESIRED_MAX_FILE_SIZE = 128 * 1024 * 1024; // 128MB static final String UTF8_ENCODING = "UTF-8"; static final BytesWritable DELETE_BYTES = diff --git a/src/java/org/apache/hadoop/hbase/HMaster.java b/src/java/org/apache/hadoop/hbase/HMaster.java index a4a7d18c4db..ed45cbba709 100644 --- a/src/java/org/apache/hadoop/hbase/HMaster.java +++ b/src/java/org/apache/hadoop/hbase/HMaster.java @@ -50,7 +50,8 @@ import org.apache.hadoop.util.StringUtils; public class HMaster implements HConstants, HMasterInterface, HMasterRegionInterface, Runnable { - public long getProtocolVersion(String protocol, long clientVersion) + public long getProtocolVersion(String protocol, + @SuppressWarnings("unused") long clientVersion) throws IOException { if (protocol.equals(HMasterInterface.class.getName())) { return HMasterInterface.versionID; @@ -61,43 +62,41 @@ public class HMaster implements HConstants, HMasterInterface, } } - private static final Log LOG = + static final Log LOG = LogFactory.getLog(org.apache.hadoop.hbase.HMaster.class.getName()); - private volatile boolean closed; - private Path dir; + volatile boolean closed; + Path dir; private Configuration conf; - private FileSystem fs; - private Random rand; + FileSystem fs; + Random rand; private long threadWakeFrequency; private int numRetries; private long maxRegionOpenTime; - // The 'msgQueue' is used to assign work to the client processor thread - - private Vector msgQueue; + Vector msgQueue; private Leases serverLeases; private Server server; private HServerAddress address; - private HClient client; + HClient client; - private long metaRescanInterval; + long metaRescanInterval; private HServerAddress rootRegionLocation; /** * Columns in the 'meta' ROOT and META tables. */ - private static final Text METACOLUMNS[] = { + static final Text METACOLUMNS[] = { COLUMN_FAMILY }; static final String MASTER_NOT_RUNNING = "Master not running"; - private boolean rootScanned; - private int numMetaRegions; + boolean rootScanned; + int numMetaRegions; /** * Base HRegion scanner class. Holds utilty common to ROOT and @@ -146,116 +145,80 @@ public class HMaster implements HConstants, HMasterInterface, *

A META region is not 'known' until it has been scanned * once. */ - private abstract class BaseScanner implements Runnable { + abstract class BaseScanner implements Runnable { private final Text FIRST_ROW = new Text(); + /** + * @param region Region to scan + * @return True if scan completed. + * @throws IOException + */ protected boolean scanRegion(final MetaRegion region) throws IOException { boolean scannedRegion = false; - HRegionInterface server = null; + HRegionInterface regionServer = null; long scannerId = -1L; if (LOG.isDebugEnabled()) { - LOG.debug("scanning meta region " + region.regionName); + LOG.debug(Thread.currentThread().getName() + " scanning meta region " + + region.regionName); } try { - server = client.getHRegionConnection(region.server); - scannerId = server.openScanner(region.regionName, METACOLUMNS, FIRST_ROW); - - DataInputBuffer inbuf = new DataInputBuffer(); + regionServer = client.getHRegionConnection(region.server); + scannerId = regionServer.openScanner(region.regionName, METACOLUMNS, + FIRST_ROW); while (true) { TreeMap results = new TreeMap(); HStoreKey key = new HStoreKey(); - - LabelledData[] values = server.next(scannerId, key); - + LabelledData[] values = regionServer.next(scannerId, key); if (values.length == 0) { break; } - + for (int i = 0; i < values.length; i++) { byte[] bytes = new byte[values[i].getData().getSize()]; - System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length); + System.arraycopy(values[i].getData().get(), 0, bytes, 0, + bytes.length); results.put(values[i].getLabel(), bytes); } - HRegionInfo info = getRegionInfo(COL_REGIONINFO, results, inbuf); - String serverName = getServerName(COL_SERVER, results); - long startCode = getStartCode(COL_STARTCODE, results); + HRegionInfo info = HRegion.getRegionInfo(results); + String serverName = HRegion.getServerName(results); + long startCode = HRegion.getStartCode(results); if(LOG.isDebugEnabled()) { - LOG.debug("row: " + info.toString() + ", server: " + serverName - + ", startCode: " + startCode); + LOG.debug(Thread.currentThread().getName() + " scanner: " + + Long.valueOf(scannerId) + " regioninfo: {" + info.toString() + + "}, server: " + serverName + ", startCode: " + startCode); } // Note Region has been assigned. checkAssigned(info, serverName, startCode); - scannedRegion = true; } + } catch (UnknownScannerException e) { + // Reset scannerId so we do not try closing a scanner the other side + // has lost account of: prevents duplicated stack trace out of the + // below close in the finally. + scannerId = -1L; } finally { try { if (scannerId != -1L) { - server.close(scannerId); + if (regionServer != null) { + regionServer.close(scannerId); + } } } catch (IOException e) { LOG.error(e); } - scannerId = -1L; } if (LOG.isDebugEnabled()) { - LOG.debug("scan of meta region " + region.regionName + " complete"); + LOG.debug(Thread.currentThread().getName() + " scan of meta region " + + region.regionName + " complete"); } return scannedRegion; } - protected HRegionInfo getRegionInfo(final Text key, - final TreeMap data, final DataInputBuffer in) - throws IOException { - byte[] bytes = data.get(key); - if (bytes == null || bytes.length == 0) { - throw new IOException("no value for " + key); - } - in.reset(bytes, bytes.length); - HRegionInfo info = new HRegionInfo(); - info.readFields(in); - return info; - } - - protected String getServerName(final Text key, - final TreeMap data) { - - byte [] bytes = data.get(key); - String name = null; - try { - name = (bytes != null && bytes.length != 0) ? - new String(bytes, UTF8_ENCODING): null; - - } catch(UnsupportedEncodingException e) { - assert(false); - } - return (name != null)? name.trim(): name; - } - - protected long getStartCode(final Text key, - final TreeMap data) { - - long startCode = -1L; - byte [] bytes = data.get(key); - if(bytes != null && bytes.length != 0) { - try { - startCode = Long.valueOf(new String(bytes, UTF8_ENCODING).trim()); - - } catch(NumberFormatException e) { - assert(false); - - } catch(UnsupportedEncodingException e) { - assert(false); - } - } - return startCode; - } - protected void checkAssigned(final HRegionInfo info, final String serverName, final long startCode) { @@ -327,23 +290,17 @@ public class HMaster implements HConstants, HMasterInterface, rootScanned = true; } try { - if (LOG.isDebugEnabled()) { - LOG.debug("RootScanner going to sleep"); - } Thread.sleep(metaRescanInterval); - if (LOG.isDebugEnabled()) { - LOG.debug("RootScanner woke up"); - } } catch(InterruptedException e) { // Catch and go around again. If interrupt, its spurious or we're // being shutdown. Go back up to the while test. } } } catch(IOException e) { - LOG.error(e); + LOG.error("ROOT scanner", e); closed = true; } - LOG.debug("ROOT scanner exiting"); + LOG.info("ROOT scanner exiting"); } } @@ -369,7 +326,6 @@ public class HMaster implements HConstants, HMasterInterface, } // Comparable - public int compareTo(Object o) { MetaRegion other = (MetaRegion)o; @@ -383,11 +339,11 @@ public class HMaster implements HConstants, HMasterInterface, } /** Work for the meta scanner is queued up here */ - private Vector metaRegionsToScan; + Vector metaRegionsToScan; - private SortedMap knownMetaRegions; + SortedMap knownMetaRegions; - private boolean allMetaRegionsScanned; + boolean allMetaRegionsScanned; /** * MetaScanner META table. @@ -399,6 +355,7 @@ public class HMaster implements HConstants, HMasterInterface, * action would prevent other work from getting done. */ private class MetaScanner extends BaseScanner { + @SuppressWarnings("null") public void run() { while (!closed) { if (LOG.isDebugEnabled()) { @@ -412,13 +369,7 @@ public class HMaster implements HConstants, HMasterInterface, } if (region == null) { try { - if (LOG.isDebugEnabled()) { - LOG.debug("MetaScanner going into wait"); - } metaRegionsToScan.wait(); - if (LOG.isDebugEnabled()) { - LOG.debug("MetaScanner woke up"); - } } catch (InterruptedException e) { // Catch and go around again. We've been woken because there // are new meta regions available or because we are being @@ -445,13 +396,7 @@ public class HMaster implements HConstants, HMasterInterface, do { try { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleep for meta rescan interval"); - } Thread.sleep(metaRescanInterval); - if (LOG.isDebugEnabled()) { - LOG.debug("Sleep for meta rescan interval"); - } } catch(InterruptedException ex) { // Catch and go around again. } @@ -472,13 +417,11 @@ public class HMaster implements HConstants, HMasterInterface, } while(true); } catch(IOException e) { - LOG.error(e); + LOG.error("META scanner", e); closed = true; } } - if(LOG.isDebugEnabled()) { - LOG.debug("META scanner exiting"); - } + LOG.info("META scanner exiting"); } private synchronized void metaRegionsScanned() { @@ -488,25 +431,17 @@ public class HMaster implements HConstants, HMasterInterface, public synchronized void waitForMetaScan() { while(!closed && !allMetaRegionsScanned) { try { - if (LOG.isDebugEnabled()) { - LOG.debug("Wait for all meta regions scanned"); - } wait(); - if (LOG.isDebugEnabled()) { - LOG.debug("Wake from wait for all meta regions scanned"); - } } catch(InterruptedException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Wake from wait for all meta regions scanned (IE)"); - } + // continue } } } } - private MetaScanner metaScanner; + MetaScanner metaScanner; private Thread metaScannerThread; - private Integer metaScannerLock = 0; + Integer metaScannerLock = 0; // The 'unassignedRegions' table maps from a region name to a HRegionInfo record, // which includes the region's table, its id, and its start/end keys. @@ -514,31 +449,28 @@ public class HMaster implements HConstants, HMasterInterface, // We fill 'unassignedRecords' by scanning ROOT and META tables, learning the // set of all known valid regions. - private SortedMap unassignedRegions; + SortedMap unassignedRegions; // The 'assignAttempts' table maps from regions to a timestamp that indicates // the last time we *tried* to assign the region to a RegionServer. If the // timestamp is out of date, then we can try to reassign it. - private SortedMap assignAttempts; + SortedMap assignAttempts; - // 'killList' indicates regions that we hope to close and not reopen - // (because we're merging them, or taking the table offline, for example). - - private SortedMap> killList; + SortedMap> killList; // 'killedRegions' contains regions that are in the process of being closed - private SortedSet killedRegions; + SortedSet killedRegions; // 'regionsToDelete' contains regions that need to be deleted, but cannot be // until the region server closes it - private SortedSet regionsToDelete; + SortedSet regionsToDelete; // A map of known server names to server info - private SortedMap serversToServerInfo = + SortedMap serversToServerInfo = Collections.synchronizedSortedMap(new TreeMap()); /** Build the HMaster out of a raw configuration item. */ @@ -576,18 +508,16 @@ public class HMaster implements HConstants, HMasterInterface, if(! fs.exists(rootRegionDir)) { LOG.info("bootstrap: creating ROOT and first META regions"); try { - HRegion root = HRegion.createNewHRegion(fs, dir, conf, - HGlobals.rootTableDesc, 0L, null, null); - HRegion meta = HRegion.createNewHRegion(fs, dir, conf, - HGlobals.metaTableDesc, 1L, null, null); - - HRegion.addRegionToMeta(root, meta); - + HRegion root = HRegion.createHRegion(0L, HGlobals.rootTableDesc, + this.dir, this.conf); + HRegion meta = HRegion.createHRegion(1L, HGlobals.metaTableDesc, + this.dir, this.conf); + // Add first region from the META table to the ROOT region. + HRegion.addRegionToMETA(root, meta); root.close(); root.getLog().close(); meta.close(); meta.getLog().close(); - } catch(IOException e) { LOG.error(e); } @@ -690,6 +620,7 @@ public class HMaster implements HConstants, HMasterInterface, try { msgQueue.wait(threadWakeFrequency); } catch(InterruptedException iex) { + // continue } } if(closed) { @@ -751,9 +682,7 @@ public class HMaster implements HConstants, HMasterInterface, LOG.warn(iex); } - if(LOG.isDebugEnabled()) { - LOG.debug("HMaster main thread exiting"); - } + LOG.info("HMaster main thread exiting"); } /** @@ -1085,7 +1014,7 @@ public class HMaster implements HConstants, HMasterInterface, } } } - return (HMsg[]) returnMsgs.toArray(new HMsg[returnMsgs.size()]); + return returnMsgs.toArray(new HMsg[returnMsgs.size()]); } private synchronized void rootRegionIsAvailable() { @@ -1195,8 +1124,8 @@ public class HMaster implements HConstants, HMasterInterface, long startCode = -1L; try { - startCode = Long.valueOf(new String(bytes, UTF8_ENCODING)); - + startCode = Long.valueOf(new String(bytes, UTF8_ENCODING)). + longValue(); } catch(UnsupportedEncodingException e) { LOG.error(e); break; @@ -1558,7 +1487,7 @@ public class HMaster implements HConstants, HMasterInterface, } } - private synchronized void waitForRootRegion() { + synchronized void waitForRootRegion() { while (rootRegionLocation == null) { try { if (LOG.isDebugEnabled()) { @@ -1625,8 +1554,8 @@ public class HMaster implements HConstants, HMasterInterface, // 2. Create the HRegion - HRegion r = HRegion.createNewHRegion(fs, dir, conf, desc, - newRegion.regionId, null, null); + HRegion r = HRegion.createHRegion(newRegion.regionId, desc, this.dir, + this.conf); // 3. Insert into meta @@ -1874,7 +1803,6 @@ public class HMaster implements HConstants, HMasterInterface, protected boolean isBeingServed(String serverName, long startCode) { boolean result = false; - if(serverName != null && startCode != -1L) { HServerInfo s = serversToServerInfo.get(serverName); result = s != null && s.getStartCode() == startCode; @@ -1889,29 +1817,30 @@ public class HMaster implements HConstants, HMasterInterface, protected abstract void processScanItem(String serverName, long startCode, HRegionInfo info) throws IOException; - protected abstract void postProcessMeta(MetaRegion m, - HRegionInterface server) throws IOException; + protected abstract void postProcessMeta(MetaRegion m, + HRegionInterface server) + throws IOException; } private class ChangeTableState extends TableOperation { private boolean online; - protected TreeMap> servedRegions; + protected TreeMap> servedRegions = + new TreeMap>(); protected long lockid; protected long clientId; public ChangeTableState(Text tableName, boolean onLine) throws IOException { super(tableName); this.online = onLine; - this.servedRegions = new TreeMap>(); } protected void processScanItem(String serverName, long startCode, - HRegionInfo info) throws IOException { - - if(isBeingServed(serverName, startCode)) { + HRegionInfo info) + throws IOException { + if (isBeingServed(serverName, startCode)) { TreeSet regions = servedRegions.get(serverName); - if(regions == null) { + if (regions == null) { regions = new TreeSet(); } regions.add(info); @@ -1921,16 +1850,12 @@ public class HMaster implements HConstants, HMasterInterface, protected void postProcessMeta(MetaRegion m, HRegionInterface server) throws IOException { - // Process regions not being served - if(LOG.isDebugEnabled()) { LOG.debug("processing unserved regions"); } for(HRegionInfo i: unservedRegions) { - // Update meta table - if(LOG.isDebugEnabled()) { LOG.debug("updating columns in row: " + i.regionName); } @@ -1986,13 +1911,12 @@ public class HMaster implements HConstants, HMasterInterface, } for(Map.Entry> e: servedRegions.entrySet()) { String serverName = e.getKey(); - - if(online) { + if (online) { + LOG.debug("Already online"); continue; // Already being served } - // Cause regions being served to be take off-line and disabled - + // Cause regions being served to be taken off-line and disabled TreeMap localKillList = killList.get(serverName); if(localKillList == null) { localKillList = new TreeMap(); @@ -2003,9 +1927,10 @@ public class HMaster implements HConstants, HMasterInterface, } localKillList.put(i.regionName, i); } - if(localKillList != null && localKillList.size() > 0) { + if(localKillList.size() > 0) { if(LOG.isDebugEnabled()) { - LOG.debug("inserted local kill list into kill list for server " + serverName); + LOG.debug("inserted local kill list into kill list for server " + + serverName); } killList.put(serverName, localKillList); } @@ -2036,23 +1961,18 @@ public class HMaster implements HConstants, HMasterInterface, protected void postProcessMeta(MetaRegion m, HRegionInterface server) throws IOException { - - // For regions that are being served, mark them for deletion - - for(TreeSet s: servedRegions.values()) { - for(HRegionInfo i: s) { + // For regions that are being served, mark them for deletion + for (TreeSet s: servedRegions.values()) { + for (HRegionInfo i: s) { regionsToDelete.add(i.regionName); } } // Unserved regions we can delete now - - for(HRegionInfo i: unservedRegions) { + for (HRegionInfo i: unservedRegions) { // Delete the region - try { HRegion.deleteRegion(fs, dir, i.regionName); - } catch(IOException e) { LOG.error("failed to delete region " + i.regionName); LOG.error(e); @@ -2062,22 +1982,23 @@ public class HMaster implements HConstants, HMasterInterface, } @Override - protected void updateRegionInfo(HRegionInterface server, Text regionName, - HRegionInfo i) throws IOException { - + protected void updateRegionInfo( + @SuppressWarnings("hiding") HRegionInterface server, Text regionName, + @SuppressWarnings("unused") HRegionInfo i) + throws IOException { server.delete(regionName, clientId, lockid, COL_REGIONINFO); } } private abstract class ColumnOperation extends TableOperation { - protected ColumnOperation(Text tableName) throws IOException { super(tableName); } - protected void processScanItem(String serverName, long startCode, - HRegionInfo info) throws IOException { - + protected void processScanItem( + @SuppressWarnings("unused") String serverName, + @SuppressWarnings("unused") long startCode, final HRegionInfo info) + throws IOException { if(isEnabled(info)) { throw new TableNotDisabledException(tableName.toString()); } @@ -2196,10 +2117,7 @@ public class HMaster implements HConstants, HMasterInterface, } public void leaseExpired() { - if(LOG.isDebugEnabled()) { - LOG.debug(server + " lease expired"); - } - + LOG.info(server + " lease expired"); HServerInfo storedInfo = serversToServerInfo.remove(server); synchronized(msgQueue) { msgQueue.add(new PendingServerShutdown(storedInfo)); @@ -2218,7 +2136,7 @@ public class HMaster implements HConstants, HMasterInterface, System.exit(0); } - public static void main(String [] args) throws IOException { + public static void main(String [] args) { if (args.length < 1) { printUsageAndExit(); } @@ -2261,4 +2179,4 @@ public class HMaster implements HConstants, HMasterInterface, printUsageAndExit(); } } -} \ No newline at end of file +} diff --git a/src/java/org/apache/hadoop/hbase/HMemcache.java b/src/java/org/apache/hadoop/hbase/HMemcache.java index 740caf1d323..9b231c73114 100644 --- a/src/java/org/apache/hadoop/hbase/HMemcache.java +++ b/src/java/org/apache/hadoop/hbase/HMemcache.java @@ -27,12 +27,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; -/******************************************************************************* +/** * The HMemcache holds in-memory modifications to the HRegion. This is really a * wrapper around a TreeMap that helps us when staging the Memcache out to disk. - ******************************************************************************/ + */ public class HMemcache { - private static final Log LOG = LogFactory.getLog(HMemcache.class); + private final Log LOG = LogFactory.getLog(this.getClass().getName()); TreeMap memcache = new TreeMap(); @@ -42,7 +42,7 @@ public class HMemcache { TreeMap snapshot = null; - private final HLocking lock = new HLocking(); + final HLocking lock = new HLocking(); public HMemcache() { super(); @@ -147,7 +147,8 @@ public class HMemcache { * * Operation uses a write lock. */ - public void add(Text row, TreeMap columns, long timestamp) { + public void add(final Text row, final TreeMap columns, + final long timestamp) { this.lock.obtainWriteLock(); try { for (Map.Entry es: columns.entrySet()) { @@ -239,7 +240,6 @@ public class HMemcache { Vector result = new Vector(); HStoreKey curKey = new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp()); SortedMap tailMap = map.tailMap(curKey); - for (Map.Entry es: tailMap.entrySet()) { HStoreKey itKey = es.getKey(); if (itKey.matchesRowCol(curKey)) { @@ -257,9 +257,9 @@ public class HMemcache { /** * Return a scanner over the keys in the HMemcache */ - public HInternalScannerInterface getScanner(long timestamp, Text targetCols[], Text firstRow) - throws IOException { - + public HInternalScannerInterface getScanner(long timestamp, + Text targetCols[], Text firstRow) + throws IOException { return new HMemcacheScanner(timestamp, targetCols, firstRow); } @@ -295,16 +295,11 @@ public class HMemcache { this.vals = new BytesWritable[backingMaps.length]; // Generate list of iterators - HStoreKey firstKey = new HStoreKey(firstRow); for(int i = 0; i < backingMaps.length; i++) { - if(firstRow.getLength() != 0) { - keyIterators[i] = backingMaps[i].tailMap(firstKey).keySet().iterator(); - - } else { - keyIterators[i] = backingMaps[i].keySet().iterator(); - } - + keyIterators[i] = (firstRow.getLength() != 0)? + backingMaps[i].tailMap(firstKey).keySet().iterator(): + backingMaps[i].keySet().iterator(); while(getNext(i)) { if(! findFirstRow(i, firstRow)) { continue; @@ -314,7 +309,6 @@ public class HMemcache { } } } - } catch(IOException ex) { LOG.error(ex); close(); @@ -326,9 +320,9 @@ public class HMemcache { * The user didn't want to start scanning at the first row. This method * seeks to the requested row. * - * @param i - which iterator to advance - * @param firstRow - seek to this row - * @return - true if this is the first row + * @param i which iterator to advance + * @param firstRow seek to this row + * @return true if this is the first row */ boolean findFirstRow(int i, Text firstRow) { return ((firstRow.getLength() == 0) @@ -338,11 +332,11 @@ public class HMemcache { /** * Get the next value from the specified iterater. * - * @param i - which iterator to fetch next value from - * @return - true if there is more data available + * @param i Which iterator to fetch next value from + * @return true if there is more data available */ boolean getNext(int i) { - if(! keyIterators[i].hasNext()) { + if (!keyIterators[i].hasNext()) { closeSubScanner(i); return false; } diff --git a/src/java/org/apache/hadoop/hbase/HMerge.java b/src/java/org/apache/hadoop/hbase/HMerge.java index 420413f1416..5390055cffe 100644 --- a/src/java/org/apache/hadoop/hbase/HMerge.java +++ b/src/java/org/apache/hadoop/hbase/HMerge.java @@ -146,7 +146,9 @@ public class HMerge implements HConstants { nextSize = nextRegion.largestHStore(); - if((currentSize + nextSize) <= (DESIRED_MAX_FILE_SIZE / 2)) { + long maxFilesize = + conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); + if((currentSize + nextSize) <= (maxFilesize / 2)) { // We merge two adjacent regions if their total size is less than // one half of the desired maximum size diff --git a/src/java/org/apache/hadoop/hbase/HRegion.java b/src/java/org/apache/hadoop/hbase/HRegion.java index ca918a3737b..cc77c45b354 100644 --- a/src/java/org/apache/hadoop/hbase/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/HRegion.java @@ -62,7 +62,7 @@ public class HRegion implements HConstants { static int MIN_COMMITS_FOR_COMPACTION = 10; static Random rand = new Random(); - private static final Log LOG = LogFactory.getLog(HRegion.class); + static final Log LOG = LogFactory.getLog(HRegion.class); /** * Deletes all the files for a HRegion @@ -74,6 +74,7 @@ public class HRegion implements HConstants { */ public static void deleteRegion(FileSystem fs, Path baseDirectory, Text regionName) throws IOException { + LOG.debug("Deleting region " + regionName); fs.delete(HStoreFile.getHRegionDir(baseDirectory, regionName)); } @@ -134,14 +135,10 @@ public class HRegion implements HConstants { + (endKey == null ? "" : endKey) + "'"); // Flush each of the sources, and merge their files into a single - // target for each column family. - - if(LOG.isDebugEnabled()) { - LOG.debug("flushing and getting file names for region " + srcA.getRegionName()); - } - + // target for each column family. TreeSet alreadyMerged = new TreeSet(); - TreeMap> filesToMerge = new TreeMap>(); + TreeMap> filesToMerge = + new TreeMap>(); for(HStoreFile src: srcA.flushcache(true)) { Vector v = filesToMerge.get(src.getColFamily()); if(v == null) { @@ -151,10 +148,6 @@ public class HRegion implements HConstants { v.add(src); } - if(LOG.isDebugEnabled()) { - LOG.debug("flushing and getting file names for region " + srcB.getRegionName()); - } - for(HStoreFile src: srcB.flushcache(true)) { Vector v = filesToMerge.get(src.getColFamily()); if(v == null) { @@ -187,6 +180,7 @@ public class HRegion implements HConstants { } filesToMerge.clear(); + for(HStoreFile src: srcA.close()) { if(! alreadyMerged.contains(src)) { Vector v = filesToMerge.get(src.getColFamily()); @@ -330,6 +324,7 @@ public class HRegion implements HConstants { int maxUnflushedEntries = 0; int compactionThreshold = 0; private final HLocking lock = new HLocking(); + private long desiredMaxFileSize; ////////////////////////////////////////////////////////////////////////////// // Constructor @@ -382,7 +377,6 @@ public class HRegion implements HConstants { // Load in all the HStores. for(Map.Entry e : this.regionInfo.tableDesc.families().entrySet()) { - Text colFamily = HStoreKey.extractFamily(e.getKey()); stores.put(colFamily, new HStore(dir, this.regionInfo.regionName, e.getValue(), fs, oldLogFile, conf)); @@ -404,7 +398,12 @@ public class HRegion implements HConstants { // By default, we compact the region if an HStore has more than 10 map files - this.compactionThreshold = conf.getInt("hbase.hregion.compactionThreshold", 10); + this.compactionThreshold = + conf.getInt("hbase.hregion.compactionThreshold", 10); + + // By default we split region if a file > DEFAULT_MAX_FILE_SIZE. + this.desiredMaxFileSize = + conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE); // HRegion is ready to go! this.writestate.writesOngoing = false; @@ -448,6 +447,7 @@ public class HRegion implements HConstants { try { writestate.wait(); } catch (InterruptedException iex) { + // continue } } writestate.writesOngoing = true; @@ -456,24 +456,21 @@ public class HRegion implements HConstants { if(! shouldClose) { return null; + } + LOG.info("closing region " + this.regionInfo.regionName); + Vector allHStoreFiles = internalFlushcache(); + for (HStore store: stores.values()) { + store.close(); + } + try { + return allHStoreFiles; - } else { - LOG.info("closing region " + this.regionInfo.regionName); - Vector allHStoreFiles = internalFlushcache(); - for(Iterator it = stores.values().iterator(); it.hasNext(); ) { - HStore store = it.next(); - store.close(); - } - try { - return allHStoreFiles; - - } finally { - synchronized(writestate) { - writestate.closed = true; - writestate.writesOngoing = false; - } - LOG.info("region " + this.regionInfo.regionName + " closed"); + } finally { + synchronized (writestate) { + writestate.closed = true; + writestate.writesOngoing = false; } + LOG.info("region " + this.regionInfo.regionName + " closed"); } } finally { lock.releaseWriteLock(); @@ -493,10 +490,11 @@ public class HRegion implements HConstants { && (regionInfo.startKey.compareTo(midKey) > 0)) || ((regionInfo.endKey.getLength() != 0) && (regionInfo.endKey.compareTo(midKey) < 0))) { - throw new IOException("Region splitkey must lie within region boundaries."); + throw new IOException("Region splitkey must lie within region " + + "boundaries."); } - LOG.info("splitting region " + this.regionInfo.regionName); + LOG.info("Splitting region " + this.regionInfo.regionName); Path splits = new Path(regiondir, SPLITDIR); if(! fs.exists(splits)) { @@ -524,39 +522,41 @@ public class HRegion implements HConstants { // Flush this HRegion out to storage, and turn off flushes // or compactions until close() is called. + // TODO: flushcache can come back null if it can't do the flush. FIX. Vector hstoreFilesToSplit = flushcache(true); - for(Iterator it = hstoreFilesToSplit.iterator(); it.hasNext(); ) { - HStoreFile hsf = it.next(); - - if(LOG.isDebugEnabled()) { - LOG.debug("splitting HStore " + hsf.getRegionName() + "/" - + hsf.getColFamily() + "/" + hsf.fileId()); + for(HStoreFile hsf: hstoreFilesToSplit) { + if (LOG.isDebugEnabled()) { + LOG.debug("Splitting HStore " + hsf.getRegionName() + "/" + + hsf.getColFamily() + "/" + hsf.fileId()); } - HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName, - hsf.getColFamily(), Math.abs(rand.nextLong())); - + hsf.getColFamily(), Math.abs(rand.nextLong())); HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName, - hsf.getColFamily(), Math.abs(rand.nextLong())); - + hsf.getColFamily(), Math.abs(rand.nextLong())); hsf.splitStoreFile(midKey, dstA, dstB, fs, conf); alreadySplit.add(hsf); } // We just copied most of the data. + // Notify the caller that we are about to close the region + listener.closing(this.getRegionName()); - listener.regionIsUnavailable(this.getRegionName()); - - // Now close the HRegion and copy the small remainder + // Wait on the last row updates to come in. + waitOnRowLocks(); + // Now close the HRegion hstoreFilesToSplit = close(); - for(Iterator it = hstoreFilesToSplit.iterator(); it.hasNext(); ) { - HStoreFile hsf = it.next(); - + + // Tell listener that region is now closed and that they can therefore + // clean up any outstanding references. + listener.closed(this.getRegionName()); + + // Copy the small remainder + for(HStoreFile hsf: hstoreFilesToSplit) { if(! alreadySplit.contains(hsf)) { if(LOG.isDebugEnabled()) { - LOG.debug("splitting HStore " + hsf.getRegionName() + "/" + LOG.debug("Splitting HStore " + hsf.getRegionName() + "/" + hsf.getColFamily() + "/" + hsf.fileId()); } @@ -585,8 +585,9 @@ public class HRegion implements HConstants { regions[0] = regionA; regions[1] = regionB; - LOG.info("region split complete. new regions are: " + regions[0].getRegionName() - + ", " + regions[1].getRegionName()); + LOG.info("Region split of " + this.regionInfo.regionName + " complete. " + + "New regions are: " + regions[0].getRegionName() + ", " + + regions[1].getRegionName()); return regions; } @@ -653,22 +654,19 @@ public class HRegion implements HConstants { */ public boolean needsSplit(Text midKey) { lock.obtainReadLock(); - try { Text key = new Text(); long maxSize = 0; - - for(Iterator i = stores.values().iterator(); i.hasNext(); ) { - long size = i.next().getLargestFileSize(key); - + for(HStore store: stores.values()) { + long size = store.getLargestFileSize(key); if(size > maxSize) { // Largest so far maxSize = size; midKey.set(key); } } - return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2))); - + return (maxSize > + (this.desiredMaxFileSize + (this.desiredMaxFileSize / 2))); } finally { lock.releaseReadLock(); } @@ -701,16 +699,16 @@ public class HRegion implements HConstants { */ public boolean needsCompaction() { boolean needsCompaction = false; - lock.obtainReadLock(); + this.lock.obtainReadLock(); try { - for(Iterator i = stores.values().iterator(); i.hasNext(); ) { - if(i.next().getNMaps() > compactionThreshold) { + for(HStore store: stores.values()) { + if(store.getNMaps() > this.compactionThreshold) { needsCompaction = true; break; } } } finally { - lock.releaseReadLock(); + this.lock.releaseReadLock(); } return needsCompaction; } @@ -732,41 +730,35 @@ public class HRegion implements HConstants { boolean shouldCompact = false; lock.obtainReadLock(); try { - synchronized(writestate) { - if((! writestate.writesOngoing) - && writestate.writesEnabled - && (! writestate.closed) - && recentCommits > MIN_COMMITS_FOR_COMPACTION) { - + synchronized (writestate) { + if ((!writestate.writesOngoing) && + writestate.writesEnabled && + (!writestate.closed) && + recentCommits > MIN_COMMITS_FOR_COMPACTION) { writestate.writesOngoing = true; shouldCompact = true; } } - } finally { - lock.releaseReadLock(); - } - if(! shouldCompact) { - LOG.info("not compacting region " + this.regionInfo.regionName); - return false; - } - lock.obtainWriteLock(); - try { - LOG.info("starting compaction on region " + this.regionInfo.regionName); - for (Iterator it = stores.values().iterator(); it.hasNext();) { - HStore store = it.next(); + if (!shouldCompact) { + LOG.info("not compacting region " + this.regionInfo); + return false; + } + + LOG.info("starting compaction on region " + this.regionInfo); + for (HStore store : stores.values()) { store.compact(); } - LOG.info("compaction completed on region " + this.regionInfo.regionName); + LOG.info("compaction completed on region " + this.regionInfo); return true; - + } finally { + lock.releaseReadLock(); synchronized (writestate) { writestate.writesOngoing = false; recentCommits = 0; writestate.notifyAll(); } - lock.releaseWriteLock(); } } @@ -800,7 +792,8 @@ public class HRegion implements HConstants { * This method may block for some time, so it should not be called from a * time-sensitive thread. */ - public Vector flushcache(boolean disableFutureWrites) throws IOException { + public Vector flushcache(boolean disableFutureWrites) + throws IOException { boolean shouldFlush = false; synchronized(writestate) { if((! writestate.writesOngoing) @@ -818,45 +811,45 @@ public class HRegion implements HConstants { if(! shouldFlush) { if(LOG.isDebugEnabled()) { - LOG.debug("not flushing cache for region " + this.regionInfo.regionName); + LOG.debug("not flushing cache for region " + + this.regionInfo.regionName); } - return null; - - } else { - try { - return internalFlushcache(); - - } finally { - synchronized(writestate) { - writestate.writesOngoing = false; - writestate.notifyAll(); - } + return null; + } + + try { + return internalFlushcache(); + + } finally { + synchronized (writestate) { + writestate.writesOngoing = false; + writestate.notifyAll(); } } } /** - * Flushing the cache is a little tricky. We have a lot of updates in the - * HMemcache, all of which have also been written to the log. We need to write - * those updates in the HMemcache out to disk, while being able to process - * reads/writes as much as possible during the flush operation. Also, the log + * Flushing the cache is a little tricky. We have a lot of updates in the + * HMemcache, all of which have also been written to the log. We need to write + * those updates in the HMemcache out to disk, while being able to process + * reads/writes as much as possible during the flush operation. Also, the log * has to state clearly the point in time at which the HMemcache was flushed. * (That way, during recovery, we know when we can rely on the on-disk flushed * structures and when we have to recover the HMemcache from the log.) - * + * * So, we have a three-step process: - * + * * A. Flush the memcache to the on-disk stores, noting the current sequence ID - * for the log. - * - * B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence ID - * that was current at the time of memcache-flush. - * - * C. Get rid of the memcache structures that are now redundant, as they've - * been flushed to the on-disk HStores. - * + * for the log. + * + * B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence ID + * that was current at the time of memcache-flush. + * + * C. Get rid of the memcache structures that are now redundant, as they've + * been flushed to the on-disk HStores. + * * This method is protected, but can be accessed via several public routes. - * + * * This method may block for some time. */ Vector internalFlushcache() throws IOException { @@ -884,8 +877,7 @@ public class HRegion implements HConstants { HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log); TreeMap memcacheSnapshot = retval.memcacheSnapshot; if(memcacheSnapshot == null) { - for(Iterator it = stores.values().iterator(); it.hasNext(); ) { - HStore hstore = it.next(); + for(HStore hstore: stores.values()) { Vector hstoreFiles = hstore.getAllMapFiles(); allHStoreFiles.addAll(0, hstoreFiles); } @@ -944,12 +936,7 @@ public class HRegion implements HConstants { /** Fetch a single data item. */ public BytesWritable get(Text row, Text column) throws IOException { BytesWritable[] results = get(row, column, Long.MAX_VALUE, 1); - if(results == null) { - return null; - - } else { - return results[0]; - } + return (results == null)? null: results[0]; } /** Fetch multiple versions of a single data item */ @@ -972,13 +959,13 @@ public class HRegion implements HConstants { // Obtain the row-lock - obtainLock(row); + obtainRowLock(row); try { // Obtain the -col results return get(new HStoreKey(row, column, timestamp), numVersions); } finally { - releaseLock(row); + releaseRowLock(row); } } @@ -1042,7 +1029,8 @@ public class HRegion implements HConstants { * Return an iterator that scans over the HRegion, returning the indicated * columns. This Iterator must be closed by the caller. */ - public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) throws IOException { + public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) + throws IOException { lock.obtainReadLock(); try { TreeSet families = new TreeSet(); @@ -1052,12 +1040,10 @@ public class HRegion implements HConstants { HStore[] storelist = new HStore[families.size()]; int i = 0; - for(Iterator it = families.iterator(); it.hasNext(); ) { - Text family = it.next(); + for (Text family: families) { storelist[i++] = stores.get(family); } return new HScanner(cols, firstRow, memcache, storelist); - } finally { lock.releaseReadLock(); } @@ -1068,26 +1054,26 @@ public class HRegion implements HConstants { ////////////////////////////////////////////////////////////////////////////// /** - * The caller wants to apply a series of writes to a single row in the HRegion. - * The caller will invoke startUpdate(), followed by a series of calls to - * put/delete, then finally either abort() or commit(). + * The caller wants to apply a series of writes to a single row in the + * HRegion. The caller will invoke startUpdate(), followed by a series of + * calls to put/delete, then finally either abort() or commit(). * - * Note that we rely on the external caller to properly abort() or commit() - * every transaction. If the caller is a network client, there should be a - * lease-system in place that automatically aborts() transactions after a - * specified quiet period. + *

Note that we rely on the external caller to properly abort() or + * commit() every transaction. If the caller is a network client, there + * should be a lease-system in place that automatically aborts() transactions + * after a specified quiet period. + * + * @param row Row to update + * @return lockid + * @see #put(long, Text, BytesWritable) */ public long startUpdate(Text row) throws IOException { - - // We obtain a per-row lock, so other clients will - // block while one client performs an update. - - lock.obtainReadLock(); - try { - return obtainLock(row); - } finally { - lock.releaseReadLock(); - } + // We obtain a per-row lock, so other clients will block while one client + // performs an update. The read lock is released by the client calling + // #commit or #abort or if the HRegionServer lease on the lock expires. + // See HRegionServer#RegionListener for how the expire on HRegionServer + // invokes a HRegion#abort. + return obtainRowLock(row); } /** @@ -1099,10 +1085,11 @@ public class HRegion implements HConstants { * This method really just tests the input, then calls an internal localput() * method. */ - public void put(long lockid, Text targetCol, BytesWritable val) throws IOException { + public void put(long lockid, Text targetCol, BytesWritable val) + throws IOException { if(val.getSize() == DELETE_BYTES.getSize() && val.compareTo(DELETE_BYTES) == 0) { - throw new IOException("Cannot insert value: " + val); + throw new IOException("Cannot insert value: " + val); } localput(lockid, targetCol, val); } @@ -1114,14 +1101,21 @@ public class HRegion implements HConstants { localput(lockid, targetCol, DELETE_BYTES); } - /** + /* * Private implementation. * - * localput() is used for both puts and deletes. We just place the values into - * a per-row pending area, until a commit() or abort() call is received. + * localput() is used for both puts and deletes. We just place the values + * into a per-row pending area, until a commit() or abort() call is received. * (Or until the user's write-lock expires.) + * + * @param lockid + * @param targetCol + * @param val Value to enter into cell + * @throws IOException */ - void localput(long lockid, Text targetCol, BytesWritable val) throws IOException { + void localput(final long lockid, final Text targetCol, + final BytesWritable val) + throws IOException { checkColumn(targetCol); Text row = getRowFromLock(lockid); @@ -1132,15 +1126,12 @@ public class HRegion implements HConstants { // This sync block makes localput() thread-safe when multiple // threads from the same client attempt an insert on the same // locked row (via lockid). - synchronized(row) { - // This check makes sure that another thread from the client // hasn't aborted/committed the write-operation. - if(row != getRowFromLock(lockid)) { - throw new LockException("Locking error: put operation on lock " + lockid - + " unexpected aborted by another thread"); + throw new LockException("Locking error: put operation on lock " + + lockid + " unexpected aborted by another thread"); } TreeMap targets = targetColumns.get(lockid); @@ -1178,22 +1169,22 @@ public class HRegion implements HConstants { } targetColumns.remove(lockid); - releaseLock(row); + releaseRowLock(row); } } /** - * Commit a pending set of writes to the memcache. This also results in writing - * to the change log. + * Commit a pending set of writes to the memcache. This also results in + * writing to the change log. * * Once updates hit the change log, they are safe. They will either be moved * into an HStore in the future, or they will be recovered from the log. + * @param lockid Lock for row we're to commit. + * @throws IOException */ - public void commit(long lockid) throws IOException { - + public void commit(final long lockid) throws IOException { // Remove the row from the pendingWrites list so // that repeated executions won't screw this up. - Text row = getRowFromLock(lockid); if(row == null) { throw new LockException("No write lock for lockid " + lockid); @@ -1201,22 +1192,16 @@ public class HRegion implements HConstants { // This check makes sure that another thread from the client // hasn't aborted/committed the write-operation - synchronized(row) { - - // We can now commit the changes. // Add updates to the log and add values to the memcache. - long commitTimestamp = System.currentTimeMillis(); log.append(regionInfo.regionName, regionInfo.tableDesc.getName(), row, - targetColumns.get(lockid), commitTimestamp); - - memcache.add(row, targetColumns.get(lockid), commitTimestamp); - + targetColumns.get(Long.valueOf(lockid)), commitTimestamp); + memcache.add(row, targetColumns.get(Long.valueOf(lockid)), + commitTimestamp); // OK, all done! - - targetColumns.remove(lockid); - releaseLock(row); + targetColumns.remove(Long.valueOf(lockid)); + releaseRowLock(row); } recentCommits++; this.commitsSinceFlush++; @@ -1235,9 +1220,10 @@ public class HRegion implements HConstants { // all's well } else { - throw new IOException("Requested row out of range for HRegion " - + regionInfo.regionName + ", startKey='" + regionInfo.startKey - + "', endKey='" + regionInfo.endKey + "', row='" + row + "'"); + throw new WrongRegionException("Requested row out of range for " + + "HRegion " + regionInfo.regionName + ", startKey='" + + regionInfo.startKey + "', endKey='" + regionInfo.endKey + "', row='" + + row + "'"); } } @@ -1255,25 +1241,32 @@ public class HRegion implements HConstants { * Obtain a lock on the given row. Blocks until success. * * I know it's strange to have two mappings: + *

    *   ROWS  ==> LOCKS
+   * 
* as well as + *
    *   LOCKS ==> ROWS
+   * 
* - * But it acts as a guard on the client; a miswritten client just can't submit - * the name of a row and start writing to it; it must know the correct lockid, - * which matches the lock list in memory. + * But it acts as a guard on the client; a miswritten client just can't + * submit the name of a row and start writing to it; it must know the correct + * lockid, which matches the lock list in memory. * - * It would be more memory-efficient to assume a correctly-written client, + *

It would be more memory-efficient to assume a correctly-written client, * which maybe we'll do in the future. + * + * @param row Name of row to lock. + * @return The id of the held lock. */ - long obtainLock(Text row) throws IOException { + long obtainRowLock(Text row) throws IOException { checkRow(row); - synchronized(rowsToLocks) { while(rowsToLocks.get(row) != null) { try { rowsToLocks.wait(); } catch (InterruptedException ie) { + // Empty } } @@ -1285,7 +1278,7 @@ public class HRegion implements HConstants { } } - Text getRowFromLock(long lockid) throws IOException { + Text getRowFromLock(long lockid) { // Pattern is that all access to rowsToLocks and/or to // locksToRows is via a lock on rowsToLocks. synchronized(rowsToLocks) { @@ -1293,17 +1286,32 @@ public class HRegion implements HConstants { } } - /** Release the row lock! */ - void releaseLock(Text row) throws IOException { + /** Release the row lock! + * @param lock Name of row whose lock we are to release + */ + void releaseRowLock(Text row) { synchronized(rowsToLocks) { long lockid = rowsToLocks.remove(row).longValue(); locksToRows.remove(lockid); rowsToLocks.notifyAll(); } } - /******************************************************************************* + + private void waitOnRowLocks() { + synchronized (this.rowsToLocks) { + while (this.rowsToLocks.size() > 0) { + try { + this.rowsToLocks.wait(); + } catch (InterruptedException e) { + // Catch. Let while test determine loop-end. + } + } + } + } + + /* * HScanner is an iterator through a bunch of rows in an HRegion. - ******************************************************************************/ + */ private static class HScanner implements HInternalScannerInterface { private HInternalScannerInterface[] scanners; private TreeMap[] resultSets; @@ -1314,10 +1322,8 @@ public class HRegion implements HConstants { /** Create an HScanner with a handle on many HStores. */ @SuppressWarnings("unchecked") public HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores) - throws IOException { - + throws IOException { long scanTime = System.currentTimeMillis(); - this.scanners = new HInternalScannerInterface[stores.length + 1]; for(int i = 0; i < this.scanners.length; i++) { this.scanners[i] = null; @@ -1332,12 +1338,9 @@ public class HRegion implements HConstants { // All results will match the required column-set and scanTime. // NOTE: the memcache scanner should be the first scanner - try { HInternalScannerInterface scanner = memcache.getScanner(scanTime, cols, firstRow); - - if(scanner.isWildcardScanner()) { this.wildcardMatch = true; } @@ -1368,8 +1371,7 @@ public class HRegion implements HConstants { for(int i = 0; i < scanners.length; i++) { keys[i] = new HStoreKey(); resultSets[i] = new TreeMap(); - - if(scanners[i] != null && ! scanners[i].next(keys[i], resultSets[i])) { + if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) { closeScanner(i); } } @@ -1393,13 +1395,12 @@ public class HRegion implements HConstants { * Grab the next row's worth of values. The HScanner will return the most * recent data value for each row that is not newer than the target time. */ - public boolean next(HStoreKey key, TreeMap results) throws IOException { - + public boolean next(HStoreKey key, TreeMap results) + throws IOException { // Find the lowest-possible key. - Text chosenRow = null; long chosenTimestamp = -1; - for(int i = 0; i < keys.length; i++) { + for(int i = 0; i < this.keys.length; i++) { if(scanners[i] != null && (chosenRow == null || (keys[i].getRow().compareTo(chosenRow) < 0) @@ -1412,18 +1413,15 @@ public class HRegion implements HConstants { } // Store the key and results for each sub-scanner. Merge them as appropriate. - boolean insertedItem = false; if(chosenTimestamp > 0) { key.setRow(chosenRow); key.setVersion(chosenTimestamp); key.setColumn(new Text("")); - for(int i = 0; i < scanners.length; i++) { + for(int i = 0; i < scanners.length; i++) { while((scanners[i] != null) && (keys[i].getRow().compareTo(chosenRow) == 0)) { - - // If we are doing a wild card match or there are multiple matchers // per column, we need to scan all the older versions of this row // to pick up the rest of the family members @@ -1439,11 +1437,7 @@ public class HRegion implements HConstants { // values with older ones. So now we only insert // a result if the map does not contain the key. - for(Iterator> it - = resultSets[i].entrySet().iterator(); - it.hasNext(); ) { - - Map.Entry e = it.next(); + for(Map.Entry e: resultSets[i].entrySet()) { if(!results.containsKey(e.getKey())) { results.put(e.getKey(), e.getValue()); insertedItem = true; @@ -1476,7 +1470,6 @@ public class HRegion implements HConstants { void closeScanner(int i) { try { scanners[i].close(); - } finally { scanners[i] = null; keys[i] = null; @@ -1493,4 +1486,160 @@ public class HRegion implements HConstants { } } } + + // Utility methods + + /** + * Convenience method creating new HRegions. + * @param regionId ID to use + * @param tableDesc Descriptor + * @param dir Home directory for the new region. + * @param conf + * @return New META region (ROOT or META). + * @throws IOException + */ + public static HRegion createHRegion(final long regionId, + final HTableDescriptor tableDesc, final Path dir, final Configuration conf) + throws IOException { + return createHRegion(new HRegionInfo(regionId, tableDesc, null, null), + dir, conf, null, null); + } + + /** + * Convenience method creating new HRegions. Used by createTable and by the + * bootstrap code in the HMaster constructor + * + * @param info Info for region to create. + * @param dir Home dir for new region + * @param conf + * @param initialFiles InitialFiles to pass new HRegion. Pass null if none. + * @param oldLogFile Old log file to use in region initialization. Pass null + * if none. + * @return new HRegion + * + * @throws IOException + */ + public static HRegion createHRegion(final HRegionInfo info, + final Path dir, final Configuration conf, final Path initialFiles, + final Path oldLogFile) + throws IOException { + Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName); + FileSystem fs = FileSystem.get(conf); + fs.mkdirs(regionDir); + return new HRegion(dir, + new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf), + fs, conf, info, initialFiles, oldLogFile); + } + + /** + * Inserts a new region's meta information into the passed + * meta region. Used by the HMaster bootstrap code adding + * new table to ROOT table. + * + * @param meta META HRegion to be updated + * @param r HRegion to add to meta + * + * @throws IOException + */ + public static void addRegionToMETA(HRegion meta, HRegion r) + throws IOException { + // The row key is the region name + long writeid = meta.startUpdate(r.getRegionName()); + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + DataOutputStream s = new DataOutputStream(bytes); + r.getRegionInfo().write(s); + meta.put(writeid, COL_REGIONINFO, new BytesWritable(bytes.toByteArray())); + meta.commit(writeid); + } + + public static void addRegionToMETA(final HClient client, + final Text table, final HRegion region, + final HServerAddress serverAddress, + final long startCode) + throws IOException { + client.openTable(table); + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(bytes); + region.getRegionInfo().write(out); + long lockid = client.startUpdate(region.getRegionName()); + client.put(lockid, COL_REGIONINFO, bytes.toByteArray()); + client.put(lockid, COL_SERVER, + serverAddress.toString().getBytes(UTF8_ENCODING)); + client.put(lockid, COL_STARTCODE, + String.valueOf(startCode).getBytes(UTF8_ENCODING)); + client.commit(lockid); + LOG.info("Added region " + region.getRegionName() + " to table " + table); + } + + /** + * Delete region from META table. + * @param client Client to use running update. + * @param table META table we are to delete region from. + * @param regionName Region to remove. + * @throws IOException + */ + public static void removeRegionFromMETA(final HClient client, + final Text table, final Text regionName) + throws IOException { + client.openTable(table); + long lockid = client.startUpdate(regionName); + client.delete(lockid, COL_REGIONINFO); + client.delete(lockid, COL_SERVER); + client.delete(lockid, COL_STARTCODE); + client.commit(lockid); + LOG.info("Removed " + regionName + " from table " + table); + } + + /** + * @param data Map of META row labelled column data. + * @return Server + */ + static HRegionInfo getRegionInfo(final TreeMap data) + throws IOException { + byte[] bytes = data.get(COL_REGIONINFO); + if (bytes == null || bytes.length == 0) { + throw new IOException("no value for " + COL_REGIONINFO); + } + DataInputBuffer in = new DataInputBuffer(); + in.reset(bytes, bytes.length); + HRegionInfo info = new HRegionInfo(); + info.readFields(in); + return info; + } + + /** + * @param data Map of META row labelled column data. + * @return Server + */ + static String getServerName(final TreeMap data) { + byte [] bytes = data.get(COL_SERVER); + String name = null; + try { + name = (bytes != null && bytes.length != 0) ? + new String(bytes, UTF8_ENCODING): null; + + } catch(UnsupportedEncodingException e) { + assert(false); + } + return (name != null)? name.trim(): name; + } + + /** + * @param data Map of META row labelled column data. + * @return Start code. + */ + static long getStartCode(final TreeMap data) { + long startCode = -1L; + byte [] bytes = data.get(COL_STARTCODE); + if(bytes != null && bytes.length != 0) { + try { + startCode = Long.parseLong(new String(bytes, UTF8_ENCODING).trim()); + } catch(NumberFormatException e) { + LOG.error("Failed getting " + COL_STARTCODE, e); + } catch(UnsupportedEncodingException e) { + LOG.error("Failed getting " + COL_STARTCODE, e); + } + } + return startCode; + } } diff --git a/src/java/org/apache/hadoop/hbase/HRegionInfo.java b/src/java/org/apache/hadoop/hbase/HRegionInfo.java index 1ff869b081b..aa620629ab7 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionInfo.java +++ b/src/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -20,9 +20,11 @@ import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparable; /** * HRegion information. @@ -124,13 +126,27 @@ public class HRegionInfo implements WritableComparable { this.regionName.readFields(in); this.offLine = in.readBoolean(); } - + ////////////////////////////////////////////////////////////////////////////// // Comparable ////////////////////////////////////////////////////////////////////////////// public int compareTo(Object o) { - HRegionInfo other = (HRegionInfo)o; - return regionName.compareTo(other.regionName); + HRegionInfo other = (HRegionInfo) o; + + // Are regions of same table? + int result = this.tableDesc.compareTo(other.tableDesc); + if (result != 0) { + return result; + } + + // Compare start keys. + result = this.startKey.compareTo(other.startKey); + if (result != 0) { + return result; + } + + // Compare end keys. + return this.endKey.compareTo(other.endKey); } -} \ No newline at end of file +} diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index 540bee40a64..74e16b81697 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -15,17 +15,28 @@ */ package org.apache.hadoop.hbase; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.Vector; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.ipc.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.conf.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.locks.ReentrantReadWriteLock; /******************************************************************************* * HRegionServer makes a set of HRegions available to clients. It checks in with @@ -34,196 +45,194 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class HRegionServer implements HConstants, HRegionInterface, Runnable { - public long getProtocolVersion(String protocol, - long clientVersion) throws IOException { + public long getProtocolVersion(final String protocol, + @SuppressWarnings("unused") final long clientVersion) + throws IOException { if (protocol.equals(HRegionInterface.class.getName())) { return HRegionInterface.versionID; - } else { - throw new IOException("Unknown protocol to name node: " + protocol); } + throw new IOException("Unknown protocol to name node: " + protocol); } - private static final Log LOG = LogFactory.getLog(HRegionServer.class); + static final Log LOG = LogFactory.getLog(HRegionServer.class); - private volatile boolean stopRequested; + volatile boolean stopRequested; private Path regionDir; - private HServerInfo info; - private Configuration conf; + HServerInfo info; + Configuration conf; private Random rand; - private TreeMap regions; // region name -> HRegion - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + // region name -> HRegion + TreeMap onlineRegions = new TreeMap(); + Map retiringRegions = new HashMap(); + + final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private Vector outboundMsgs; - private long threadWakeFrequency; - private int maxLogEntries; + long threadWakeFrequency; private long msgInterval; - private int numRetries; // Check to see if regions should be split - - private long splitOrCompactCheckFrequency; + long splitOrCompactCheckFrequency; private SplitOrCompactChecker splitOrCompactChecker; private Thread splitOrCompactCheckerThread; - private Integer splitOrCompactLock = 0; + Integer splitOrCompactLock = Integer.valueOf(0); - private class SplitOrCompactChecker implements Runnable, RegionUnavailableListener { - private HClient client = new HClient(conf); - - /* (non-Javadoc) - * @see org.apache.hadoop.hbase.RegionUnavailableListener#regionIsUnavailable(org.apache.hadoop.io.Text) + /* + * Interface used by the {@link org.apache.hadoop.io.retry} mechanism. + */ + interface UpdateMetaInterface { + /* + * @return True if succeeded. + * @throws IOException */ - public void regionIsUnavailable(Text regionName) { + boolean update() throws IOException; + } + + class SplitOrCompactChecker implements Runnable, RegionUnavailableListener { + HClient client = new HClient(conf); + + public void closing(final Text regionName) { lock.writeLock().lock(); try { - regions.remove(regionName); + // Remove region from regions Map and add it to the Map of retiring + // regions. + retiringRegions.put(regionName, onlineRegions.remove(regionName)); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + regionName + " to retiringRegions"); + } + } finally { + lock.writeLock().unlock(); + } + } + + public void closed(final Text regionName) { + lock.writeLock().lock(); + try { + retiringRegions.remove(regionName); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing " + regionName + " from retiringRegions"); + } } finally { lock.writeLock().unlock(); } } - /* (non-Javadoc) - * @see java.lang.Runnable#run() - */ public void run() { while(! stopRequested) { long startTime = System.currentTimeMillis(); - synchronized(splitOrCompactLock) { // Don't interrupt us while we're working - // Grab a list of regions to check - Vector regionsToCheck = new Vector(); lock.readLock().lock(); try { - regionsToCheck.addAll(regions.values()); + regionsToCheck.addAll(onlineRegions.values()); } finally { lock.readLock().unlock(); } try { - for(Iteratorit = regionsToCheck.iterator(); it.hasNext(); ) { - HRegion cur = it.next(); - + for(HRegion cur: regionsToCheck) { if(cur.isClosed()) { continue; // Skip if closed } if(cur.needsCompaction()) { - - // The best time to split a region is right after it has been compacted - + // Best time to split a region is right after compaction if(cur.compactStores()) { Text midKey = new Text(); if(cur.needsSplit(midKey)) { - Text oldRegion = cur.getRegionName(); - - LOG.info("splitting region: " + oldRegion); - - HRegion[] newRegions = cur.closeAndSplit(midKey, this); - - // When a region is split, the META table needs to updated if we're - // splitting a 'normal' region, and the ROOT table needs to be - // updated if we are splitting a META region. - - if(LOG.isDebugEnabled()) { - LOG.debug("region split complete. updating meta"); - } - - Text tableToUpdate = - (oldRegion.find(META_TABLE_NAME.toString()) == 0) ? - ROOT_TABLE_NAME : META_TABLE_NAME; - - for(int tries = 0; tries < numRetries; tries++) { - try { - client.openTable(tableToUpdate); - long lockid = client.startUpdate(oldRegion); - client.delete(lockid, COL_REGIONINFO); - client.delete(lockid, COL_SERVER); - client.delete(lockid, COL_STARTCODE); - client.commit(lockid); - - for(int i = 0; i < newRegions.length; i++) { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(bytes); - newRegions[i].getRegionInfo().write(out); - - lockid = client.startUpdate(newRegions[i].getRegionName()); - client.put(lockid, COL_REGIONINFO, bytes.toByteArray()); - client.put(lockid, COL_SERVER, - info.getServerAddress().toString().getBytes(UTF8_ENCODING)); - client.put(lockid, COL_STARTCODE, - String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING)); - client.commit(lockid); - } - - // Now tell the master about the new regions - - if(LOG.isDebugEnabled()) { - LOG.debug("reporting region split to master"); - } - - reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo()); - - LOG.info("region split successful. old region=" + oldRegion - + ", new regions: " + newRegions[0].getRegionName() + ", " - + newRegions[1].getRegionName()); - - // Finally, start serving the new regions - - lock.writeLock().lock(); - try { - regions.put(newRegions[0].getRegionName(), newRegions[0]); - regions.put(newRegions[1].getRegionName(), newRegions[1]); - } finally { - lock.writeLock().unlock(); - } - - } catch(NotServingRegionException e) { - if(tries == numRetries - 1) { - throw e; - } - continue; - } - break; - } + split(cur, midKey); } } } } } catch(IOException e) { //TODO: What happens if this fails? Are we toast? - LOG.error(e); + LOG.error("What happens if this fails? Are we toast?", e); } } + + if (stopRequested) { + continue; + } // Sleep - long waitTime = stopRequested ? 0 - : splitOrCompactCheckFrequency - (System.currentTimeMillis() - startTime); + long waitTime = splitOrCompactCheckFrequency - + (System.currentTimeMillis() - startTime); if (waitTime > 0) { try { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleep splitOrCompactChecker"); - } Thread.sleep(waitTime); - if (LOG.isDebugEnabled()) { - LOG.debug("Wake splitOrCompactChecker"); - } } catch(InterruptedException iex) { + // continue } } } + LOG.info("splitOrCompactChecker exiting"); + } + + private void split(final HRegion region, final Text midKey) + throws IOException { + final Text oldRegion = region.getRegionName(); + final HRegion[] newRegions = region.closeAndSplit(midKey, this); + + // When a region is split, the META table needs to updated if we're + // splitting a 'normal' region, and the ROOT table needs to be + // updated if we are splitting a META region. + final Text tableToUpdate = + (oldRegion.find(META_TABLE_NAME.toString()) == 0) ? + ROOT_TABLE_NAME : META_TABLE_NAME; if(LOG.isDebugEnabled()) { - LOG.debug("splitOrCompactChecker exiting"); + LOG.debug("Updating " + tableToUpdate + " with region split info"); } + + // Wrap the update of META region with an org.apache.hadoop.io.retry. + UpdateMetaInterface implementation = new UpdateMetaInterface() { + public boolean update() throws IOException { + HRegion.removeRegionFromMETA(client, tableToUpdate, + region.getRegionName()); + for (int i = 0; i < newRegions.length; i++) { + HRegion.addRegionToMETA(client, tableToUpdate, newRegions[i], + info.getServerAddress(), info.getStartCode()); + } + + // Now tell the master about the new regions + if (LOG.isDebugEnabled()) { + LOG.debug("Reporting region split to master"); + } + reportSplit(newRegions[0].getRegionInfo(), newRegions[1]. + getRegionInfo()); + LOG.info("region split, META update, and report to master all" + + " successful. Old region=" + oldRegion + ", new regions: " + + newRegions[0].getRegionName() + ", " + + newRegions[1].getRegionName()); + + // Finally, start serving the new regions + lock.writeLock().lock(); + try { + onlineRegions.put(newRegions[0].getRegionName(), newRegions[0]); + onlineRegions.put(newRegions[1].getRegionName(), newRegions[1]); + } finally { + lock.writeLock().unlock(); + } + return true; + } + }; + + // Get retry proxy wrapper around 'implementation'. + UpdateMetaInterface retryProxy = (UpdateMetaInterface)RetryProxy. + create(UpdateMetaInterface.class, implementation, + client.getRetryPolicy()); + // Run retry. + retryProxy.update(); } } - - // Cache flushing - + + // Cache flushing private Flusher cacheFlusher; private Thread cacheFlusherThread; - private Integer cacheFlusherLock = 0; - private class Flusher implements Runnable { + Integer cacheFlusherLock = Integer.valueOf(0); + class Flusher implements Runnable { public void run() { while(! stopRequested) { long startTime = System.currentTimeMillis(); @@ -235,23 +244,20 @@ public class HRegionServer Vector toFlush = new Vector(); lock.readLock().lock(); try { - toFlush.addAll(regions.values()); + toFlush.addAll(onlineRegions.values()); } finally { lock.readLock().unlock(); } // Flush them, if necessary - for(Iterator it = toFlush.iterator(); it.hasNext(); ) { - HRegion cur = it.next(); - + for(HRegion cur: toFlush) { if(cur.isClosed()) { // Skip if closed continue; } try { cur.optionallyFlush(); - } catch(IOException iex) { LOG.error(iex); } @@ -263,21 +269,14 @@ public class HRegionServer : threadWakeFrequency - (System.currentTimeMillis() - startTime); if(waitTime > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleep cacheFlusher"); - } try { Thread.sleep(waitTime); } catch(InterruptedException iex) { - } - if (LOG.isDebugEnabled()) { - LOG.debug("Wake cacheFlusher"); + // continue } } } - if(LOG.isDebugEnabled()) { - LOG.debug("cacheFlusher exiting"); - } + LOG.info("cacheFlusher exiting"); } } @@ -287,45 +286,45 @@ public class HRegionServer private Path oldlogfile; // Logging - - private HLog log; + HLog log; private LogRoller logRoller; private Thread logRollerThread; - private Integer logRollerLock = 0; - private class LogRoller implements Runnable { + Integer logRollerLock = Integer.valueOf(0); + + /** + * Log rolling Runnable. + */ + class LogRoller implements Runnable { + private int maxLogEntries = + conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000); + public void run() { while(! stopRequested) { synchronized(logRollerLock) { // If the number of log entries is high enough, roll the log. This is a // very fast operation, but should not be done too frequently. int nEntries = log.getNumEntries(); - if(nEntries > maxLogEntries) { + if(nEntries > this.maxLogEntries) { try { if (LOG.isDebugEnabled()) { LOG.debug("Rolling log. Number of entries is: " + nEntries); } log.rollWriter(); } catch(IOException iex) { + // continue } } } if(!stopRequested) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleep logRoller"); - } try { Thread.sleep(threadWakeFrequency); } catch(InterruptedException iex) { - } - if (LOG.isDebugEnabled()) { - LOG.debug("Wake logRoller"); + // continue } } } - if(LOG.isDebugEnabled()) { - LOG.debug("logRoller exiting"); - } + LOG.info("logRoller exiting"); } } @@ -338,7 +337,6 @@ public class HRegionServer private Server server; // Leases - private Leases leases; /** Start a HRegionServer at the default location */ @@ -357,18 +355,17 @@ public class HRegionServer this.regionDir = regionDir; this.conf = conf; this.rand = new Random(); - this.regions = new TreeMap(); this.outboundMsgs = new Vector(); this.scanners = Collections.synchronizedMap(new TreeMap()); // Config'ed params this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); - this.maxLogEntries = conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000); this.msgInterval = conf.getLong("hbase.regionserver.msginterval", - 15 * 1000); + 15 * 1000); this.splitOrCompactCheckFrequency = - conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", 60 * 1000); + conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", + 60 * 1000); // Cache flushing this.cacheFlusher = new Flusher(); @@ -448,7 +445,7 @@ public class HRegionServer * Set a flag that will cause all the HRegionServer threads to shut down * in an orderly fashion. */ - public synchronized void stop() throws IOException { + public synchronized void stop() { stopRequested = true; notifyAll(); // Wakes run() if it is sleeping } @@ -460,24 +457,30 @@ public class HRegionServer try { this.workerThread.join(); } catch(InterruptedException iex) { + // continue } try { this.logRollerThread.join(); } catch(InterruptedException iex) { + // continue } try { this.cacheFlusherThread.join(); } catch(InterruptedException iex) { + // continue } try { this.splitOrCompactCheckerThread.join(); } catch(InterruptedException iex) { + // continue } try { this.server.join(); } catch(InterruptedException iex) { + // continue } - LOG.info("HRegionServer stopped at: " + info.getServerAddress().toString()); + LOG.info("HRegionServer stopped at: " + + info.getServerAddress().toString()); } /** @@ -506,9 +509,6 @@ public class HRegionServer : msgInterval - (System.currentTimeMillis() - lastMsg); if(waitTime > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleep"); - } synchronized (this) { try { wait(waitTime); @@ -586,9 +586,6 @@ public class HRegionServer waitTime = stopRequested ? 0 : msgInterval - (System.currentTimeMillis() - lastMsg); if (waitTime > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleep"); - } synchronized (this) { try { wait(waitTime); @@ -596,9 +593,6 @@ public class HRegionServer // On interrupt we go around to the while test of stopRequested } } - if (LOG.isDebugEnabled()) { - LOG.debug("Wake"); - } } } } @@ -666,7 +660,7 @@ public class HRegionServer * updated the meta or root regions, and the master will pick that up on its * next rescan of the root or meta tables. */ - private void reportSplit(HRegionInfo newRegionA, HRegionInfo newRegionB) { + void reportSplit(HRegionInfo newRegionA, HRegionInfo newRegionB) { synchronized(outboundMsgs) { outboundMsgs.add(new HMsg(HMsg.MSG_NEW_REGION, newRegionA)); outboundMsgs.add(new HMsg(HMsg.MSG_NEW_REGION, newRegionB)); @@ -677,10 +671,10 @@ public class HRegionServer // HMaster-given operations ////////////////////////////////////////////////////////////////////////////// - private Vector toDo; + Vector toDo; private Worker worker; private Thread workerThread; - private class Worker implements Runnable { + class Worker implements Runnable { public void stop() { synchronized(toDo) { toDo.notifyAll(); @@ -700,6 +694,7 @@ public class HRegionServer LOG.debug("Wake on todo"); } } catch(InterruptedException e) { + // continue } } if(stopRequested) { @@ -761,38 +756,34 @@ public class HRegionServer LOG.error(e); } } - if(LOG.isDebugEnabled()) { - LOG.debug("worker thread exiting"); - } + LOG.info("worker thread exiting"); } } - private void openRegion(HRegionInfo regionInfo) throws IOException { + void openRegion(HRegionInfo regionInfo) throws IOException { this.lock.writeLock().lock(); try { HRegion region = new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile); - regions.put(region.getRegionName(), region); + this.onlineRegions.put(region.getRegionName(), region); reportOpen(region); } finally { this.lock.writeLock().unlock(); } } - private void closeRegion(HRegionInfo info, boolean reportWhenCompleted) - throws IOException { - + void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted) + throws IOException { this.lock.writeLock().lock(); HRegion region = null; try { - region = regions.remove(info.regionName); + region = onlineRegions.remove(hri.regionName); } finally { this.lock.writeLock().unlock(); } if(region != null) { region.close(); - if(reportWhenCompleted) { reportClose(region); } @@ -800,12 +791,12 @@ public class HRegionServer } /** Called either when the master tells us to restart or from stop() */ - private void closeAllRegions() { + void closeAllRegions() { Vector regionsToClose = new Vector(); this.lock.writeLock().lock(); try { - regionsToClose.addAll(regions.values()); - regions.clear(); + regionsToClose.addAll(onlineRegions.values()); + onlineRegions.clear(); } finally { this.lock.writeLock().unlock(); } @@ -817,7 +808,6 @@ public class HRegionServer try { region.close(); LOG.debug("region closed " + region.getRegionName()); - } catch(IOException e) { LOG.error("error closing region " + region.getRegionName(), e); } @@ -829,55 +819,34 @@ public class HRegionServer ////////////////////////////////////////////////////////////////////////////// /** Obtain a table descriptor for the given region */ - public HRegionInfo getRegionInfo(Text regionName) throws NotServingRegionException { - HRegion region = getRegion(regionName); - return region.getRegionInfo(); + public HRegionInfo getRegionInfo(Text regionName) + throws NotServingRegionException { + return getRegion(regionName).getRegionInfo(); } /** Get the indicated row/column */ - public BytesWritable get(Text regionName, Text row, Text column) throws IOException { - HRegion region = getRegion(regionName); - - if (LOG.isDebugEnabled()) { - LOG.debug("get " + row.toString() + ", " + column.toString()); - } - BytesWritable results = region.get(row, column); - if(results != null) { - return results; - } - return null; + public BytesWritable get(Text regionName, Text row, Text column) + throws IOException { + return getRegion(regionName).get(row, column); } /** Get multiple versions of the indicated row/col */ public BytesWritable[] get(Text regionName, Text row, Text column, - int numVersions) throws IOException { - - HRegion region = getRegion(regionName); - - BytesWritable[] results = region.get(row, column, numVersions); - if(results != null) { - return results; - } - return null; + int numVersions) + throws IOException { + return getRegion(regionName).get(row, column, numVersions); } /** Get multiple timestamped versions of the indicated row/col */ public BytesWritable[] get(Text regionName, Text row, Text column, - long timestamp, int numVersions) throws IOException { - - HRegion region = getRegion(regionName); - - BytesWritable[] results = region.get(row, column, timestamp, numVersions); - if(results != null) { - return results; - } - return null; + long timestamp, int numVersions) + throws IOException { + return getRegion(regionName).get(row, column, timestamp, numVersions); } /** Get all the columns (along with their names) for a given row. */ public LabelledData[] getRow(Text regionName, Text row) throws IOException { HRegion region = getRegion(regionName); - TreeMap map = region.getFull(row); LabelledData result[] = new LabelledData[map.size()]; int counter = 0; @@ -910,13 +879,44 @@ public class HRegionServer } } + public LabelledData[] next(final long scannerId, final HStoreKey key) + throws IOException { + Text scannerName = new Text(String.valueOf(scannerId)); + HInternalScannerInterface s = scanners.get(scannerName); + if (s == null) { + throw new UnknownScannerException("Name: " + scannerName + ", key " + + key); + } + leases.renewLease(scannerName, scannerName); + TreeMap results = new TreeMap(); + ArrayList values = new ArrayList(); + // Keep getting rows till we find one that has at least one non-deleted + // column value. + while (s.next(key, results)) { + for(Map.Entry e: results.entrySet()) { + BytesWritable val = e.getValue(); + if(val.getSize() == DELETE_BYTES.getSize() + && val.compareTo(DELETE_BYTES) == 0) { + // Column value is deleted. Don't return it. + continue; + } + values.add(new LabelledData(e.getKey(), val)); + } + if (values.size() > 0) { + // Row has something in it. Let it out. Else go get another row. + break; + } + // Need to clear results before we go back up and call 'next' again. + results.clear(); + } + return values.toArray(new LabelledData[values.size()]); + } + public long startUpdate(Text regionName, long clientid, Text row) throws IOException { - HRegion region = getRegion(regionName); - long lockid = region.startUpdate(row); - leases.createLease(new Text(String.valueOf(clientid)), + this.leases.createLease(new Text(String.valueOf(clientid)), new Text(String.valueOf(lockid)), new RegionListener(region, lockid)); @@ -926,48 +926,36 @@ public class HRegionServer /** Add something to the HBase. */ public void put(Text regionName, long clientid, long lockid, Text column, BytesWritable val) throws IOException { - - HRegion region = getRegion(regionName); - + HRegion region = getRegion(regionName, true); leases.renewLease(new Text(String.valueOf(clientid)), new Text(String.valueOf(lockid))); - region.put(lockid, column, val); } /** Remove a cell from the HBase. */ public void delete(Text regionName, long clientid, long lockid, Text column) throws IOException { - HRegion region = getRegion(regionName); - leases.renewLease(new Text(String.valueOf(clientid)), new Text(String.valueOf(lockid))); - region.delete(lockid, column); } /** Abandon the transaction */ public void abort(Text regionName, long clientid, long lockid) throws IOException { - - HRegion region = getRegion(regionName); - + HRegion region = getRegion(regionName, true); leases.cancelLease(new Text(String.valueOf(clientid)), new Text(String.valueOf(lockid))); - region.abort(lockid); } /** Confirm the transaction */ public void commit(Text regionName, long clientid, long lockid) - throws IOException { - - HRegion region = getRegion(regionName); - + throws IOException { + HRegion region = getRegion(regionName, true); leases.cancelLease(new Text(String.valueOf(clientid)), new Text(String.valueOf(lockid))); - region.commit(lockid); } @@ -977,27 +965,55 @@ public class HRegionServer new Text(String.valueOf(lockid))); } - /** Private utility method for safely obtaining an HRegion handle. */ - private HRegion getRegion(Text regionName) throws NotServingRegionException { - this.lock.readLock().lock(); + /** Private utility method for safely obtaining an HRegion handle. + * @param regionName Name of online {@link HRegion} to return + * @return {@link HRegion} for regionName + * @throws NotServingRegionException + */ + private HRegion getRegion(final Text regionName) + throws NotServingRegionException { + return getRegion(regionName, false); + } + + /** Private utility method for safely obtaining an HRegion handle. + * @param regionName Name of online {@link HRegion} to return + * @param checkRetiringRegions Set true if we're to check retiring regions + * as well as online regions. + * @return {@link HRegion} for regionName + * @throws NotServingRegionException + */ + private HRegion getRegion(final Text regionName, + final boolean checkRetiringRegions) + throws NotServingRegionException { HRegion region = null; + this.lock.readLock().lock(); try { - region = regions.get(regionName); + region = onlineRegions.get(regionName); + if (region == null && checkRetiringRegions) { + region = this.retiringRegions.get(regionName); + if (LOG.isDebugEnabled()) { + if (region != null) { + LOG.debug("Found region " + regionName + " in retiringRegions"); + } + } + } + + if (region == null) { + throw new NotServingRegionException(regionName.toString()); + } + + return region; } finally { this.lock.readLock().unlock(); } - - if(region == null) { - throw new NotServingRegionException(regionName.toString()); - } - return region; } ////////////////////////////////////////////////////////////////////////////// // remote scanner interface ////////////////////////////////////////////////////////////////////////////// - private Map scanners; + Map scanners; + private class ScannerListener extends LeaseListener { private Text scannerName; @@ -1006,6 +1022,7 @@ public class HRegionServer } public void leaseExpired() { + LOG.info("Scanner " + scannerName + " lease expired"); HInternalScannerInterface s = null; synchronized(scanners) { s = scanners.remove(scannerName); @@ -1018,8 +1035,7 @@ public class HRegionServer /** Start a scanner for a given HRegion. */ public long openScanner(Text regionName, Text[] cols, Text firstRow) - throws IOException { - + throws IOException { HRegion r = getRegion(regionName); long scannerId = -1L; try { @@ -1029,8 +1045,8 @@ public class HRegionServer synchronized(scanners) { scanners.put(scannerName, s); } - leases.createLease(scannerName, scannerName, new ScannerListener(scannerName)); - + leases.createLease(scannerName, scannerName, + new ScannerListener(scannerName)); } catch(IOException e) { LOG.error(e); throw e; @@ -1038,38 +1054,6 @@ public class HRegionServer return scannerId; } - public LabelledData[] next(long scannerId, HStoreKey key) throws IOException { - - Text scannerName = new Text(String.valueOf(scannerId)); - HInternalScannerInterface s = scanners.get(scannerName); - if(s == null) { - throw new IOException("unknown scanner"); - } - leases.renewLease(scannerName, scannerName); - TreeMap results = new TreeMap(); - ArrayList values = new ArrayList(); - if(s.next(key, results)) { - for(Iterator> it - = results.entrySet().iterator(); - it.hasNext(); ) { - - Map.Entry e = it.next(); - BytesWritable val = e.getValue(); - if(val.getSize() == DELETE_BYTES.getSize() - && val.compareTo(DELETE_BYTES) == 0) { - - // Value is deleted. Don't return a value - - continue; - - } else { - values.add(new LabelledData(e.getKey(), val)); - } - } - } - return values.toArray(new LabelledData[values.size()]); - } - public void close(long scannerId) throws IOException { Text scannerName = new Text(String.valueOf(scannerId)); HInternalScannerInterface s = null; @@ -1077,7 +1061,7 @@ public class HRegionServer s = scanners.remove(scannerName); } if(s == null) { - throw new IOException("unknown scanner"); + throw new UnknownScannerException(scannerName.toString()); } s.close(); leases.cancelLease(scannerName, scannerName); @@ -1096,7 +1080,7 @@ public class HRegionServer System.exit(0); } - public static void main(String [] args) throws IOException { + public static void main(String [] args) { if (args.length < 1) { printUsageAndExit(); } diff --git a/src/java/org/apache/hadoop/hbase/HStore.java b/src/java/org/apache/hadoop/hbase/HStore.java index 4025e7f77f2..452da97372c 100644 --- a/src/java/org/apache/hadoop/hbase/HStore.java +++ b/src/java/org/apache/hadoop/hbase/HStore.java @@ -369,8 +369,8 @@ public class HStore { * Compact the back-HStores. This method may take some time, so the calling * thread must be able to block for long periods. * - * During this time, the HStore can work as usual, getting values from MapFiles - * and writing new MapFiles from given memcaches. + * During this time, the HStore can work as usual, getting values from + * MapFiles and writing new MapFiles from given memcaches. * * Existing MapFiles are not destroyed until the new compacted TreeMap is * completely written-out to disk. @@ -410,8 +410,7 @@ public class HStore { // Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps long maxSeenSeqID = -1; - for(Iterator it = toCompactFiles.iterator(); it.hasNext(); ) { - HStoreFile hsf = it.next(); + for (HStoreFile hsf: toCompactFiles) { long seqid = hsf.loadInfo(fs); if(seqid > 0) { if(seqid > maxSeenSeqID) { @@ -587,7 +586,6 @@ public class HStore { HStoreFile hsf = it.next(); hsf.write(out); } - } finally { out.close(); } @@ -595,12 +593,7 @@ public class HStore { // Indicate that we're done. Path doneFile = new Path(curCompactStore, COMPACTION_DONE); - out = new DataOutputStream(fs.create(doneFile)); - - try { - } finally { - out.close(); - } + (new DataOutputStream(fs.create(doneFile))).close(); // Move the compaction into place. diff --git a/src/java/org/apache/hadoop/hbase/Leases.java b/src/java/org/apache/hadoop/hbase/Leases.java index 7a6ebc4a56c..2a5b222c0d1 100644 --- a/src/java/org/apache/hadoop/hbase/Leases.java +++ b/src/java/org/apache/hadoop/hbase/Leases.java @@ -91,7 +91,9 @@ public class Leases { Lease lease = new Lease(holderId, resourceId, listener); Text leaseId = lease.getLeaseId(); if(leases.get(leaseId) != null) { - throw new IOException("Impossible state for createLease(): Lease for holderId " + holderId + " and resourceId " + resourceId + " is still held."); + throw new IOException("Impossible state for createLease(): Lease " + + "for holderId " + holderId + " and resourceId " + resourceId + + " is still held."); } leases.put(leaseId, lease); sortedLeases.add(lease); @@ -106,11 +108,10 @@ public class Leases { Text leaseId = createLeaseId(holderId, resourceId); Lease lease = leases.get(leaseId); if(lease == null) { - // It's possible that someone tries to renew the lease, but // it just expired a moment ago. So fail. - - throw new IOException("Cannot renew lease is not held (holderId=" + holderId + ", resourceId=" + resourceId + ")"); + throw new IOException("Cannot renew lease; not held (holderId=" + + holderId + ", resourceId=" + resourceId + ")"); } sortedLeases.remove(lease); diff --git a/src/java/org/apache/hadoop/hbase/RegionNotFoundException.java b/src/java/org/apache/hadoop/hbase/RegionNotFoundException.java new file mode 100644 index 00000000000..b2b240fc0b7 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/RegionNotFoundException.java @@ -0,0 +1,30 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +public class RegionNotFoundException extends IOException { + private static final long serialVersionUID = 993179627856392526L; + + public RegionNotFoundException() { + super(); + } + + public RegionNotFoundException(String s) { + super(s); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/RegionUnavailableListener.java b/src/java/org/apache/hadoop/hbase/RegionUnavailableListener.java index e34abe9eae9..3d2b1582c53 100644 --- a/src/java/org/apache/hadoop/hbase/RegionUnavailableListener.java +++ b/src/java/org/apache/hadoop/hbase/RegionUnavailableListener.java @@ -19,9 +19,22 @@ import org.apache.hadoop.io.Text; /** * Used as a callback mechanism so that an HRegion can notify the HRegionServer - * when a region is about to be closed during a split operation. This is done - * to minimize the amount of time the region is off-line. + * of the different stages making an HRegion unavailable. Regions are made + * unavailable during region split operations. */ public interface RegionUnavailableListener { - public void regionIsUnavailable(Text regionName); + /** + * regionName is closing. + * Listener should stop accepting new writes but can continue to service + * outstanding transactions. + * @param regionName + */ + public void closing(final Text regionName); + + /** + * regionName is closed and no longer available. + * Listener should clean up any references to regionName + * @param regionName + */ + public void closed(final Text regionName); } diff --git a/src/java/org/apache/hadoop/hbase/UnknownScannerException.java b/src/java/org/apache/hadoop/hbase/UnknownScannerException.java new file mode 100644 index 00000000000..ab898406af7 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/UnknownScannerException.java @@ -0,0 +1,30 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +public class UnknownScannerException extends IOException { + private static final long serialVersionUID = 993179627856392526L; + + public UnknownScannerException() { + super(); + } + + public UnknownScannerException(String s) { + super(s); + } +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/WrongRegionException.java b/src/java/org/apache/hadoop/hbase/WrongRegionException.java new file mode 100644 index 00000000000..4b23bfdcc86 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/WrongRegionException.java @@ -0,0 +1,30 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +public class WrongRegionException extends IOException { + private static final long serialVersionUID = 993179627856392526L; + + public WrongRegionException() { + super(); + } + + public WrongRegionException(String s) { + super(s); + } +} \ No newline at end of file diff --git a/src/test/org/apache/hadoop/hbase/EvaluationClient.java b/src/test/org/apache/hadoop/hbase/EvaluationClient.java index 61e94679efa..37277d35e9c 100644 --- a/src/test/org/apache/hadoop/hbase/EvaluationClient.java +++ b/src/test/org/apache/hadoop/hbase/EvaluationClient.java @@ -42,7 +42,7 @@ public class EvaluationClient implements HConstants { private static final int ROW_LENGTH = 1024; - private static final int ONE_HUNDRED_MB = 1024 * 1024 * 1 /*100 RESTORE*/; + private static final int ONE_HUNDRED_MB = 1024 * 1024 * 100; private static final int ROWS_PER_100_MB = ONE_HUNDRED_MB / ROW_LENGTH; private static final int ONE_GB = ONE_HUNDRED_MB * 10; @@ -62,7 +62,7 @@ public class EvaluationClient implements HConstants { RANDOM_WRITE, SEQUENTIAL_READ, SEQUENTIAL_WRITE, - SCAN}; + SCAN} private Random rand; private Configuration conf; @@ -177,8 +177,8 @@ public class EvaluationClient implements HConstants { test == Test.SCAN || test == Test.SEQUENTIAL_READ || test == Test.SEQUENTIAL_WRITE) { - for(int range = 0; range < 10; range++) { - long elapsedTime = sequentialWrite(range * nRows, nRows); + for(int i = 0; i < 10; i++) { + long elapsedTime = sequentialWrite(i * nRows, nRows); if (test == Test.SEQUENTIAL_WRITE) { totalElapsedTime += elapsedTime; } @@ -188,8 +188,8 @@ public class EvaluationClient implements HConstants { switch(test) { case RANDOM_READ: - for(int range = 0 ; range < 10; range++) { - long elapsedTime = randomRead(range * nRows, nRows); + for(int i = 0 ; i < 10; i++) { + long elapsedTime = randomRead(i * nRows, nRows); totalElapsedTime += elapsedTime; } System.out.print("Random read of " + R + " rows completed in: "); @@ -199,15 +199,15 @@ public class EvaluationClient implements HConstants { throw new UnsupportedOperationException("Not yet implemented"); case RANDOM_WRITE: - for(int range = 0 ; range < 10; range++) { - long elapsedTime = randomWrite(range * nRows, nRows); + for(int i = 0 ; i < 10; i++) { + long elapsedTime = randomWrite(i * nRows, nRows); totalElapsedTime += elapsedTime; } System.out.print("Random write of " + R + " rows completed in: "); break; case SCAN: - for(int range = 0 ; range < 10; range++) { + for(int i = 0 ; i < 10; i++) { long elapsedTime = scan(range * nRows, nRows); totalElapsedTime += elapsedTime; } @@ -215,8 +215,8 @@ public class EvaluationClient implements HConstants { break; case SEQUENTIAL_READ: - for(int range = 0 ; range < 10; range++) { - long elapsedTime = sequentialRead(range * nRows, nRows); + for(int i = 0 ; i < 10; i++) { + long elapsedTime = sequentialRead(i * nRows, nRows); totalElapsedTime += elapsedTime; } System.out.print("Sequential read of " + R + " rows completed in: "); @@ -230,16 +230,16 @@ public class EvaluationClient implements HConstants { throw new IllegalArgumentException("Invalid command value: " + test); } System.out.println((totalElapsedTime / 1000.0)); - } catch(Exception e) { - e.printStackTrace(); - + LOG.error("Failed", e); } finally { + LOG.info("Deleting table " + tableDescriptor.getName()); this.client.deleteTable(tableDescriptor.getName()); } } - private void runOneTest(Test cmd) { + private void runOneTest(@SuppressWarnings("unused") Test cmd) { + // TODO } private void runTest(Test test) throws IOException { @@ -302,6 +302,10 @@ public class EvaluationClient implements HConstants { System.err.println(" running: 1 <= value <= 500"); System.err.println(" range Integer. Required. 0 <= value <= " + "(nclients * 10) - 1"); + System.err.println("Examples:"); + System.err.println(" To run a single evaluation client:"); + System.err.println(" $ bin/hbase " + + "org.apache.hadoop.hbase.EvaluationClient sequentialWrite 1 1"); } private void getArgs(final int start, final String[] args) { diff --git a/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java b/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java index c3c28b23ab4..ef5c5445e8a 100644 --- a/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -157,25 +157,20 @@ public class MiniHBaseCluster implements HConstants { public void shutdown() { LOG.info("Shutting down the HBase Cluster"); for(int i = 0; i < regionServers.length; i++) { - try { - regionServers[i].stop(); - - } catch(IOException e) { - e.printStackTrace(); - } + regionServers[i].stop(); } master.shutdown(); for(int i = 0; i < regionServers.length; i++) { try { regionThreads[i].join(); - } catch(InterruptedException e) { + // continue } } try { masterThread.join(); - } catch(InterruptedException e) { + // continue } LOG.info("HBase Cluster shutdown complete"); diff --git a/src/test/org/apache/hadoop/hbase/TestCompare.java b/src/test/org/apache/hadoop/hbase/TestCompare.java new file mode 100644 index 00000000000..9357e8e0fcb --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/TestCompare.java @@ -0,0 +1,33 @@ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.io.Text; + +import junit.framework.TestCase; + +/** + * Test comparing HBase objects. + */ +public class TestCompare extends TestCase { + public void testHRegionInfo() { + HRegionInfo a = new HRegionInfo(1, new HTableDescriptor("a"), null, null); + HRegionInfo b = new HRegionInfo(2, new HTableDescriptor("b"), null, null); + assertTrue(a.compareTo(b) != 0); + HTableDescriptor t = new HTableDescriptor("t"); + Text midway = new Text("midway"); + a = new HRegionInfo(1, t, null, midway); + b = new HRegionInfo(2, t, midway, null); + assertTrue(a.compareTo(b) < 0); + assertTrue(b.compareTo(a) > 0); + assertEquals(a, a); + assertTrue(a.compareTo(a) == 0); + a = new HRegionInfo(1, t, new Text("a"), new Text("d")); + b = new HRegionInfo(2, t, new Text("e"), new Text("g")); + assertTrue(a.compareTo(b) < 0); + a = new HRegionInfo(1, t, new Text("aaaa"), new Text("dddd")); + b = new HRegionInfo(2, t, new Text("e"), new Text("g")); + assertTrue(a.compareTo(b) < 0); + a = new HRegionInfo(1, t, new Text("aaaa"), new Text("dddd")); + b = new HRegionInfo(2, t, new Text("aaaa"), new Text("eeee")); + assertTrue(a.compareTo(b) < 0); + } +} diff --git a/src/test/org/apache/hadoop/hbase/TestHRegion.java b/src/test/org/apache/hadoop/hbase/TestHRegion.java index ee077e0ff9a..b037b3f2e44 100644 --- a/src/test/org/apache/hadoop/hbase/TestHRegion.java +++ b/src/test/org/apache/hadoop/hbase/TestHRegion.java @@ -214,7 +214,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe for (int i = 0; i < lockCount; i++) { try { Text rowid = new Text(Integer.toString(i)); - lockids[i] = region.obtainLock(rowid); + lockids[i] = region.obtainRowLock(rowid); rowid.equals(region.getRowFromLock(lockids[i])); LOG.debug(getName() + " locked " + rowid.toString()); } catch (IOException e) { @@ -590,8 +590,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe } // NOTE: This test depends on testBatchWrite succeeding - - private void splitAndMerge() throws IOException { + void splitAndMerge() throws IOException { Text midKey = new Text(); if(region.needsSplit(midKey)) { @@ -634,7 +633,11 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe /* (non-Javadoc) * @see org.apache.hadoop.hbase.RegionUnavailableListener#regionIsUnavailable(org.apache.hadoop.io.Text) */ - public void regionIsUnavailable(Text regionName) { + public void closing(@SuppressWarnings("unused") final Text regionName) { + // We don't use this here. It is only for the HRegionServer + } + + public void closed(@SuppressWarnings("unused") final Text regionName) { // We don't use this here. It is only for the HRegionServer } diff --git a/src/test/org/apache/hadoop/hbase/TestScanner.java b/src/test/org/apache/hadoop/hbase/TestScanner.java index 09b492d54bc..06baa1324ae 100644 --- a/src/test/org/apache/hadoop/hbase/TestScanner.java +++ b/src/test/org/apache/hadoop/hbase/TestScanner.java @@ -28,6 +28,9 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; +/** + * Test of a long-lived scanner validating as we go. + */ public class TestScanner extends HBaseTestCase { private static final Text FIRST_ROW = new Text(); private static final Text[] COLS = { diff --git a/src/test/org/apache/hadoop/hbase/TestScanner2.java b/src/test/org/apache/hadoop/hbase/TestScanner2.java new file mode 100644 index 00000000000..87a84d6a109 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/TestScanner2.java @@ -0,0 +1,106 @@ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +/** + * Additional scanner tests. + * {@link TestScanner} does a custom setup/takedown not conducive + * to addition of extra scanning tests. + * @see TestScanner + */ +public class TestScanner2 extends HBaseClusterTestCase { + final Log LOG = LogFactory.getLog(this.getClass().getName()); + + /** + * Test scanning of META table around split. + * There was a problem where only one of the splits showed in a scan. + * Split deletes a row and then adds two new ones. + * @throws IOException + */ + public void testSplitDeleteOneAddTwoRegions() throws IOException { + // First add a new table. Its intial region will be added to META region. + HClient client = new HClient(this.conf); + client.createTable(new HTableDescriptor(getName())); + List regions = scan(client, HConstants.META_TABLE_NAME); + assertEquals("Expected one region", regions.size(), 1); + HRegionInfo region = regions.get(0); + assertTrue("Expected region named for test", + region.regionName.toString().startsWith(getName())); + // Now do what happens at split time; remove old region and then add two + // new ones in its place. + HRegion.removeRegionFromMETA(client, HConstants.META_TABLE_NAME, + region.regionName); + HTableDescriptor desc = region.tableDesc; + Path homedir = new Path(getName()); + List newRegions = new ArrayList(2); + newRegions.add(HRegion.createHRegion( + new HRegionInfo(2L, desc, null, new Text("midway")), + homedir, this.conf, null, null)); + newRegions.add(HRegion.createHRegion( + new HRegionInfo(3L, desc, new Text("midway"), null), + homedir, this.conf, null, null)); + for (HRegion r: newRegions) { + HRegion.addRegionToMETA(client, HConstants.META_TABLE_NAME, r, + this.cluster.getHMasterAddress(), -1L); + } + regions = scan(client, HConstants.META_TABLE_NAME); + assertEquals("Should be two regions only", regions.size(), 2); + } + + private List scan(final HClient client, final Text table) + throws IOException { + List regions = new ArrayList(); + HRegionInterface regionServer = null; + long scannerId = -1L; + try { + client.openTable(table); + HClient.RegionLocation rl = client.getRegionLocation(table); + regionServer = client.getHRegionConnection(rl.serverAddress); + scannerId = regionServer.openScanner(rl.regionInfo.regionName, + HMaster.METACOLUMNS, new Text()); + while (true) { + TreeMap results = new TreeMap(); + HStoreKey key = new HStoreKey(); + LabelledData[] values = regionServer.next(scannerId, key); + if (values.length == 0) { + break; + } + + for (int i = 0; i < values.length; i++) { + byte[] bytes = new byte[values[i].getData().getSize()]; + System.arraycopy(values[i].getData().get(), 0, bytes, 0, + bytes.length); + results.put(values[i].getLabel(), bytes); + } + + HRegionInfo info = HRegion.getRegionInfo(results); + String serverName = HRegion.getServerName(results); + long startCode = HRegion.getStartCode(results); + LOG.info(Thread.currentThread().getName() + " scanner: " + + Long.valueOf(scannerId) + " row: " + key + + ": regioninfo: {" + info.toString() + "}, server: " + serverName + + ", startCode: " + startCode); + regions.add(info); + } + } finally { + try { + if (scannerId != -1L) { + if (regionServer != null) { + regionServer.close(scannerId); + } + } + } catch (IOException e) { + LOG.error(e); + } + } + return regions; + } +} \ No newline at end of file