HADOOP-1421 HADOOP-1466 When a region server dies, its log file must be split up on a per region basis

so that region servers are assigned the regions have a log to apply edits from. Enhance fail over 
capabilities. 

For all the files modified, clean up javadoc, class method and field visibility.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@546192 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-06-11 16:46:27 +00:00
parent 3509f88c48
commit 09cf0a100f
34 changed files with 2128 additions and 957 deletions

View File

@ -25,3 +25,6 @@ Trunk (unreleased changes)
13. HADOOP-1445 Support updates across region splits and compactions
14. HADOOP-1460 On shutdown IOException with complaint 'Cannot cancel lease
that is not held'
15. HADOOP-1421 Failover detection, split log files.
For the files modified, also clean up javadoc, class, field and method
visibility (HADOOP-1466)

View File

@ -15,7 +15,7 @@
</description>
</property>
<property>
<name>hbase.regiondir</name>
<name>hbase.rootdir</name>
<value>${hadoop.tmp.dir}/hbase</value>
<description>The directory shared by region servers.
</description>

View File

@ -42,9 +42,12 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
// The kind of match we are doing on a column:
private static enum MATCH_TYPE {
FAMILY_ONLY, // Just check the column family name
REGEX, // Column family + matches regex
SIMPLE // Literal matching
/** Just check the column family name */
FAMILY_ONLY,
/** Column family + matches regex */
REGEX,
/** Literal matching */
SIMPLE
}
// This class provides column matching functions that are more sophisticated
@ -63,12 +66,12 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
ColumnMatcher(Text col) throws IOException {
String column = col.toString();
try {
int colpos = column.indexOf(":") + 1;
if(colpos == 0) {
int colpos = column.indexOf(":");
if(colpos == -1) {
throw new InvalidColumnNameException("Column name has no family indicator.");
}
String columnkey = column.substring(colpos);
String columnkey = column.substring(colpos + 1);
if(columnkey == null || columnkey.length() == 0) {
this.matchType = MATCH_TYPE.FAMILY_ONLY;
@ -97,7 +100,7 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
return c.equals(this.col);
} else if(this.matchType == MATCH_TYPE.FAMILY_ONLY) {
return c.toString().startsWith(this.family);
return HStoreKey.extractFamily(c).toString().equals(this.family);
} else if(this.matchType == MATCH_TYPE.REGEX) {
return this.columnMatcher.matcher(c.toString()).matches();
@ -211,6 +214,7 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
* @param key The key that matched
* @param results All the results for <code>key</code>
* @return true if a match was found
* @throws IOException
*
* @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap)
*/

View File

@ -53,7 +53,7 @@ public class HClient implements HConstants {
COL_REGIONINFO
};
private static final Text EMPTY_START_ROW = new Text();
static final Text EMPTY_START_ROW = new Text();
long pause;
int numRetries;
@ -64,8 +64,8 @@ public class HClient implements HConstants {
* Data structure that holds current location for a region and its info.
*/
static class RegionLocation {
public HRegionInfo regionInfo;
public HServerAddress serverAddress;
HRegionInfo regionInfo;
HServerAddress serverAddress;
RegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress) {
this.regionInfo = regionInfo;
@ -83,7 +83,7 @@ public class HClient implements HConstants {
private TreeMap<Text, SortedMap<Text, RegionLocation>> tablesToServers;
// For the "current" table: Map startRow -> (HRegionInfo, HServerAddress)
private SortedMap<Text, RegionLocation> tableServers;
SortedMap<Text, RegionLocation> tableServers;
// Known region HServerAddress.toString() -> HRegionInterface
private TreeMap<String, HRegionInterface> servers;
@ -95,7 +95,10 @@ public class HClient implements HConstants {
Random rand;
long clientid;
/** Creates a new HClient */
/**
* Creates a new HClient
* @param conf - Configuration object
*/
public HClient(Configuration conf) {
this.conf = conf;
@ -239,6 +242,12 @@ public class HClient implements HConstants {
}
}
/**
* Deletes a table
*
* @param tableName - name of table to delete
* @throws IOException
*/
public synchronized void deleteTable(Text tableName) throws IOException {
checkReservedTableName(tableName);
checkMaster();
@ -254,23 +263,21 @@ public class HClient implements HConstants {
HRegionInterface server =
getHRegionConnection(firstMetaServer.serverAddress);
DataInputBuffer inbuf = new DataInputBuffer();
HStoreKey key = new HStoreKey();
HRegionInfo info = new HRegionInfo();
for (int tries = 0; tries < numRetries; tries++) {
long scannerId = -1L;
try {
scannerId = server.openScanner(firstMetaServer.regionInfo.regionName,
REGIONINFO, tableName);
LabelledData[] values = server.next(scannerId, key);
KeyedData[] values = server.next(scannerId);
if(values == null || values.length == 0) {
break;
}
boolean found = false;
for(int j = 0; j < values.length; j++) {
if(values[j].getLabel().equals(COL_REGIONINFO)) {
if(values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
byte[] bytes = new byte[values[j].getData().getSize()];
System.arraycopy(values[j].getData().get(), 0, bytes, 0,
bytes.length);
System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length);
inbuf.reset(bytes, bytes.length);
info.readFields(inbuf);
if(info.tableDesc.getName().equals(tableName)) {
@ -301,7 +308,15 @@ public class HClient implements HConstants {
LOG.info("table " + tableName + " deleted");
}
public synchronized void addColumn(Text tableName, HColumnDescriptor column) throws IOException {
/**
* Add a column to an existing table
*
* @param tableName - name of the table to add column to
* @param column - column descriptor of column to be added
* @throws IOException
*/
public synchronized void addColumn(Text tableName, HColumnDescriptor column)
throws IOException {
checkReservedTableName(tableName);
checkMaster();
try {
@ -312,7 +327,15 @@ public class HClient implements HConstants {
}
}
public synchronized void deleteColumn(Text tableName, Text columnName) throws IOException {
/**
* Delete a column from a table
*
* @param tableName - name of table
* @param columnName - name of column to be deleted
* @throws IOException
*/
public synchronized void deleteColumn(Text tableName, Text columnName)
throws IOException {
checkReservedTableName(tableName);
checkMaster();
try {
@ -323,6 +346,12 @@ public class HClient implements HConstants {
}
}
/**
* Brings a table on-line (enables it)
*
* @param tableName - name of the table
* @throws IOException
*/
public synchronized void enableTable(Text tableName) throws IOException {
checkReservedTableName(tableName);
checkMaster();
@ -340,7 +369,6 @@ public class HClient implements HConstants {
HRegionInterface server = getHRegionConnection(firstMetaServer.serverAddress);
DataInputBuffer inbuf = new DataInputBuffer();
HStoreKey key = new HStoreKey();
HRegionInfo info = new HRegionInfo();
for(int tries = 0; tries < numRetries; tries++) {
int valuesfound = 0;
@ -348,21 +376,28 @@ public class HClient implements HConstants {
try {
scannerId = server.openScanner(firstMetaServer.regionInfo.regionName,
REGIONINFO, tableName);
LabelledData[] values = server.next(scannerId, key);
boolean isenabled = false;
while(true) {
KeyedData[] values = server.next(scannerId);
if(values == null || values.length == 0) {
if(valuesfound == 0) {
throw new NoSuchElementException("table " + tableName + " not found");
}
break;
}
valuesfound += 1;
boolean isenabled = false;
for(int j = 0; j < values.length; j++) {
if(values[j].getLabel().equals(COL_REGIONINFO)) {
if(values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
byte[] bytes = new byte[values[j].getData().getSize()];
System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length);
inbuf.reset(bytes, bytes.length);
info.readFields(inbuf);
isenabled = !info.offLine;
break;
}
}
if(isenabled) {
break;
}
}
if(isenabled) {
@ -395,6 +430,13 @@ public class HClient implements HConstants {
LOG.info("Enabled table " + tableName);
}
/**
* Disables a table (takes it off-line) If it is being served, the master
* will tell the servers to stop serving it.
*
* @param tableName - name of table
* @throws IOException
*/
public synchronized void disableTable(Text tableName) throws IOException {
checkReservedTableName(tableName);
checkMaster();
@ -412,7 +454,6 @@ public class HClient implements HConstants {
HRegionInterface server = getHRegionConnection(firstMetaServer.serverAddress);
DataInputBuffer inbuf = new DataInputBuffer();
HStoreKey key = new HStoreKey();
HRegionInfo info = new HRegionInfo();
for(int tries = 0; tries < numRetries; tries++) {
int valuesfound = 0;
@ -420,21 +461,28 @@ public class HClient implements HConstants {
try {
scannerId = server.openScanner(firstMetaServer.regionInfo.regionName,
REGIONINFO, tableName);
LabelledData[] values = server.next(scannerId, key);
boolean disabled = false;
while(true) {
KeyedData[] values = server.next(scannerId);
if(values == null || values.length == 0) {
if(valuesfound == 0) {
throw new NoSuchElementException("table " + tableName + " not found");
}
break;
}
valuesfound += 1;
boolean disabled = false;
for(int j = 0; j < values.length; j++) {
if(values[j].getLabel().equals(COL_REGIONINFO)) {
if(values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
byte[] bytes = new byte[values[j].getData().getSize()];
System.arraycopy(values[j].getData().get(), 0, bytes, 0, bytes.length);
inbuf.reset(bytes, bytes.length);
info.readFields(inbuf);
disabled = info.offLine;
break;
}
}
if(disabled) {
break;
}
}
if(disabled) {
@ -466,6 +514,10 @@ public class HClient implements HConstants {
LOG.info("Disabled table " + tableName);
}
/**
* Shuts down the HBase instance
* @throws IOException
*/
public synchronized void shutdown() throws IOException {
checkMaster();
this.master.shutdown();
@ -675,8 +727,8 @@ public class HClient implements HConstants {
* @throws IOException
*/
private TreeMap<Text, RegionLocation> scanOneMetaRegion(final RegionLocation t,
final Text tableName)
throws IOException {
final Text tableName) throws IOException {
HRegionInterface server = getHRegionConnection(t.serverAddress);
TreeMap<Text, RegionLocation> servers = new TreeMap<Text, RegionLocation>();
for(int tries = 0; servers.size() == 0 && tries < this.numRetries;
@ -691,8 +743,7 @@ public class HClient implements HConstants {
while(true) {
HRegionInfo regionInfo = null;
String serverAddress = null;
HStoreKey key = new HStoreKey();
LabelledData[] values = server.next(scannerId, key);
KeyedData[] values = server.next(scannerId);
if(values.length == 0) {
if(servers.size() == 0) {
// If we didn't find any servers then the table does not exist
@ -713,7 +764,7 @@ public class HClient implements HConstants {
for(int i = 0; i < values.length; i++) {
bytes = new byte[values[i].getData().getSize()];
System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
results.put(values[i].getLabel(), bytes);
results.put(values[i].getKey().getColumn(), bytes);
}
regionInfo = new HRegionInfo();
bytes = results.get(COL_REGIONINFO);
@ -808,6 +859,9 @@ public class HClient implements HConstants {
* If we wanted this to be really fast, we could implement a special
* catalog table that just contains table names and their descriptors.
* Right now, it only exists as part of the META table's region info.
*
* @return - returns an array of HTableDescriptors
* @throws IOException
*/
public synchronized HTableDescriptor[] listTables()
throws IOException {
@ -828,15 +882,14 @@ public class HClient implements HConstants {
scannerId = server.openScanner(t.regionInfo.regionName,
META_COLUMNS, EMPTY_START_ROW);
HStoreKey key = new HStoreKey();
DataInputBuffer inbuf = new DataInputBuffer();
while(true) {
LabelledData[] values = server.next(scannerId, key);
KeyedData[] values = server.next(scannerId);
if(values.length == 0) {
break;
}
for(int i = 0; i < values.length; i++) {
if(values[i].getLabel().equals(COL_REGIONINFO)) {
if(values[i].getKey().getColumn().equals(COL_REGIONINFO)) {
byte[] bytes = values[i].getData().get();
inbuf.reset(bytes, bytes.length);
HRegionInfo info = new HRegionInfo();
@ -901,7 +954,14 @@ public class HClient implements HConstants {
}
}
/** Get a single value for the specified row and column */
/**
* Get a single value for the specified row and column
*
* @param row - row key
* @param column - column name
* @return - value for specified row/column
* @throws IOException
*/
public byte[] get(Text row, Text column) throws IOException {
RegionLocation info = null;
BytesWritable value = null;
@ -931,7 +991,15 @@ public class HClient implements HConstants {
return null;
}
/** Get the specified number of versions of the specified row and column */
/**
* Get the specified number of versions of the specified row and column
*
* @param row - row key
* @param column - column name
* @param numVersions - number of versions to retrieve
* @return - array byte values
* @throws IOException
*/
public byte[][] get(Text row, Text column, int numVersions) throws IOException {
RegionLocation info = null;
BytesWritable[] values = null;
@ -968,8 +1036,16 @@ public class HClient implements HConstants {
/**
* Get the specified number of versions of the specified row and column with
* the specified timestamp.
*
* @param row - row key
* @param column - column name
* @param timestamp - timestamp
* @param numVersions - number of versions to retrieve
* @return - array of values that match the above criteria
* @throws IOException
*/
public byte[][] get(Text row, Text column, long timestamp, int numVersions) throws IOException {
public byte[][] get(Text row, Text column, long timestamp, int numVersions)
throws IOException {
RegionLocation info = null;
BytesWritable[] values = null;
@ -1002,10 +1078,16 @@ public class HClient implements HConstants {
return null;
}
/** Get all the data for the specified row */
public LabelledData[] getRow(Text row) throws IOException {
/**
* Get all the data for the specified row
*
* @param row - row key
* @return - map of colums to values
* @throws IOException
*/
public SortedMap<Text, byte[]> getRow(Text row) throws IOException {
RegionLocation info = null;
LabelledData[] value = null;
KeyedData[] value = null;
for(int tries = 0; tries < numRetries && info == null; tries++) {
info = getRegionLocation(row);
@ -1023,15 +1105,29 @@ public class HClient implements HConstants {
info = null;
}
}
return value;
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
if(value != null && value.length != 0) {
for(int i = 0; i < value.length; i++) {
byte[] bytes = new byte[value[i].getData().getSize()];
System.arraycopy(value[i].getData().get(), 0, bytes, 0, bytes.length);
results.put(value[i].getKey().getColumn(), bytes);
}
}
return results;
}
/**
* Get a scanner on the current table starting at the specified row.
* Return the specified columns.
*
* @param columns - array of columns to return
* @param startRow - starting row in table to scan
* @return - scanner
* @throws IOException
*/
public synchronized HScannerInterface obtainScanner(Text[] columns, Text startRow) throws IOException {
public synchronized HScannerInterface obtainScanner(Text[] columns,
Text startRow) throws IOException {
if(this.tableServers == null) {
throw new IllegalStateException("Must open table first");
}
@ -1069,9 +1165,20 @@ public class HClient implements HConstants {
long startUpdate() throws IOException;
}
/* Start an atomic row insertion or update
/**
* Start an atomic row insertion/update. No changes are committed until the
* call to commit() returns. A call to abort() will abandon any updates in progress.
*
* Callers to this method are given a lease for each unique lockid; before the
* lease expires, either abort() or commit() must be called. If it is not
* called, the system will automatically call abort() on the client's behalf.
*
* The client can gain extra time with a call to renewLease().
* Start an atomic row insertion or update
*
* @param row Name of row to start update against.
* @return Row lockid.
* @throws IOException
*/
public long startUpdate(final Text row) throws IOException {
// Implemention of the StartUpdate interface.
@ -1114,7 +1221,14 @@ public class HClient implements HConstants {
return retryProxy.startUpdate();
}
/** Change a value for the specified column */
/**
* Change a value for the specified column
*
* @param lockid - lock id returned from startUpdate
* @param column - column whose value is being set
* @param val - new value for column
* @throws IOException
*/
public void put(long lockid, Text column, byte val[]) throws IOException {
try {
this.currentServer.put(this.currentRegion, this.clientid, lockid, column,
@ -1131,7 +1245,13 @@ public class HClient implements HConstants {
}
}
/** Delete the value for a column */
/**
* Delete the value for a column
*
* @param lockid - lock id returned from startUpdate
* @param column - name of column whose value is to be deleted
* @throws IOException
*/
public void delete(long lockid, Text column) throws IOException {
try {
this.currentServer.delete(this.currentRegion, this.clientid, lockid,
@ -1148,7 +1268,12 @@ public class HClient implements HConstants {
}
}
/** Abort a row mutation */
/**
* Abort a row mutation
*
* @param lockid - lock id returned from startUpdate
* @throws IOException
*/
public void abort(long lockid) throws IOException {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
@ -1159,7 +1284,12 @@ public class HClient implements HConstants {
}
}
/** Finalize a row mutation */
/**
* Finalize a row mutation
*
* @param lockid - lock id returned from startUpdate
* @throws IOException
*/
public void commit(long lockid) throws IOException {
try {
this.currentServer.commit(this.currentRegion, this.clientid, lockid);
@ -1169,12 +1299,34 @@ public class HClient implements HConstants {
}
}
/**
* Renew lease on update
*
* @param lockid - lock id returned from startUpdate
* @throws IOException
*/
public void renewLease(long lockid) throws IOException {
try {
this.currentServer.renewLease(lockid, this.clientid);
} catch(IOException e) {
try {
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
} catch(IOException e2) {
LOG.warn(e2);
}
this.currentServer = null;
this.currentRegion = null;
throw e;
}
}
/**
* Implements the scanner interface for the HBase client.
* If there are multiple regions in a table, this scanner will iterate
* through them all.
*/
private class ClientScanner implements HScannerInterface {
private final Text EMPTY_COLUMN = new Text();
private Text[] columns;
private Text startRow;
private boolean closed;
@ -1198,7 +1350,7 @@ public class HClient implements HConstants {
this.regions = info.toArray(new RegionLocation[info.size()]);
}
public ClientScanner(Text[] columns, Text startRow) throws IOException {
ClientScanner(Text[] columns, Text startRow) throws IOException {
this.columns = columns;
this.startRow = startRow;
this.closed = false;
@ -1260,17 +1412,22 @@ public class HClient implements HConstants {
if(this.closed) {
return false;
}
LabelledData[] values = null;
KeyedData[] values = null;
do {
values = this.server.next(this.scannerId, key);
} while(values.length == 0 && nextScanner());
values = this.server.next(this.scannerId);
} while(values != null && values.length == 0 && nextScanner());
if(values != null && values.length != 0) {
for(int i = 0; i < values.length; i++) {
key.setRow(values[i].getKey().getRow());
key.setVersion(values[i].getKey().getTimestamp());
key.setColumn(EMPTY_COLUMN);
byte[] bytes = new byte[values[i].getData().getSize()];
System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
results.put(values[i].getLabel(), bytes);
results.put(values[i].getKey().getColumn(), bytes);
}
return values.length != 0;
}
return values == null ? false : values.length != 0;
}
/* (non-Javadoc)
@ -1333,8 +1490,13 @@ public class HClient implements HConstants {
" deleteTable testtable");
}
/**
* Process command-line args.
* @param args - command arguments
* @return 0 if successful -1 otherwise
*/
public int doCommandLine(final String args[]) {
// Process command-line args. TODO: Better cmd-line processing
// TODO: Better cmd-line processing
// (but hopefully something not as painful as cli options).
int errCode = -1;
if (args.length < 1) {
@ -1416,6 +1578,10 @@ public class HClient implements HConstants {
return errCode;
}
/**
* Main program
* @param args
*/
public static void main(final String args[]) {
Configuration c = new HBaseConfiguration();
int errCode = (new HClient(c)).doCommandLine(args);

View File

@ -81,6 +81,12 @@ public class HColumnDescriptor implements WritableComparable {
this.versionNumber = COLUMN_DESCRIPTOR_VERSION;
}
/**
* Construct a column descriptor specifying only the family name
* The other attributes are defaulted.
*
* @param columnName - column family name
*/
public HColumnDescriptor(String columnName) {
this();
this.name.set(columnName);

View File

@ -34,22 +34,41 @@ public interface HConstants {
// TODO: Support 'local': i.e. default of all running in single
// process. Same for regionserver. TODO: Is having HBase homed
// on port 60k OK?
/** Parameter name for master address */
static final String MASTER_ADDRESS = "hbase.master";
/** Default master address */
static final String DEFAULT_MASTER_ADDRESS = "localhost:60000";
// Key for hbase.regionserver address.
/** Parameter name for hbase.regionserver address. */
static final String REGIONSERVER_ADDRESS = "hbase.regionserver";
/** Default region server address */
static final String DEFAULT_REGIONSERVER_ADDRESS = "localhost:60010";
/** Parameter name for how often threads should wake up */
static final String THREAD_WAKE_FREQUENCY = "hbase.server.thread.wakefrequency";
static final String HREGION_DIR = "hbase.regiondir";
static final String DEFAULT_HREGION_DIR = "/hbase";
/** Parameter name for HBase instance root directory */
static final String HBASE_DIR = "hbase.rootdir";
/** Default HBase instance root directory */
static final String DEFAULT_HBASE_DIR = "/hbase";
/** Used to construct the name of the directory in which a HRegion resides */
static final String HREGIONDIR_PREFIX = "hregion_";
// TODO: Someone may try to name a column family 'log'. If they
// do, it will clash with the HREGION log dir subdirectory. FIX.
/** Used to construct the name of the log directory for a region server */
static final String HREGION_LOGDIR_NAME = "log";
/** Name of old log file for reconstruction */
static final String HREGION_OLDLOGFILE_NAME = "oldlogfile.log";
/** Default maximum file size */
static final long DEFAULT_MAX_FILE_SIZE = 128 * 1024 * 1024; // 128MB
// Always store the location of the root table's HRegion.
@ -59,26 +78,36 @@ public interface HConstants {
// each row in the root and meta tables describes exactly 1 region
// Do we ever need to know all the information that we are storing?
// The root tables' name.
/** The root table's name. */
static final Text ROOT_TABLE_NAME = new Text("--ROOT--");
// The META tables' name.
/** The META table's name. */
static final Text META_TABLE_NAME = new Text("--META--");
// Defines for the column names used in both ROOT and META HBase 'meta'
// tables.
// Defines for the column names used in both ROOT and META HBase 'meta' tables.
/** The ROOT and META column family */
static final Text COLUMN_FAMILY = new Text("info:");
/** ROOT/META column family member - contains HRegionInfo */
static final Text COL_REGIONINFO = new Text(COLUMN_FAMILY + "regioninfo");
/** ROOT/META column family member - contains HServerAddress.toString() */
static final Text COL_SERVER = new Text(COLUMN_FAMILY + "server");
/** ROOT/META column family member - contains server start code (a long) */
static final Text COL_STARTCODE = new Text(COLUMN_FAMILY + "serverstartcode");
// Other constants
/** When we encode strings, we always specify UTF8 encoding */
static final String UTF8_ENCODING = "UTF-8";
/** Value stored for a deleted item */
static final BytesWritable DELETE_BYTES =
new BytesWritable("HBASE::DELETEVAL".getBytes());
/** Value written to HLog on a complete cache flush */
static final BytesWritable COMPLETE_CACHEFLUSH =
new BytesWritable("HBASE::CACHEFLUSH".getBytes());

View File

@ -81,26 +81,26 @@ public class HLog implements HConstants {
Integer rollLock = 0;
/**
* Bundle up a bunch of log files (which are no longer being written to),
* into a new file. Delete the old log files when ready.
* @param srcDir Directory of log files to bundle:
* e.g. <code>${REGIONDIR}/log_HOST_PORT</code>
* @param dstFile Destination file:
* e.g. <code>${REGIONDIR}/oldlogfile_HOST_PORT</code>
* Split up a bunch of log files, that are no longer being written to,
* into new files, one per region. Delete the old log files when ready.
* @param rootDir Root directory of the HBase instance
* @param srcDir Directory of log files to split:
* e.g. <code>${ROOTDIR}/log_HOST_PORT</code>
* @param fs FileSystem
* @param conf HBaseConfiguration
* @throws IOException
*/
public static void consolidateOldLog(Path srcDir, Path dstFile,
FileSystem fs, Configuration conf)
throws IOException {
static void splitLog(Path rootDir, Path srcDir, FileSystem fs,
Configuration conf) throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("consolidating log files");
LOG.debug("splitting log files");
}
Path logfiles[] = fs.listPaths(srcDir);
SequenceFile.Writer newlog = SequenceFile.createWriter(fs, conf, dstFile,
HLogKey.class, HLogEdit.class);
TreeMap<Text, SequenceFile.Writer> logWriters =
new TreeMap<Text, SequenceFile.Writer>();
try {
for(int i = 0; i < logfiles.length; i++) {
SequenceFile.Reader in =
@ -109,7 +109,17 @@ public class HLog implements HConstants {
HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit();
while(in.next(key, val)) {
newlog.append(key, val);
Text regionName = key.getRegionName();
SequenceFile.Writer w = logWriters.get(regionName);
if(w == null) {
Path logfile = new Path(HStoreFile.getHRegionDir(rootDir,
regionName), HREGION_OLDLOGFILE_NAME);
w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
HLogEdit.class);
logWriters.put(regionName, w);
}
w.append(key, val);
}
} finally {
@ -118,7 +128,9 @@ public class HLog implements HConstants {
}
} finally {
newlog.close();
for(SequenceFile.Writer w: logWriters.values()) {
w.close();
}
}
if(fs.exists(srcDir)) {
@ -132,7 +144,7 @@ public class HLog implements HConstants {
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("log file consolidation completed");
LOG.debug("log file splitting completed");
}
}
@ -142,8 +154,13 @@ public class HLog implements HConstants {
* You should never have to load an existing log. If there is a log
* at startup, it should have already been processed and deleted by
* the time the HLog object is started up.
*
* @param fs
* @param dir
* @param conf
* @throws IOException
*/
public HLog(FileSystem fs, Path dir, Configuration conf) throws IOException {
HLog(FileSystem fs, Path dir, Configuration conf) throws IOException {
this.fs = fs;
this.dir = dir;
this.conf = conf;
@ -163,8 +180,10 @@ public class HLog implements HConstants {
*
* The 'this' lock limits access to the current writer so
* we don't append multiple items simultaneously.
*
* @throws IOException
*/
public void rollWriter() throws IOException {
void rollWriter() throws IOException {
synchronized(rollLock) {
// Try to roll the writer to a new file. We may have to
@ -267,8 +286,21 @@ public class HLog implements HConstants {
return new Path(dir, HLOG_DATFILE + String.format("%1$03d", filenum));
}
/** Shut down the log. */
public synchronized void close() throws IOException {
/**
* Shut down the log and delete the log directory
* @throws IOException
*/
synchronized void closeAndDelete() throws IOException {
rollWriter();
close();
fs.delete(dir);
}
/**
* Shut down the log.
* @throws IOException
*/
synchronized void close() throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("closing log writer");
}
@ -300,7 +332,7 @@ public class HLog implements HConstants {
* @param timestamp
* @throws IOException
*/
public synchronized void append(Text regionName, Text tableName, Text row,
synchronized void append(Text regionName, Text tableName, Text row,
TreeMap<Text, BytesWritable> columns, long timestamp)
throws IOException {
if(closed) {
@ -327,8 +359,8 @@ public class HLog implements HConstants {
}
}
/** How many items have been added to the log? */
public int getNumEntries() {
/** @return How many items have been added to the log */
int getNumEntries() {
return numEntries;
}
@ -340,6 +372,12 @@ public class HLog implements HConstants {
return logSeqNum++;
}
/**
* Obtain a specified number of sequence numbers
*
* @param num - number of sequence numbers to obtain
* @return - array of sequence numbers
*/
synchronized long[] obtainSeqNum(int num) {
long[] results = new long[num];
for (int i = 0; i < num; i++) {
@ -358,7 +396,7 @@ public class HLog implements HConstants {
* @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)}
* @see #completeCacheFlush(Text, Text, long)
*/
public synchronized long startCacheFlush() {
synchronized long startCacheFlush() {
while (insideCacheFlush) {
try {
wait();
@ -370,8 +408,13 @@ public class HLog implements HConstants {
return obtainSeqNum();
}
/** Complete the cache flush */
public synchronized void completeCacheFlush(final Text regionName,
/** Complete the cache flush
* @param regionName
* @param tableName
* @param logSeqId
* @throws IOException
*/
synchronized void completeCacheFlush(final Text regionName,
final Text tableName, final long logSeqId)
throws IOException {
if(closed) {

View File

@ -32,14 +32,20 @@ public class HLogKey implements WritableComparable {
Text row = new Text();
long logSeqNum = 0L;
/** Create an empty key useful when deserializing */
public HLogKey() {
}
/**
* Create the log key!
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
*
* @param regionName - name of region
* @param tablename - name of table
* @param row - row key
* @param logSeqNum - log sequence number
*/
public HLogKey() {
}
public HLogKey(Text regionName, Text tablename, Text row, long logSeqNum) {
this.regionName.set(regionName);
this.tablename.set(tablename);
@ -51,26 +57,25 @@ public class HLogKey implements WritableComparable {
// A bunch of accessors
//////////////////////////////////////////////////////////////////////////////
public Text getRegionName() {
Text getRegionName() {
return regionName;
}
public Text getTablename() {
Text getTablename() {
return tablename;
}
public Text getRow() {
Text getRow() {
return row;
}
public long getLogSeqNum() {
long getLogSeqNum() {
return logSeqNum;
}
@Override
public String toString() {
return getTablename().toString() + " " + getRegionName().toString() + " " +
getRow().toString() + " " + getLogSeqNum();
return tablename + " " + regionName + " " + row + " " + logSeqNum;
}
@Override
@ -90,10 +95,8 @@ public class HLogKey implements WritableComparable {
// Comparable
//////////////////////////////////////////////////////////////////////////////
/**
* When sorting through log entries, we want to group items
* first in the same table, then to the same row, then finally
* ordered by write-order.
/* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
public int compareTo(Object o) {
HLogKey other = (HLogKey) o;
@ -119,6 +122,9 @@ public class HLogKey implements WritableComparable {
// Writable
//////////////////////////////////////////////////////////////////////////////
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
public void write(DataOutput out) throws IOException {
this.regionName.write(out);
this.tablename.write(out);
@ -126,6 +132,9 @@ public class HLogKey implements WritableComparable {
out.writeLong(logSeqNum);
}
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
public void readFields(DataInput in) throws IOException {
this.regionName.readFields(in);
this.tablename.readFields(in);

File diff suppressed because it is too large Load Diff

View File

@ -31,9 +31,13 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
public class HMerge implements HConstants {
private static final Log LOG = LogFactory.getLog(HMerge.class);
private static final Text[] META_COLS = {COL_REGIONINFO};
/**
* A non-instantiable class that has a static method capable of compacting
* a table by merging adjacent regions that have grown too small.
*/
class HMerge implements HConstants {
static final Log LOG = LogFactory.getLog(HMerge.class);
static final Text[] META_COLS = {COL_REGIONINFO};
private HMerge() {} // Not instantiable
@ -93,13 +97,13 @@ public class HMerge implements HConstants {
this.more = true;
this.key = new HStoreKey();
this.info = new HRegionInfo();
this.dir = new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR));
this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
this.basedir = new Path(dir, "merge_" + System.currentTimeMillis());
fs.mkdirs(basedir);
this.hlog = new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf);
}
public void process() throws IOException {
void process() throws IOException {
try {
while(more) {
TreeSet<HRegionInfo> regionsToMerge = next();
@ -110,7 +114,7 @@ public class HMerge implements HConstants {
}
} finally {
try {
hlog.close();
hlog.closeAndDelete();
} catch(IOException e) {
LOG.error(e);
@ -137,12 +141,12 @@ public class HMerge implements HConstants {
for(int i = 0; i < regions.length - 1; i++) {
if(currentRegion == null) {
currentRegion =
new HRegion(dir, hlog, fs, conf, regions[i], null, null);
new HRegion(dir, hlog, fs, conf, regions[i], null);
currentSize = currentRegion.largestHStore();
}
nextRegion =
new HRegion(dir, hlog, fs, conf, regions[i + 1], null, null);
new HRegion(dir, hlog, fs, conf, regions[i + 1], null);
nextSize = nextRegion.largestHStore();
@ -164,10 +168,9 @@ public class HMerge implements HConstants {
i++;
continue;
} else {
}
LOG.info("not merging regions " + currentRegion.getRegionName()
+ " and " + nextRegion.getRegionName());
}
currentRegion.close();
currentRegion = nextRegion;
@ -185,12 +188,13 @@ public class HMerge implements HConstants {
}
/** Instantiated to compact a normal user table */
private static class OnlineMerger extends Merger {
private HClient client;
private HScannerInterface metaScanner;
private HRegionInfo latestRegion;
public OnlineMerger(Configuration conf, FileSystem fs, HClient client,
OnlineMerger(Configuration conf, FileSystem fs, HClient client,
Text tableName) throws IOException {
super(conf, fs, tableName);
@ -231,6 +235,7 @@ public class HMerge implements HConstants {
}
}
@Override
protected TreeSet<HRegionInfo> next() throws IOException {
TreeSet<HRegionInfo> regions = new TreeSet<HRegionInfo>();
if(latestRegion == null) {
@ -246,6 +251,7 @@ public class HMerge implements HConstants {
return regions;
}
@Override
protected void updateMeta(Text oldRegion1, Text oldRegion2,
HRegion newRegion) throws IOException {
Text[] regionsToDelete = {
@ -307,23 +313,24 @@ public class HMerge implements HConstants {
}
}
/** Instantiated to compact the meta region */
private static class OfflineMerger extends Merger {
private Path dir;
private TreeSet<HRegionInfo> metaRegions;
private TreeMap<Text, BytesWritable> results;
public OfflineMerger(Configuration conf, FileSystem fs, Text tableName)
OfflineMerger(Configuration conf, FileSystem fs, Text tableName)
throws IOException {
super(conf, fs, tableName);
this.dir = new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR));
this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
this.metaRegions = new TreeSet<HRegionInfo>();
this.results = new TreeMap<Text, BytesWritable>();
// Scan root region to find all the meta regions
HRegion root = new HRegion(dir, hlog,fs, conf, HGlobals.rootRegionInfo,
null, null);
HRegion root =
new HRegion(dir, hlog,fs, conf, HGlobals.rootRegionInfo, null);
HInternalScannerInterface rootScanner =
root.getScanner(META_COLS, new Text());
@ -350,16 +357,18 @@ public class HMerge implements HConstants {
}
}
@Override
protected TreeSet<HRegionInfo> next() throws IOException {
more = false;
return metaRegions;
}
@Override
protected void updateMeta(Text oldRegion1, Text oldRegion2,
HRegion newRegion) throws IOException {
HRegion root =
new HRegion(dir, hlog, fs, conf, HGlobals.rootRegionInfo, null, null);
new HRegion(dir, hlog, fs, conf, HGlobals.rootRegionInfo, null);
Text[] regionsToDelete = {
oldRegion1,

View File

@ -24,54 +24,148 @@ import java.io.*;
* HRegionServers.
******************************************************************************/
public class HMsg implements Writable {
// Messages sent from master to region server
/** Start serving the specified region */
public static final byte MSG_REGION_OPEN = 1;
/** Stop serving the specified region */
public static final byte MSG_REGION_CLOSE = 2;
public static final byte MSG_REGION_MERGE = 3;
/** Region server is unknown to master. Restart */
public static final byte MSG_CALL_SERVER_STARTUP = 4;
/** Master tells region server to stop */
public static final byte MSG_REGIONSERVER_STOP = 5;
/** Stop serving the specified region and don't report back that it's closed */
public static final byte MSG_REGION_CLOSE_WITHOUT_REPORT = 6;
// Messages sent from the region server to the master
/** region server is now serving the specified region */
public static final byte MSG_REPORT_OPEN = 100;
/** region server is no longer serving the specified region */
public static final byte MSG_REPORT_CLOSE = 101;
public static final byte MSG_REGION_SPLIT = 102;
/** region server is now serving a region produced by a region split */
public static final byte MSG_NEW_REGION = 103;
/** region server is shutting down */
public static final byte MSG_REPORT_EXITING = 104;
byte msg;
HRegionInfo info;
/** Default constructor. Used during deserialization */
public HMsg() {
this.info = new HRegionInfo();
}
/**
* Construct a message with an empty HRegionInfo
*
* @param msg - message code
*/
public HMsg(byte msg) {
this.msg = msg;
this.info = new HRegionInfo();
}
/**
* Construct a message with the specified message code and HRegionInfo
*
* @param msg - message code
* @param info - HRegionInfo
*/
public HMsg(byte msg, HRegionInfo info) {
this.msg = msg;
this.info = info;
}
/**
* Accessor
* @return message code
*/
public byte getMsg() {
return msg;
}
/**
* Accessor
* @return HRegionInfo
*/
public HRegionInfo getRegionInfo() {
return info;
}
@Override
public String toString() {
StringBuilder message = new StringBuilder();
switch(msg) {
case MSG_REGION_OPEN:
message.append("MSG_REGION_OPEN : ");
break;
case MSG_REGION_CLOSE:
message.append("MSG_REGION_CLOSE : ");
break;
case MSG_CALL_SERVER_STARTUP:
message.append("MSG_CALL_SERVER_STARTUP : ");
break;
case MSG_REGIONSERVER_STOP:
message.append("MSG_REGIONSERVER_STOP : ");
break;
case MSG_REGION_CLOSE_WITHOUT_REPORT:
message.append("MSG_REGION_CLOSE_WITHOUT_REPORT : ");
break;
case MSG_REPORT_OPEN:
message.append("MSG_REPORT_OPEN : ");
break;
case MSG_REPORT_CLOSE:
message.append("MSG_REPORT_CLOSE : ");
break;
case MSG_NEW_REGION:
message.append("MSG_NEW_REGION : ");
break;
case MSG_REPORT_EXITING:
message.append("MSG_REPORT_EXITING : ");
break;
default:
message.append("unknown message code (");
message.append(msg);
message.append(") : ");
break;
}
message.append(info == null ? "null" : info.toString());
return message.toString();
}
//////////////////////////////////////////////////////////////////////////////
// Writable
//////////////////////////////////////////////////////////////////////////////
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
public void write(DataOutput out) throws IOException {
out.writeByte(msg);
info.write(out);
}
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
public void readFields(DataInput in) throws IOException {
this.msg = in.readByte();
this.info.readFields(in);

View File

@ -55,7 +55,7 @@ import java.util.*;
* regionName is a unique identifier for this HRegion. (startKey, endKey]
* defines the keyspace for this HRegion.
*/
public class HRegion implements HConstants {
class HRegion implements HConstants {
static String SPLITDIR = "splits";
static String MERGEDIR = "merges";
static String TMPREGION_PREFIX = "tmpregion_";
@ -72,7 +72,7 @@ public class HRegion implements HConstants {
* @param regionName - name of the region to delete
* @throws IOException
*/
public static void deleteRegion(FileSystem fs, Path baseDirectory,
static void deleteRegion(FileSystem fs, Path baseDirectory,
Text regionName) throws IOException {
LOG.debug("Deleting region " + regionName);
fs.delete(HStoreFile.getHRegionDir(baseDirectory, regionName));
@ -83,7 +83,7 @@ public class HRegion implements HConstants {
* HRegionServer. Returns a brand-new active HRegion, also
* running on the current HRegionServer.
*/
public static HRegion closeAndMerge(HRegion srcA, HRegion srcB) throws IOException {
static HRegion closeAndMerge(HRegion srcA, HRegion srcB) throws IOException {
// Make sure that srcA comes first; important for key-ordering during
// write of the merged file.
@ -110,7 +110,7 @@ public class HRegion implements HConstants {
Configuration conf = srcA.getConf();
HTableDescriptor tabledesc = srcA.getTableDesc();
HLog log = srcA.getLog();
Path dir = srcA.getDir();
Path rootDir = srcA.getRootDir();
Text startKey = srcA.getStartKey();
Text endKey = srcB.getEndKey();
@ -222,8 +222,8 @@ public class HRegion implements HConstants {
// Done
HRegion dstRegion = new HRegion(dir, log, fs, conf, newRegionInfo,
newRegionDir, null);
HRegion dstRegion = new HRegion(rootDir, log, fs, conf, newRegionInfo,
newRegionDir);
// Get rid of merges directory
@ -234,59 +234,6 @@ public class HRegion implements HConstants {
return dstRegion;
}
/**
* Internal method to create a new HRegion. Used by createTable and by the
* bootstrap code in the HMaster constructor
*
* @param fs - file system to create region in
* @param dir - base directory
* @param conf - configuration object
* @param desc - table descriptor
* @param regionId - region id
* @param startKey - first key in region
* @param endKey - last key in region
* @return - new HRegion
* @throws IOException
*/
public static HRegion createNewHRegion(FileSystem fs, Path dir,
Configuration conf, HTableDescriptor desc, long regionId, Text startKey,
Text endKey) throws IOException {
HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey);
Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
fs.mkdirs(regionDir);
return new HRegion(dir,
new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf),
fs, conf, info, null, null);
}
/**
* Inserts a new table's meta information into the meta table. Used by
* the HMaster bootstrap code.
*
* @param meta - HRegion to be updated
* @param table - HRegion of new table
*
* @throws IOException
*/
public static void addRegionToMeta(HRegion meta, HRegion table)
throws IOException {
// The row key is the region name
long writeid = meta.startUpdate(table.getRegionName());
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(bytes);
table.getRegionInfo().write(s);
meta.put(writeid, COL_REGIONINFO, new BytesWritable(bytes.toByteArray()));
meta.commit(writeid);
}
//////////////////////////////////////////////////////////////////////////////
// Members
//////////////////////////////////////////////////////////////////////////////
@ -299,7 +246,7 @@ public class HRegion implements HConstants {
HMemcache memcache;
Path dir;
Path rootDir;
HLog log;
FileSystem fs;
Configuration conf;
@ -307,10 +254,10 @@ public class HRegion implements HConstants {
Path regiondir;
static class WriteState {
public volatile boolean writesOngoing;
public volatile boolean writesEnabled;
public volatile boolean closed;
public WriteState() {
volatile boolean writesOngoing;
volatile boolean writesEnabled;
volatile boolean closed;
WriteState() {
this.writesOngoing = true;
this.writesEnabled = true;
this.closed = false;
@ -340,18 +287,22 @@ public class HRegion implements HConstants {
* appropriate log info for this HRegion. If there is a previous log file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
*
* @param rootDir root directory for HBase instance
* @param log HLog where changes should be committed
* @param fs is the filesystem.
* @param dir dir is where the HRegion is stored.
* @param conf is global configuration settings.
* @param regionInfo - HRegionInfo that describes the region
* @param initialFiles If there are initial files (implying that the HRegion
* is new), then read them from the supplied path.
*
* @throws IOException
*/
public HRegion(Path dir, HLog log, FileSystem fs, Configuration conf,
HRegionInfo regionInfo, Path initialFiles, Path oldLogFile)
HRegion(Path rootDir, HLog log, FileSystem fs, Configuration conf,
HRegionInfo regionInfo, Path initialFiles)
throws IOException {
this.dir = dir;
this.rootDir = rootDir;
this.log = log;
this.fs = fs;
this.conf = conf;
@ -366,7 +317,8 @@ public class HRegion implements HConstants {
// Declare the regionName. This is a unique string for the region, used to
// build a unique filename.
this.regiondir = HStoreFile.getHRegionDir(dir, this.regionInfo.regionName);
this.regiondir = HStoreFile.getHRegionDir(rootDir, this.regionInfo.regionName);
Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
// Move prefab HStore files into place (if any)
@ -378,7 +330,7 @@ public class HRegion implements HConstants {
for(Map.Entry<Text, HColumnDescriptor> e :
this.regionInfo.tableDesc.families().entrySet()) {
Text colFamily = HStoreKey.extractFamily(e.getKey());
stores.put(colFamily, new HStore(dir, this.regionInfo.regionName,
stores.put(colFamily, new HStore(rootDir, this.regionInfo.regionName,
e.getValue(), fs, oldLogFile, conf));
}
@ -411,12 +363,12 @@ public class HRegion implements HConstants {
}
/** Returns a HRegionInfo object for this region */
public HRegionInfo getRegionInfo() {
HRegionInfo getRegionInfo() {
return this.regionInfo;
}
/** returns true if region is closed */
public boolean isClosed() {
boolean isClosed() {
boolean closed = false;
synchronized(writestate) {
closed = writestate.closed;
@ -434,7 +386,7 @@ public class HRegion implements HConstants {
* This method could take some time to execute, so don't call it from a
* time-sensitive thread.
*/
public Vector<HStoreFile> close() throws IOException {
Vector<HStoreFile> close() throws IOException {
lock.obtainWriteLock();
try {
boolean shouldClose = false;
@ -483,7 +435,7 @@ public class HRegion implements HConstants {
*
* Returns two brand-new (and open) HRegions
*/
public HRegion[] closeAndSplit(Text midKey, RegionUnavailableListener listener)
HRegion[] closeAndSplit(Text midKey, RegionUnavailableListener listener)
throws IOException {
if(((regionInfo.startKey.getLength() != 0)
@ -572,9 +524,9 @@ public class HRegion implements HConstants {
// Done
HRegion regionA = new HRegion(dir, log, fs, conf, regionAInfo, dirA, null);
HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA);
HRegion regionB = new HRegion(dir, log, fs, conf, regionBInfo, dirB, null);
HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB);
// Cleanup
@ -596,43 +548,43 @@ public class HRegion implements HConstants {
// HRegion accessors
//////////////////////////////////////////////////////////////////////////////
public Text getStartKey() {
Text getStartKey() {
return regionInfo.startKey;
}
public Text getEndKey() {
Text getEndKey() {
return regionInfo.endKey;
}
public long getRegionId() {
long getRegionId() {
return regionInfo.regionId;
}
public Text getRegionName() {
Text getRegionName() {
return regionInfo.regionName;
}
public Path getDir() {
return dir;
Path getRootDir() {
return rootDir;
}
public HTableDescriptor getTableDesc() {
HTableDescriptor getTableDesc() {
return regionInfo.tableDesc;
}
public HLog getLog() {
HLog getLog() {
return log;
}
public Configuration getConf() {
Configuration getConf() {
return conf;
}
public Path getRegionDir() {
Path getRegionDir() {
return regiondir;
}
public FileSystem getFilesystem() {
FileSystem getFilesystem() {
return fs;
}
@ -652,7 +604,7 @@ public class HRegion implements HConstants {
* @param midKey - (return value) midKey of the largest MapFile
* @return - true if the region should be split
*/
public boolean needsSplit(Text midKey) {
boolean needsSplit(Text midKey) {
lock.obtainReadLock();
try {
Text key = new Text();
@ -675,7 +627,7 @@ public class HRegion implements HConstants {
/**
* @return - returns the size of the largest HStore
*/
public long largestHStore() {
long largestHStore() {
long maxsize = 0;
lock.obtainReadLock();
try {
@ -697,7 +649,7 @@ public class HRegion implements HConstants {
/**
* @return true if the region should be compacted.
*/
public boolean needsCompaction() {
boolean needsCompaction() {
boolean needsCompaction = false;
this.lock.obtainReadLock();
try {
@ -726,7 +678,7 @@ public class HRegion implements HConstants {
* HRegion is busy doing something else storage-intensive (like flushing the
* cache). The caller should check back later.
*/
public boolean compactStores() throws IOException {
boolean compactStores() throws IOException {
boolean shouldCompact = false;
lock.obtainReadLock();
try {
@ -766,7 +718,7 @@ public class HRegion implements HConstants {
* Each HRegion is given a periodic chance to flush the cache, which it should
* only take if there have been a lot of uncommitted writes.
*/
public void optionallyFlush() throws IOException {
void optionallyFlush() throws IOException {
if(commitsSinceFlush > maxUnflushedEntries) {
if (LOG.isDebugEnabled()) {
LOG.debug("Flushing cache. Number of commits is: " + commitsSinceFlush);
@ -792,8 +744,7 @@ public class HRegion implements HConstants {
* This method may block for some time, so it should not be called from a
* time-sensitive thread.
*/
public Vector<HStoreFile> flushcache(boolean disableFutureWrites)
throws IOException {
Vector<HStoreFile> flushcache(boolean disableFutureWrites) throws IOException {
boolean shouldFlush = false;
synchronized(writestate) {
if((! writestate.writesOngoing)
@ -934,18 +885,18 @@ public class HRegion implements HConstants {
//////////////////////////////////////////////////////////////////////////////
/** Fetch a single data item. */
public BytesWritable get(Text row, Text column) throws IOException {
BytesWritable get(Text row, Text column) throws IOException {
BytesWritable[] results = get(row, column, Long.MAX_VALUE, 1);
return (results == null)? null: results[0];
}
/** Fetch multiple versions of a single data item */
public BytesWritable[] get(Text row, Text column, int numVersions) throws IOException {
BytesWritable[] get(Text row, Text column, int numVersions) throws IOException {
return get(row, column, Long.MAX_VALUE, numVersions);
}
/** Fetch multiple versions of a single data item, with timestamp. */
public BytesWritable[] get(Text row, Text column, long timestamp, int numVersions)
BytesWritable[] get(Text row, Text column, long timestamp, int numVersions)
throws IOException {
if(writestate.closed) {
@ -969,8 +920,7 @@ public class HRegion implements HConstants {
}
}
// Private implementation: get the value for the indicated HStoreKey
/** Private implementation: get the value for the indicated HStoreKey */
private BytesWritable[] get(HStoreKey key, int numVersions) throws IOException {
lock.obtainReadLock();
@ -1007,7 +957,7 @@ public class HRegion implements HConstants {
* determine which column groups are useful for that row. That would let us
* avoid a bunch of disk activity.
*/
public TreeMap<Text, BytesWritable> getFull(Text row) throws IOException {
TreeMap<Text, BytesWritable> getFull(Text row) throws IOException {
HStoreKey key = new HStoreKey(row, System.currentTimeMillis());
lock.obtainReadLock();
@ -1029,7 +979,7 @@ public class HRegion implements HConstants {
* Return an iterator that scans over the HRegion, returning the indicated
* columns. This Iterator must be closed by the caller.
*/
public HInternalScannerInterface getScanner(Text[] cols, Text firstRow)
HInternalScannerInterface getScanner(Text[] cols, Text firstRow)
throws IOException {
lock.obtainReadLock();
try {
@ -1067,7 +1017,7 @@ public class HRegion implements HConstants {
* @return lockid
* @see #put(long, Text, BytesWritable)
*/
public long startUpdate(Text row) throws IOException {
long startUpdate(Text row) throws IOException {
// We obtain a per-row lock, so other clients will block while one client
// performs an update. The read lock is released by the client calling
// #commit or #abort or if the HRegionServer lease on the lock expires.
@ -1085,8 +1035,7 @@ public class HRegion implements HConstants {
* This method really just tests the input, then calls an internal localput()
* method.
*/
public void put(long lockid, Text targetCol, BytesWritable val)
throws IOException {
void put(long lockid, Text targetCol, BytesWritable val) throws IOException {
if(val.getSize() == DELETE_BYTES.getSize()
&& val.compareTo(DELETE_BYTES) == 0) {
throw new IOException("Cannot insert value: " + val);
@ -1097,11 +1046,11 @@ public class HRegion implements HConstants {
/**
* Delete a value or write a value. This is a just a convenience method for put().
*/
public void delete(long lockid, Text targetCol) throws IOException {
void delete(long lockid, Text targetCol) throws IOException {
localput(lockid, targetCol, DELETE_BYTES);
}
/*
/**
* Private implementation.
*
* localput() is used for both puts and deletes. We just place the values
@ -1148,7 +1097,7 @@ public class HRegion implements HConstants {
* writes associated with the given row-lock. These values have not yet
* been placed in memcache or written to the log.
*/
public void abort(long lockid) throws IOException {
void abort(long lockid) throws IOException {
Text row = getRowFromLock(lockid);
if(row == null) {
throw new LockException("No write lock for lockid " + lockid);
@ -1182,7 +1131,7 @@ public class HRegion implements HConstants {
* @param lockid Lock for row we're to commit.
* @throws IOException
*/
public void commit(final long lockid) throws IOException {
void commit(final long lockid) throws IOException {
// Remove the row from the pendingWrites list so
// that repeated executions won't screw this up.
Text row = getRowFromLock(lockid);
@ -1286,7 +1235,8 @@ public class HRegion implements HConstants {
}
}
/** Release the row lock!
/**
* Release the row lock!
* @param lock Name of row whose lock we are to release
*/
void releaseRowLock(Text row) {
@ -1309,7 +1259,7 @@ public class HRegion implements HConstants {
}
}
/*
/**
* HScanner is an iterator through a bunch of rows in an HRegion.
*/
private static class HScanner implements HInternalScannerInterface {
@ -1321,7 +1271,7 @@ public class HRegion implements HConstants {
/** Create an HScanner with a handle on many HStores. */
@SuppressWarnings("unchecked")
public HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores)
HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores)
throws IOException {
long scanTime = System.currentTimeMillis();
this.scanners = new HInternalScannerInterface[stores.length + 1];
@ -1391,9 +1341,12 @@ public class HRegion implements HConstants {
return multipleMatchers;
}
/**
/* (non-Javadoc)
*
* Grab the next row's worth of values. The HScanner will return the most
* recent data value for each row that is not newer than the target time.
*
* @see org.apache.hadoop.hbase.HInternalScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap)
*/
public boolean next(HStoreKey key, TreeMap<Text, BytesWritable> results)
throws IOException {
@ -1477,7 +1430,9 @@ public class HRegion implements HConstants {
}
}
/** All done with the scanner. */
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HInternalScannerInterface#close()
*/
public void close() {
for(int i = 0; i < scanners.length; i++) {
if(scanners[i] != null) {
@ -1493,16 +1448,17 @@ public class HRegion implements HConstants {
* Convenience method creating new HRegions.
* @param regionId ID to use
* @param tableDesc Descriptor
* @param dir Home directory for the new region.
* @param rootDir Root directory of HBase instance
* @param conf
* @return New META region (ROOT or META).
* @throws IOException
*/
public static HRegion createHRegion(final long regionId,
final HTableDescriptor tableDesc, final Path dir, final Configuration conf)
static HRegion createHRegion(final long regionId,
final HTableDescriptor tableDesc, final Path rootDir,
final Configuration conf)
throws IOException {
return createHRegion(new HRegionInfo(regionId, tableDesc, null, null),
dir, conf, null, null);
rootDir, conf, null);
}
/**
@ -1510,25 +1466,22 @@ public class HRegion implements HConstants {
* bootstrap code in the HMaster constructor
*
* @param info Info for region to create.
* @param dir Home dir for new region
* @param rootDir Root directory for HBase instance
* @param conf
* @param initialFiles InitialFiles to pass new HRegion. Pass null if none.
* @param oldLogFile Old log file to use in region initialization. Pass null
* if none.
* @return new HRegion
*
* @throws IOException
*/
public static HRegion createHRegion(final HRegionInfo info,
final Path dir, final Configuration conf, final Path initialFiles,
final Path oldLogFile)
static HRegion createHRegion(final HRegionInfo info,
final Path rootDir, final Configuration conf, final Path initialFiles)
throws IOException {
Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
Path regionDir = HStoreFile.getHRegionDir(rootDir, info.regionName);
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(regionDir);
return new HRegion(dir,
return new HRegion(rootDir,
new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf),
fs, conf, info, initialFiles, oldLogFile);
fs, conf, info, initialFiles);
}
/**
@ -1541,7 +1494,7 @@ public class HRegion implements HConstants {
*
* @throws IOException
*/
public static void addRegionToMETA(HRegion meta, HRegion r)
static void addRegionToMETA(HRegion meta, HRegion r)
throws IOException {
// The row key is the region name
long writeid = meta.startUpdate(r.getRegionName());
@ -1552,7 +1505,7 @@ public class HRegion implements HConstants {
meta.commit(writeid);
}
public static void addRegionToMETA(final HClient client,
static void addRegionToMETA(final HClient client,
final Text table, final HRegion region,
final HServerAddress serverAddress,
final long startCode)
@ -1578,7 +1531,7 @@ public class HRegion implements HConstants {
* @param regionName Region to remove.
* @throws IOException
*/
public static void removeRegionFromMETA(final HClient client,
static void removeRegionFromMETA(final HClient client,
final Text table, final Text regionName)
throws IOException {
client.openTable(table);

View File

@ -20,11 +20,9 @@ import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparable;
/**
* HRegion information.
@ -32,13 +30,14 @@ import org.apache.hadoop.io.WritableComparable;
* HRegions' table descriptor, etc.
*/
public class HRegionInfo implements WritableComparable {
public Text regionName;
public long regionId;
public Text startKey;
public Text endKey;
public boolean offLine;
public HTableDescriptor tableDesc;
Text regionName;
long regionId;
Text startKey;
Text endKey;
boolean offLine;
HTableDescriptor tableDesc;
/** Default constructor - creates empty object */
public HRegionInfo() {
this.regionId = 0;
this.tableDesc = new HTableDescriptor();
@ -48,14 +47,28 @@ public class HRegionInfo implements WritableComparable {
this.offLine = false;
}
/**
* Construct a HRegionInfo object from byte array
*
* @param serializedBytes
* @throws IOException
*/
public HRegionInfo(final byte [] serializedBytes) throws IOException {
this();
readFields(new DataInputStream(new ByteArrayInputStream(serializedBytes)));
}
public HRegionInfo(long regionId, HTableDescriptor tableDesc,
Text startKey, Text endKey)
throws IllegalArgumentException {
/**
* Construct HRegionInfo with explicit parameters
*
* @param regionId - the regionid
* @param tableDesc - the table descriptor
* @param startKey - first key in region
* @param endKey - end of key range
* @throws IllegalArgumentException
*/
public HRegionInfo(long regionId, HTableDescriptor tableDesc, Text startKey,
Text endKey) throws IllegalArgumentException {
this.regionId = regionId;

View File

@ -26,18 +26,67 @@ import java.io.*;
* a handle to the HRegionInterface.
******************************************************************************/
public interface HRegionInterface extends VersionedProtocol {
public static final long versionID = 1L; // initial version
/** initial version */
public static final long versionID = 1L;
// Get metainfo about an HRegion
/**
* Get metainfo about an HRegion
*
* @param regionName - name of the region
* @return - HRegionInfo object for region
* @throws NotServingRegionException
*/
public HRegionInfo getRegionInfo(final Text regionName) throws NotServingRegionException;
public HRegionInfo getRegionInfo(Text regionName) throws NotServingRegionException;
/**
* Retrieve a single value from the specified region for the specified row
* and column keys
*
* @param regionName - name of region
* @param row - row key
* @param column - column key
* @return - value for that region/row/column
* @throws IOException
*/
public BytesWritable get(final Text regionName, final Text row, final Text column) throws IOException;
// GET methods for an HRegion.
/**
* Get the specified number of versions of the specified row and column
*
* @param regionName - region name
* @param row - row key
* @param column - column key
* @param numVersions - number of versions to return
* @return - array of values
* @throws IOException
*/
public BytesWritable[] get(final Text regionName, final Text row,
final Text column, final int numVersions) throws IOException;
public BytesWritable get(Text regionName, Text row, Text column) throws IOException;
public BytesWritable[] get(Text regionName, Text row, Text column, int numVersions) throws IOException;
public BytesWritable[] get(Text regionName, Text row, Text column, long timestamp, int numVersions) throws IOException;
public LabelledData[] getRow(Text regionName, Text row) throws IOException;
/**
* Get the specified number of versions of the specified row and column with
* the specified timestamp.
*
* @param regionName - region name
* @param row - row key
* @param column - column key
* @param timestamp - timestamp
* @param numVersions - number of versions to return
* @return - array of values
* @throws IOException
*/
public BytesWritable[] get(final Text regionName, final Text row, final Text column,
final long timestamp, final int numVersions) throws IOException;
/**
* Get all the data for the specified row
*
* @param regionName - region name
* @param row - row key
* @return - array of values
* @throws IOException
*/
public KeyedData[] getRow(final Text regionName, final Text row) throws IOException;
//////////////////////////////////////////////////////////////////////////////
// Start an atomic row insertion/update. No changes are committed until the
@ -50,11 +99,80 @@ public interface HRegionInterface extends VersionedProtocol {
// The client can gain extra time with a call to renewLease().
//////////////////////////////////////////////////////////////////////////////
public long startUpdate(Text regionName, long clientid, Text row) throws IOException;
public void put(Text regionName, long clientid, long lockid, Text column, BytesWritable val) throws IOException;
public void delete(Text regionName, long clientid, long lockid, Text column) throws IOException;
public void abort(Text regionName, long clientid, long lockid) throws IOException;
public void commit(Text regionName, long clientid, long lockid) throws IOException;
/**
* Start an atomic row insertion/update. No changes are committed until the
* call to commit() returns. A call to abort() will abandon any updates in progress.
*
* Callers to this method are given a lease for each unique lockid; before the
* lease expires, either abort() or commit() must be called. If it is not
* called, the system will automatically call abort() on the client's behalf.
*
* The client can gain extra time with a call to renewLease().
* Start an atomic row insertion or update
*
* @param regionName - region name
* @param clientid - a unique value to identify the client
* @param row - Name of row to start update against.
* @return Row lockid.
* @throws IOException
*/
public long startUpdate(final Text regionName, final long clientid,
final Text row) throws IOException;
/**
* Change a value for the specified column
*
* @param regionName - region name
* @param clientid - a unique value to identify the client
* @param lockid - lock id returned from startUpdate
* @param column - column whose value is being set
* @param val - new value for column
* @throws IOException
*/
public void put(final Text regionName, final long clientid, final long lockid,
final Text column, final BytesWritable val) throws IOException;
/**
* Delete the value for a column
*
* @param regionName - region name
* @param clientid - a unique value to identify the client
* @param lockid - lock id returned from startUpdate
* @param column - name of column whose value is to be deleted
* @throws IOException
*/
public void delete(final Text regionName, final long clientid, final long lockid,
final Text column) throws IOException;
/**
* Abort a row mutation
*
* @param regionName - region name
* @param clientid - a unique value to identify the client
* @param lockid - lock id returned from startUpdate
* @throws IOException
*/
public void abort(final Text regionName, final long clientid,
final long lockid) throws IOException;
/**
* Finalize a row mutation
*
* @param regionName - region name
* @param clientid - a unique value to identify the client
* @param lockid - lock id returned from startUpdate
* @throws IOException
*/
public void commit(final Text regionName, final long clientid,
final long lockid) throws IOException;
/**
* Renew lease on update
*
* @param lockid - lock id returned from startUpdate
* @param clientid - a unique value to identify the client
* @throws IOException
*/
public void renewLease(long lockid, long clientid) throws IOException;
//////////////////////////////////////////////////////////////////////////////
@ -77,11 +195,10 @@ public interface HRegionInterface extends VersionedProtocol {
* Get the next set of values
*
* @param scannerId - clientId passed to openScanner
* @param key - the next HStoreKey
* @return - true if a value was retrieved
* @return - array of values
* @throws IOException
*/
public LabelledData[] next(long scannerId, HStoreKey key) throws IOException;
public KeyedData[] next(long scannerId) throws IOException;
/**
* Close a scanner

View File

@ -19,9 +19,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -42,9 +43,11 @@ import org.apache.hadoop.util.StringUtils;
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
******************************************************************************/
public class HRegionServer
implements HConstants, HRegionInterface, Runnable {
public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/* (non-Javadoc)
* @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolVersion(java.lang.String, long)
*/
public long getProtocolVersion(final String protocol,
@SuppressWarnings("unused") final long clientVersion)
throws IOException {
@ -57,18 +60,20 @@ public class HRegionServer
static final Log LOG = LogFactory.getLog(HRegionServer.class);
volatile boolean stopRequested;
private Path regionDir;
volatile boolean abortRequested;
private Path rootDir;
HServerInfo info;
Configuration conf;
private Random rand;
// region name -> HRegion
TreeMap<Text, HRegion> onlineRegions = new TreeMap<Text, HRegion>();
SortedMap<Text, HRegion> onlineRegions;
Map<Text, HRegion> retiringRegions = new HashMap<Text, HRegion>();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private Vector<HMsg> outboundMsgs;
int numRetries;
long threadWakeFrequency;
private long msgInterval;
@ -78,20 +83,24 @@ public class HRegionServer
private Thread splitOrCompactCheckerThread;
Integer splitOrCompactLock = Integer.valueOf(0);
/*
/**
* Interface used by the {@link org.apache.hadoop.io.retry} mechanism.
*/
interface UpdateMetaInterface {
/*
public interface UpdateMetaInterface {
/**
* @return True if succeeded.
* @throws IOException
*/
boolean update() throws IOException;
public boolean update() throws IOException;
}
/** Runs periodically to determine if regions need to be compacted or split */
class SplitOrCompactChecker implements Runnable, RegionUnavailableListener {
HClient client = new HClient(conf);
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.RegionUnavailableListener#closing(org.apache.hadoop.io.Text)
*/
public void closing(final Text regionName) {
lock.writeLock().lock();
try {
@ -106,6 +115,9 @@ public class HRegionServer
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.RegionUnavailableListener#closed(org.apache.hadoop.io.Text)
*/
public void closed(final Text regionName) {
lock.writeLock().lock();
try {
@ -118,6 +130,9 @@ public class HRegionServer
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
public void run() {
while(! stopRequested) {
long startTime = System.currentTimeMillis();
@ -180,7 +195,7 @@ public class HRegionServer
// splitting a 'normal' region, and the ROOT table needs to be
// updated if we are splitting a META region.
final Text tableToUpdate =
(oldRegion.find(META_TABLE_NAME.toString()) == 0) ?
region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME) ?
ROOT_TABLE_NAME : META_TABLE_NAME;
if(LOG.isDebugEnabled()) {
LOG.debug("Updating " + tableToUpdate + " with region split info");
@ -188,6 +203,9 @@ public class HRegionServer
// Wrap the update of META region with an org.apache.hadoop.io.retry.
UpdateMetaInterface implementation = new UpdateMetaInterface() {
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionServer.UpdateMetaInterface#update()
*/
public boolean update() throws IOException {
HRegion.removeRegionFromMETA(client, tableToUpdate,
region.getRegionName());
@ -232,7 +250,11 @@ public class HRegionServer
private Flusher cacheFlusher;
private Thread cacheFlusherThread;
Integer cacheFlusherLock = Integer.valueOf(0);
/** Runs periodically to flush the memcache */
class Flusher implements Runnable {
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
public void run() {
while(! stopRequested) {
long startTime = System.currentTimeMillis();
@ -283,21 +305,22 @@ public class HRegionServer
// File paths
private FileSystem fs;
private Path oldlogfile;
// Logging
HLog log;
private LogRoller logRoller;
private Thread logRollerThread;
Integer logRollerLock = Integer.valueOf(0);
/**
* Log rolling Runnable.
*/
/** Runs periodically to determine if the log should be rolled */
class LogRoller implements Runnable {
private int maxLogEntries =
conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
public void run() {
while(! stopRequested) {
synchronized(logRollerLock) {
@ -339,27 +362,42 @@ public class HRegionServer
// Leases
private Leases leases;
/** Start a HRegionServer at the default location */
/**
* Starts a HRegionServer at the default location
* @param conf
* @throws IOException
*/
public HRegionServer(Configuration conf) throws IOException {
this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)),
this(new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)),
new HServerAddress(conf.get(REGIONSERVER_ADDRESS, "localhost:0")),
conf);
}
/** Start a HRegionServer at an indicated location */
public HRegionServer(Path regionDir, HServerAddress address,
/**
* Starts a HRegionServer at the specified location
* @param rootDir
* @param address
* @param conf
* @throws IOException
*/
public HRegionServer(Path rootDir, HServerAddress address,
Configuration conf) throws IOException {
// Basic setup
this.stopRequested = false;
this.regionDir = regionDir;
this.abortRequested = false;
this.rootDir = rootDir;
this.conf = conf;
this.rand = new Random();
this.onlineRegions =
Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>());
this.outboundMsgs = new Vector<HMsg>();
this.scanners =
Collections.synchronizedMap(new TreeMap<Text, HInternalScannerInterface>());
// Config'ed params
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.msgInterval = conf.getLong("hbase.regionserver.msginterval",
15 * 1000);
@ -369,18 +407,16 @@ public class HRegionServer
// Cache flushing
this.cacheFlusher = new Flusher();
this.cacheFlusherThread =
new Thread(cacheFlusher, "HRegionServer.cacheFlusher");
this.cacheFlusherThread = new Thread(cacheFlusher);
// Check regions to see if they need to be split
this.splitOrCompactChecker = new SplitOrCompactChecker();
this.splitOrCompactCheckerThread =
new Thread(splitOrCompactChecker, "HRegionServer.splitOrCompactChecker");
this.splitOrCompactCheckerThread = new Thread(splitOrCompactChecker);
// Process requests from Master
this.toDo = new Vector<HMsg>();
this.toDo = new LinkedList<ToDoEntry>();
this.worker = new Worker();
this.workerThread = new Thread(worker, "HRegionServer.worker");
this.workerThread = new Thread(worker);
try {
// Server to handle client requests
@ -398,20 +434,19 @@ public class HRegionServer
this.info.getServerAddress().getBindAddress() + "_"
+ this.info.getServerAddress().getPort();
Path newlogdir = new Path(regionDir, "log" + "_" + serverName);
this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + serverName);
Path logdir = new Path(rootDir, "log" + "_" + serverName);
// Logging
this.fs = FileSystem.get(conf);
HLog.consolidateOldLog(newlogdir, oldlogfile, fs, conf);
// TODO: Now we have a consolidated log for all regions, sort and
// then split result by region passing the splits as reconstruction
// logs to HRegions on start. Or, rather than consolidate, split logs
// into per region files.
this.log = new HLog(fs, newlogdir, conf);
if(fs.exists(logdir)) {
throw new RegionServerRunningException("region server already running at "
+ this.info.getServerAddress().toString());
}
this.log = new HLog(fs, logdir, conf);
this.logRoller = new LogRoller();
this.logRollerThread = new Thread(logRoller, "HRegionServer.logRoller");
this.logRollerThread = new Thread(logRoller);
// Remote HMaster
@ -420,40 +455,37 @@ public class HRegionServer
new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
conf);
// Threads
this.workerThread.start();
this.cacheFlusherThread.start();
this.splitOrCompactCheckerThread.start();
this.logRollerThread.start();
this.leases = new Leases(conf.getLong("hbase.regionserver.lease.period",
3 * 60 * 1000), threadWakeFrequency);
// Server
this.server.start();
} catch(IOException e) {
this.stopRequested = true;
throw e;
}
LOG.info("HRegionServer started at: " + address.toString());
}
/**
* Set a flag that will cause all the HRegionServer threads to shut down
* Sets a flag that will cause all the HRegionServer threads to shut down
* in an orderly fashion.
*/
public synchronized void stop() {
synchronized void stop() {
stopRequested = true;
notifyAll(); // Wakes run() if it is sleeping
}
/** Wait on all threads to finish.
/**
* Cause the server to exit without closing the regions it is serving, the
* log it is using and without notifying the master.
*
* FOR DEBUGGING ONLY
*/
synchronized void abort() {
abortRequested = true;
stop();
}
/**
* Wait on all threads to finish.
* Presumption is that all closes and stops have already been called.
*/
public void join() {
void join() {
try {
this.workerThread.join();
} catch(InterruptedException iex) {
@ -489,6 +521,33 @@ public class HRegionServer
* load/unload instructions.
*/
public void run() {
// Threads
String threadName = Thread.currentThread().getName();
workerThread.setName(threadName + ".worker");
workerThread.start();
cacheFlusherThread.setName(threadName + ".cacheFlusher");
cacheFlusherThread.start();
splitOrCompactCheckerThread.setName(threadName + ".splitOrCompactChecker");
splitOrCompactCheckerThread.start();
logRollerThread.setName(threadName + ".logRoller");
logRollerThread.start();
leases = new Leases(conf.getLong("hbase.regionserver.lease.period",
3 * 60 * 1000), threadWakeFrequency);
// Server
try {
this.server.start();
LOG.info("HRegionServer started at: " + info.getServerAddress().toString());
} catch(IOException e) {
LOG.error(e);
stopRequested = true;
}
while(! stopRequested) {
long lastMsg = 0;
long waitTime;
@ -545,7 +604,6 @@ public class HRegionServer
if (LOG.isDebugEnabled()) {
LOG.debug("Got call server startup message");
}
toDo.clear();
closeAllRegions();
restart = true;
break;
@ -554,8 +612,6 @@ public class HRegionServer
if (LOG.isDebugEnabled()) {
LOG.debug("Got regionserver stop message");
}
toDo.clear();
closeAllRegions();
stopRequested = true;
break;
@ -563,19 +619,21 @@ public class HRegionServer
if (LOG.isDebugEnabled()) {
LOG.debug("Got default message");
}
toDo.add(msgs[i]);
toDo.addLast(new ToDoEntry(msgs[i]));
}
}
if(restart || stopRequested) {
toDo.clear();
break;
}
if(toDo.size() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("notify on todo");
}
toDo.notifyAll();
}
if(restart || stopRequested) {
break;
}
}
} catch(IOException e) {
@ -596,15 +654,9 @@ public class HRegionServer
}
}
}
try {
HMsg[] exitMsg = { new HMsg(HMsg.MSG_REPORT_EXITING) };
hbaseMaster.regionServerReport(info, exitMsg);
} catch(IOException e) {
LOG.warn(e);
}
try {
LOG.info("stopping server at: " + info.getServerAddress().toString());
this.worker.stop();
this.server.stop();
leases.close();
// Send interrupts to wake up threads if sleeping so they notice shutdown.
@ -620,17 +672,47 @@ public class HRegionServer
this.splitOrCompactCheckerThread.interrupt();
}
this.worker.stop();
this.server.stop();
if(abortRequested) {
try {
log.rollWriter();
closeAllRegions();
log.close();
leases.close();
join();
} catch(IOException e) {
LOG.warn(e);
}
LOG.info("aborting server at: " + info.getServerAddress().toString());
} else {
Vector<HRegion> closedRegions = closeAllRegions();
try {
log.closeAndDelete();
} catch(IOException e) {
LOG.error(e);
}
try {
HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
exitMsg[0] = new HMsg(HMsg.MSG_REPORT_EXITING);
// Tell the master what regions we are/were serving
int i = 1;
for(HRegion region: closedRegions) {
exitMsg[i++] = new HMsg(HMsg.MSG_REPORT_CLOSE, region.getRegionInfo());
}
LOG.info("telling master that region server is shutting down at: "
+info.getServerAddress().toString());
hbaseMaster.regionServerReport(info, exitMsg);
} catch(IOException e) {
LOG.warn(e);
}
LOG.info("stopping server at: " + info.getServerAddress().toString());
}
join();
if(LOG.isDebugEnabled()) {
LOG.debug("main thread exiting");
}
@ -671,89 +753,84 @@ public class HRegionServer
// HMaster-given operations
//////////////////////////////////////////////////////////////////////////////
Vector<HMsg> toDo;
private static class ToDoEntry {
int tries;
HMsg msg;
ToDoEntry(HMsg msg) {
this.tries = 0;
this.msg = msg;
}
}
LinkedList<ToDoEntry> toDo;
private Worker worker;
private Thread workerThread;
/** Thread that performs long running requests from the master */
class Worker implements Runnable {
public void stop() {
void stop() {
synchronized(toDo) {
toDo.notifyAll();
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
public void run() {
for(HMsg msg = null; !stopRequested; ) {
for(ToDoEntry e = null; !stopRequested; ) {
synchronized(toDo) {
while(toDo.size() == 0 && !stopRequested) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Wait on todo");
}
toDo.wait();
toDo.wait(threadWakeFrequency);
if (LOG.isDebugEnabled()) {
LOG.debug("Wake on todo");
}
} catch(InterruptedException e) {
} catch(InterruptedException ex) {
// continue
}
}
if(stopRequested) {
continue;
}
msg = toDo.remove(0);
e = toDo.removeFirst();
}
try {
switch(msg.getMsg()) {
if (LOG.isDebugEnabled()) {
LOG.debug(e.msg.toString());
}
switch(e.msg.getMsg()) {
case HMsg.MSG_REGION_OPEN: // Open a region
if (LOG.isDebugEnabled()) {
LOG.debug("MSG_REGION_OPEN");
}
openRegion(msg.getRegionInfo());
openRegion(e.msg.getRegionInfo());
break;
case HMsg.MSG_REGION_CLOSE: // Close a region
if (LOG.isDebugEnabled()) {
LOG.debug("MSG_REGION_CLOSE");
}
closeRegion(msg.getRegionInfo(), true);
closeRegion(e.msg.getRegionInfo(), true);
break;
case HMsg.MSG_REGION_MERGE: // Merge two regions
if (LOG.isDebugEnabled()) {
LOG.debug("MSG_REGION_MERGE");
}
//TODO ???
throw new IOException("TODO: need to figure out merge");
//break;
case HMsg.MSG_CALL_SERVER_STARTUP: // Close regions, restart
if (LOG.isDebugEnabled()) {
LOG.debug("MSG_CALL_SERVER_STARTUP");
}
closeAllRegions();
continue;
case HMsg.MSG_REGIONSERVER_STOP: // Go away
if (LOG.isDebugEnabled()) {
LOG.debug("MSG_REGIONSERVER_STOP");
}
stopRequested = true;
continue;
case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: // Close a region, don't reply
if (LOG.isDebugEnabled()) {
LOG.debug("MSG_REGION_CLOSE_WITHOUT_REPORT");
}
closeRegion(msg.getRegionInfo(), false);
closeRegion(e.msg.getRegionInfo(), false);
break;
default:
throw new IOException("Impossible state during msg processing. Instruction: " + msg);
throw new AssertionError(
"Impossible state during msg processing. Instruction: "
+ e.msg.toString());
}
} catch(IOException ie) {
if(e.tries < numRetries) {
LOG.warn(ie);
e.tries++;
synchronized(toDo) {
toDo.addLast(e);
}
} else {
LOG.error("unable to process message: " + e.msg.toString(), ie);
}
} catch(IOException e) {
LOG.error(e);
}
}
LOG.info("worker thread exiting");
@ -761,16 +838,19 @@ public class HRegionServer
}
void openRegion(HRegionInfo regionInfo) throws IOException {
HRegion region = onlineRegions.get(regionInfo.regionName);
if(region == null) {
region = new HRegion(rootDir, log, fs, conf, regionInfo, null);
this.lock.writeLock().lock();
try {
HRegion region =
new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile);
this.onlineRegions.put(region.getRegionName(), region);
reportOpen(region);
} finally {
this.lock.writeLock().unlock();
}
}
reportOpen(region);
}
void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
throws IOException {
@ -791,7 +871,7 @@ public class HRegionServer
}
/** Called either when the master tells us to restart or from stop() */
void closeAllRegions() {
Vector<HRegion> closeAllRegions() {
Vector<HRegion> regionsToClose = new Vector<HRegion>();
this.lock.writeLock().lock();
try {
@ -800,8 +880,7 @@ public class HRegionServer
} finally {
this.lock.writeLock().unlock();
}
for(Iterator<HRegion> it = regionsToClose.iterator(); it.hasNext(); ) {
HRegion region = it.next();
for(HRegion region: regionsToClose) {
if (LOG.isDebugEnabled()) {
LOG.debug("closing region " + region.getRegionName());
}
@ -812,59 +891,134 @@ public class HRegionServer
LOG.error("error closing region " + region.getRegionName(), e);
}
}
return regionsToClose;
}
//////////////////////////////////////////////////////////////////////////////
// HRegionInterface
//////////////////////////////////////////////////////////////////////////////
/** Obtain a table descriptor for the given region */
public HRegionInfo getRegionInfo(Text regionName)
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#getRegionInfo(org.apache.hadoop.io.Text)
*/
public HRegionInfo getRegionInfo(final Text regionName)
throws NotServingRegionException {
return getRegion(regionName).getRegionInfo();
}
/** Get the indicated row/column */
public BytesWritable get(Text regionName, Text row, Text column)
throws IOException {
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text)
*/
public BytesWritable get(final Text regionName, final Text row,
final Text column) throws IOException {
return getRegion(regionName).get(row, column);
}
/** Get multiple versions of the indicated row/col */
public BytesWritable[] get(Text regionName, Text row, Text column,
int numVersions)
throws IOException {
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, int)
*/
public BytesWritable[] get(final Text regionName, final Text row,
final Text column, final int numVersions) throws IOException {
return getRegion(regionName).get(row, column, numVersions);
}
/** Get multiple timestamped versions of the indicated row/col */
public BytesWritable[] get(Text regionName, Text row, Text column,
long timestamp, int numVersions)
throws IOException {
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, long, int)
*/
public BytesWritable[] get(final Text regionName, final Text row, final Text column,
final long timestamp, final int numVersions) throws IOException {
return getRegion(regionName).get(row, column, timestamp, numVersions);
}
/** Get all the columns (along with their names) for a given row. */
public LabelledData[] getRow(Text regionName, Text row) throws IOException {
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#getRow(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text)
*/
public KeyedData[] getRow(final Text regionName, final Text row) throws IOException {
HRegion region = getRegion(regionName);
TreeMap<Text, BytesWritable> map = region.getFull(row);
LabelledData result[] = new LabelledData[map.size()];
KeyedData result[] = new KeyedData[map.size()];
int counter = 0;
for (Map.Entry<Text, BytesWritable> es: map.entrySet()) {
result[counter++] = new LabelledData(es.getKey(), es.getValue());
result[counter++] =
new KeyedData(new HStoreKey(row, es.getKey()), es.getValue());
}
return result;
}
/**
* Start an update to the HBase. This also creates a lease associated with
* the caller.
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#next(long)
*/
private static class RegionListener extends LeaseListener {
public KeyedData[] next(final long scannerId)
throws IOException {
Text scannerName = new Text(String.valueOf(scannerId));
HInternalScannerInterface s = scanners.get(scannerName);
if (s == null) {
throw new UnknownScannerException("Name: " + scannerName);
}
leases.renewLease(scannerName, scannerName);
// Collect values to be returned here
ArrayList<KeyedData> values = new ArrayList<KeyedData>();
TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
// Keep getting rows until we find one that has at least one non-deleted column value
HStoreKey key = new HStoreKey();
while (s.next(key, results)) {
for(Map.Entry<Text, BytesWritable> e: results.entrySet()) {
HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
BytesWritable val = e.getValue();
if(val.getSize() == DELETE_BYTES.getSize()
&& val.compareTo(DELETE_BYTES) == 0) {
// Column value is deleted. Don't return it.
if (LOG.isDebugEnabled()) {
LOG.debug("skipping deleted value for key: " + k.toString());
}
continue;
}
if (LOG.isDebugEnabled()) {
LOG.debug("adding value for key: " + k.toString());
}
values.add(new KeyedData(k, val));
}
if(values.size() > 0) {
// Row has something in it. Return the value.
break;
}
// No data for this row, go get another.
results.clear();
}
return values.toArray(new KeyedData[values.size()]);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#startUpdate(org.apache.hadoop.io.Text, long, org.apache.hadoop.io.Text)
*/
public long startUpdate(Text regionName, long clientid, Text row)
throws IOException {
HRegion region = getRegion(regionName);
long lockid = region.startUpdate(row);
this.leases.createLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)),
new RegionListener(region, lockid));
return lockid;
}
/** Create a lease for an update. If it times out, the update is aborted */
private static class RegionListener implements LeaseListener {
private HRegion localRegion;
private long localLockId;
public RegionListener(HRegion region, long lockId) {
RegionListener(HRegion region, long lockId) {
this.localRegion = region;
this.localLockId = lockId;
}
@ -879,51 +1033,9 @@ public class HRegionServer
}
}
public LabelledData[] next(final long scannerId, final HStoreKey key)
throws IOException {
Text scannerName = new Text(String.valueOf(scannerId));
HInternalScannerInterface s = scanners.get(scannerName);
if (s == null) {
throw new UnknownScannerException("Name: " + scannerName + ", key " +
key);
}
leases.renewLease(scannerName, scannerName);
TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
ArrayList<LabelledData> values = new ArrayList<LabelledData>();
// Keep getting rows till we find one that has at least one non-deleted
// column value.
while (s.next(key, results)) {
for(Map.Entry<Text, BytesWritable> e: results.entrySet()) {
BytesWritable val = e.getValue();
if(val.getSize() == DELETE_BYTES.getSize()
&& val.compareTo(DELETE_BYTES) == 0) {
// Column value is deleted. Don't return it.
continue;
}
values.add(new LabelledData(e.getKey(), val));
}
if (values.size() > 0) {
// Row has something in it. Let it out. Else go get another row.
break;
}
// Need to clear results before we go back up and call 'next' again.
results.clear();
}
return values.toArray(new LabelledData[values.size()]);
}
public long startUpdate(Text regionName, long clientid, Text row)
throws IOException {
HRegion region = getRegion(regionName);
long lockid = region.startUpdate(row);
this.leases.createLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)),
new RegionListener(region, lockid));
return lockid;
}
/** Add something to the HBase. */
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#put(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text, org.apache.hadoop.io.BytesWritable)
*/
public void put(Text regionName, long clientid, long lockid, Text column,
BytesWritable val) throws IOException {
HRegion region = getRegion(regionName, true);
@ -932,7 +1044,9 @@ public class HRegionServer
region.put(lockid, column, val);
}
/** Remove a cell from the HBase. */
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#delete(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text)
*/
public void delete(Text regionName, long clientid, long lockid, Text column)
throws IOException {
HRegion region = getRegion(regionName);
@ -941,7 +1055,9 @@ public class HRegionServer
region.delete(lockid, column);
}
/** Abandon the transaction */
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#abort(org.apache.hadoop.io.Text, long, long)
*/
public void abort(Text regionName, long clientid, long lockid)
throws IOException {
HRegion region = getRegion(regionName, true);
@ -950,7 +1066,9 @@ public class HRegionServer
region.abort(lockid);
}
/** Confirm the transaction */
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#commit(org.apache.hadoop.io.Text, long, long)
*/
public void commit(Text regionName, long clientid, long lockid)
throws IOException {
HRegion region = getRegion(regionName, true);
@ -959,13 +1077,16 @@ public class HRegionServer
region.commit(lockid);
}
/** Don't let the client's lease expire just yet... */
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#renewLease(long, long)
*/
public void renewLease(long lockid, long clientid) throws IOException {
leases.renewLease(new Text(String.valueOf(clientid)),
new Text(String.valueOf(lockid)));
}
/** Private utility method for safely obtaining an HRegion handle.
/**
* Private utility method for safely obtaining an HRegion handle.
* @param regionName Name of online {@link HRegion} to return
* @return {@link HRegion} for <code>regionName</code>
* @throws NotServingRegionException
@ -975,7 +1096,8 @@ public class HRegionServer
return getRegion(regionName, false);
}
/** Private utility method for safely obtaining an HRegion handle.
/**
* Private utility method for safely obtaining an HRegion handle.
* @param regionName Name of online {@link HRegion} to return
* @param checkRetiringRegions Set true if we're to check retiring regions
* as well as online regions.
@ -1014,13 +1136,20 @@ public class HRegionServer
Map<Text, HInternalScannerInterface> scanners;
private class ScannerListener extends LeaseListener {
/**
* Instantiated as a scanner lease.
* If the lease times out, the scanner is closed
*/
private class ScannerListener implements LeaseListener {
private Text scannerName;
public ScannerListener(Text scannerName) {
ScannerListener(Text scannerName) {
this.scannerName = scannerName;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.LeaseListener#leaseExpired()
*/
public void leaseExpired() {
LOG.info("Scanner " + scannerName + " lease expired");
HInternalScannerInterface s = null;
@ -1033,7 +1162,9 @@ public class HRegionServer
}
}
/** Start a scanner for a given HRegion. */
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#openScanner(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text[], org.apache.hadoop.io.Text)
*/
public long openScanner(Text regionName, Text[] cols, Text firstRow)
throws IOException {
HRegion r = getRegion(regionName);
@ -1054,6 +1185,9 @@ public class HRegionServer
return scannerId;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.HRegionInterface#close(long)
*/
public void close(long scannerId) throws IOException {
Text scannerName = new Text(String.valueOf(scannerId));
HInternalScannerInterface s = null;
@ -1080,6 +1214,9 @@ public class HRegionServer
System.exit(0);
}
/**
* @param args
*/
public static void main(String [] args) {
if (args.length < 1) {
printUsageAndExit();
@ -1100,7 +1237,7 @@ public class HRegionServer
try {
(new Thread(new HRegionServer(conf))).start();
} catch (Throwable t) {
LOG.error( "Can not start master because "+
LOG.error( "Can not start region server because "+
StringUtils.stringifyException(t) );
System.exit(-1);
}

View File

@ -36,12 +36,12 @@ import org.apache.hadoop.io.Text;
* file system only.
* TODO: Add dumping of HStoreFile content and HLog.
*/
public class HRegiondirReader {
class HRegiondirReader {
private final Configuration conf;
private final Path parentdir;
private static final Pattern REGION_NAME_PARSER =
Pattern.compile(HGlobals.HREGIONDIR_PREFIX +
static final Pattern REGION_NAME_PARSER =
Pattern.compile(HConstants.HREGIONDIR_PREFIX +
"([^_]+)_([^_]*)_([^_]*)");
private static final String USAGE = "Usage: " +
@ -50,7 +50,7 @@ public class HRegiondirReader {
private final List<HRegionInfo> infos;
public HRegiondirReader(final HBaseConfiguration conf,
HRegiondirReader(final HBaseConfiguration conf,
final String parentdirName)
throws IOException {
this.conf = conf;
@ -65,6 +65,9 @@ public class HRegiondirReader {
// Look for regions in parentdir.
Path [] regiondirs =
fs.listPaths(parentdir, new PathFilter() {
/* (non-Javadoc)
* @see org.apache.hadoop.fs.PathFilter#accept(org.apache.hadoop.fs.Path)
*/
public boolean accept(Path path) {
Matcher m = REGION_NAME_PARSER.matcher(path.getName());
return m != null && m.matches();
@ -136,12 +139,11 @@ public class HRegiondirReader {
return families.toArray(new Text [] {});
}
public List <HRegionInfo> getRegions() {
List <HRegionInfo> getRegions() {
return this.infos;
}
public HRegionInfo getRegionInfo(final String tableName)
throws IOException {
HRegionInfo getRegionInfo(final String tableName) {
HRegionInfo result = null;
for(HRegionInfo i: getRegions()) {
if(i.tableDesc.getName().equals(tableName)) {
@ -162,7 +164,7 @@ public class HRegiondirReader {
private void dump(final HRegionInfo info) throws IOException {
HRegion r = new HRegion(this.parentdir, null,
FileSystem.get(this.conf), conf, info, null, null);
FileSystem.get(this.conf), conf, info, null);
Text [] families = info.tableDesc.families().keySet().toArray(new Text [] {});
HInternalScannerInterface scanner = r.getScanner(families, new Text());
HStoreKey key = new HStoreKey();
@ -201,6 +203,10 @@ public class HRegiondirReader {
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
if (args.length < 1) {
System.err.println(USAGE);

View File

@ -42,7 +42,7 @@ import org.apache.hadoop.io.Text;
* Locking and transactions are handled at a higher level. This API should not
* be called directly by any writer, but rather by an HRegion manager.
******************************************************************************/
public class HStore {
class HStore implements HConstants {
private static final Log LOG = LogFactory.getLog(HStore.class);
static final String COMPACTION_DIR = "compaction.tmp";
@ -64,7 +64,7 @@ public class HStore {
Integer compactLock = 0;
Integer flushLock = 0;
private final HLocking lock = new HLocking();
final HLocking lock = new HLocking();
TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>();
TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>();
@ -98,8 +98,16 @@ public class HStore {
*
* <p>It's assumed that after this constructor returns, the reconstructionLog
* file will be deleted (by whoever has instantiated the HStore).
*
* @param dir - log file directory
* @param regionName - name of region
* @param family - name of column family
* @param fs - file system object
* @param reconstructionLog - existing log file to apply if any
* @param conf - configuration object
* @throws IOException
*/
public HStore(Path dir, Text regionName, HColumnDescriptor family,
HStore(Path dir, Text regionName, HColumnDescriptor family,
FileSystem fs, Path reconstructionLog, Configuration conf)
throws IOException {
this.dir = dir;
@ -200,18 +208,25 @@ public class HStore {
// Check this edit is for me. Also, guard against writing
// METACOLUMN info such as HBASE::CACHEFLUSH entries
Text column = val.getColumn();
if (!key.getRegionName().equals(this.regionName) ||
column.equals(HLog.METACOLUMN) ||
HStoreKey.extractFamily(column).equals(this.familyName)) {
if (column.equals(HLog.METACOLUMN)
|| !key.getRegionName().equals(this.regionName)
|| !HStoreKey.extractFamily(column).equals(this.familyName)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Passing on edit " + key.getRegionName() + ", "
+ key.getRegionName() + ", " + column.toString() + ": "
+ new String(val.getVal().get()));
+ column.toString() + ": " + new String(val.getVal().get())
+ ", my region: " + this.regionName + ", my column: "
+ this.familyName);
}
continue;
}
reconstructedCache.put(new HStoreKey(key.getRow(), val.getColumn(),
val.getTimestamp()), val.getVal());
byte[] bytes = new byte[val.getVal().getSize()];
System.arraycopy(val.getVal().get(), 0, bytes, 0, bytes.length);
HStoreKey k = new HStoreKey(key.getRow(), column,val.getTimestamp());
if (LOG.isDebugEnabled()) {
LOG.debug("Applying edit " + k.toString() + "="
+ new String(bytes, UTF8_ENCODING));
}
reconstructedCache.put(k, new BytesWritable(bytes));
}
} finally {
login.close();
@ -248,8 +263,11 @@ public class HStore {
LOG.info("HStore online for " + this.regionName + "/" + this.familyName);
}
/** Turn off all the MapFile readers */
public void close() throws IOException {
/**
* Turn off all the MapFile readers
* @throws IOException
*/
void close() throws IOException {
LOG.info("closing HStore for " + this.regionName + "/" + this.familyName);
this.lock.obtainWriteLock();
try {
@ -279,8 +297,13 @@ public class HStore {
* Also, we are not expecting any reads of this MapFile just yet.
*
* Return the entire list of HStoreFiles currently used by the HStore.
*
* @param inputCache - memcache to flush
* @param logCacheFlushId - flush sequence number
* @return - Vector of all the HStoreFiles in use
* @throws IOException
*/
public Vector<HStoreFile> flushCache(TreeMap<HStoreKey, BytesWritable> inputCache,
Vector<HStoreFile> flushCache(TreeMap<HStoreKey, BytesWritable> inputCache,
long logCacheFlushId) throws IOException {
return flushCacheHelper(inputCache, logCacheFlushId, true);
@ -351,7 +374,10 @@ public class HStore {
}
}
public Vector<HStoreFile> getAllMapFiles() {
/**
* @return - vector of all the HStore files in use
*/
Vector<HStoreFile> getAllMapFiles() {
this.lock.obtainReadLock();
try {
return new Vector<HStoreFile>(mapFiles.values());
@ -380,8 +406,10 @@ public class HStore {
*
* We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period.
*
* @throws IOException
*/
public void compact() throws IOException {
void compact() throws IOException {
compactHelper(false);
}
@ -766,7 +794,7 @@ public class HStore {
*
* The returned object should map column names to byte arrays (byte[]).
*/
public void getFull(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException {
void getFull(HStoreKey key, TreeMap<Text, BytesWritable> results) throws IOException {
this.lock.obtainReadLock();
try {
MapFile.Reader[] maparray
@ -806,7 +834,7 @@ public class HStore {
*
* If 'numVersions' is negative, the method returns all available versions.
*/
public BytesWritable[] get(HStoreKey key, int numVersions) throws IOException {
BytesWritable[] get(HStoreKey key, int numVersions) throws IOException {
if(numVersions <= 0) {
throw new IllegalArgumentException("Number of versions must be > 0");
}
@ -833,24 +861,19 @@ public class HStore {
if(numVersions > 0 && (results.size() >= numVersions)) {
break;
} else {
}
results.add(readval);
readval = new BytesWritable();
}
}
}
}
if(results.size() >= numVersions) {
break;
}
}
if(results.size() == 0) {
return null;
} else {
return results.toArray(new BytesWritable[results.size()]);
}
return results.size() == 0 ?
null :results.toArray(new BytesWritable[results.size()]);
} finally {
this.lock.releaseReadLock();
@ -863,7 +886,7 @@ public class HStore {
* @param midKey - the middle key for the largest MapFile
* @return - size of the largest MapFile
*/
public long getLargestFileSize(Text midKey) {
long getLargestFileSize(Text midKey) {
long maxSize = 0L;
if (this.mapFiles.size() <= 0) {
return maxSize;
@ -904,7 +927,7 @@ public class HStore {
/**
* @return Returns the number of map files currently in use
*/
public int getNMaps() {
int getNMaps() {
this.lock.obtainReadLock();
try {
return maps.size();
@ -933,7 +956,7 @@ public class HStore {
* Return a set of MapFile.Readers, one for each HStore file.
* These should be closed after the user is done with them.
*/
public HInternalScannerInterface getScanner(long timestamp, Text targetCols[],
HInternalScannerInterface getScanner(long timestamp, Text targetCols[],
Text firstRow) throws IOException {
return new HStoreScanner(timestamp, targetCols, firstRow);
@ -947,7 +970,7 @@ public class HStore {
class HStoreScanner extends HAbstractScanner {
private MapFile.Reader[] readers;
public HStoreScanner(long timestamp, Text[] targetCols, Text firstRow)
HStoreScanner(long timestamp, Text[] targetCols, Text firstRow)
throws IOException {
super(timestamp, targetCols);
@ -1000,6 +1023,7 @@ public class HStore {
* @param firstRow - seek to this row
* @return - true if this is the first row or if the row was not found
*/
@Override
boolean findFirstRow(int i, Text firstRow) throws IOException {
HStoreKey firstKey
= (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), vals[i]);
@ -1023,6 +1047,7 @@ public class HStore {
* @param i - which reader to fetch next value from
* @return - true if there is more data available
*/
@Override
boolean getNext(int i) throws IOException {
vals[i] = new BytesWritable();
if(! readers[i].next(keys[i], vals[i])) {
@ -1033,6 +1058,7 @@ public class HStore {
}
/** Close down the indicated reader. */
@Override
void closeSubScanner(int i) {
try {
if(readers[i] != null) {
@ -1052,6 +1078,7 @@ public class HStore {
}
/** Shut it down! */
@Override
public void close() {
if(! scannerClosed) {
try {

View File

@ -15,8 +15,6 @@
*/
package org.apache.hadoop.hbase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.*;
import java.io.*;
@ -25,9 +23,15 @@ import java.io.*;
* A Key for a stored row
******************************************************************************/
public class HStoreKey implements WritableComparable {
private final Log LOG = LogFactory.getLog(this.getClass().getName());
public static Text extractFamily(Text col) throws IOException {
/**
* Extracts the column family name from a column
* For example, returns 'info' if the specified column was 'info:server'
*
* @param col - name of column
* @return - column family name
*/
public static Text extractFamily(Text col) {
String column = col.toString();
int colpos = column.indexOf(":");
if(colpos < 0) {
@ -40,56 +44,126 @@ public class HStoreKey implements WritableComparable {
Text column;
long timestamp;
/** Default constructor used in conjunction with Writable interface */
public HStoreKey() {
this.row = new Text();
this.column = new Text();
this.timestamp = Long.MAX_VALUE;
}
/**
* Create an HStoreKey specifying only the row
* The column defaults to the empty string and the time stamp defaults to
* Long.MAX_VALUE
*
* @param row - row key
*/
public HStoreKey(Text row) {
this.row = new Text(row);
this.column = new Text();
this.timestamp = Long.MAX_VALUE;
}
/**
* Create an HStoreKey specifying the row and timestamp
* The column name defaults to the empty string
*
* @param row - row key
* @param timestamp - timestamp value
*/
public HStoreKey(Text row, long timestamp) {
this.row = new Text(row);
this.column = new Text();
this.timestamp = timestamp;
}
/**
* Create an HStoreKey specifying the row and column names
* The timestamp defaults to Long.MAX_VALUE
*
* @param row - row key
* @param column - column key
*/
public HStoreKey(Text row, Text column) {
this.row = new Text(row);
this.column = new Text(column);
this.timestamp = Long.MAX_VALUE;
}
/**
* Create an HStoreKey specifying all the fields
*
* @param row - row key
* @param column - column key
* @param timestamp - timestamp value
*/
public HStoreKey(Text row, Text column, long timestamp) {
this.row = new Text(row);
this.column = new Text(column);
this.timestamp = timestamp;
}
/**
* Construct a new HStoreKey from another
*
* @param other - the source key
*/
public HStoreKey(HStoreKey other) {
this();
this.row.set(other.row);
this.column.set(other.column);
this.timestamp = other.timestamp;
}
/**
* Change the value of the row key
*
* @param newrow - new row key value
*/
public void setRow(Text newrow) {
this.row.set(newrow);
}
/**
* Change the value of the column key
*
* @param newcol - new column key value
*/
public void setColumn(Text newcol) {
this.column.set(newcol);
}
/**
* Change the value of the timestamp field
*
* @param timestamp - new timestamp value
*/
public void setVersion(long timestamp) {
this.timestamp = timestamp;
}
/**
* Set the value of this HStoreKey from the supplied key
*
* @param k - key value to copy
*/
public void set(HStoreKey k) {
this.row = k.getRow();
this.column = k.getColumn();
this.timestamp = k.getTimestamp();
}
/** @return value of row key */
public Text getRow() {
return row;
}
/** @return value of column key */
public Text getColumn() {
return column;
}
/** @return value of timestamp */
public long getTimestamp() {
return timestamp;
}
@ -125,18 +199,12 @@ public class HStoreKey implements WritableComparable {
* @see #matchesWithoutColumn(HStoreKey)
*/
public boolean matchesRowFamily(HStoreKey other) {
boolean status = false;
try {
status = this.row.compareTo(other.row) == 0
return this.row.compareTo(other.row) == 0
&& extractFamily(this.column).compareTo(
extractFamily(other.getColumn())) == 0;
} catch(IOException e) {
LOG.error(e);
}
return status;
}
@Override
public String toString() {
return row.toString() + "/" + column.toString() + "/" + timestamp;
}
@ -158,6 +226,9 @@ public class HStoreKey implements WritableComparable {
// Comparable
//////////////////////////////////////////////////////////////////////////////
/* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
public int compareTo(Object o) {
HStoreKey other = (HStoreKey) o;
int result = this.row.compareTo(other.row);
@ -180,12 +251,18 @@ public class HStoreKey implements WritableComparable {
// Writable
//////////////////////////////////////////////////////////////////////////////
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
public void write(DataOutput out) throws IOException {
row.write(out);
column.write(out);
out.writeLong(timestamp);
}
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
public void readFields(DataInput in) throws IOException {
row.readFields(in);
column.readFields(in);

View File

@ -44,6 +44,7 @@ public class HTableDescriptor implements WritableComparable {
private static final Pattern LEGAL_TABLE_NAME =
Pattern.compile("[\\w-]+");
/** Constructs an empty object */
public HTableDescriptor() {
this.name = new Text();
this.families = new TreeMap<Text, HColumnDescriptor>();
@ -66,6 +67,7 @@ public class HTableDescriptor implements WritableComparable {
this.families = new TreeMap<Text, HColumnDescriptor>();
}
/** @return name of table */
public Text getName() {
return name;
}
@ -78,7 +80,12 @@ public class HTableDescriptor implements WritableComparable {
families.put(family.getName(), family);
}
/** Do we contain a given column? */
/**
* Checks to see if this table contains the given column family
*
* @param family - family name
* @return true if the table contains the specified family name
*/
public boolean hasFamily(Text family) {
return families.containsKey(family);
}
@ -87,6 +94,8 @@ public class HTableDescriptor implements WritableComparable {
*
* TODO: What is this used for? Seems Dangerous to let people play with our
* private members.
*
* @return map of family members
*/
public TreeMap<Text, HColumnDescriptor> families() {
return families;

View File

@ -20,26 +20,34 @@ import java.io.*;
/*******************************************************************************
* LabelledData is just a data pair.
* It includes a Text label and some associated data.
* It includes an HStoreKey and some associated data.
******************************************************************************/
public class LabelledData implements Writable {
Text label;
public class KeyedData implements Writable {
HStoreKey key;
BytesWritable data;
public LabelledData() {
this.label = new Text();
/** Default constructor. Used by Writable interface */
public KeyedData() {
this.key = new HStoreKey();
this.data = new BytesWritable();
}
public LabelledData(Text label, BytesWritable data) {
this.label = new Text(label);
/**
* Create a KeyedData object specifying the parts
* @param key - HStoreKey
* @param data - BytesWritable
*/
public KeyedData(HStoreKey key, BytesWritable data) {
this.key = key;
this.data = data;
}
public Text getLabel() {
return label;
/** @return - returns the key */
public HStoreKey getKey() {
return key;
}
/** @return - returns the value */
public BytesWritable getData() {
return data;
}
@ -48,13 +56,19 @@ public class LabelledData implements Writable {
// Writable
//////////////////////////////////////////////////////////////////////////////
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
public void write(DataOutput out) throws IOException {
label.write(out);
key.write(out);
data.write(out);
}
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
public void readFields(DataInput in) throws IOException {
label.readFields(in);
key.readFields(in);
data.readFields(in);
}
}

View File

@ -17,25 +17,14 @@ package org.apache.hadoop.hbase;
/*******************************************************************************
* LeaseListener is a small class meant to be overridden by users of the Leases
* LeaseListener is an interface meant to be implemented by users of the Leases
* class.
*
* It receives events from the Leases class about the status of its accompanying
* lease. Users of the Leases class can use a LeaseListener subclass to, for
* example, clean up resources after a lease has expired.
******************************************************************************/
public abstract class LeaseListener {
public LeaseListener() {
}
public void leaseRenewed() {
}
/** When the user cancels a lease, this method is called. */
public void leaseCancelled() {
}
public interface LeaseListener {
/** When a lease expires, this method is called. */
public void leaseExpired() {
}
public void leaseExpired();
}

View File

@ -48,7 +48,12 @@ public class Leases {
TreeSet<Lease> sortedLeases = new TreeSet<Lease>();
boolean running = true;
/** Indicate the length of the lease, in milliseconds */
/**
* Creates a lease
*
* @param leasePeriod - length of time (milliseconds) that the lease is valid
* @param leaseCheckFrequency - how often the lease should be checked (milliseconds)
*/
public Leases(long leasePeriod, long leaseCheckFrequency) {
this.leasePeriod = leasePeriod;
this.leaseCheckFrequency = leaseCheckFrequency;
@ -59,7 +64,7 @@ public class Leases {
}
/**
* Shut down this Leases outfit. All pending leases will be destroyed,
* Shut down this Leases instance. All pending leases will be destroyed,
* without any cancellation calls.
*/
public void close() {
@ -89,15 +94,21 @@ public class Leases {
}
/** A client obtains a lease... */
/**
* Obtain a lease
*
* @param holderId - name of lease holder
* @param resourceId - resource being leased
* @param listener - listener that will process lease expirations
*/
public void createLease(Text holderId, Text resourceId,
final LeaseListener listener)
throws IOException {
final LeaseListener listener) {
synchronized(leases) {
synchronized(sortedLeases) {
Lease lease = new Lease(holderId, resourceId, listener);
Text leaseId = lease.getLeaseId();
if(leases.get(leaseId) != null) {
throw new IOException("Impossible state for createLease(): Lease " +
throw new AssertionError("Impossible state for createLease(): Lease " +
getLeaseName(holderId, resourceId) + " is still held.");
}
leases.put(leaseId, lease);
@ -110,6 +121,13 @@ public class Leases {
}
/** A client renews a lease... */
/**
* Renew a lease
*
* @param holderId - name of lease holder
* @param resourceId - resource being leased
* @throws IOException
*/
public void renewLease(Text holderId, Text resourceId) throws IOException {
synchronized(leases) {
synchronized(sortedLeases) {
@ -132,8 +150,12 @@ public class Leases {
}
}
/** A client explicitly cancels a lease.
* The lease-cleanup method is not called.
/**
* Client explicitly cancels a lease.
*
* @param holderId - name of lease holder
* @param resourceId - resource being leased
* @throws IOException
*/
public void cancelLease(Text holderId, Text resourceId) throws IOException {
synchronized(leases) {
@ -152,7 +174,6 @@ public class Leases {
sortedLeases.remove(lease);
leases.remove(leaseId);
lease.cancelled();
}
}
if (LOG.isDebugEnabled()) {
@ -197,37 +218,33 @@ public class Leases {
}
/** This class tracks a single Lease. */
class Lease implements Comparable {
@SuppressWarnings("unchecked")
private class Lease implements Comparable {
Text holderId;
Text resourceId;
LeaseListener listener;
long lastUpdate;
public Lease(Text holderId, Text resourceId, LeaseListener listener) {
Lease(Text holderId, Text resourceId, LeaseListener listener) {
this.holderId = holderId;
this.resourceId = resourceId;
this.listener = listener;
renew();
}
public Text getLeaseId() {
Text getLeaseId() {
return createLeaseId(holderId, resourceId);
}
public boolean shouldExpire() {
boolean shouldExpire() {
return (System.currentTimeMillis() - lastUpdate > leasePeriod);
}
public void renew() {
void renew() {
this.lastUpdate = System.currentTimeMillis();
listener.leaseRenewed();
}
public void cancelled() {
listener.leaseCancelled();
}
public void expired() {
void expired() {
if (LOG.isDebugEnabled()) {
LOG.debug("Lease expired " + getLeaseName(this.holderId,
this.resourceId));

View File

@ -0,0 +1,40 @@
/**
* Copyright 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
/**
* Thrown if the region server log directory exists (which indicates another
* region server is running at the same address)
*/
public class RegionServerRunningException extends IOException {
private static final long serialVersionUID = 1L << 31 - 1L;
/** Default Constructor */
public RegionServerRunningException() {
super();
}
/**
* Constructs the exception and supplies a string as the message
* @param s - message
*/
public RegionServerRunningException(String s) {
super(s);
}
}

View File

@ -19,6 +19,7 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -36,6 +37,7 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
protected FileSystem fs;
protected Path dir;
@Override
public void setUp() throws Exception {
super.setUp();
rand = new Random();
@ -87,23 +89,19 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
// Now create the root and meta regions and insert the data regions
// created above into the meta
HRegion root = HRegion.createNewHRegion(fs, dir, conf,
HGlobals.rootTableDesc, 0L, null, null);
HRegion meta = HRegion.createNewHRegion(fs, dir, conf,
HGlobals.metaTableDesc, 1L, null, null);
HRegion root = createNewHRegion(fs, dir, conf, HGlobals.rootTableDesc, 0L, null, null);
HRegion meta = createNewHRegion(fs, dir, conf, HGlobals.metaTableDesc, 1L, null, null);
HRegion.addRegionToMeta(root, meta);
HRegion.addRegionToMETA(root, meta);
for(int i = 0; i < regions.length; i++) {
HRegion.addRegionToMeta(meta, regions[i]);
HRegion.addRegionToMETA(meta, regions[i]);
}
root.close();
root.getLog().close();
fs.delete(new Path(root.getRegionDir(), HConstants.HREGION_LOGDIR_NAME));
root.getLog().closeAndDelete();
meta.close();
meta.getLog().close();
fs.delete(new Path(meta.getRegionDir(), HConstants.HREGION_LOGDIR_NAME));
meta.getLog().closeAndDelete();
} catch(Throwable t) {
t.printStackTrace();
@ -111,6 +109,7 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
}
}
@Override
public void tearDown() throws Exception {
super.tearDown();
dfsCluster.shutdown();
@ -118,8 +117,7 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
private HRegion createAregion(Text startKey, Text endKey, int firstRow, int nrows)
throws IOException {
HRegion region = HRegion.createNewHRegion(fs, dir, conf, desc,
rand.nextLong(), startKey, endKey);
HRegion region = createNewHRegion(fs, dir, conf, desc, rand.nextLong(), startKey, endKey);
System.out.println("created region " + region.getRegionName());
@ -138,9 +136,22 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
region.log.rollWriter();
region.compactStores();
region.close();
region.getLog().close();
fs.delete(new Path(region.getRegionDir(), HConstants.HREGION_LOGDIR_NAME));
region.getLog().closeAndDelete();
region.getRegionInfo().offLine = true;
return region;
}
private HRegion createNewHRegion(FileSystem fs, Path dir,
Configuration conf, HTableDescriptor desc, long regionId, Text startKey,
Text endKey) throws IOException {
HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey);
Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
fs.mkdirs(regionDir);
return new HRegion(dir,
new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf),
fs, conf, info, null);
}
}

View File

@ -22,11 +22,17 @@ package org.apache.hadoop.hbase;
public abstract class HBaseClusterTestCase extends HBaseTestCase {
protected MiniHBaseCluster cluster;
final boolean miniHdfs;
int regionServers;
protected HBaseClusterTestCase() {
this(true);
}
protected HBaseClusterTestCase(int regionServers) {
this(true);
this.regionServers = regionServers;
}
protected HBaseClusterTestCase(String name) {
this(name, true);
}
@ -34,18 +40,23 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
protected HBaseClusterTestCase(final boolean miniHdfs) {
super();
this.miniHdfs = miniHdfs;
this.regionServers = 1;
}
protected HBaseClusterTestCase(String name, final boolean miniHdfs) {
super(name);
this.miniHdfs = miniHdfs;
this.regionServers = 1;
}
@Override
public void setUp() throws Exception {
super.setUp();
this.cluster = new MiniHBaseCluster(this.conf, 1, this.miniHdfs);
this.cluster =
new MiniHBaseCluster(this.conf, this.regionServers, this.miniHdfs);
}
@Override
public void tearDown() throws Exception {
super.tearDown();
if (this.cluster != null) {

View File

@ -38,7 +38,7 @@ public class MiniHBaseCluster implements HConstants {
private HMaster master;
private Thread masterThread;
private HRegionServer[] regionServers;
private Thread[] regionThreads;
Thread[] regionThreads;
/**
* Starts a MiniHBaseCluster on top of a new MiniDFSCluster
@ -94,7 +94,7 @@ public class MiniHBaseCluster implements HConstants {
try {
try {
this.fs = FileSystem.get(conf);
this.parentdir = new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR));
this.parentdir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
fs.mkdirs(parentdir);
} catch(Throwable e) {
@ -146,13 +146,39 @@ public class MiniHBaseCluster implements HConstants {
}
/**
* Returns the rpc address actually used by the master server, because the
* supplied port is not necessarily the actual port used.
* @return Returns the rpc address actually used by the master server, because
* the supplied port is not necessarily the actual port used.
*/
public HServerAddress getHMasterAddress() {
return master.getMasterAddress();
}
/**
* Shut down the specified region server cleanly
*
* @param serverNumber
*/
public void stopRegionServer(int serverNumber) {
if(serverNumber >= regionServers.length) {
throw new ArrayIndexOutOfBoundsException(
"serverNumber > number of region servers");
}
this.regionServers[serverNumber].stop();
}
/**
* Cause a region server to exit without cleaning up
*
* @param serverNumber
*/
public void abortRegionServer(int serverNumber) {
if(serverNumber >= regionServers.length) {
throw new ArrayIndexOutOfBoundsException(
"serverNumber > number of region servers");
}
this.regionServers[serverNumber].abort();
}
/** Shut down the HBase cluster */
public void shutdown() {
LOG.info("Shutting down the HBase Cluster");

View File

@ -0,0 +1,59 @@
/**
* Copyright 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
/** Tests region server failover when a region server exits cleanly */
public class TestCleanRegionServerExit extends HBaseClusterTestCase {
private HClient client;
/** Constructor */
public TestCleanRegionServerExit() {
super(2); // Start two region servers
client = new HClient(conf);
}
/** The test */
public void testCleanRegionServerExit() {
try {
// When the META table can be opened, the region servers are running
client.openTable(HConstants.META_TABLE_NAME);
} catch(IOException e) {
e.printStackTrace();
fail();
}
// Shut down a region server cleanly
this.cluster.stopRegionServer(0);
try {
this.cluster.regionThreads[0].join();
} catch(InterruptedException e) {
}
try {
Thread.sleep(60000); // Wait for cluster to adjust
} catch(InterruptedException e) {
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
/** Test case for get */
public class TestGet extends HBaseTestCase {
private static final Text CONTENTS = new Text("contents:");
private static final Text ROW_KEY = new Text(HGlobals.rootRegionInfo.regionName);
@ -59,6 +60,10 @@ public class TestGet extends HBaseTestCase {
}
}
/**
* Constructor
* @throws IOException
*/
public void testGet() throws IOException {
MiniDFSCluster cluster = null;
@ -81,7 +86,7 @@ public class TestGet extends HBaseTestCase {
HLog log = new HLog(fs, new Path(regionDir, "log"), conf);
HRegion r = new HRegion(dir, log, fs, conf, info, null, null);
HRegion r = new HRegion(dir, log, fs, conf, info, null);
// Write information to the table
@ -126,7 +131,7 @@ public class TestGet extends HBaseTestCase {
r.close();
log.rollWriter();
r = new HRegion(dir, log, fs, conf, info, null, null);
r = new HRegion(dir, log, fs, conf, info, null);
// Read it back
@ -156,7 +161,7 @@ public class TestGet extends HBaseTestCase {
r.close();
log.rollWriter();
r = new HRegion(dir, log, fs, conf, info, null, null);
r = new HRegion(dir, log, fs, conf, info, null);
// Read it back
@ -165,6 +170,7 @@ public class TestGet extends HBaseTestCase {
// Close region once and for all
r.close();
log.closeAndDelete();
} catch(IOException e) {
e.printStackTrace();

View File

@ -15,6 +15,7 @@
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.fs.FileSystem;
@ -24,13 +25,17 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.Reader;
/** JUnit test case for HLog */
public class TestHLog extends HBaseTestCase implements HConstants {
protected void setUp() throws Exception {
@Override
public void setUp() throws Exception {
super.setUp();
}
public void testAppend() throws Exception {
/** The test */
public void testAppend() {
try {
Path dir = getUnitTestdir(getName());
FileSystem fs = FileSystem.get(this.conf);
if (fs.exists(dir)) {
@ -63,18 +68,18 @@ public class TestHLog extends HBaseTestCase implements HConstants {
HLogEdit val = new HLogEdit();
for (int i = 0; i < COL_COUNT; i++) {
reader.next(key, val);
assertEquals(key.getRegionName(), regionName);
assertEquals(key.getTablename(), tableName);
assertEquals(key.getRow(), row);
assertEquals(val.getVal().get()[0], (byte)(i + '0'));
assertEquals(regionName, key.getRegionName());
assertEquals(tableName, key.getTablename());
assertEquals(row, key.getRow());
assertEquals((byte)(i + '0'), val.getVal().get()[0]);
System.out.println(key + " " + val);
}
while (reader.next(key, val)) {
// Assert only one more row... the meta flushed row.
assertEquals(key.getRegionName(), regionName);
assertEquals(key.getTablename(), tableName);
assertEquals(key.getRow(), HLog.METAROW);
assertEquals(val.getColumn(), HLog.METACOLUMN);
assertEquals(regionName, key.getRegionName());
assertEquals(tableName, key.getTablename());
assertEquals(HLog.METAROW, key.getRow());
assertEquals(HLog.METACOLUMN, val.getColumn());
assertEquals(0, val.getVal().compareTo(COMPLETE_CACHEFLUSH));
System.out.println(key + " " + val);
}
@ -89,9 +94,14 @@ public class TestHLog extends HBaseTestCase implements HConstants {
fs.delete(dir);
}
}
} catch(IOException e) {
e.printStackTrace();
fail();
}
}
protected void tearDown() throws Exception {
@Override
public void tearDown() throws Exception {
super.tearDown();
}
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HMemcache.Snapshot;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
/** memcache test case */
public class TestHMemcache extends TestCase {
private HMemcache hmemcache;
@ -41,6 +42,10 @@ public class TestHMemcache extends TestCase {
private static final String COLUMN_FAMILY = "column";
/* (non-Javadoc)
* @see junit.framework.TestCase#setUp()
*/
@Override
protected void setUp() throws Exception {
super.setUp();
@ -55,6 +60,10 @@ public class TestHMemcache extends TestCase {
"org.apache.hadoop.fs.LocalFileSystem");
}
/* (non-Javadoc)
* @see junit.framework.TestCase#tearDown()
*/
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
@ -117,11 +126,14 @@ public class TestHMemcache extends TestCase {
return s;
}
/**
* Test memcache snapshots
* @throws IOException
*/
public void testSnapshotting() throws IOException {
final int snapshotCount = 5;
final Text tableName = new Text(getName());
HLog log = getLogfile();
try {
// Add some rows, run a snapshot. Do it a few times.
for (int i = 0; i < snapshotCount; i++) {
addRows(this.hmemcache);
@ -131,10 +143,7 @@ public class TestHMemcache extends TestCase {
// Clean up snapshot now we are done with it.
this.hmemcache.deleteSnapshot();
}
log.close();
} finally {
log.dir.getFileSystem(this.conf).delete(log.dir);
}
log.closeAndDelete();
}
private void isExpectedRow(final int rowIndex,
@ -157,7 +166,8 @@ public class TestHMemcache extends TestCase {
}
}
public void testGetFull() throws IOException {
/** Test getFull from memcache */
public void testGetFull() {
addRows(this.hmemcache);
for (int i = 0; i < ROW_COUNT; i++) {
HStoreKey hsk = new HStoreKey(getRowName(i));
@ -166,6 +176,10 @@ public class TestHMemcache extends TestCase {
}
}
/**
* Test memcache scanner
* @throws IOException
*/
public void testScanner() throws IOException {
addRows(this.hmemcache);
long timestamp = System.currentTimeMillis();

View File

@ -37,7 +37,7 @@ import org.apache.log4j.Logger;
* HRegions or in the HBaseMaster, so only basic testing is possible.
*/
public class TestHRegion extends HBaseTestCase implements RegionUnavailableListener {
private Logger LOG = Logger.getLogger(this.getClass().getName());
Logger LOG = Logger.getLogger(this.getClass().getName());
/** Constructor */
public TestHRegion() {
@ -83,10 +83,9 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
private static FileSystem fs = null;
private static Path parentdir = null;
private static Path newlogdir = null;
private static Path oldlogfile = null;
private static HLog log = null;
private static HTableDescriptor desc = null;
private static HRegion region = null;
static HRegion region = null;
private static int numInserted = 0;
@ -99,14 +98,13 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
parentdir = new Path("/hbase");
fs.mkdirs(parentdir);
newlogdir = new Path(parentdir, "log");
oldlogfile = new Path(parentdir, "oldlogfile");
log = new HLog(fs, newlogdir, conf);
desc = new HTableDescriptor("test");
desc.addFamily(new HColumnDescriptor("contents:"));
desc.addFamily(new HColumnDescriptor("anchor:"));
region = new HRegion(parentdir, log, fs, conf,
new HRegionInfo(1, desc, null, null), null, oldlogfile);
new HRegionInfo(1, desc, null, null), null);
}
// Test basic functionality. Writes to contents:basic and anchor:anchornum-*
@ -208,6 +206,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
List<Thread>threads = new ArrayList<Thread>(threadCount);
for (int i = 0; i < threadCount; i++) {
threads.add(new Thread(Integer.toString(i)) {
@Override
public void run() {
long [] lockids = new long[lockCount];
// Get locks.
@ -822,7 +821,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
f.delete();
}
private void cleanup() throws IOException {
private void cleanup() {
// Shut down the mini cluster

View File

@ -0,0 +1,54 @@
/**
* Copyright 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
/** Tests region server failover when a region server exits cleanly */
public class TestRegionServerAbort extends HBaseClusterTestCase {
private HClient client;
/** Constructor */
public TestRegionServerAbort() {
super(2); // Start two region servers
client = new HClient(conf);
}
/** The test */
public void testRegionServerAbort() {
try {
// When the META table can be opened, the region servers are running
client.openTable(HConstants.META_TABLE_NAME);
} catch(IOException e) {
e.printStackTrace();
fail();
}
// Force a region server to exit "ungracefully"
this.cluster.abortRegionServer(0);
try {
Thread.sleep(120000); // Wait for cluster to adjust
} catch(InterruptedException e) {
}
}
}

View File

@ -132,7 +132,9 @@ public class TestScanner extends HBaseTestCase {
validateRegionInfo(bytes);
}
/** The test! */
/** The test!
* @throws IOException
*/
public void testScanner() throws IOException {
MiniDFSCluster cluster = null;
FileSystem fs = null;
@ -152,7 +154,7 @@ public class TestScanner extends HBaseTestCase {
HLog log = new HLog(fs, new Path(regionDir, "log"), conf);
region = new HRegion(dir, log, fs, conf, REGION_INFO, null, null);
region = new HRegion(dir, log, fs, conf, REGION_INFO, null);
// Write information to the meta table
@ -175,7 +177,7 @@ public class TestScanner extends HBaseTestCase {
region.close();
log.rollWriter();
region = new HRegion(dir, log, fs, conf, REGION_INFO, null, null);
region = new HRegion(dir, log, fs, conf, REGION_INFO, null);
// Verify we can get the data back now that it is on disk.
@ -216,7 +218,7 @@ public class TestScanner extends HBaseTestCase {
region.close();
log.rollWriter();
region = new HRegion(dir, log, fs, conf, REGION_INFO, null, null);
region = new HRegion(dir, log, fs, conf, REGION_INFO, null);
// Validate again
@ -252,13 +254,18 @@ public class TestScanner extends HBaseTestCase {
region.close();
log.rollWriter();
region = new HRegion(dir, log, fs, conf, REGION_INFO, null, null);
region = new HRegion(dir, log, fs, conf, REGION_INFO, null);
// Validate again
scan(true, address.toString());
getRegionInfo();
// clean up
region.close();
log.closeAndDelete();
} catch(IOException e) {
e.printStackTrace();
throw e;

View File

@ -1,3 +1,18 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
@ -43,16 +58,16 @@ public class TestScanner2 extends HBaseClusterTestCase {
List<HRegion> newRegions = new ArrayList<HRegion>(2);
newRegions.add(HRegion.createHRegion(
new HRegionInfo(2L, desc, null, new Text("midway")),
homedir, this.conf, null, null));
homedir, this.conf, null));
newRegions.add(HRegion.createHRegion(
new HRegionInfo(3L, desc, new Text("midway"), null),
homedir, this.conf, null, null));
homedir, this.conf, null));
for (HRegion r: newRegions) {
HRegion.addRegionToMETA(client, HConstants.META_TABLE_NAME, r,
this.cluster.getHMasterAddress(), -1L);
}
regions = scan(client, HConstants.META_TABLE_NAME);
assertEquals("Should be two regions only", regions.size(), 2);
assertEquals("Should be two regions only", 2, regions.size());
}
private List<HRegionInfo> scan(final HClient client, final Text table)
@ -68,8 +83,7 @@ public class TestScanner2 extends HBaseClusterTestCase {
HMaster.METACOLUMNS, new Text());
while (true) {
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
HStoreKey key = new HStoreKey();
LabelledData[] values = regionServer.next(scannerId, key);
KeyedData[] values = regionServer.next(scannerId);
if (values.length == 0) {
break;
}
@ -78,16 +92,15 @@ public class TestScanner2 extends HBaseClusterTestCase {
byte[] bytes = new byte[values[i].getData().getSize()];
System.arraycopy(values[i].getData().get(), 0, bytes, 0,
bytes.length);
results.put(values[i].getLabel(), bytes);
results.put(values[i].getKey().getColumn(), bytes);
}
HRegionInfo info = HRegion.getRegionInfo(results);
String serverName = HRegion.getServerName(results);
long startCode = HRegion.getStartCode(results);
LOG.info(Thread.currentThread().getName() + " scanner: " +
Long.valueOf(scannerId) + " row: " + key +
": regioninfo: {" + info.toString() + "}, server: " + serverName +
", startCode: " + startCode);
LOG.info(Thread.currentThread().getName() + " scanner: "
+ Long.valueOf(scannerId) + ": regioninfo: {" + info.toString()
+ "}, server: " + serverName + ", startCode: " + startCode);
regions.add(info);
}
} finally {