HADOOP-1445 Support updates across region splits and compactions

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@544188 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-06-04 17:14:10 +00:00
parent 7c3d11974b
commit ac718209e5
24 changed files with 1437 additions and 984 deletions

View File

@ -22,3 +22,4 @@ Trunk (unreleased changes)
add/remove column.
12. HADOOP-1392. Part2: includes table compaction by merging adjacent regions
that have shrunk in size.
13. HADOOP-1445 Support updates across region splits and compactions

View File

@ -21,21 +21,18 @@
</description>
</property>
<property>
<name>hbase.client.timeout.length</name>
<value>10000</value>
<description>Client timeout in milliseconds</description>
</property>
<property>
<name>hbase.client.timeout.number</name>
<value>5</value>
<description>Try this many timeouts before giving up.
</description>
<name>hbase.client.pause</name>
<value>30000</value>
<description>General client pause value. Used mostly as value to wait
before running a retry of a failed get, region lookup, etc.</description>
</property>
<property>
<name>hbase.client.retries.number</name>
<value>2</value>
<description>Count of maximum retries fetching the root region from root
region server.
<value>5</value>
<description>Maximum retries. Used as maximum for all retryable
operations such as fetching of the root region from root region
server, getting a cell's value, starting a row update, etc.
Default: 5.
</description>
</property>
<property>
@ -51,6 +48,12 @@
<description>HMaster server lease period in milliseconds. Default is
30 seconds.</description>
</property>
<property>
<name>hbase.regionserver.lease.period</name>
<value>180000</value>
<description>HRegion server lease period in milliseconds. Default is
180 seconds.</description>
</property>
<property>
<name>hbase.server.thread.wakefrequency</name>
<value>10000</value>
@ -58,12 +61,6 @@
Used as sleep interval by service threads such as META scanner and log roller.
</description>
</property>
<property>
<name>hbase.regionserver.lease.period</name>
<value>30000</value>
<description>HRegion server lease period in milliseconds. Default is
30 seconds.</description>
</property>
<property>
<name>hbase.regionserver.handler.count</name>
<value>10</value>
@ -80,5 +77,27 @@
tests to be responsive.
</description>
</property>
<property>
<name>hbase.regionserver.maxlogentries</name>
<value>30000</value>
<description>Rotate the logs when count of entries exceeds this value.
Default: 30,000
</description>
</property>
<property>
<name>hbase.hregion.maxunflushed</name>
<value>10000</value>
<description>
Memcache will be flushed to disk if number of Memcache writes
are in excess of this number.
</description>
</property>
<property>
<name>hbase.hregion.max.filesize</name>
<value>134217728</value>
<description>
Maximum desired file size for an HRegion. If filesize exceeds
value + (value / 2), the HRegion is split in two. Default: 128M.
</description>
</property>
</configuration>

View File

@ -1,4 +1,54 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.regiondir</name>
<value>hbase</value>
<description>The directory shared by region servers.
</description>
</property>
<property>
<name>hbase.regionserver.msginterval</name>
<value>1000</value>
<description>Interval between messages from the RegionServer to HMaster
in milliseconds. Default is 15. Set this value low if you want unit
tests to be responsive.
</description>
</property>
<!--
<property>
<name>hbase.master.meta.thread.rescanfrequency</name>
<value>600000</value>
<description>How long the HMaster sleeps (in milliseconds) between scans of
the root and meta tables.
</description>
</property>
<property>
<name>hbase.master.lease.period</name>
<value>360000</value>
<description>HMaster server lease period in milliseconds. Default is
180 seconds.</description>
</property>
<property>
<name>hbase.regionserver.lease.period</name>
<value>360000</value>
<description>HMaster server lease period in milliseconds. Default is
180 seconds.</description>
</property>
-->
<!--
<property>
<name>hbase.hregion.max.filesize</name>
<value>3421223</value>
<description>
Maximum desired file size for an HRegion. If filesize exceeds
value + (value / 2), the HRegion is split in two. Default: 128M.
</description>
</property>
<property>
<name>hbase.client.timeout.length</name>
<value>10000</value>
<description>Client timeout in milliseconds</description>
</property>
-->
</configuration>

View File

@ -21,6 +21,8 @@ import java.util.Vector;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.BytesWritable;
@ -31,10 +33,11 @@ import org.apache.hadoop.io.Text;
* Used by the concrete HMemcacheScanner and HStoreScanners
******************************************************************************/
public abstract class HAbstractScanner implements HInternalScannerInterface {
final Log LOG = LogFactory.getLog(this.getClass().getName());
// Pattern to determine if a column key is a regex
private static Pattern isRegexPattern = Pattern.compile("^.*[\\\\+|^&*$\\[\\]\\}{)(]+.*$");
static Pattern isRegexPattern = Pattern.compile("^.*[\\\\+|^&*$\\[\\]\\}{)(]+.*$");
// The kind of match we are doing on a column:
@ -42,7 +45,7 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
FAMILY_ONLY, // Just check the column family name
REGEX, // Column family + matches regex
SIMPLE // Literal matching
};
}
// This class provides column matching functions that are more sophisticated
// than a simple string compare. There are three types of matching:
@ -89,15 +92,15 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
// Matching method
boolean matches(Text col) throws IOException {
boolean matches(Text c) throws IOException {
if(this.matchType == MATCH_TYPE.SIMPLE) {
return col.equals(this.col);
return c.equals(this.col);
} else if(this.matchType == MATCH_TYPE.FAMILY_ONLY) {
return col.toString().startsWith(this.family);
return c.toString().startsWith(this.family);
} else if(this.matchType == MATCH_TYPE.REGEX) {
return this.columnMatcher.matcher(col.toString()).matches();
return this.columnMatcher.matcher(c.toString()).matches();
} else {
throw new IOException("Invalid match type: " + this.matchType);
@ -201,20 +204,19 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
public boolean isMultipleMatchScanner() {
return this.multipleMatchers;
}
/**
* Get the next set of values for this scanner.
*
* @param key - The key that matched
* @param results - all the results for that key.
* @return - true if a match was found
* @param key The key that matched
* @param results All the results for <code>key</code>
* @return true if a match was found
*
* @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap)
*/
public boolean next(HStoreKey key, TreeMap<Text, BytesWritable> results)
throws IOException {
throws IOException {
// Find the next row label (and timestamp)
Text chosenRow = null;
long chosenTimestamp = -1;
for(int i = 0; i < keys.length; i++) {
@ -232,7 +234,6 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
}
// Grab all the values that match this row/timestamp
boolean insertedItem = false;
if(chosenRow != null) {
key.setRow(chosenRow);
@ -241,7 +242,6 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
for(int i = 0; i < keys.length; i++) {
// Fetch the data
while((keys[i] != null)
&& (keys[i].getRow().compareTo(chosenRow) == 0)) {
@ -255,10 +255,8 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
break;
}
if(columnMatch(i)) {
if(columnMatch(i)) {
// We only want the first result for any specific family member
if(!results.containsKey(keys[i].getColumn())) {
results.put(new Text(keys[i].getColumn()), vals[i]);
insertedItem = true;
@ -277,7 +275,6 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
&& ((keys[i].getRow().compareTo(chosenRow) <= 0)
|| (keys[i].getTimestamp() > this.timestamp)
|| (! columnMatch(i)))) {
getNext(i);
}
}

View File

@ -18,11 +18,14 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -30,6 +33,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@ -37,7 +43,7 @@ import org.apache.hadoop.ipc.RemoteException;
* HClient manages a connection to a single HRegionServer.
*/
public class HClient implements HConstants {
private final Log LOG = LogFactory.getLog(this.getClass().getName());
final Log LOG = LogFactory.getLog(this.getClass().getName());
private static final Text[] META_COLUMNS = {
COLUMN_FAMILY
@ -49,51 +55,55 @@ public class HClient implements HConstants {
private static final Text EMPTY_START_ROW = new Text();
private long clientTimeout;
private int numTimeouts;
private int numRetries;
long pause;
int numRetries;
private HMasterInterface master;
private final Configuration conf;
private static class TableInfo {
/*
* Data structure that holds current location for a region and its info.
*/
static class RegionLocation {
public HRegionInfo regionInfo;
public HServerAddress serverAddress;
TableInfo(HRegionInfo regionInfo, HServerAddress serverAddress) {
RegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress) {
this.regionInfo = regionInfo;
this.serverAddress = serverAddress;
}
@Override
public String toString() {
return "address: " + this.serverAddress.toString() + ", regioninfo: " +
this.regionInfo;
}
}
// Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress)
private TreeMap<Text, SortedMap<Text, TableInfo>> tablesToServers;
private TreeMap<Text, SortedMap<Text, RegionLocation>> tablesToServers;
// For the "current" table: Map startRow -> (HRegionInfo, HServerAddress)
private SortedMap<Text, RegionLocation> tableServers;
private SortedMap<Text, TableInfo> tableServers;
// Known region HServerAddress.toString() -> HRegionInterface
// Known region HServerAddress.toString() -> HRegionInterface
private TreeMap<String, HRegionInterface> servers;
// For row mutation operations
private Text currentRegion;
private HRegionInterface currentServer;
private Random rand;
private long clientid;
Text currentRegion;
HRegionInterface currentServer;
Random rand;
long clientid;
/** Creates a new HClient */
public HClient(Configuration conf) {
this.conf = conf;
this.clientTimeout = conf.getLong("hbase.client.timeout.length", 30 * 1000);
this.numTimeouts = conf.getInt("hbase.client.timeout.number", 5);
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
this.numRetries = conf.getInt("hbase.client.retries.number", 5);
this.master = null;
this.tablesToServers = new TreeMap<Text, SortedMap<Text, TableInfo>>();
this.tablesToServers = new TreeMap<Text, SortedMap<Text, RegionLocation>>();
this.tableServers = null;
this.servers = new TreeMap<String, HRegionInterface>();
@ -129,11 +139,13 @@ public class HClient implements HConstants {
}
}
/* Find the address of the master and connect to it */
/* Find the address of the master and connect to it
*/
private void checkMaster() throws MasterNotRunningException {
if (this.master != null) {
return;
}
for(int tries = 0; this.master == null && tries < numRetries; tries++) {
HServerAddress masterLocation =
new HServerAddress(this.conf.get(MASTER_ADDRESS,
@ -142,9 +154,8 @@ public class HClient implements HConstants {
try {
HMasterInterface tryMaster =
(HMasterInterface)RPC.getProxy(HMasterInterface.class,
HMasterInterface.versionID, masterLocation.getInetSocketAddress(),
this.conf);
HMasterInterface.versionID, masterLocation.getInetSocketAddress(),
this.conf);
if(tryMaster.isMasterRunning()) {
this.master = tryMaster;
break;
@ -154,16 +165,18 @@ public class HClient implements HConstants {
// This was our last chance - don't bother sleeping
break;
}
LOG.info("Attempt " + tries + " of " + this.numRetries +
" failed with <" + e + ">. Retrying after sleep of " + this.pause);
}
// We either cannot connect to the master or it is not running.
// Sleep and retry
// We either cannot connect to master or it is not running. Sleep & retry
try {
Thread.sleep(this.clientTimeout);
Thread.sleep(this.pause);
} catch(InterruptedException e) {
// continue
}
}
if(this.master == null) {
throw new MasterNotRunningException();
}
@ -210,7 +223,7 @@ public class HClient implements HConstants {
// Save the current table
SortedMap<Text, TableInfo> oldServers = this.tableServers;
SortedMap<Text, RegionLocation> oldServers = this.tableServers;
try {
// Wait for new table to come on-line
@ -229,23 +242,21 @@ public class HClient implements HConstants {
public synchronized void deleteTable(Text tableName) throws IOException {
checkReservedTableName(tableName);
checkMaster();
TableInfo firstMetaServer = getFirstMetaServerForTable(tableName);
RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName);
try {
this.master.deleteTable(tableName);
} catch(RemoteException e) {
handleRemoteException(e);
}
// Wait until first region is deleted
HRegionInterface server = getHRegionConnection(firstMetaServer.serverAddress);
HRegionInterface server =
getHRegionConnection(firstMetaServer.serverAddress);
DataInputBuffer inbuf = new DataInputBuffer();
HStoreKey key = new HStoreKey();
HRegionInfo info = new HRegionInfo();
for(int tries = 0; tries < numRetries; tries++) {
for (int tries = 0; tries < numRetries; tries++) {
long scannerId = -1L;
try {
scannerId = server.openScanner(firstMetaServer.regionInfo.regionName,
@ -258,7 +269,8 @@ public class HClient implements HConstants {
for(int j = 0; j < values.length; j++) {
if(values[j].getLabel().equals(COL_REGIONINFO)) {
byte[] bytes = new byte[values[j].getData().getSize()];
System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length);
System.arraycopy(values[j].getData().get(), 0, bytes, 0,
bytes.length);
inbuf.reset(bytes, bytes.length);
info.readFields(inbuf);
if(info.tableDesc.getName().equals(tableName)) {
@ -274,27 +286,19 @@ public class HClient implements HConstants {
if(scannerId != -1L) {
try {
server.close(scannerId);
} catch(Exception e) {
LOG.warn(e);
}
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("Sleep. Waiting for first region to be deleted from " + tableName);
}
try {
Thread.sleep(clientTimeout);
Thread.sleep(pause);
} catch(InterruptedException e) {
}
if(LOG.isDebugEnabled()) {
LOG.debug("Wake. Waiting for first region to be deleted from " + tableName);
// continue
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("table deleted " + tableName);
}
LOG.info("table " + tableName + " deleted");
}
public synchronized void addColumn(Text tableName, HColumnDescriptor column) throws IOException {
@ -322,7 +326,7 @@ public class HClient implements HConstants {
public synchronized void enableTable(Text tableName) throws IOException {
checkReservedTableName(tableName);
checkMaster();
TableInfo firstMetaServer = getFirstMetaServerForTable(tableName);
RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName);
try {
this.master.enableTable(tableName);
@ -379,20 +383,22 @@ public class HClient implements HConstants {
LOG.debug("Sleep. Waiting for first region to be enabled from " + tableName);
}
try {
Thread.sleep(clientTimeout);
Thread.sleep(pause);
} catch(InterruptedException e) {
// continue
}
if(LOG.isDebugEnabled()) {
LOG.debug("Wake. Waiting for first region to be enabled from " + tableName);
}
}
LOG.info("Enabled table " + tableName);
}
public synchronized void disableTable(Text tableName) throws IOException {
checkReservedTableName(tableName);
checkMaster();
TableInfo firstMetaServer = getFirstMetaServerForTable(tableName);
RegionLocation firstMetaServer = getFirstMetaServerForTable(tableName);
try {
this.master.disableTable(tableName);
@ -449,14 +455,15 @@ public class HClient implements HConstants {
LOG.debug("Sleep. Waiting for first region to be disabled from " + tableName);
}
try {
Thread.sleep(clientTimeout);
Thread.sleep(pause);
} catch(InterruptedException e) {
// continue
}
if(LOG.isDebugEnabled()) {
LOG.debug("Wake. Waiting for first region to be disabled from " + tableName);
}
}
LOG.info("Disabled table " + tableName);
}
public synchronized void shutdown() throws IOException {
@ -477,8 +484,8 @@ public class HClient implements HConstants {
}
}
private TableInfo getFirstMetaServerForTable(Text tableName) throws IOException {
SortedMap<Text, TableInfo> metaservers = findMetaServersForTable(tableName);
private RegionLocation getFirstMetaServerForTable(Text tableName) throws IOException {
SortedMap<Text, RegionLocation> metaservers = findMetaServersForTable(tableName);
return metaservers.get(metaservers.firstKey());
}
@ -497,7 +504,10 @@ public class HClient implements HConstants {
throw new IllegalArgumentException("table name cannot be null or zero length");
}
this.tableServers = tablesToServers.get(tableName);
if(this.tableServers == null ) {
if (this.tableServers == null ) {
if (LOG.isDebugEnabled()) {
LOG.debug("No servers for " + tableName + ". Doing a find...");
}
// We don't know where the table is.
// Load the information from meta.
this.tableServers = findServersForTable(tableName);
@ -511,23 +521,25 @@ public class HClient implements HConstants {
* @return - map of first row to table info for all regions in the table
* @throws IOException
*/
private SortedMap<Text, TableInfo> findServersForTable(Text tableName)
private SortedMap<Text, RegionLocation> findServersForTable(Text tableName)
throws IOException {
SortedMap<Text, TableInfo> servers = null;
SortedMap<Text, RegionLocation> servers = null;
if(tableName.equals(ROOT_TABLE_NAME)) {
servers = locateRootRegion();
} else if(tableName.equals(META_TABLE_NAME)) {
servers = loadMetaFromRoot();
} else {
servers = new TreeMap<Text, TableInfo>();
for(TableInfo t: findMetaServersForTable(tableName).values()) {
servers = new TreeMap<Text, RegionLocation>();
for(RegionLocation t: findMetaServersForTable(tableName).values()) {
servers.putAll(scanOneMetaRegion(t, tableName));
}
this.tablesToServers.put(tableName, servers);
}
if (LOG.isDebugEnabled()) {
for (Map.Entry<Text, RegionLocation> e: servers.entrySet()) {
LOG.debug("Server " + e.getKey() + " is serving: " + e.getValue());
}
}
return servers;
}
@ -537,18 +549,15 @@ public class HClient implements HConstants {
* @return - returns a SortedMap of the meta servers
* @throws IOException
*/
private SortedMap<Text, TableInfo> findMetaServersForTable(Text tableName)
throws IOException {
SortedMap<Text, TableInfo> metaServers =
private SortedMap<Text, RegionLocation> findMetaServersForTable(final Text tableName)
throws IOException {
SortedMap<Text, RegionLocation> metaServers =
this.tablesToServers.get(META_TABLE_NAME);
if(metaServers == null) { // Don't know where the meta is
metaServers = loadMetaFromRoot();
}
Text firstMetaRegion = (metaServers.containsKey(tableName)) ?
tableName : metaServers.headMap(tableName).lastKey();
tableName : metaServers.headMap(tableName).lastKey();
return metaServers.tailMap(firstMetaRegion);
}
@ -558,10 +567,9 @@ public class HClient implements HConstants {
* @return map of first row to TableInfo for all meta regions
* @throws IOException
*/
private TreeMap<Text, TableInfo> loadMetaFromRoot() throws IOException {
SortedMap<Text, TableInfo> rootRegion =
private TreeMap<Text, RegionLocation> loadMetaFromRoot() throws IOException {
SortedMap<Text, RegionLocation> rootRegion =
this.tablesToServers.get(ROOT_TABLE_NAME);
if(rootRegion == null) {
rootRegion = locateRootRegion();
}
@ -570,34 +578,34 @@ public class HClient implements HConstants {
/*
* Repeatedly try to find the root region by asking the master for where it is
*
* @return TreeMap<Text, TableInfo> for root regin if found
* @throws NoServerForRegionException - if the root region can not be located after retrying
* @throws IOException
*/
private TreeMap<Text, TableInfo> locateRootRegion() throws IOException {
private TreeMap<Text, RegionLocation> locateRootRegion() throws IOException {
checkMaster();
HServerAddress rootRegionLocation = null;
for(int tries = 0; rootRegionLocation == null && tries < numRetries; tries++){
for(int tries = 0; tries < numRetries; tries++) {
int localTimeouts = 0;
while(rootRegionLocation == null && localTimeouts < numTimeouts) {
while(rootRegionLocation == null && localTimeouts < numRetries) {
rootRegionLocation = master.findRootRegion();
if(rootRegionLocation == null) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sleeping. Waiting for root region.");
}
Thread.sleep(this.clientTimeout);
Thread.sleep(this.pause);
if (LOG.isDebugEnabled()) {
LOG.debug("Wake. Retry finding root region.");
}
} catch(InterruptedException iex) {
// continue
}
localTimeouts++;
}
}
if(rootRegionLocation == null) {
throw new NoServerForRegionException(
"Timed out trying to locate root region");
@ -608,7 +616,6 @@ public class HClient implements HConstants {
try {
rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName);
break;
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
// Don't bother sleeping. We've run out of retries.
@ -616,16 +623,16 @@ public class HClient implements HConstants {
}
// Sleep and retry finding root region.
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Root region location changed. Sleeping.");
}
Thread.sleep(this.clientTimeout);
Thread.sleep(this.pause);
if (LOG.isDebugEnabled()) {
LOG.debug("Wake. Retry finding root region.");
}
} catch(InterruptedException iex) {
// continue
}
}
rootRegionLocation = null;
@ -633,12 +640,12 @@ public class HClient implements HConstants {
if (rootRegionLocation == null) {
throw new NoServerForRegionException(
"unable to locate root region server");
"unable to locate root region server");
}
TreeMap<Text, TableInfo> rootServer = new TreeMap<Text, TableInfo>();
TreeMap<Text, RegionLocation> rootServer = new TreeMap<Text, RegionLocation>();
rootServer.put(EMPTY_START_ROW,
new TableInfo(HGlobals.rootRegionInfo, rootRegionLocation));
new RegionLocation(HGlobals.rootRegionInfo, rootRegionLocation));
this.tablesToServers.put(ROOT_TABLE_NAME, rootServer);
return rootServer;
@ -649,10 +656,9 @@ public class HClient implements HConstants {
* @return - TreeMap of meta region servers
* @throws IOException
*/
private TreeMap<Text, TableInfo> scanRoot(TableInfo rootRegion)
private TreeMap<Text, RegionLocation> scanRoot(RegionLocation rootRegion)
throws IOException {
TreeMap<Text, TableInfo> metaservers =
TreeMap<Text, RegionLocation> metaservers =
scanOneMetaRegion(rootRegion, META_TABLE_NAME);
this.tablesToServers.put(META_TABLE_NAME, metaservers);
return metaservers;
@ -663,16 +669,16 @@ public class HClient implements HConstants {
* @param t the meta region we're going to scan
* @param tableName the name of the table we're looking for
* @return returns a map of startingRow to TableInfo
* @throws NoSuchElementException - if table does not exist
* @throws RegionNotFoundException - if table does not exist
* @throws IllegalStateException - if table is offline
* @throws NoServerForRegionException - if table can not be found after retrying
* @throws IOException
*/
private TreeMap<Text, TableInfo> scanOneMetaRegion(TableInfo t, Text tableName)
throws IOException {
private TreeMap<Text, RegionLocation> scanOneMetaRegion(final RegionLocation t,
final Text tableName)
throws IOException {
HRegionInterface server = getHRegionConnection(t.serverAddress);
TreeMap<Text, TableInfo> servers = new TreeMap<Text, TableInfo>();
TreeMap<Text, RegionLocation> servers = new TreeMap<Text, RegionLocation>();
for(int tries = 0; servers.size() == 0 && tries < this.numRetries;
tries++) {
@ -690,13 +696,15 @@ public class HClient implements HConstants {
if(values.length == 0) {
if(servers.size() == 0) {
// If we didn't find any servers then the table does not exist
throw new NoSuchElementException("table '" + tableName
+ "' does not exist");
throw new RegionNotFoundException("table '" + tableName +
"' does not exist in " + t);
}
// We found at least one server for the table and now we're done.
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + servers.size() + " server(s) for " +
"location: " + t + " for tablename " + tableName);
}
break;
}
@ -714,6 +722,9 @@ public class HClient implements HConstants {
if(!regionInfo.tableDesc.getName().equals(tableName)) {
// We're done
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + tableName);
}
break;
}
@ -724,7 +735,6 @@ public class HClient implements HConstants {
bytes = results.get(COL_SERVER);
if(bytes == null || bytes.length == 0) {
// We need to rescan because the table we want is unassigned.
if(LOG.isDebugEnabled()) {
LOG.debug("no server address for " + regionInfo.toString());
}
@ -732,15 +742,13 @@ public class HClient implements HConstants {
break;
}
serverAddress = new String(bytes, UTF8_ENCODING);
servers.put(regionInfo.startKey,
new TableInfo(regionInfo, new HServerAddress(serverAddress)));
new RegionLocation(regionInfo, new HServerAddress(serverAddress)));
}
} finally {
if(scannerId != -1L) {
try {
server.close(scannerId);
} catch(Exception e) {
LOG.warn(e);
}
@ -752,19 +760,20 @@ public class HClient implements HConstants {
+ tableName + " after " + this.numRetries + " retries");
}
// The table is not yet being served. Sleep and retry.
if(LOG.isDebugEnabled()) {
LOG.debug("Sleeping. Table " + tableName
+ " not currently being served.");
}
try {
Thread.sleep(this.clientTimeout);
} catch(InterruptedException e) {
}
if(LOG.isDebugEnabled()) {
LOG.debug("Wake. Retry finding table " + tableName);
if (servers.size() <= 0) {
// The table is not yet being served. Sleep and retry.
if (LOG.isDebugEnabled()) {
LOG.debug("Sleeping. Table " + tableName +
" not currently being served.");
}
try {
Thread.sleep(this.pause);
} catch (InterruptedException e) {
// continue
}
if (LOG.isDebugEnabled()) {
LOG.debug("Wake. Retry finding table " + tableName);
}
}
}
return servers;
@ -804,7 +813,7 @@ public class HClient implements HConstants {
throws IOException {
TreeSet<HTableDescriptor> uniqueTables = new TreeSet<HTableDescriptor>();
SortedMap<Text, TableInfo> metaTables =
SortedMap<Text, RegionLocation> metaTables =
this.tablesToServers.get(META_TABLE_NAME);
if(metaTables == null) {
@ -812,7 +821,7 @@ public class HClient implements HConstants {
metaTables = loadMetaFromRoot();
}
for (TableInfo t: metaTables.values()) {
for (RegionLocation t: metaTables.values()) {
HRegionInterface server = getHRegionConnection(t.serverAddress);
long scannerId = -1L;
try {
@ -846,11 +855,15 @@ public class HClient implements HConstants {
}
}
}
return (HTableDescriptor[])uniqueTables.
toArray(new HTableDescriptor[uniqueTables.size()]);
return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]);
}
private synchronized TableInfo getTableInfo(Text row) {
/*
* Find region location hosting passed row using cached info
* @param row Row to find.
* @return Location of row.
*/
synchronized RegionLocation getRegionLocation(Text row) {
if(row == null || row.getLength() == 0) {
throw new IllegalArgumentException("row key cannot be null or zero length");
}
@ -859,41 +872,42 @@ public class HClient implements HConstants {
}
// Only one server will have the row we are looking for
Text serverKey = null;
if(this.tableServers.containsKey(row)) {
serverKey = row;
} else {
serverKey = this.tableServers.headMap(row).lastKey();
}
Text serverKey = (this.tableServers.containsKey(row))? row:
this.tableServers.headMap(row).lastKey();
return this.tableServers.get(serverKey);
}
private synchronized void findRegion(TableInfo info) throws IOException {
/*
* Clear caches of passed region location, reload servers for the passed
* region's table and then ensure region location can be found.
* @param info Region location to find.
* @throws IOException
*/
synchronized void findRegion(final RegionLocation info) throws IOException {
// Wipe out everything we know about this table
if (LOG.isDebugEnabled()) {
LOG.debug("Wiping out all we know of " + info);
}
this.tablesToServers.remove(info.regionInfo.tableDesc.getName());
this.tableServers.clear();
// Reload information for the whole table
this.tableServers = findServersForTable(info.regionInfo.tableDesc.getName());
if(this.tableServers.get(info.regionInfo.startKey) == null ) {
throw new IOException("region " + info.regionInfo.regionName
+ " does not exist");
if (LOG.isDebugEnabled()) {
LOG.debug("Result of findRegion: " + this.tableServers.toString());
}
if (this.tableServers.get(info.regionInfo.startKey) == null) {
throw new RegionNotFoundException(info.regionInfo.regionName.toString());
}
}
/** Get a single value for the specified row and column */
public byte[] get(Text row, Text column) throws IOException {
TableInfo info = null;
RegionLocation info = null;
BytesWritable value = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getTableInfo(row);
info = getRegionLocation(row);
try {
value = getHRegionConnection(info.serverAddress).get(
@ -919,11 +933,11 @@ public class HClient implements HConstants {
/** Get the specified number of versions of the specified row and column */
public byte[][] get(Text row, Text column, int numVersions) throws IOException {
TableInfo info = null;
RegionLocation info = null;
BytesWritable[] values = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getTableInfo(row);
info = getRegionLocation(row);
try {
values = getHRegionConnection(info.serverAddress).get(
@ -956,11 +970,11 @@ public class HClient implements HConstants {
* the specified timestamp.
*/
public byte[][] get(Text row, Text column, long timestamp, int numVersions) throws IOException {
TableInfo info = null;
RegionLocation info = null;
BytesWritable[] values = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getTableInfo(row);
info = getRegionLocation(row);
try {
values = getHRegionConnection(info.serverAddress).get(
@ -990,11 +1004,11 @@ public class HClient implements HConstants {
/** Get all the data for the specified row */
public LabelledData[] getRow(Text row) throws IOException {
TableInfo info = null;
RegionLocation info = null;
LabelledData[] value = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getTableInfo(row);
info = getRegionLocation(row);
try {
value = getHRegionConnection(info.serverAddress).getRow(
@ -1023,38 +1037,81 @@ public class HClient implements HConstants {
}
return new ClientScanner(columns, startRow);
}
/** Start an atomic row insertion or update */
public long startUpdate(Text row) throws IOException {
TableInfo info = null;
long lockid = -1L;
/*
* @return General HClient RetryPolicy instance.
*/
RetryPolicy getRetryPolicy() {
Map<Class <? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class <? extends Exception>, RetryPolicy>();
// Pass numRetries - 1 because it does less-than-equal internally rather
// than the less-than we do elsewhere where we use numRetries.
RetryPolicy rp =
RetryPolicies.retryUpToMaximumCountWithProportionalSleep(numRetries,
this.pause, TimeUnit.MILLISECONDS);
exceptionToPolicyMap.put(NotServingRegionException.class, rp);
exceptionToPolicyMap.put(WrongRegionException.class, rp);
exceptionToPolicyMap.put(RegionNotFoundException.class, rp);
return RetryPolicies.retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
exceptionToPolicyMap);
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getTableInfo(row);
try {
this.currentServer = getHRegionConnection(info.serverAddress);
this.currentRegion = info.regionInfo.regionName;
this.clientid = rand.nextLong();
lockid = currentServer.startUpdate(this.currentRegion, this.clientid, row);
}
/*
* Interface for {@link #startUpate()} used by the
* {@link org.apache.hadoop.io.retry} mechanism.
*/
private interface StartUpdateInterface {
/**
* @return row lockid for the update
* @throws IOException
*/
long startUpdate() throws IOException;
}
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
// No more tries
throw e;
/* Start an atomic row insertion or update
* @param row Name of row to start update against.
* @return Row lockid.
*/
public long startUpdate(final Text row) throws IOException {
// Implemention of the StartUpdate interface.
StartUpdateInterface implementation = new StartUpdateInterface() {
private RegionLocation info = null;
private int attempts = 0;
/*
* Wrapped method. Proxy wrapper is configured to judge whether
* exception merits retry.
* @return lockid
* @throws IOException
*/
public long startUpdate() throws IOException {
this.attempts++;
if (this.info != null) {
LOG.info("Retry of startUpdate. Attempt " + this.attempts +
" for row " + row);
// If a retry. Something wrong w/ region we have. Refind.
try {
findRegion(info);
} catch (RegionNotFoundException e) {
// continue. If no longer exists, perhaps we just came through
// a split and region is now gone. Below getRegionLocation should
// recalibrate client.
}
}
findRegion(info);
info = null;
} catch(IOException e) {
this.currentServer = null;
this.currentRegion = null;
throw e;
this.info = getRegionLocation(row);
currentServer = getHRegionConnection(info.serverAddress);
currentRegion = info.regionInfo.regionName;
clientid = rand.nextLong();
return currentServer.startUpdate(currentRegion, clientid, row);
}
}
};
return lockid;
// Get retry proxy wrapper around 'implementation'.
StartUpdateInterface retryProxy = (StartUpdateInterface)RetryProxy.
create(StartUpdateInterface.class, implementation, getRetryPolicy());
// Run retry.
return retryProxy.startUpdate();
}
/** Change a value for the specified column */
@ -1062,12 +1119,11 @@ public class HClient implements HConstants {
try {
this.currentServer.put(this.currentRegion, this.clientid, lockid, column,
new BytesWritable(val));
} catch(IOException e) {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
} catch(IOException e2) {
LOG.warn(e2);
}
this.currentServer = null;
this.currentRegion = null;
@ -1078,13 +1134,13 @@ public class HClient implements HConstants {
/** Delete the value for a column */
public void delete(long lockid, Text column) throws IOException {
try {
this.currentServer.delete(this.currentRegion, this.clientid, lockid, column);
this.currentServer.delete(this.currentRegion, this.clientid, lockid,
column);
} catch(IOException e) {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
} catch(IOException e2) {
LOG.warn(e2);
}
this.currentServer = null;
this.currentRegion = null;
@ -1107,7 +1163,6 @@ public class HClient implements HConstants {
public void commit(long lockid) throws IOException {
try {
this.currentServer.commit(this.currentRegion, this.clientid, lockid);
} finally {
this.currentServer = null;
this.currentRegion = null;
@ -1123,7 +1178,7 @@ public class HClient implements HConstants {
private Text[] columns;
private Text startRow;
private boolean closed;
private TableInfo[] regions;
private RegionLocation[] regions;
private int currentRegion;
private HRegionInterface server;
private long scannerId;
@ -1139,8 +1194,8 @@ public class HClient implements HConstants {
} else {
firstServer = tableServers.headMap(startRow).lastKey();
}
Collection<TableInfo> info = tableServers.tailMap(firstServer).values();
this.regions = info.toArray(new TableInfo[info.size()]);
Collection<RegionLocation> info = tableServers.tailMap(firstServer).values();
this.regions = info.toArray(new RegionLocation[info.size()]);
}
public ClientScanner(Text[] columns, Text startRow) throws IOException {
@ -1173,7 +1228,7 @@ public class HClient implements HConstants {
this.server = getHRegionConnection(this.regions[currentRegion].serverAddress);
for(int tries = 0; tries < numRetries; tries++) {
TableInfo info = this.regions[currentRegion];
RegionLocation info = this.regions[currentRegion];
try {
this.scannerId = this.server.openScanner(info.regionInfo.regionName,
@ -1247,15 +1302,37 @@ public class HClient implements HConstants {
System.err.println(" address is read from configuration.");
System.err.println("Commands:");
System.err.println(" shutdown Shutdown the HBase cluster.");
System.err.println(" createTable Takes table name, column families,... ");
System.err.println(" deleteTable Takes a table name.");
System.err.println(" iistTables List all tables.");
System.err.println(" createTable Create named table.");
System.err.println(" deleteTable Delete named table.");
System.err.println(" listTables List all tables.");
System.err.println("Example Usage:");
System.err.println(" % java " + this.getClass().getName() + " shutdown");
System.err.println(" % java " + this.getClass().getName() +
" createTable webcrawl contents: anchors: 10");
}
private void printCreateTableUsage(final String message) {
if (message != null && message.length() > 0) {
System.err.println(message);
}
System.err.println("Usage: java " + this.getClass().getName() +
" [options] createTable <name> <colfamily1> ... <max_versions>");
System.err.println("Example Usage:");
System.err.println(" % java " + this.getClass().getName() +
" createTable testtable column_x column_y column_z 3");
}
private void printDeleteTableUsage(final String message) {
if (message != null && message.length() > 0) {
System.err.println(message);
}
System.err.println("Usage: java " + this.getClass().getName() +
" [options] deleteTable <name>");
System.err.println("Example Usage:");
System.err.println(" % java " + this.getClass().getName() +
" deleteTable testtable");
}
public int doCommandLine(final String args[]) {
// Process command-line args. TODO: Better cmd-line processing
// (but hopefully something not as painful as cli options).
@ -1296,8 +1373,10 @@ public class HClient implements HConstants {
if (cmd.equals("createTable")) {
if (i + 2 > args.length) {
throw new IllegalArgumentException("Must supply a table name " +
"and at least one column family");
printCreateTableUsage("Error: Supply a table name," +
" at least one column family, and maximum versions");
errCode = 1;
break;
}
HTableDescriptor desc = new HTableDescriptor(args[i + 1]);
boolean addedFamily = false;
@ -1316,7 +1395,9 @@ public class HClient implements HConstants {
if (cmd.equals("deleteTable")) {
if (i + 1 > args.length) {
throw new IllegalArgumentException("Must supply a table name");
printDeleteTableUsage("Error: Must supply a table name");
errCode = 1;
break;
}
deleteTable(new Text(args[i + 1]));
errCode = 0;

View File

@ -49,6 +49,8 @@ public interface HConstants {
// TODO: Someone may try to name a column family 'log'. If they
// do, it will clash with the HREGION log dir subdirectory. FIX.
static final String HREGION_LOGDIR_NAME = "log";
static final long DEFAULT_MAX_FILE_SIZE = 128 * 1024 * 1024; // 128MB
// Always store the location of the root table's HRegion.
// This HRegion is never split.
@ -72,7 +74,6 @@ public interface HConstants {
// Other constants
static final long DESIRED_MAX_FILE_SIZE = 128 * 1024 * 1024; // 128MB
static final String UTF8_ENCODING = "UTF-8";
static final BytesWritable DELETE_BYTES =

View File

@ -50,7 +50,8 @@ import org.apache.hadoop.util.StringUtils;
public class HMaster implements HConstants, HMasterInterface,
HMasterRegionInterface, Runnable {
public long getProtocolVersion(String protocol, long clientVersion)
public long getProtocolVersion(String protocol,
@SuppressWarnings("unused") long clientVersion)
throws IOException {
if (protocol.equals(HMasterInterface.class.getName())) {
return HMasterInterface.versionID;
@ -61,43 +62,41 @@ public class HMaster implements HConstants, HMasterInterface,
}
}
private static final Log LOG =
static final Log LOG =
LogFactory.getLog(org.apache.hadoop.hbase.HMaster.class.getName());
private volatile boolean closed;
private Path dir;
volatile boolean closed;
Path dir;
private Configuration conf;
private FileSystem fs;
private Random rand;
FileSystem fs;
Random rand;
private long threadWakeFrequency;
private int numRetries;
private long maxRegionOpenTime;
// The 'msgQueue' is used to assign work to the client processor thread
private Vector<PendingOperation> msgQueue;
Vector<PendingOperation> msgQueue;
private Leases serverLeases;
private Server server;
private HServerAddress address;
private HClient client;
HClient client;
private long metaRescanInterval;
long metaRescanInterval;
private HServerAddress rootRegionLocation;
/**
* Columns in the 'meta' ROOT and META tables.
*/
private static final Text METACOLUMNS[] = {
static final Text METACOLUMNS[] = {
COLUMN_FAMILY
};
static final String MASTER_NOT_RUNNING = "Master not running";
private boolean rootScanned;
private int numMetaRegions;
boolean rootScanned;
int numMetaRegions;
/**
* Base HRegion scanner class. Holds utilty common to <code>ROOT</code> and
@ -146,116 +145,80 @@ public class HMaster implements HConstants, HMasterInterface,
* <p>A <code>META</code> region is not 'known' until it has been scanned
* once.
*/
private abstract class BaseScanner implements Runnable {
abstract class BaseScanner implements Runnable {
private final Text FIRST_ROW = new Text();
/**
* @param region Region to scan
* @return True if scan completed.
* @throws IOException
*/
protected boolean scanRegion(final MetaRegion region)
throws IOException {
boolean scannedRegion = false;
HRegionInterface server = null;
HRegionInterface regionServer = null;
long scannerId = -1L;
if (LOG.isDebugEnabled()) {
LOG.debug("scanning meta region " + region.regionName);
LOG.debug(Thread.currentThread().getName() + " scanning meta region " +
region.regionName);
}
try {
server = client.getHRegionConnection(region.server);
scannerId = server.openScanner(region.regionName, METACOLUMNS, FIRST_ROW);
DataInputBuffer inbuf = new DataInputBuffer();
regionServer = client.getHRegionConnection(region.server);
scannerId = regionServer.openScanner(region.regionName, METACOLUMNS,
FIRST_ROW);
while (true) {
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
HStoreKey key = new HStoreKey();
LabelledData[] values = server.next(scannerId, key);
LabelledData[] values = regionServer.next(scannerId, key);
if (values.length == 0) {
break;
}
for (int i = 0; i < values.length; i++) {
byte[] bytes = new byte[values[i].getData().getSize()];
System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
System.arraycopy(values[i].getData().get(), 0, bytes, 0,
bytes.length);
results.put(values[i].getLabel(), bytes);
}
HRegionInfo info = getRegionInfo(COL_REGIONINFO, results, inbuf);
String serverName = getServerName(COL_SERVER, results);
long startCode = getStartCode(COL_STARTCODE, results);
HRegionInfo info = HRegion.getRegionInfo(results);
String serverName = HRegion.getServerName(results);
long startCode = HRegion.getStartCode(results);
if(LOG.isDebugEnabled()) {
LOG.debug("row: " + info.toString() + ", server: " + serverName
+ ", startCode: " + startCode);
LOG.debug(Thread.currentThread().getName() + " scanner: " +
Long.valueOf(scannerId) + " regioninfo: {" + info.toString() +
"}, server: " + serverName + ", startCode: " + startCode);
}
// Note Region has been assigned.
checkAssigned(info, serverName, startCode);
scannedRegion = true;
}
} catch (UnknownScannerException e) {
// Reset scannerId so we do not try closing a scanner the other side
// has lost account of: prevents duplicated stack trace out of the
// below close in the finally.
scannerId = -1L;
} finally {
try {
if (scannerId != -1L) {
server.close(scannerId);
if (regionServer != null) {
regionServer.close(scannerId);
}
}
} catch (IOException e) {
LOG.error(e);
}
scannerId = -1L;
}
if (LOG.isDebugEnabled()) {
LOG.debug("scan of meta region " + region.regionName + " complete");
LOG.debug(Thread.currentThread().getName() + " scan of meta region " +
region.regionName + " complete");
}
return scannedRegion;
}
protected HRegionInfo getRegionInfo(final Text key,
final TreeMap<Text, byte[]> data, final DataInputBuffer in)
throws IOException {
byte[] bytes = data.get(key);
if (bytes == null || bytes.length == 0) {
throw new IOException("no value for " + key);
}
in.reset(bytes, bytes.length);
HRegionInfo info = new HRegionInfo();
info.readFields(in);
return info;
}
protected String getServerName(final Text key,
final TreeMap<Text, byte[]> data) {
byte [] bytes = data.get(key);
String name = null;
try {
name = (bytes != null && bytes.length != 0) ?
new String(bytes, UTF8_ENCODING): null;
} catch(UnsupportedEncodingException e) {
assert(false);
}
return (name != null)? name.trim(): name;
}
protected long getStartCode(final Text key,
final TreeMap<Text, byte[]> data) {
long startCode = -1L;
byte [] bytes = data.get(key);
if(bytes != null && bytes.length != 0) {
try {
startCode = Long.valueOf(new String(bytes, UTF8_ENCODING).trim());
} catch(NumberFormatException e) {
assert(false);
} catch(UnsupportedEncodingException e) {
assert(false);
}
}
return startCode;
}
protected void checkAssigned(final HRegionInfo info,
final String serverName, final long startCode) {
@ -327,23 +290,17 @@ public class HMaster implements HConstants, HMasterInterface,
rootScanned = true;
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("RootScanner going to sleep");
}
Thread.sleep(metaRescanInterval);
if (LOG.isDebugEnabled()) {
LOG.debug("RootScanner woke up");
}
} catch(InterruptedException e) {
// Catch and go around again. If interrupt, its spurious or we're
// being shutdown. Go back up to the while test.
}
}
} catch(IOException e) {
LOG.error(e);
LOG.error("ROOT scanner", e);
closed = true;
}
LOG.debug("ROOT scanner exiting");
LOG.info("ROOT scanner exiting");
}
}
@ -369,7 +326,6 @@ public class HMaster implements HConstants, HMasterInterface,
}
// Comparable
public int compareTo(Object o) {
MetaRegion other = (MetaRegion)o;
@ -383,11 +339,11 @@ public class HMaster implements HConstants, HMasterInterface,
}
/** Work for the meta scanner is queued up here */
private Vector<MetaRegion> metaRegionsToScan;
Vector<MetaRegion> metaRegionsToScan;
private SortedMap<Text, MetaRegion> knownMetaRegions;
SortedMap<Text, MetaRegion> knownMetaRegions;
private boolean allMetaRegionsScanned;
boolean allMetaRegionsScanned;
/**
* MetaScanner <code>META</code> table.
@ -399,6 +355,7 @@ public class HMaster implements HConstants, HMasterInterface,
* action would prevent other work from getting done.
*/
private class MetaScanner extends BaseScanner {
@SuppressWarnings("null")
public void run() {
while (!closed) {
if (LOG.isDebugEnabled()) {
@ -412,13 +369,7 @@ public class HMaster implements HConstants, HMasterInterface,
}
if (region == null) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("MetaScanner going into wait");
}
metaRegionsToScan.wait();
if (LOG.isDebugEnabled()) {
LOG.debug("MetaScanner woke up");
}
} catch (InterruptedException e) {
// Catch and go around again. We've been woken because there
// are new meta regions available or because we are being
@ -445,13 +396,7 @@ public class HMaster implements HConstants, HMasterInterface,
do {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sleep for meta rescan interval");
}
Thread.sleep(metaRescanInterval);
if (LOG.isDebugEnabled()) {
LOG.debug("Sleep for meta rescan interval");
}
} catch(InterruptedException ex) {
// Catch and go around again.
}
@ -472,13 +417,11 @@ public class HMaster implements HConstants, HMasterInterface,
} while(true);
} catch(IOException e) {
LOG.error(e);
LOG.error("META scanner", e);
closed = true;
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("META scanner exiting");
}
LOG.info("META scanner exiting");
}
private synchronized void metaRegionsScanned() {
@ -488,25 +431,17 @@ public class HMaster implements HConstants, HMasterInterface,
public synchronized void waitForMetaScan() {
while(!closed && !allMetaRegionsScanned) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Wait for all meta regions scanned");
}
wait();
if (LOG.isDebugEnabled()) {
LOG.debug("Wake from wait for all meta regions scanned");
}
} catch(InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Wake from wait for all meta regions scanned (IE)");
}
// continue
}
}
}
}
private MetaScanner metaScanner;
MetaScanner metaScanner;
private Thread metaScannerThread;
private Integer metaScannerLock = 0;
Integer metaScannerLock = 0;
// The 'unassignedRegions' table maps from a region name to a HRegionInfo record,
// which includes the region's table, its id, and its start/end keys.
@ -514,31 +449,28 @@ public class HMaster implements HConstants, HMasterInterface,
// We fill 'unassignedRecords' by scanning ROOT and META tables, learning the
// set of all known valid regions.
private SortedMap<Text, HRegionInfo> unassignedRegions;
SortedMap<Text, HRegionInfo> unassignedRegions;
// The 'assignAttempts' table maps from regions to a timestamp that indicates
// the last time we *tried* to assign the region to a RegionServer. If the
// timestamp is out of date, then we can try to reassign it.
private SortedMap<Text, Long> assignAttempts;
SortedMap<Text, Long> assignAttempts;
// 'killList' indicates regions that we hope to close and not reopen
// (because we're merging them, or taking the table offline, for example).
private SortedMap<String, TreeMap<Text, HRegionInfo>> killList;
SortedMap<String, TreeMap<Text, HRegionInfo>> killList;
// 'killedRegions' contains regions that are in the process of being closed
private SortedSet<Text> killedRegions;
SortedSet<Text> killedRegions;
// 'regionsToDelete' contains regions that need to be deleted, but cannot be
// until the region server closes it
private SortedSet<Text> regionsToDelete;
SortedSet<Text> regionsToDelete;
// A map of known server names to server info
private SortedMap<String, HServerInfo> serversToServerInfo =
SortedMap<String, HServerInfo> serversToServerInfo =
Collections.synchronizedSortedMap(new TreeMap<String, HServerInfo>());
/** Build the HMaster out of a raw configuration item. */
@ -576,18 +508,16 @@ public class HMaster implements HConstants, HMasterInterface,
if(! fs.exists(rootRegionDir)) {
LOG.info("bootstrap: creating ROOT and first META regions");
try {
HRegion root = HRegion.createNewHRegion(fs, dir, conf,
HGlobals.rootTableDesc, 0L, null, null);
HRegion meta = HRegion.createNewHRegion(fs, dir, conf,
HGlobals.metaTableDesc, 1L, null, null);
HRegion.addRegionToMeta(root, meta);
HRegion root = HRegion.createHRegion(0L, HGlobals.rootTableDesc,
this.dir, this.conf);
HRegion meta = HRegion.createHRegion(1L, HGlobals.metaTableDesc,
this.dir, this.conf);
// Add first region from the META table to the ROOT region.
HRegion.addRegionToMETA(root, meta);
root.close();
root.getLog().close();
meta.close();
meta.getLog().close();
} catch(IOException e) {
LOG.error(e);
}
@ -690,6 +620,7 @@ public class HMaster implements HConstants, HMasterInterface,
try {
msgQueue.wait(threadWakeFrequency);
} catch(InterruptedException iex) {
// continue
}
}
if(closed) {
@ -751,9 +682,7 @@ public class HMaster implements HConstants, HMasterInterface,
LOG.warn(iex);
}
if(LOG.isDebugEnabled()) {
LOG.debug("HMaster main thread exiting");
}
LOG.info("HMaster main thread exiting");
}
/**
@ -1085,7 +1014,7 @@ public class HMaster implements HConstants, HMasterInterface,
}
}
}
return (HMsg[]) returnMsgs.toArray(new HMsg[returnMsgs.size()]);
return returnMsgs.toArray(new HMsg[returnMsgs.size()]);
}
private synchronized void rootRegionIsAvailable() {
@ -1195,8 +1124,8 @@ public class HMaster implements HConstants, HMasterInterface,
long startCode = -1L;
try {
startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
startCode = Long.valueOf(new String(bytes, UTF8_ENCODING)).
longValue();
} catch(UnsupportedEncodingException e) {
LOG.error(e);
break;
@ -1558,7 +1487,7 @@ public class HMaster implements HConstants, HMasterInterface,
}
}
private synchronized void waitForRootRegion() {
synchronized void waitForRootRegion() {
while (rootRegionLocation == null) {
try {
if (LOG.isDebugEnabled()) {
@ -1625,8 +1554,8 @@ public class HMaster implements HConstants, HMasterInterface,
// 2. Create the HRegion
HRegion r = HRegion.createNewHRegion(fs, dir, conf, desc,
newRegion.regionId, null, null);
HRegion r = HRegion.createHRegion(newRegion.regionId, desc, this.dir,
this.conf);
// 3. Insert into meta
@ -1874,7 +1803,6 @@ public class HMaster implements HConstants, HMasterInterface,
protected boolean isBeingServed(String serverName, long startCode) {
boolean result = false;
if(serverName != null && startCode != -1L) {
HServerInfo s = serversToServerInfo.get(serverName);
result = s != null && s.getStartCode() == startCode;
@ -1889,29 +1817,30 @@ public class HMaster implements HConstants, HMasterInterface,
protected abstract void processScanItem(String serverName, long startCode,
HRegionInfo info) throws IOException;
protected abstract void postProcessMeta(MetaRegion m,
HRegionInterface server) throws IOException;
protected abstract void postProcessMeta(MetaRegion m,
HRegionInterface server)
throws IOException;
}
private class ChangeTableState extends TableOperation {
private boolean online;
protected TreeMap<String, TreeSet<HRegionInfo>> servedRegions;
protected TreeMap<String, TreeSet<HRegionInfo>> servedRegions =
new TreeMap<String, TreeSet<HRegionInfo>>();
protected long lockid;
protected long clientId;
public ChangeTableState(Text tableName, boolean onLine) throws IOException {
super(tableName);
this.online = onLine;
this.servedRegions = new TreeMap<String, TreeSet<HRegionInfo>>();
}
protected void processScanItem(String serverName, long startCode,
HRegionInfo info) throws IOException {
if(isBeingServed(serverName, startCode)) {
HRegionInfo info)
throws IOException {
if (isBeingServed(serverName, startCode)) {
TreeSet<HRegionInfo> regions = servedRegions.get(serverName);
if(regions == null) {
if (regions == null) {
regions = new TreeSet<HRegionInfo>();
}
regions.add(info);
@ -1921,16 +1850,12 @@ public class HMaster implements HConstants, HMasterInterface,
protected void postProcessMeta(MetaRegion m, HRegionInterface server)
throws IOException {
// Process regions not being served
if(LOG.isDebugEnabled()) {
LOG.debug("processing unserved regions");
}
for(HRegionInfo i: unservedRegions) {
// Update meta table
if(LOG.isDebugEnabled()) {
LOG.debug("updating columns in row: " + i.regionName);
}
@ -1986,13 +1911,12 @@ public class HMaster implements HConstants, HMasterInterface,
}
for(Map.Entry<String, TreeSet<HRegionInfo>> e: servedRegions.entrySet()) {
String serverName = e.getKey();
if(online) {
if (online) {
LOG.debug("Already online");
continue; // Already being served
}
// Cause regions being served to be take off-line and disabled
// Cause regions being served to be taken off-line and disabled
TreeMap<Text, HRegionInfo> localKillList = killList.get(serverName);
if(localKillList == null) {
localKillList = new TreeMap<Text, HRegionInfo>();
@ -2003,9 +1927,10 @@ public class HMaster implements HConstants, HMasterInterface,
}
localKillList.put(i.regionName, i);
}
if(localKillList != null && localKillList.size() > 0) {
if(localKillList.size() > 0) {
if(LOG.isDebugEnabled()) {
LOG.debug("inserted local kill list into kill list for server " + serverName);
LOG.debug("inserted local kill list into kill list for server " +
serverName);
}
killList.put(serverName, localKillList);
}
@ -2036,23 +1961,18 @@ public class HMaster implements HConstants, HMasterInterface,
protected void postProcessMeta(MetaRegion m, HRegionInterface server)
throws IOException {
// For regions that are being served, mark them for deletion
for(TreeSet<HRegionInfo> s: servedRegions.values()) {
for(HRegionInfo i: s) {
// For regions that are being served, mark them for deletion
for (TreeSet<HRegionInfo> s: servedRegions.values()) {
for (HRegionInfo i: s) {
regionsToDelete.add(i.regionName);
}
}
// Unserved regions we can delete now
for(HRegionInfo i: unservedRegions) {
for (HRegionInfo i: unservedRegions) {
// Delete the region
try {
HRegion.deleteRegion(fs, dir, i.regionName);
} catch(IOException e) {
LOG.error("failed to delete region " + i.regionName);
LOG.error(e);
@ -2062,22 +1982,23 @@ public class HMaster implements HConstants, HMasterInterface,
}
@Override
protected void updateRegionInfo(HRegionInterface server, Text regionName,
HRegionInfo i) throws IOException {
protected void updateRegionInfo(
@SuppressWarnings("hiding") HRegionInterface server, Text regionName,
@SuppressWarnings("unused") HRegionInfo i)
throws IOException {
server.delete(regionName, clientId, lockid, COL_REGIONINFO);
}
}
private abstract class ColumnOperation extends TableOperation {
protected ColumnOperation(Text tableName) throws IOException {
super(tableName);
}
protected void processScanItem(String serverName, long startCode,
HRegionInfo info) throws IOException {
protected void processScanItem(
@SuppressWarnings("unused") String serverName,
@SuppressWarnings("unused") long startCode, final HRegionInfo info)
throws IOException {
if(isEnabled(info)) {
throw new TableNotDisabledException(tableName.toString());
}
@ -2196,10 +2117,7 @@ public class HMaster implements HConstants, HMasterInterface,
}
public void leaseExpired() {
if(LOG.isDebugEnabled()) {
LOG.debug(server + " lease expired");
}
LOG.info(server + " lease expired");
HServerInfo storedInfo = serversToServerInfo.remove(server);
synchronized(msgQueue) {
msgQueue.add(new PendingServerShutdown(storedInfo));
@ -2218,7 +2136,7 @@ public class HMaster implements HConstants, HMasterInterface,
System.exit(0);
}
public static void main(String [] args) throws IOException {
public static void main(String [] args) {
if (args.length < 1) {
printUsageAndExit();
}
@ -2261,4 +2179,4 @@ public class HMaster implements HConstants, HMasterInterface,
printUsageAndExit();
}
}
}
}

View File

@ -27,12 +27,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
/*******************************************************************************
/**
* The HMemcache holds in-memory modifications to the HRegion. This is really a
* wrapper around a TreeMap that helps us when staging the Memcache out to disk.
******************************************************************************/
*/
public class HMemcache {
private static final Log LOG = LogFactory.getLog(HMemcache.class);
private final Log LOG = LogFactory.getLog(this.getClass().getName());
TreeMap<HStoreKey, BytesWritable> memcache
= new TreeMap<HStoreKey, BytesWritable>();
@ -42,7 +42,7 @@ public class HMemcache {
TreeMap<HStoreKey, BytesWritable> snapshot = null;
private final HLocking lock = new HLocking();
final HLocking lock = new HLocking();
public HMemcache() {
super();
@ -147,7 +147,8 @@ public class HMemcache {
*
* Operation uses a write lock.
*/
public void add(Text row, TreeMap<Text, BytesWritable> columns, long timestamp) {
public void add(final Text row, final TreeMap<Text, BytesWritable> columns,
final long timestamp) {
this.lock.obtainWriteLock();
try {
for (Map.Entry<Text, BytesWritable> es: columns.entrySet()) {
@ -239,7 +240,6 @@ public class HMemcache {
Vector<BytesWritable> result = new Vector<BytesWritable>();
HStoreKey curKey = new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp());
SortedMap<HStoreKey, BytesWritable> tailMap = map.tailMap(curKey);
for (Map.Entry<HStoreKey, BytesWritable> es: tailMap.entrySet()) {
HStoreKey itKey = es.getKey();
if (itKey.matchesRowCol(curKey)) {
@ -257,9 +257,9 @@ public class HMemcache {
/**
* Return a scanner over the keys in the HMemcache
*/
public HInternalScannerInterface getScanner(long timestamp, Text targetCols[], Text firstRow)
throws IOException {
public HInternalScannerInterface getScanner(long timestamp,
Text targetCols[], Text firstRow)
throws IOException {
return new HMemcacheScanner(timestamp, targetCols, firstRow);
}
@ -295,16 +295,11 @@ public class HMemcache {
this.vals = new BytesWritable[backingMaps.length];
// Generate list of iterators
HStoreKey firstKey = new HStoreKey(firstRow);
for(int i = 0; i < backingMaps.length; i++) {
if(firstRow.getLength() != 0) {
keyIterators[i] = backingMaps[i].tailMap(firstKey).keySet().iterator();
} else {
keyIterators[i] = backingMaps[i].keySet().iterator();
}
keyIterators[i] = (firstRow.getLength() != 0)?
backingMaps[i].tailMap(firstKey).keySet().iterator():
backingMaps[i].keySet().iterator();
while(getNext(i)) {
if(! findFirstRow(i, firstRow)) {
continue;
@ -314,7 +309,6 @@ public class HMemcache {
}
}
}
} catch(IOException ex) {
LOG.error(ex);
close();
@ -326,9 +320,9 @@ public class HMemcache {
* The user didn't want to start scanning at the first row. This method
* seeks to the requested row.
*
* @param i - which iterator to advance
* @param firstRow - seek to this row
* @return - true if this is the first row
* @param i which iterator to advance
* @param firstRow seek to this row
* @return true if this is the first row
*/
boolean findFirstRow(int i, Text firstRow) {
return ((firstRow.getLength() == 0)
@ -338,11 +332,11 @@ public class HMemcache {
/**
* Get the next value from the specified iterater.
*
* @param i - which iterator to fetch next value from
* @return - true if there is more data available
* @param i Which iterator to fetch next value from
* @return true if there is more data available
*/
boolean getNext(int i) {
if(! keyIterators[i].hasNext()) {
if (!keyIterators[i].hasNext()) {
closeSubScanner(i);
return false;
}

View File

@ -146,7 +146,9 @@ public class HMerge implements HConstants {
nextSize = nextRegion.largestHStore();
if((currentSize + nextSize) <= (DESIRED_MAX_FILE_SIZE / 2)) {
long maxFilesize =
conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);
if((currentSize + nextSize) <= (maxFilesize / 2)) {
// We merge two adjacent regions if their total size is less than
// one half of the desired maximum size

View File

@ -62,7 +62,7 @@ public class HRegion implements HConstants {
static int MIN_COMMITS_FOR_COMPACTION = 10;
static Random rand = new Random();
private static final Log LOG = LogFactory.getLog(HRegion.class);
static final Log LOG = LogFactory.getLog(HRegion.class);
/**
* Deletes all the files for a HRegion
@ -74,6 +74,7 @@ public class HRegion implements HConstants {
*/
public static void deleteRegion(FileSystem fs, Path baseDirectory,
Text regionName) throws IOException {
LOG.debug("Deleting region " + regionName);
fs.delete(HStoreFile.getHRegionDir(baseDirectory, regionName));
}
@ -134,14 +135,10 @@ public class HRegion implements HConstants {
+ (endKey == null ? "" : endKey) + "'");
// Flush each of the sources, and merge their files into a single
// target for each column family.
if(LOG.isDebugEnabled()) {
LOG.debug("flushing and getting file names for region " + srcA.getRegionName());
}
// target for each column family.
TreeSet<HStoreFile> alreadyMerged = new TreeSet<HStoreFile>();
TreeMap<Text, Vector<HStoreFile>> filesToMerge = new TreeMap<Text, Vector<HStoreFile>>();
TreeMap<Text, Vector<HStoreFile>> filesToMerge =
new TreeMap<Text, Vector<HStoreFile>>();
for(HStoreFile src: srcA.flushcache(true)) {
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
if(v == null) {
@ -151,10 +148,6 @@ public class HRegion implements HConstants {
v.add(src);
}
if(LOG.isDebugEnabled()) {
LOG.debug("flushing and getting file names for region " + srcB.getRegionName());
}
for(HStoreFile src: srcB.flushcache(true)) {
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
if(v == null) {
@ -187,6 +180,7 @@ public class HRegion implements HConstants {
}
filesToMerge.clear();
for(HStoreFile src: srcA.close()) {
if(! alreadyMerged.contains(src)) {
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
@ -330,6 +324,7 @@ public class HRegion implements HConstants {
int maxUnflushedEntries = 0;
int compactionThreshold = 0;
private final HLocking lock = new HLocking();
private long desiredMaxFileSize;
//////////////////////////////////////////////////////////////////////////////
// Constructor
@ -382,7 +377,6 @@ public class HRegion implements HConstants {
// Load in all the HStores.
for(Map.Entry<Text, HColumnDescriptor> e :
this.regionInfo.tableDesc.families().entrySet()) {
Text colFamily = HStoreKey.extractFamily(e.getKey());
stores.put(colFamily, new HStore(dir, this.regionInfo.regionName,
e.getValue(), fs, oldLogFile, conf));
@ -404,7 +398,12 @@ public class HRegion implements HConstants {
// By default, we compact the region if an HStore has more than 10 map files
this.compactionThreshold = conf.getInt("hbase.hregion.compactionThreshold", 10);
this.compactionThreshold =
conf.getInt("hbase.hregion.compactionThreshold", 10);
// By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
this.desiredMaxFileSize =
conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);
// HRegion is ready to go!
this.writestate.writesOngoing = false;
@ -448,6 +447,7 @@ public class HRegion implements HConstants {
try {
writestate.wait();
} catch (InterruptedException iex) {
// continue
}
}
writestate.writesOngoing = true;
@ -456,24 +456,21 @@ public class HRegion implements HConstants {
if(! shouldClose) {
return null;
}
LOG.info("closing region " + this.regionInfo.regionName);
Vector<HStoreFile> allHStoreFiles = internalFlushcache();
for (HStore store: stores.values()) {
store.close();
}
try {
return allHStoreFiles;
} else {
LOG.info("closing region " + this.regionInfo.regionName);
Vector<HStoreFile> allHStoreFiles = internalFlushcache();
for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
HStore store = it.next();
store.close();
}
try {
return allHStoreFiles;
} finally {
synchronized(writestate) {
writestate.closed = true;
writestate.writesOngoing = false;
}
LOG.info("region " + this.regionInfo.regionName + " closed");
} finally {
synchronized (writestate) {
writestate.closed = true;
writestate.writesOngoing = false;
}
LOG.info("region " + this.regionInfo.regionName + " closed");
}
} finally {
lock.releaseWriteLock();
@ -493,10 +490,11 @@ public class HRegion implements HConstants {
&& (regionInfo.startKey.compareTo(midKey) > 0))
|| ((regionInfo.endKey.getLength() != 0)
&& (regionInfo.endKey.compareTo(midKey) < 0))) {
throw new IOException("Region splitkey must lie within region boundaries.");
throw new IOException("Region splitkey must lie within region " +
"boundaries.");
}
LOG.info("splitting region " + this.regionInfo.regionName);
LOG.info("Splitting region " + this.regionInfo.regionName);
Path splits = new Path(regiondir, SPLITDIR);
if(! fs.exists(splits)) {
@ -524,39 +522,41 @@ public class HRegion implements HConstants {
// Flush this HRegion out to storage, and turn off flushes
// or compactions until close() is called.
// TODO: flushcache can come back null if it can't do the flush. FIX.
Vector<HStoreFile> hstoreFilesToSplit = flushcache(true);
for(Iterator<HStoreFile> it = hstoreFilesToSplit.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
if(LOG.isDebugEnabled()) {
LOG.debug("splitting HStore " + hsf.getRegionName() + "/"
+ hsf.getColFamily() + "/" + hsf.fileId());
for(HStoreFile hsf: hstoreFilesToSplit) {
if (LOG.isDebugEnabled()) {
LOG.debug("Splitting HStore " + hsf.getRegionName() + "/" +
hsf.getColFamily() + "/" + hsf.fileId());
}
HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName,
hsf.getColFamily(), Math.abs(rand.nextLong()));
hsf.getColFamily(), Math.abs(rand.nextLong()));
HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName,
hsf.getColFamily(), Math.abs(rand.nextLong()));
hsf.getColFamily(), Math.abs(rand.nextLong()));
hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
alreadySplit.add(hsf);
}
// We just copied most of the data.
// Notify the caller that we are about to close the region
listener.closing(this.getRegionName());
listener.regionIsUnavailable(this.getRegionName());
// Now close the HRegion and copy the small remainder
// Wait on the last row updates to come in.
waitOnRowLocks();
// Now close the HRegion
hstoreFilesToSplit = close();
for(Iterator<HStoreFile> it = hstoreFilesToSplit.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
// Tell listener that region is now closed and that they can therefore
// clean up any outstanding references.
listener.closed(this.getRegionName());
// Copy the small remainder
for(HStoreFile hsf: hstoreFilesToSplit) {
if(! alreadySplit.contains(hsf)) {
if(LOG.isDebugEnabled()) {
LOG.debug("splitting HStore " + hsf.getRegionName() + "/"
LOG.debug("Splitting HStore " + hsf.getRegionName() + "/"
+ hsf.getColFamily() + "/" + hsf.fileId());
}
@ -585,8 +585,9 @@ public class HRegion implements HConstants {
regions[0] = regionA;
regions[1] = regionB;
LOG.info("region split complete. new regions are: " + regions[0].getRegionName()
+ ", " + regions[1].getRegionName());
LOG.info("Region split of " + this.regionInfo.regionName + " complete. " +
"New regions are: " + regions[0].getRegionName() + ", " +
regions[1].getRegionName());
return regions;
}
@ -653,22 +654,19 @@ public class HRegion implements HConstants {
*/
public boolean needsSplit(Text midKey) {
lock.obtainReadLock();
try {
Text key = new Text();
long maxSize = 0;
for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
long size = i.next().getLargestFileSize(key);
for(HStore store: stores.values()) {
long size = store.getLargestFileSize(key);
if(size > maxSize) { // Largest so far
maxSize = size;
midKey.set(key);
}
}
return (maxSize > (DESIRED_MAX_FILE_SIZE + (DESIRED_MAX_FILE_SIZE / 2)));
return (maxSize >
(this.desiredMaxFileSize + (this.desiredMaxFileSize / 2)));
} finally {
lock.releaseReadLock();
}
@ -701,16 +699,16 @@ public class HRegion implements HConstants {
*/
public boolean needsCompaction() {
boolean needsCompaction = false;
lock.obtainReadLock();
this.lock.obtainReadLock();
try {
for(Iterator<HStore> i = stores.values().iterator(); i.hasNext(); ) {
if(i.next().getNMaps() > compactionThreshold) {
for(HStore store: stores.values()) {
if(store.getNMaps() > this.compactionThreshold) {
needsCompaction = true;
break;
}
}
} finally {
lock.releaseReadLock();
this.lock.releaseReadLock();
}
return needsCompaction;
}
@ -732,41 +730,35 @@ public class HRegion implements HConstants {
boolean shouldCompact = false;
lock.obtainReadLock();
try {
synchronized(writestate) {
if((! writestate.writesOngoing)
&& writestate.writesEnabled
&& (! writestate.closed)
&& recentCommits > MIN_COMMITS_FOR_COMPACTION) {
synchronized (writestate) {
if ((!writestate.writesOngoing) &&
writestate.writesEnabled &&
(!writestate.closed) &&
recentCommits > MIN_COMMITS_FOR_COMPACTION) {
writestate.writesOngoing = true;
shouldCompact = true;
}
}
} finally {
lock.releaseReadLock();
}
if(! shouldCompact) {
LOG.info("not compacting region " + this.regionInfo.regionName);
return false;
}
lock.obtainWriteLock();
try {
LOG.info("starting compaction on region " + this.regionInfo.regionName);
for (Iterator<HStore> it = stores.values().iterator(); it.hasNext();) {
HStore store = it.next();
if (!shouldCompact) {
LOG.info("not compacting region " + this.regionInfo);
return false;
}
LOG.info("starting compaction on region " + this.regionInfo);
for (HStore store : stores.values()) {
store.compact();
}
LOG.info("compaction completed on region " + this.regionInfo.regionName);
LOG.info("compaction completed on region " + this.regionInfo);
return true;
} finally {
lock.releaseReadLock();
synchronized (writestate) {
writestate.writesOngoing = false;
recentCommits = 0;
writestate.notifyAll();
}
lock.releaseWriteLock();
}
}
@ -800,7 +792,8 @@ public class HRegion implements HConstants {
* This method may block for some time, so it should not be called from a
* time-sensitive thread.
*/
public Vector<HStoreFile> flushcache(boolean disableFutureWrites) throws IOException {
public Vector<HStoreFile> flushcache(boolean disableFutureWrites)
throws IOException {
boolean shouldFlush = false;
synchronized(writestate) {
if((! writestate.writesOngoing)
@ -818,45 +811,45 @@ public class HRegion implements HConstants {
if(! shouldFlush) {
if(LOG.isDebugEnabled()) {
LOG.debug("not flushing cache for region " + this.regionInfo.regionName);
LOG.debug("not flushing cache for region " +
this.regionInfo.regionName);
}
return null;
} else {
try {
return internalFlushcache();
} finally {
synchronized(writestate) {
writestate.writesOngoing = false;
writestate.notifyAll();
}
return null;
}
try {
return internalFlushcache();
} finally {
synchronized (writestate) {
writestate.writesOngoing = false;
writestate.notifyAll();
}
}
}
/**
* Flushing the cache is a little tricky. We have a lot of updates in the
* HMemcache, all of which have also been written to the log. We need to write
* those updates in the HMemcache out to disk, while being able to process
* reads/writes as much as possible during the flush operation. Also, the log
* Flushing the cache is a little tricky. We have a lot of updates in the
* HMemcache, all of which have also been written to the log. We need to write
* those updates in the HMemcache out to disk, while being able to process
* reads/writes as much as possible during the flush operation. Also, the log
* has to state clearly the point in time at which the HMemcache was flushed.
* (That way, during recovery, we know when we can rely on the on-disk flushed
* structures and when we have to recover the HMemcache from the log.)
*
*
* So, we have a three-step process:
*
*
* A. Flush the memcache to the on-disk stores, noting the current sequence ID
* for the log.
*
* B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence ID
* that was current at the time of memcache-flush.
*
* C. Get rid of the memcache structures that are now redundant, as they've
* been flushed to the on-disk HStores.
*
* for the log.
*
* B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence ID
* that was current at the time of memcache-flush.
*
* C. Get rid of the memcache structures that are now redundant, as they've
* been flushed to the on-disk HStores.
*
* This method is protected, but can be accessed via several public routes.
*
*
* This method may block for some time.
*/
Vector<HStoreFile> internalFlushcache() throws IOException {
@ -884,8 +877,7 @@ public class HRegion implements HConstants {
HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
TreeMap<HStoreKey, BytesWritable> memcacheSnapshot = retval.memcacheSnapshot;
if(memcacheSnapshot == null) {
for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
HStore hstore = it.next();
for(HStore hstore: stores.values()) {
Vector<HStoreFile> hstoreFiles = hstore.getAllMapFiles();
allHStoreFiles.addAll(0, hstoreFiles);
}
@ -944,12 +936,7 @@ public class HRegion implements HConstants {
/** Fetch a single data item. */
public BytesWritable get(Text row, Text column) throws IOException {
BytesWritable[] results = get(row, column, Long.MAX_VALUE, 1);
if(results == null) {
return null;
} else {
return results[0];
}
return (results == null)? null: results[0];
}
/** Fetch multiple versions of a single data item */
@ -972,13 +959,13 @@ public class HRegion implements HConstants {
// Obtain the row-lock
obtainLock(row);
obtainRowLock(row);
try {
// Obtain the -col results
return get(new HStoreKey(row, column, timestamp), numVersions);
} finally {
releaseLock(row);
releaseRowLock(row);
}
}
@ -1042,7 +1029,8 @@ public class HRegion implements HConstants {
* Return an iterator that scans over the HRegion, returning the indicated
* columns. This Iterator must be closed by the caller.
*/
public HInternalScannerInterface getScanner(Text[] cols, Text firstRow) throws IOException {
public HInternalScannerInterface getScanner(Text[] cols, Text firstRow)
throws IOException {
lock.obtainReadLock();
try {
TreeSet<Text> families = new TreeSet<Text>();
@ -1052,12 +1040,10 @@ public class HRegion implements HConstants {
HStore[] storelist = new HStore[families.size()];
int i = 0;
for(Iterator<Text> it = families.iterator(); it.hasNext(); ) {
Text family = it.next();
for (Text family: families) {
storelist[i++] = stores.get(family);
}
return new HScanner(cols, firstRow, memcache, storelist);
} finally {
lock.releaseReadLock();
}
@ -1068,26 +1054,26 @@ public class HRegion implements HConstants {
//////////////////////////////////////////////////////////////////////////////
/**
* The caller wants to apply a series of writes to a single row in the HRegion.
* The caller will invoke startUpdate(), followed by a series of calls to
* put/delete, then finally either abort() or commit().
* The caller wants to apply a series of writes to a single row in the
* HRegion. The caller will invoke startUpdate(), followed by a series of
* calls to put/delete, then finally either abort() or commit().
*
* Note that we rely on the external caller to properly abort() or commit()
* every transaction. If the caller is a network client, there should be a
* lease-system in place that automatically aborts() transactions after a
* specified quiet period.
* <p>Note that we rely on the external caller to properly abort() or
* commit() every transaction. If the caller is a network client, there
* should be a lease-system in place that automatically aborts() transactions
* after a specified quiet period.
*
* @param row Row to update
* @return lockid
* @see #put(long, Text, BytesWritable)
*/
public long startUpdate(Text row) throws IOException {
// We obtain a per-row lock, so other clients will
// block while one client performs an update.
lock.obtainReadLock();
try {
return obtainLock(row);
} finally {
lock.releaseReadLock();
}
// We obtain a per-row lock, so other clients will block while one client
// performs an update. The read lock is released by the client calling
// #commit or #abort or if the HRegionServer lease on the lock expires.
// See HRegionServer#RegionListener for how the expire on HRegionServer
// invokes a HRegion#abort.
return obtainRowLock(row);
}
/**
@ -1099,10 +1085,11 @@ public class HRegion implements HConstants {
* This method really just tests the input, then calls an internal localput()
* method.
*/
public void put(long lockid, Text targetCol, BytesWritable val) throws IOException {
public void put(long lockid, Text targetCol, BytesWritable val)
throws IOException {
if(val.getSize() == DELETE_BYTES.getSize()
&& val.compareTo(DELETE_BYTES) == 0) {
throw new IOException("Cannot insert value: " + val);
throw new IOException("Cannot insert value: " + val);
}
localput(lockid, targetCol, val);
}
@ -1114,14 +1101,21 @@ public class HRegion implements HConstants {
localput(lockid, targetCol, DELETE_BYTES);
}
/**
/*
* Private implementation.
*
* localput() is used for both puts and deletes. We just place the values into
* a per-row pending area, until a commit() or abort() call is received.
* localput() is used for both puts and deletes. We just place the values
* into a per-row pending area, until a commit() or abort() call is received.
* (Or until the user's write-lock expires.)
*
* @param lockid
* @param targetCol
* @param val Value to enter into cell
* @throws IOException
*/
void localput(long lockid, Text targetCol, BytesWritable val) throws IOException {
void localput(final long lockid, final Text targetCol,
final BytesWritable val)
throws IOException {
checkColumn(targetCol);
Text row = getRowFromLock(lockid);
@ -1132,15 +1126,12 @@ public class HRegion implements HConstants {
// This sync block makes localput() thread-safe when multiple
// threads from the same client attempt an insert on the same
// locked row (via lockid).
synchronized(row) {
// This check makes sure that another thread from the client
// hasn't aborted/committed the write-operation.
if(row != getRowFromLock(lockid)) {
throw new LockException("Locking error: put operation on lock " + lockid
+ " unexpected aborted by another thread");
throw new LockException("Locking error: put operation on lock " +
lockid + " unexpected aborted by another thread");
}
TreeMap<Text, BytesWritable> targets = targetColumns.get(lockid);
@ -1178,22 +1169,22 @@ public class HRegion implements HConstants {
}
targetColumns.remove(lockid);
releaseLock(row);
releaseRowLock(row);
}
}
/**
* Commit a pending set of writes to the memcache. This also results in writing
* to the change log.
* Commit a pending set of writes to the memcache. This also results in
* writing to the change log.
*
* Once updates hit the change log, they are safe. They will either be moved
* into an HStore in the future, or they will be recovered from the log.
* @param lockid Lock for row we're to commit.
* @throws IOException
*/
public void commit(long lockid) throws IOException {
public void commit(final long lockid) throws IOException {
// Remove the row from the pendingWrites list so
// that repeated executions won't screw this up.
Text row = getRowFromLock(lockid);
if(row == null) {
throw new LockException("No write lock for lockid " + lockid);
@ -1201,22 +1192,16 @@ public class HRegion implements HConstants {
// This check makes sure that another thread from the client
// hasn't aborted/committed the write-operation
synchronized(row) {
// We can now commit the changes.
// Add updates to the log and add values to the memcache.
long commitTimestamp = System.currentTimeMillis();
log.append(regionInfo.regionName, regionInfo.tableDesc.getName(), row,
targetColumns.get(lockid), commitTimestamp);
memcache.add(row, targetColumns.get(lockid), commitTimestamp);
targetColumns.get(Long.valueOf(lockid)), commitTimestamp);
memcache.add(row, targetColumns.get(Long.valueOf(lockid)),
commitTimestamp);
// OK, all done!
targetColumns.remove(lockid);
releaseLock(row);
targetColumns.remove(Long.valueOf(lockid));
releaseRowLock(row);
}
recentCommits++;
this.commitsSinceFlush++;
@ -1235,9 +1220,10 @@ public class HRegion implements HConstants {
// all's well
} else {
throw new IOException("Requested row out of range for HRegion "
+ regionInfo.regionName + ", startKey='" + regionInfo.startKey
+ "', endKey='" + regionInfo.endKey + "', row='" + row + "'");
throw new WrongRegionException("Requested row out of range for " +
"HRegion " + regionInfo.regionName + ", startKey='" +
regionInfo.startKey + "', endKey='" + regionInfo.endKey + "', row='" +
row + "'");
}
}
@ -1255,25 +1241,32 @@ public class HRegion implements HConstants {
* Obtain a lock on the given row. Blocks until success.
*
* I know it's strange to have two mappings:
* <pre>
* ROWS ==> LOCKS
* </pre>
* as well as
* <pre>
* LOCKS ==> ROWS
* </pre>
*
* But it acts as a guard on the client; a miswritten client just can't submit
* the name of a row and start writing to it; it must know the correct lockid,
* which matches the lock list in memory.
* But it acts as a guard on the client; a miswritten client just can't
* submit the name of a row and start writing to it; it must know the correct
* lockid, which matches the lock list in memory.
*
* It would be more memory-efficient to assume a correctly-written client,
* <p>It would be more memory-efficient to assume a correctly-written client,
* which maybe we'll do in the future.
*
* @param row Name of row to lock.
* @return The id of the held lock.
*/
long obtainLock(Text row) throws IOException {
long obtainRowLock(Text row) throws IOException {
checkRow(row);
synchronized(rowsToLocks) {
while(rowsToLocks.get(row) != null) {
try {
rowsToLocks.wait();
} catch (InterruptedException ie) {
// Empty
}
}
@ -1285,7 +1278,7 @@ public class HRegion implements HConstants {
}
}
Text getRowFromLock(long lockid) throws IOException {
Text getRowFromLock(long lockid) {
// Pattern is that all access to rowsToLocks and/or to
// locksToRows is via a lock on rowsToLocks.
synchronized(rowsToLocks) {
@ -1293,17 +1286,32 @@ public class HRegion implements HConstants {
}
}
/** Release the row lock! */
void releaseLock(Text row) throws IOException {
/** Release the row lock!
* @param lock Name of row whose lock we are to release
*/
void releaseRowLock(Text row) {
synchronized(rowsToLocks) {
long lockid = rowsToLocks.remove(row).longValue();
locksToRows.remove(lockid);
rowsToLocks.notifyAll();
}
}
/*******************************************************************************
private void waitOnRowLocks() {
synchronized (this.rowsToLocks) {
while (this.rowsToLocks.size() > 0) {
try {
this.rowsToLocks.wait();
} catch (InterruptedException e) {
// Catch. Let while test determine loop-end.
}
}
}
}
/*
* HScanner is an iterator through a bunch of rows in an HRegion.
******************************************************************************/
*/
private static class HScanner implements HInternalScannerInterface {
private HInternalScannerInterface[] scanners;
private TreeMap<Text, BytesWritable>[] resultSets;
@ -1314,10 +1322,8 @@ public class HRegion implements HConstants {
/** Create an HScanner with a handle on many HStores. */
@SuppressWarnings("unchecked")
public HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores)
throws IOException {
throws IOException {
long scanTime = System.currentTimeMillis();
this.scanners = new HInternalScannerInterface[stores.length + 1];
for(int i = 0; i < this.scanners.length; i++) {
this.scanners[i] = null;
@ -1332,12 +1338,9 @@ public class HRegion implements HConstants {
// All results will match the required column-set and scanTime.
// NOTE: the memcache scanner should be the first scanner
try {
HInternalScannerInterface scanner =
memcache.getScanner(scanTime, cols, firstRow);
if(scanner.isWildcardScanner()) {
this.wildcardMatch = true;
}
@ -1368,8 +1371,7 @@ public class HRegion implements HConstants {
for(int i = 0; i < scanners.length; i++) {
keys[i] = new HStoreKey();
resultSets[i] = new TreeMap<Text, BytesWritable>();
if(scanners[i] != null && ! scanners[i].next(keys[i], resultSets[i])) {
if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
closeScanner(i);
}
}
@ -1393,13 +1395,12 @@ public class HRegion implements HConstants {
* Grab the next row's worth of values. The HScanner will return the most
* recent data value for each row that is not newer than the target time.
*/
public boolean next(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException {
public boolean next(HStoreKey key, TreeMap<Text, BytesWritable> results)
throws IOException {
// Find the lowest-possible key.
Text chosenRow = null;
long chosenTimestamp = -1;
for(int i = 0; i < keys.length; i++) {
for(int i = 0; i < this.keys.length; i++) {
if(scanners[i] != null
&& (chosenRow == null
|| (keys[i].getRow().compareTo(chosenRow) < 0)
@ -1412,18 +1413,15 @@ public class HRegion implements HConstants {
}
// Store the key and results for each sub-scanner. Merge them as appropriate.
boolean insertedItem = false;
if(chosenTimestamp > 0) {
key.setRow(chosenRow);
key.setVersion(chosenTimestamp);
key.setColumn(new Text(""));
for(int i = 0; i < scanners.length; i++) {
for(int i = 0; i < scanners.length; i++) {
while((scanners[i] != null)
&& (keys[i].getRow().compareTo(chosenRow) == 0)) {
// If we are doing a wild card match or there are multiple matchers
// per column, we need to scan all the older versions of this row
// to pick up the rest of the family members
@ -1439,11 +1437,7 @@ public class HRegion implements HConstants {
// values with older ones. So now we only insert
// a result if the map does not contain the key.
for(Iterator<Map.Entry<Text, BytesWritable>> it
= resultSets[i].entrySet().iterator();
it.hasNext(); ) {
Map.Entry<Text, BytesWritable> e = it.next();
for(Map.Entry<Text, BytesWritable> e: resultSets[i].entrySet()) {
if(!results.containsKey(e.getKey())) {
results.put(e.getKey(), e.getValue());
insertedItem = true;
@ -1476,7 +1470,6 @@ public class HRegion implements HConstants {
void closeScanner(int i) {
try {
scanners[i].close();
} finally {
scanners[i] = null;
keys[i] = null;
@ -1493,4 +1486,160 @@ public class HRegion implements HConstants {
}
}
}
// Utility methods
/**
* Convenience method creating new HRegions.
* @param regionId ID to use
* @param tableDesc Descriptor
* @param dir Home directory for the new region.
* @param conf
* @return New META region (ROOT or META).
* @throws IOException
*/
public static HRegion createHRegion(final long regionId,
final HTableDescriptor tableDesc, final Path dir, final Configuration conf)
throws IOException {
return createHRegion(new HRegionInfo(regionId, tableDesc, null, null),
dir, conf, null, null);
}
/**
* Convenience method creating new HRegions. Used by createTable and by the
* bootstrap code in the HMaster constructor
*
* @param info Info for region to create.
* @param dir Home dir for new region
* @param conf
* @param initialFiles InitialFiles to pass new HRegion. Pass null if none.
* @param oldLogFile Old log file to use in region initialization. Pass null
* if none.
* @return new HRegion
*
* @throws IOException
*/
public static HRegion createHRegion(final HRegionInfo info,
final Path dir, final Configuration conf, final Path initialFiles,
final Path oldLogFile)
throws IOException {
Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(regionDir);
return new HRegion(dir,
new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf),
fs, conf, info, initialFiles, oldLogFile);
}
/**
* Inserts a new region's meta information into the passed
* <code>meta</code> region. Used by the HMaster bootstrap code adding
* new table to ROOT table.
*
* @param meta META HRegion to be updated
* @param r HRegion to add to <code>meta</code>
*
* @throws IOException
*/
public static void addRegionToMETA(HRegion meta, HRegion r)
throws IOException {
// The row key is the region name
long writeid = meta.startUpdate(r.getRegionName());
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(bytes);
r.getRegionInfo().write(s);
meta.put(writeid, COL_REGIONINFO, new BytesWritable(bytes.toByteArray()));
meta.commit(writeid);
}
public static void addRegionToMETA(final HClient client,
final Text table, final HRegion region,
final HServerAddress serverAddress,
final long startCode)
throws IOException {
client.openTable(table);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(bytes);
region.getRegionInfo().write(out);
long lockid = client.startUpdate(region.getRegionName());
client.put(lockid, COL_REGIONINFO, bytes.toByteArray());
client.put(lockid, COL_SERVER,
serverAddress.toString().getBytes(UTF8_ENCODING));
client.put(lockid, COL_STARTCODE,
String.valueOf(startCode).getBytes(UTF8_ENCODING));
client.commit(lockid);
LOG.info("Added region " + region.getRegionName() + " to table " + table);
}
/**
* Delete <code>region</code> from META <code>table</code>.
* @param client Client to use running update.
* @param table META table we are to delete region from.
* @param regionName Region to remove.
* @throws IOException
*/
public static void removeRegionFromMETA(final HClient client,
final Text table, final Text regionName)
throws IOException {
client.openTable(table);
long lockid = client.startUpdate(regionName);
client.delete(lockid, COL_REGIONINFO);
client.delete(lockid, COL_SERVER);
client.delete(lockid, COL_STARTCODE);
client.commit(lockid);
LOG.info("Removed " + regionName + " from table " + table);
}
/**
* @param data Map of META row labelled column data.
* @return Server
*/
static HRegionInfo getRegionInfo(final TreeMap<Text, byte[]> data)
throws IOException {
byte[] bytes = data.get(COL_REGIONINFO);
if (bytes == null || bytes.length == 0) {
throw new IOException("no value for " + COL_REGIONINFO);
}
DataInputBuffer in = new DataInputBuffer();
in.reset(bytes, bytes.length);
HRegionInfo info = new HRegionInfo();
info.readFields(in);
return info;
}
/**
* @param data Map of META row labelled column data.
* @return Server
*/
static String getServerName(final TreeMap<Text, byte[]> data) {
byte [] bytes = data.get(COL_SERVER);
String name = null;
try {
name = (bytes != null && bytes.length != 0) ?
new String(bytes, UTF8_ENCODING): null;
} catch(UnsupportedEncodingException e) {
assert(false);
}
return (name != null)? name.trim(): name;
}
/**
* @param data Map of META row labelled column data.
* @return Start code.
*/
static long getStartCode(final TreeMap<Text, byte[]> data) {
long startCode = -1L;
byte [] bytes = data.get(COL_STARTCODE);
if(bytes != null && bytes.length != 0) {
try {
startCode = Long.parseLong(new String(bytes, UTF8_ENCODING).trim());
} catch(NumberFormatException e) {
LOG.error("Failed getting " + COL_STARTCODE, e);
} catch(UnsupportedEncodingException e) {
LOG.error("Failed getting " + COL_STARTCODE, e);
}
}
return startCode;
}
}

View File

@ -20,9 +20,11 @@ import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparable;
/**
* HRegion information.
@ -124,13 +126,27 @@ public class HRegionInfo implements WritableComparable {
this.regionName.readFields(in);
this.offLine = in.readBoolean();
}
//////////////////////////////////////////////////////////////////////////////
// Comparable
//////////////////////////////////////////////////////////////////////////////
public int compareTo(Object o) {
HRegionInfo other = (HRegionInfo)o;
return regionName.compareTo(other.regionName);
HRegionInfo other = (HRegionInfo) o;
// Are regions of same table?
int result = this.tableDesc.compareTo(other.tableDesc);
if (result != 0) {
return result;
}
// Compare start keys.
result = this.startKey.compareTo(other.startKey);
if (result != 0) {
return result;
}
// Compare end keys.
return this.endKey.compareTo(other.endKey);
}
}
}

View File

@ -15,17 +15,28 @@
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.ipc.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.conf.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/*******************************************************************************
* HRegionServer makes a set of HRegions available to clients. It checks in with
@ -34,196 +45,194 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class HRegionServer
implements HConstants, HRegionInterface, Runnable {
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
public long getProtocolVersion(final String protocol,
@SuppressWarnings("unused") final long clientVersion)
throws IOException {
if (protocol.equals(HRegionInterface.class.getName())) {
return HRegionInterface.versionID;
} else {
throw new IOException("Unknown protocol to name node: " + protocol);
}
throw new IOException("Unknown protocol to name node: " + protocol);
}
private static final Log LOG = LogFactory.getLog(HRegionServer.class);
static final Log LOG = LogFactory.getLog(HRegionServer.class);
private volatile boolean stopRequested;
volatile boolean stopRequested;
private Path regionDir;
private HServerInfo info;
private Configuration conf;
HServerInfo info;
Configuration conf;
private Random rand;
private TreeMap<Text, HRegion> regions; // region name -> HRegion
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// region name -> HRegion
TreeMap<Text, HRegion> onlineRegions = new TreeMap<Text, HRegion>();
Map<Text, HRegion> retiringRegions = new HashMap<Text, HRegion>();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private Vector<HMsg> outboundMsgs;
private long threadWakeFrequency;
private int maxLogEntries;
long threadWakeFrequency;
private long msgInterval;
private int numRetries;
// Check to see if regions should be split
private long splitOrCompactCheckFrequency;
long splitOrCompactCheckFrequency;
private SplitOrCompactChecker splitOrCompactChecker;
private Thread splitOrCompactCheckerThread;
private Integer splitOrCompactLock = 0;
Integer splitOrCompactLock = Integer.valueOf(0);
private class SplitOrCompactChecker implements Runnable, RegionUnavailableListener {
private HClient client = new HClient(conf);
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.RegionUnavailableListener#regionIsUnavailable(org.apache.hadoop.io.Text)
/*
* Interface used by the {@link org.apache.hadoop.io.retry} mechanism.
*/
interface UpdateMetaInterface {
/*
* @return True if succeeded.
* @throws IOException
*/
public void regionIsUnavailable(Text regionName) {
boolean update() throws IOException;
}
class SplitOrCompactChecker implements Runnable, RegionUnavailableListener {
HClient client = new HClient(conf);
public void closing(final Text regionName) {
lock.writeLock().lock();
try {
regions.remove(regionName);
// Remove region from regions Map and add it to the Map of retiring
// regions.
retiringRegions.put(regionName, onlineRegions.remove(regionName));
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + regionName + " to retiringRegions");
}
} finally {
lock.writeLock().unlock();
}
}
public void closed(final Text regionName) {
lock.writeLock().lock();
try {
retiringRegions.remove(regionName);
if (LOG.isDebugEnabled()) {
LOG.debug("Removing " + regionName + " from retiringRegions");
}
} finally {
lock.writeLock().unlock();
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
public void run() {
while(! stopRequested) {
long startTime = System.currentTimeMillis();
synchronized(splitOrCompactLock) { // Don't interrupt us while we're working
// Grab a list of regions to check
Vector<HRegion> regionsToCheck = new Vector<HRegion>();
lock.readLock().lock();
try {
regionsToCheck.addAll(regions.values());
regionsToCheck.addAll(onlineRegions.values());
} finally {
lock.readLock().unlock();
}
try {
for(Iterator<HRegion>it = regionsToCheck.iterator(); it.hasNext(); ) {
HRegion cur = it.next();
for(HRegion cur: regionsToCheck) {
if(cur.isClosed()) {
continue; // Skip if closed
}
if(cur.needsCompaction()) {
// The best time to split a region is right after it has been compacted
// Best time to split a region is right after compaction
if(cur.compactStores()) {
Text midKey = new Text();
if(cur.needsSplit(midKey)) {
Text oldRegion = cur.getRegionName();
LOG.info("splitting region: " + oldRegion);
HRegion[] newRegions = cur.closeAndSplit(midKey, this);
// When a region is split, the META table needs to updated if we're
// splitting a 'normal' region, and the ROOT table needs to be
// updated if we are splitting a META region.
if(LOG.isDebugEnabled()) {
LOG.debug("region split complete. updating meta");
}
Text tableToUpdate =
(oldRegion.find(META_TABLE_NAME.toString()) == 0) ?
ROOT_TABLE_NAME : META_TABLE_NAME;
for(int tries = 0; tries < numRetries; tries++) {
try {
client.openTable(tableToUpdate);
long lockid = client.startUpdate(oldRegion);
client.delete(lockid, COL_REGIONINFO);
client.delete(lockid, COL_SERVER);
client.delete(lockid, COL_STARTCODE);
client.commit(lockid);
for(int i = 0; i < newRegions.length; i++) {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(bytes);
newRegions[i].getRegionInfo().write(out);
lockid = client.startUpdate(newRegions[i].getRegionName());
client.put(lockid, COL_REGIONINFO, bytes.toByteArray());
client.put(lockid, COL_SERVER,
info.getServerAddress().toString().getBytes(UTF8_ENCODING));
client.put(lockid, COL_STARTCODE,
String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING));
client.commit(lockid);
}
// Now tell the master about the new regions
if(LOG.isDebugEnabled()) {
LOG.debug("reporting region split to master");
}
reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo());
LOG.info("region split successful. old region=" + oldRegion
+ ", new regions: " + newRegions[0].getRegionName() + ", "
+ newRegions[1].getRegionName());
// Finally, start serving the new regions
lock.writeLock().lock();
try {
regions.put(newRegions[0].getRegionName(), newRegions[0]);
regions.put(newRegions[1].getRegionName(), newRegions[1]);
} finally {
lock.writeLock().unlock();
}
} catch(NotServingRegionException e) {
if(tries == numRetries - 1) {
throw e;
}
continue;
}
break;
}
split(cur, midKey);
}
}
}
}
} catch(IOException e) {
//TODO: What happens if this fails? Are we toast?
LOG.error(e);
LOG.error("What happens if this fails? Are we toast?", e);
}
}
if (stopRequested) {
continue;
}
// Sleep
long waitTime = stopRequested ? 0
: splitOrCompactCheckFrequency - (System.currentTimeMillis() - startTime);
long waitTime = splitOrCompactCheckFrequency -
(System.currentTimeMillis() - startTime);
if (waitTime > 0) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sleep splitOrCompactChecker");
}
Thread.sleep(waitTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Wake splitOrCompactChecker");
}
} catch(InterruptedException iex) {
// continue
}
}
}
LOG.info("splitOrCompactChecker exiting");
}
private void split(final HRegion region, final Text midKey)
throws IOException {
final Text oldRegion = region.getRegionName();
final HRegion[] newRegions = region.closeAndSplit(midKey, this);
// When a region is split, the META table needs to updated if we're
// splitting a 'normal' region, and the ROOT table needs to be
// updated if we are splitting a META region.
final Text tableToUpdate =
(oldRegion.find(META_TABLE_NAME.toString()) == 0) ?
ROOT_TABLE_NAME : META_TABLE_NAME;
if(LOG.isDebugEnabled()) {
LOG.debug("splitOrCompactChecker exiting");
LOG.debug("Updating " + tableToUpdate + " with region split info");
}
// Wrap the update of META region with an org.apache.hadoop.io.retry.
UpdateMetaInterface implementation = new UpdateMetaInterface() {
public boolean update() throws IOException {
HRegion.removeRegionFromMETA(client, tableToUpdate,
region.getRegionName());
for (int i = 0; i < newRegions.length; i++) {
HRegion.addRegionToMETA(client, tableToUpdate, newRegions[i],
info.getServerAddress(), info.getStartCode());
}
// Now tell the master about the new regions
if (LOG.isDebugEnabled()) {
LOG.debug("Reporting region split to master");
}
reportSplit(newRegions[0].getRegionInfo(), newRegions[1].
getRegionInfo());
LOG.info("region split, META update, and report to master all" +
" successful. Old region=" + oldRegion + ", new regions: " +
newRegions[0].getRegionName() + ", " +
newRegions[1].getRegionName());
// Finally, start serving the new regions
lock.writeLock().lock();
try {
onlineRegions.put(newRegions[0].getRegionName(), newRegions[0]);
onlineRegions.put(newRegions[1].getRegionName(), newRegions[1]);
} finally {
lock.writeLock().unlock();
}
return true;
}
};
// Get retry proxy wrapper around 'implementation'.
UpdateMetaInterface retryProxy = (UpdateMetaInterface)RetryProxy.
create(UpdateMetaInterface.class, implementation,
client.getRetryPolicy());
// Run retry.
retryProxy.update();
}
}
// Cache flushing
// Cache flushing
private Flusher cacheFlusher;
private Thread cacheFlusherThread;
private Integer cacheFlusherLock = 0;
private class Flusher implements Runnable {
Integer cacheFlusherLock = Integer.valueOf(0);
class Flusher implements Runnable {
public void run() {
while(! stopRequested) {
long startTime = System.currentTimeMillis();
@ -235,23 +244,20 @@ public class HRegionServer
Vector<HRegion> toFlush = new Vector<HRegion>();
lock.readLock().lock();
try {
toFlush.addAll(regions.values());
toFlush.addAll(onlineRegions.values());
} finally {
lock.readLock().unlock();
}
// Flush them, if necessary
for(Iterator<HRegion> it = toFlush.iterator(); it.hasNext(); ) {
HRegion cur = it.next();
for(HRegion cur: toFlush) {
if(cur.isClosed()) { // Skip if closed
continue;
}
try {
cur.optionallyFlush();
} catch(IOException iex) {
LOG.error(iex);
}
@ -263,21 +269,14 @@ public class HRegionServer
: threadWakeFrequency - (System.currentTimeMillis() - startTime);
if(waitTime > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sleep cacheFlusher");
}
try {
Thread.sleep(waitTime);
} catch(InterruptedException iex) {
}
if (LOG.isDebugEnabled()) {
LOG.debug("Wake cacheFlusher");
// continue
}
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("cacheFlusher exiting");
}
LOG.info("cacheFlusher exiting");
}
}
@ -287,45 +286,45 @@ public class HRegionServer
private Path oldlogfile;
// Logging
private HLog log;
HLog log;
private LogRoller logRoller;
private Thread logRollerThread;
private Integer logRollerLock = 0;
private class LogRoller implements Runnable {
Integer logRollerLock = Integer.valueOf(0);
/**
* Log rolling Runnable.
*/
class LogRoller implements Runnable {
private int maxLogEntries =
conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
public void run() {
while(! stopRequested) {
synchronized(logRollerLock) {
// If the number of log entries is high enough, roll the log. This is a
// very fast operation, but should not be done too frequently.
int nEntries = log.getNumEntries();
if(nEntries > maxLogEntries) {
if(nEntries > this.maxLogEntries) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Rolling log. Number of entries is: " + nEntries);
}
log.rollWriter();
} catch(IOException iex) {
// continue
}
}
}
if(!stopRequested) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sleep logRoller");
}
try {
Thread.sleep(threadWakeFrequency);
} catch(InterruptedException iex) {
}
if (LOG.isDebugEnabled()) {
LOG.debug("Wake logRoller");
// continue
}
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("logRoller exiting");
}
LOG.info("logRoller exiting");
}
}
@ -338,7 +337,6 @@ public class HRegionServer
private Server server;
// Leases
private Leases leases;
/** Start a HRegionServer at the default location */
@ -357,18 +355,17 @@ public class HRegionServer
this.regionDir = regionDir;
this.conf = conf;
this.rand = new Random();
this.regions = new TreeMap<Text, HRegion>();
this.outboundMsgs = new Vector<HMsg>();
this.scanners =
Collections.synchronizedMap(new TreeMap<Text, HInternalScannerInterface>());
// Config'ed params
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.maxLogEntries = conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
this.msgInterval = conf.getLong("hbase.regionserver.msginterval",
15 * 1000);
15 * 1000);
this.splitOrCompactCheckFrequency =
conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency", 60 * 1000);
conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
60 * 1000);
// Cache flushing
this.cacheFlusher = new Flusher();
@ -448,7 +445,7 @@ public class HRegionServer
* Set a flag that will cause all the HRegionServer threads to shut down
* in an orderly fashion.
*/
public synchronized void stop() throws IOException {
public synchronized void stop() {
stopRequested = true;
notifyAll(); // Wakes run() if it is sleeping
}
@ -460,24 +457,30 @@ public class HRegionServer
try {
this.workerThread.join();
} catch(InterruptedException iex) {
// continue
}
try {
this.logRollerThread.join();
} catch(InterruptedException iex) {
// continue
}
try {
this.cacheFlusherThread.join();
} catch(InterruptedException iex) {
// continue
}
try {
this.splitOrCompactCheckerThread.join();
} catch(InterruptedException iex) {
// continue
}
try {
this.server.join();
} catch(InterruptedException iex) {
// continue
}
LOG.info("HRegionServer stopped at: " + info.getServerAddress().toString());
LOG.info("HRegionServer stopped at: " +
info.getServerAddress().toString());
}
/**
@ -506,9 +509,6 @@ public class HRegionServer
: msgInterval - (System.currentTimeMillis() - lastMsg);
if(waitTime > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sleep");
}
synchronized (this) {
try {
wait(waitTime);
@ -586,9 +586,6 @@ public class HRegionServer
waitTime = stopRequested ? 0
: msgInterval - (System.currentTimeMillis() - lastMsg);
if (waitTime > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sleep");
}
synchronized (this) {
try {
wait(waitTime);
@ -596,9 +593,6 @@ public class HRegionServer
// On interrupt we go around to the while test of stopRequested
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Wake");
}
}
}
}
@ -666,7 +660,7 @@ public class HRegionServer
* updated the meta or root regions, and the master will pick that up on its
* next rescan of the root or meta tables.
*/
private void reportSplit(HRegionInfo newRegionA, HRegionInfo newRegionB) {
void reportSplit(HRegionInfo newRegionA, HRegionInfo newRegionB) {
synchronized(outboundMsgs) {
outboundMsgs.add(new HMsg(HMsg.MSG_NEW_REGION, newRegionA));
outboundMsgs.add(new HMsg(HMsg.MSG_NEW_REGION, newRegionB));
@ -677,10 +671,10 @@ public class HRegionServer
// HMaster-given operations
//////////////////////////////////////////////////////////////////////////////
private Vector<HMsg> toDo;
Vector<HMsg> toDo;
private Worker worker;
private Thread workerThread;
private class Worker implements Runnable {
class Worker implements Runnable {
public void stop() {
synchronized(toDo) {
toDo.notifyAll();
@ -700,6 +694,7 @@ public class HRegionServer
LOG.debug("Wake on todo");
}
} catch(InterruptedException e) {
// continue
}
}
if(stopRequested) {
@ -761,38 +756,34 @@ public class HRegionServer
LOG.error(e);
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("worker thread exiting");
}
LOG.info("worker thread exiting");
}
}
private void openRegion(HRegionInfo regionInfo) throws IOException {
void openRegion(HRegionInfo regionInfo) throws IOException {
this.lock.writeLock().lock();
try {
HRegion region =
new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile);
regions.put(region.getRegionName(), region);
this.onlineRegions.put(region.getRegionName(), region);
reportOpen(region);
} finally {
this.lock.writeLock().unlock();
}
}
private void closeRegion(HRegionInfo info, boolean reportWhenCompleted)
throws IOException {
void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
throws IOException {
this.lock.writeLock().lock();
HRegion region = null;
try {
region = regions.remove(info.regionName);
region = onlineRegions.remove(hri.regionName);
} finally {
this.lock.writeLock().unlock();
}
if(region != null) {
region.close();
if(reportWhenCompleted) {
reportClose(region);
}
@ -800,12 +791,12 @@ public class HRegionServer
}
/** Called either when the master tells us to restart or from stop() */
private void closeAllRegions() {
void closeAllRegions() {
Vector<HRegion> regionsToClose = new Vector<HRegion>();
this.lock.writeLock().lock();
try {
regionsToClose.addAll(regions.values());
regions.clear();
regionsToClose.addAll(onlineRegions.values());
onlineRegions.clear();
} finally {
this.lock.writeLock().unlock();
}
@ -817,7 +808,6 @@ public class HRegionServer
try {
region.close();
LOG.debug("region closed " + region.getRegionName());
} catch(IOException e) {
LOG.error("error closing region " + region.getRegionName(), e);
}
@ -829,55 +819,34 @@ public class HRegionServer
//////////////////////////////////////////////////////////////////////////////
/** Obtain a table descriptor for the given region */
public HRegionInfo getRegionInfo(Text regionName) throws NotServingRegionException {
HRegion region = getRegion(regionName);
return region.getRegionInfo();
public HRegionInfo getRegionInfo(Text regionName)
throws NotServingRegionException {
return getRegion(regionName).getRegionInfo();
}
/** Get the indicated row/column */
public BytesWritable get(Text regionName, Text row, Text column) throws IOException {
HRegion region = getRegion(regionName);
if (LOG.isDebugEnabled()) {
LOG.debug("get " + row.toString() + ", " + column.toString());
}
BytesWritable results = region.get(row, column);
if(results != null) {
return results;
}
return null;
public BytesWritable get(Text regionName, Text row, Text column)
throws IOException {
return getRegion(regionName).get(row, column);
}
/** Get multiple versions of the indicated row/col */
public BytesWritable[] get(Text regionName, Text row, Text column,
int numVersions) throws IOException {
HRegion region = getRegion(regionName);
BytesWritable[] results = region.get(row, column, numVersions);
if(results != null) {
return results;
}
return null;
int numVersions)
throws IOException {
return getRegion(regionName).get(row, column, numVersions);
}
/** Get multiple timestamped versions of the indicated row/col */
public BytesWritable[] get(Text regionName, Text row, Text column,
long timestamp, int numVersions) throws IOException {
HRegion region = getRegion(regionName);
BytesWritable[] results = region.get(row, column, timestamp, numVersions);
if(results != null) {
return results;
}
return null;
long timestamp, int numVersions)
throws IOException {
return getRegion(regionName).get(row, column, timestamp, numVersions);
}
/** Get all the columns (along with their names) for a given row. */
public LabelledData[] getRow(Text regionName, Text row) throws IOException {
HRegion region = getRegion(regionName);
TreeMap<Text, BytesWritable> map = region.getFull(row);
LabelledData result[] = new LabelledData[map.size()];
int counter = 0;
@ -910,13 +879,44 @@ public class HRegionServer
}
}
public LabelledData[] next(final long scannerId, final HStoreKey key)
throws IOException {
Text scannerName = new Text(String.valueOf(scannerId));
HInternalScannerInterface s = scanners.get(scannerName);
if (s == null) {
throw new UnknownScannerException("Name: " + scannerName + ", key " +
key);
}
leases.renewLease(scannerName, scannerName);
TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
ArrayList<LabelledData> values = new ArrayList<LabelledData>();
// Keep getting rows till we find one that has at least one non-deleted
// column value.
while (s.next(key, results)) {
for(Map.Entry<Text, BytesWritable> e: results.entrySet()) {
BytesWritable val = e.getValue();
if(val.getSize() == DELETE_BYTES.getSize()
&& val.compareTo(DELETE_BYTES) == 0) {
// Column value is deleted. Don't return it.
continue;
}
values.add(new LabelledData(e.getKey(), val));
}
if (values.size() > 0) {
// Row has something in it. Let it out. Else go get another row.
break;
}
// Need to clear results before we go back up and call 'next' again.
results.clear();
}
return values.toArray(new LabelledData[values.size()]);
}
public long startUpdate(Text regionName, long clientid, Text row)
throws IOException {
HRegion region = getRegion(regionName);
long lockid = region.startUpdate(row);
leases.createLease(new Text(String.valueOf(clientid)),
this.leases.createLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)),
new RegionListener(region, lockid));
@ -926,48 +926,36 @@ public class HRegionServer
/** Add something to the HBase. */
public void put(Text regionName, long clientid, long lockid, Text column,
BytesWritable val) throws IOException {
HRegion region = getRegion(regionName);
HRegion region = getRegion(regionName, true);
leases.renewLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
region.put(lockid, column, val);
}
/** Remove a cell from the HBase. */
public void delete(Text regionName, long clientid, long lockid, Text column)
throws IOException {
HRegion region = getRegion(regionName);
leases.renewLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
region.delete(lockid, column);
}
/** Abandon the transaction */
public void abort(Text regionName, long clientid, long lockid)
throws IOException {
HRegion region = getRegion(regionName);
HRegion region = getRegion(regionName, true);
leases.cancelLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
region.abort(lockid);
}
/** Confirm the transaction */
public void commit(Text regionName, long clientid, long lockid)
throws IOException {
HRegion region = getRegion(regionName);
throws IOException {
HRegion region = getRegion(regionName, true);
leases.cancelLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
region.commit(lockid);
}
@ -977,27 +965,55 @@ public class HRegionServer
new Text(String.valueOf(lockid)));
}
/** Private utility method for safely obtaining an HRegion handle. */
private HRegion getRegion(Text regionName) throws NotServingRegionException {
this.lock.readLock().lock();
/** Private utility method for safely obtaining an HRegion handle.
* @param regionName Name of online {@link HRegion} to return
* @return {@link HRegion} for <code>regionName</code>
* @throws NotServingRegionException
*/
private HRegion getRegion(final Text regionName)
throws NotServingRegionException {
return getRegion(regionName, false);
}
/** Private utility method for safely obtaining an HRegion handle.
* @param regionName Name of online {@link HRegion} to return
* @param checkRetiringRegions Set true if we're to check retiring regions
* as well as online regions.
* @return {@link HRegion} for <code>regionName</code>
* @throws NotServingRegionException
*/
private HRegion getRegion(final Text regionName,
final boolean checkRetiringRegions)
throws NotServingRegionException {
HRegion region = null;
this.lock.readLock().lock();
try {
region = regions.get(regionName);
region = onlineRegions.get(regionName);
if (region == null && checkRetiringRegions) {
region = this.retiringRegions.get(regionName);
if (LOG.isDebugEnabled()) {
if (region != null) {
LOG.debug("Found region " + regionName + " in retiringRegions");
}
}
}
if (region == null) {
throw new NotServingRegionException(regionName.toString());
}
return region;
} finally {
this.lock.readLock().unlock();
}
if(region == null) {
throw new NotServingRegionException(regionName.toString());
}
return region;
}
//////////////////////////////////////////////////////////////////////////////
// remote scanner interface
//////////////////////////////////////////////////////////////////////////////
private Map<Text, HInternalScannerInterface> scanners;
Map<Text, HInternalScannerInterface> scanners;
private class ScannerListener extends LeaseListener {
private Text scannerName;
@ -1006,6 +1022,7 @@ public class HRegionServer
}
public void leaseExpired() {
LOG.info("Scanner " + scannerName + " lease expired");
HInternalScannerInterface s = null;
synchronized(scanners) {
s = scanners.remove(scannerName);
@ -1018,8 +1035,7 @@ public class HRegionServer
/** Start a scanner for a given HRegion. */
public long openScanner(Text regionName, Text[] cols, Text firstRow)
throws IOException {
throws IOException {
HRegion r = getRegion(regionName);
long scannerId = -1L;
try {
@ -1029,8 +1045,8 @@ public class HRegionServer
synchronized(scanners) {
scanners.put(scannerName, s);
}
leases.createLease(scannerName, scannerName, new ScannerListener(scannerName));
leases.createLease(scannerName, scannerName,
new ScannerListener(scannerName));
} catch(IOException e) {
LOG.error(e);
throw e;
@ -1038,38 +1054,6 @@ public class HRegionServer
return scannerId;
}
public LabelledData[] next(long scannerId, HStoreKey key) throws IOException {
Text scannerName = new Text(String.valueOf(scannerId));
HInternalScannerInterface s = scanners.get(scannerName);
if(s == null) {
throw new IOException("unknown scanner");
}
leases.renewLease(scannerName, scannerName);
TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
ArrayList<LabelledData> values = new ArrayList<LabelledData>();
if(s.next(key, results)) {
for(Iterator<Map.Entry<Text, BytesWritable>> it
= results.entrySet().iterator();
it.hasNext(); ) {
Map.Entry<Text, BytesWritable> e = it.next();
BytesWritable val = e.getValue();
if(val.getSize() == DELETE_BYTES.getSize()
&& val.compareTo(DELETE_BYTES) == 0) {
// Value is deleted. Don't return a value
continue;
} else {
values.add(new LabelledData(e.getKey(), val));
}
}
}
return values.toArray(new LabelledData[values.size()]);
}
public void close(long scannerId) throws IOException {
Text scannerName = new Text(String.valueOf(scannerId));
HInternalScannerInterface s = null;
@ -1077,7 +1061,7 @@ public class HRegionServer
s = scanners.remove(scannerName);
}
if(s == null) {
throw new IOException("unknown scanner");
throw new UnknownScannerException(scannerName.toString());
}
s.close();
leases.cancelLease(scannerName, scannerName);
@ -1096,7 +1080,7 @@ public class HRegionServer
System.exit(0);
}
public static void main(String [] args) throws IOException {
public static void main(String [] args) {
if (args.length < 1) {
printUsageAndExit();
}

View File

@ -369,8 +369,8 @@ public class HStore {
* Compact the back-HStores. This method may take some time, so the calling
* thread must be able to block for long periods.
*
* During this time, the HStore can work as usual, getting values from MapFiles
* and writing new MapFiles from given memcaches.
* During this time, the HStore can work as usual, getting values from
* MapFiles and writing new MapFiles from given memcaches.
*
* Existing MapFiles are not destroyed until the new compacted TreeMap is
* completely written-out to disk.
@ -410,8 +410,7 @@ public class HStore {
// Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps
long maxSeenSeqID = -1;
for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
for (HStoreFile hsf: toCompactFiles) {
long seqid = hsf.loadInfo(fs);
if(seqid > 0) {
if(seqid > maxSeenSeqID) {
@ -587,7 +586,6 @@ public class HStore {
HStoreFile hsf = it.next();
hsf.write(out);
}
} finally {
out.close();
}
@ -595,12 +593,7 @@ public class HStore {
// Indicate that we're done.
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
out = new DataOutputStream(fs.create(doneFile));
try {
} finally {
out.close();
}
(new DataOutputStream(fs.create(doneFile))).close();
// Move the compaction into place.

View File

@ -91,7 +91,9 @@ public class Leases {
Lease lease = new Lease(holderId, resourceId, listener);
Text leaseId = lease.getLeaseId();
if(leases.get(leaseId) != null) {
throw new IOException("Impossible state for createLease(): Lease for holderId " + holderId + " and resourceId " + resourceId + " is still held.");
throw new IOException("Impossible state for createLease(): Lease " +
"for holderId " + holderId + " and resourceId " + resourceId +
" is still held.");
}
leases.put(leaseId, lease);
sortedLeases.add(lease);
@ -106,11 +108,10 @@ public class Leases {
Text leaseId = createLeaseId(holderId, resourceId);
Lease lease = leases.get(leaseId);
if(lease == null) {
// It's possible that someone tries to renew the lease, but
// it just expired a moment ago. So fail.
throw new IOException("Cannot renew lease is not held (holderId=" + holderId + ", resourceId=" + resourceId + ")");
throw new IOException("Cannot renew lease; not held (holderId=" +
holderId + ", resourceId=" + resourceId + ")");
}
sortedLeases.remove(lease);

View File

@ -0,0 +1,30 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
public class RegionNotFoundException extends IOException {
private static final long serialVersionUID = 993179627856392526L;
public RegionNotFoundException() {
super();
}
public RegionNotFoundException(String s) {
super(s);
}
}

View File

@ -19,9 +19,22 @@ import org.apache.hadoop.io.Text;
/**
* Used as a callback mechanism so that an HRegion can notify the HRegionServer
* when a region is about to be closed during a split operation. This is done
* to minimize the amount of time the region is off-line.
* of the different stages making an HRegion unavailable. Regions are made
* unavailable during region split operations.
*/
public interface RegionUnavailableListener {
public void regionIsUnavailable(Text regionName);
/**
* <code>regionName</code> is closing.
* Listener should stop accepting new writes but can continue to service
* outstanding transactions.
* @param regionName
*/
public void closing(final Text regionName);
/**
* <code>regionName</code> is closed and no longer available.
* Listener should clean up any references to <code>regionName</code>
* @param regionName
*/
public void closed(final Text regionName);
}

View File

@ -0,0 +1,30 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
public class UnknownScannerException extends IOException {
private static final long serialVersionUID = 993179627856392526L;
public UnknownScannerException() {
super();
}
public UnknownScannerException(String s) {
super(s);
}
}

View File

@ -0,0 +1,30 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
public class WrongRegionException extends IOException {
private static final long serialVersionUID = 993179627856392526L;
public WrongRegionException() {
super();
}
public WrongRegionException(String s) {
super(s);
}
}

View File

@ -42,7 +42,7 @@ public class EvaluationClient implements HConstants {
private static final int ROW_LENGTH = 1024;
private static final int ONE_HUNDRED_MB = 1024 * 1024 * 1 /*100 RESTORE*/;
private static final int ONE_HUNDRED_MB = 1024 * 1024 * 100;
private static final int ROWS_PER_100_MB = ONE_HUNDRED_MB / ROW_LENGTH;
private static final int ONE_GB = ONE_HUNDRED_MB * 10;
@ -62,7 +62,7 @@ public class EvaluationClient implements HConstants {
RANDOM_WRITE,
SEQUENTIAL_READ,
SEQUENTIAL_WRITE,
SCAN};
SCAN}
private Random rand;
private Configuration conf;
@ -177,8 +177,8 @@ public class EvaluationClient implements HConstants {
test == Test.SCAN || test == Test.SEQUENTIAL_READ ||
test == Test.SEQUENTIAL_WRITE) {
for(int range = 0; range < 10; range++) {
long elapsedTime = sequentialWrite(range * nRows, nRows);
for(int i = 0; i < 10; i++) {
long elapsedTime = sequentialWrite(i * nRows, nRows);
if (test == Test.SEQUENTIAL_WRITE) {
totalElapsedTime += elapsedTime;
}
@ -188,8 +188,8 @@ public class EvaluationClient implements HConstants {
switch(test) {
case RANDOM_READ:
for(int range = 0 ; range < 10; range++) {
long elapsedTime = randomRead(range * nRows, nRows);
for(int i = 0 ; i < 10; i++) {
long elapsedTime = randomRead(i * nRows, nRows);
totalElapsedTime += elapsedTime;
}
System.out.print("Random read of " + R + " rows completed in: ");
@ -199,15 +199,15 @@ public class EvaluationClient implements HConstants {
throw new UnsupportedOperationException("Not yet implemented");
case RANDOM_WRITE:
for(int range = 0 ; range < 10; range++) {
long elapsedTime = randomWrite(range * nRows, nRows);
for(int i = 0 ; i < 10; i++) {
long elapsedTime = randomWrite(i * nRows, nRows);
totalElapsedTime += elapsedTime;
}
System.out.print("Random write of " + R + " rows completed in: ");
break;
case SCAN:
for(int range = 0 ; range < 10; range++) {
for(int i = 0 ; i < 10; i++) {
long elapsedTime = scan(range * nRows, nRows);
totalElapsedTime += elapsedTime;
}
@ -215,8 +215,8 @@ public class EvaluationClient implements HConstants {
break;
case SEQUENTIAL_READ:
for(int range = 0 ; range < 10; range++) {
long elapsedTime = sequentialRead(range * nRows, nRows);
for(int i = 0 ; i < 10; i++) {
long elapsedTime = sequentialRead(i * nRows, nRows);
totalElapsedTime += elapsedTime;
}
System.out.print("Sequential read of " + R + " rows completed in: ");
@ -230,16 +230,16 @@ public class EvaluationClient implements HConstants {
throw new IllegalArgumentException("Invalid command value: " + test);
}
System.out.println((totalElapsedTime / 1000.0));
} catch(Exception e) {
e.printStackTrace();
LOG.error("Failed", e);
} finally {
LOG.info("Deleting table " + tableDescriptor.getName());
this.client.deleteTable(tableDescriptor.getName());
}
}
private void runOneTest(Test cmd) {
private void runOneTest(@SuppressWarnings("unused") Test cmd) {
// TODO
}
private void runTest(Test test) throws IOException {
@ -302,6 +302,10 @@ public class EvaluationClient implements HConstants {
System.err.println(" running: 1 <= value <= 500");
System.err.println(" range Integer. Required. 0 <= value <= " +
"(nclients * 10) - 1");
System.err.println("Examples:");
System.err.println(" To run a single evaluation client:");
System.err.println(" $ bin/hbase " +
"org.apache.hadoop.hbase.EvaluationClient sequentialWrite 1 1");
}
private void getArgs(final int start, final String[] args) {

View File

@ -157,25 +157,20 @@ public class MiniHBaseCluster implements HConstants {
public void shutdown() {
LOG.info("Shutting down the HBase Cluster");
for(int i = 0; i < regionServers.length; i++) {
try {
regionServers[i].stop();
} catch(IOException e) {
e.printStackTrace();
}
regionServers[i].stop();
}
master.shutdown();
for(int i = 0; i < regionServers.length; i++) {
try {
regionThreads[i].join();
} catch(InterruptedException e) {
// continue
}
}
try {
masterThread.join();
} catch(InterruptedException e) {
// continue
}
LOG.info("HBase Cluster shutdown complete");

View File

@ -0,0 +1,33 @@
package org.apache.hadoop.hbase;
import org.apache.hadoop.io.Text;
import junit.framework.TestCase;
/**
* Test comparing HBase objects.
*/
public class TestCompare extends TestCase {
public void testHRegionInfo() {
HRegionInfo a = new HRegionInfo(1, new HTableDescriptor("a"), null, null);
HRegionInfo b = new HRegionInfo(2, new HTableDescriptor("b"), null, null);
assertTrue(a.compareTo(b) != 0);
HTableDescriptor t = new HTableDescriptor("t");
Text midway = new Text("midway");
a = new HRegionInfo(1, t, null, midway);
b = new HRegionInfo(2, t, midway, null);
assertTrue(a.compareTo(b) < 0);
assertTrue(b.compareTo(a) > 0);
assertEquals(a, a);
assertTrue(a.compareTo(a) == 0);
a = new HRegionInfo(1, t, new Text("a"), new Text("d"));
b = new HRegionInfo(2, t, new Text("e"), new Text("g"));
assertTrue(a.compareTo(b) < 0);
a = new HRegionInfo(1, t, new Text("aaaa"), new Text("dddd"));
b = new HRegionInfo(2, t, new Text("e"), new Text("g"));
assertTrue(a.compareTo(b) < 0);
a = new HRegionInfo(1, t, new Text("aaaa"), new Text("dddd"));
b = new HRegionInfo(2, t, new Text("aaaa"), new Text("eeee"));
assertTrue(a.compareTo(b) < 0);
}
}

View File

@ -214,7 +214,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
for (int i = 0; i < lockCount; i++) {
try {
Text rowid = new Text(Integer.toString(i));
lockids[i] = region.obtainLock(rowid);
lockids[i] = region.obtainRowLock(rowid);
rowid.equals(region.getRowFromLock(lockids[i]));
LOG.debug(getName() + " locked " + rowid.toString());
} catch (IOException e) {
@ -590,8 +590,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
}
// NOTE: This test depends on testBatchWrite succeeding
private void splitAndMerge() throws IOException {
void splitAndMerge() throws IOException {
Text midKey = new Text();
if(region.needsSplit(midKey)) {
@ -634,7 +633,11 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.RegionUnavailableListener#regionIsUnavailable(org.apache.hadoop.io.Text)
*/
public void regionIsUnavailable(Text regionName) {
public void closing(@SuppressWarnings("unused") final Text regionName) {
// We don't use this here. It is only for the HRegionServer
}
public void closed(@SuppressWarnings("unused") final Text regionName) {
// We don't use this here. It is only for the HRegionServer
}

View File

@ -28,6 +28,9 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
/**
* Test of a long-lived scanner validating as we go.
*/
public class TestScanner extends HBaseTestCase {
private static final Text FIRST_ROW = new Text();
private static final Text[] COLS = {

View File

@ -0,0 +1,106 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
/**
* Additional scanner tests.
* {@link TestScanner} does a custom setup/takedown not conducive
* to addition of extra scanning tests.
* @see TestScanner
*/
public class TestScanner2 extends HBaseClusterTestCase {
final Log LOG = LogFactory.getLog(this.getClass().getName());
/**
* Test scanning of META table around split.
* There was a problem where only one of the splits showed in a scan.
* Split deletes a row and then adds two new ones.
* @throws IOException
*/
public void testSplitDeleteOneAddTwoRegions() throws IOException {
// First add a new table. Its intial region will be added to META region.
HClient client = new HClient(this.conf);
client.createTable(new HTableDescriptor(getName()));
List<HRegionInfo> regions = scan(client, HConstants.META_TABLE_NAME);
assertEquals("Expected one region", regions.size(), 1);
HRegionInfo region = regions.get(0);
assertTrue("Expected region named for test",
region.regionName.toString().startsWith(getName()));
// Now do what happens at split time; remove old region and then add two
// new ones in its place.
HRegion.removeRegionFromMETA(client, HConstants.META_TABLE_NAME,
region.regionName);
HTableDescriptor desc = region.tableDesc;
Path homedir = new Path(getName());
List<HRegion> newRegions = new ArrayList<HRegion>(2);
newRegions.add(HRegion.createHRegion(
new HRegionInfo(2L, desc, null, new Text("midway")),
homedir, this.conf, null, null));
newRegions.add(HRegion.createHRegion(
new HRegionInfo(3L, desc, new Text("midway"), null),
homedir, this.conf, null, null));
for (HRegion r: newRegions) {
HRegion.addRegionToMETA(client, HConstants.META_TABLE_NAME, r,
this.cluster.getHMasterAddress(), -1L);
}
regions = scan(client, HConstants.META_TABLE_NAME);
assertEquals("Should be two regions only", regions.size(), 2);
}
private List<HRegionInfo> scan(final HClient client, final Text table)
throws IOException {
List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
HRegionInterface regionServer = null;
long scannerId = -1L;
try {
client.openTable(table);
HClient.RegionLocation rl = client.getRegionLocation(table);
regionServer = client.getHRegionConnection(rl.serverAddress);
scannerId = regionServer.openScanner(rl.regionInfo.regionName,
HMaster.METACOLUMNS, new Text());
while (true) {
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
HStoreKey key = new HStoreKey();
LabelledData[] values = regionServer.next(scannerId, key);
if (values.length == 0) {
break;
}
for (int i = 0; i < values.length; i++) {
byte[] bytes = new byte[values[i].getData().getSize()];
System.arraycopy(values[i].getData().get(), 0, bytes, 0,
bytes.length);
results.put(values[i].getLabel(), bytes);
}
HRegionInfo info = HRegion.getRegionInfo(results);
String serverName = HRegion.getServerName(results);
long startCode = HRegion.getStartCode(results);
LOG.info(Thread.currentThread().getName() + " scanner: " +
Long.valueOf(scannerId) + " row: " + key +
": regioninfo: {" + info.toString() + "}, server: " + serverName +
", startCode: " + startCode);
regions.add(info);
}
} finally {
try {
if (scannerId != -1L) {
if (regionServer != null) {
regionServer.close(scannerId);
}
}
} catch (IOException e) {
LOG.error(e);
}
}
return regions;
}
}