HBASE-677 Make HTable, HRegion, HRegionServer, HStore, and HColumnDescriptor subclassable
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@666395 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c3bb630ef2
commit
90719ebd62
|
@ -79,6 +79,8 @@ Hbase Change Log
|
|||
HBASE-632 HTable.getMetadata is very inefficient
|
||||
HBASE-671 New UI page displaying all regions in a table should be sorted
|
||||
HBASE-672 Sort regions in the regionserver UI
|
||||
HBASE-677 Make HTable, HRegion, HRegionServer, HStore, and HColumnDescriptor
|
||||
subclassable (Clint Morgan via Stack)
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-47 Option to set TTL for columns in hbase
|
||||
|
|
|
@ -84,6 +84,9 @@ public interface HConstants {
|
|||
/** Parameter name for what region server interface to use. */
|
||||
static final String REGION_SERVER_CLASS = "hbase.regionserver.class";
|
||||
|
||||
/** Parameter name for what region server implementation to use. */
|
||||
static final String REGION_SERVER_IMPL= "hbase.regionserver.impl";
|
||||
|
||||
/** Default region server interface class name. */
|
||||
static final String DEFAULT_REGION_SERVER_CLASS = HRegionInterface.class.getName();
|
||||
|
||||
|
|
|
@ -65,6 +65,7 @@ public class LocalHBaseCluster implements HConstants {
|
|||
/** 'local:' */
|
||||
public static final String LOCAL_COLON = LOCAL + ":";
|
||||
private final HBaseConfiguration conf;
|
||||
private final Class<? extends HRegionServer> regionServerClass;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
|
@ -98,6 +99,7 @@ public class LocalHBaseCluster implements HConstants {
|
|||
// start/stop ports at different times during the life of the test.
|
||||
conf.set(REGIONSERVER_ADDRESS, DEFAULT_HOST + ":0");
|
||||
this.regionThreads = new ArrayList<RegionServerThread>();
|
||||
regionServerClass = (Class<? extends HRegionServer>) conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
|
||||
for (int i = 0; i < noRegionServers; i++) {
|
||||
addRegionServer();
|
||||
}
|
||||
|
@ -112,7 +114,15 @@ public class LocalHBaseCluster implements HConstants {
|
|||
*/
|
||||
public RegionServerThread addRegionServer() throws IOException {
|
||||
synchronized (regionThreads) {
|
||||
RegionServerThread t = new RegionServerThread(new HRegionServer(conf),
|
||||
HRegionServer server;
|
||||
try {
|
||||
server = regionServerClass.getConstructor(HBaseConfiguration.class).newInstance(conf);
|
||||
} catch (Exception e) {
|
||||
IOException ioe = new IOException();
|
||||
ioe.initCause(e);
|
||||
throw ioe;
|
||||
}
|
||||
RegionServerThread t = new RegionServerThread(server,
|
||||
this.regionThreads.size());
|
||||
this.regionThreads.add(t);
|
||||
return t;
|
||||
|
|
|
@ -333,7 +333,7 @@ public class HTable {
|
|||
return this.tableName;
|
||||
}
|
||||
|
||||
protected HConnection getConnection() {
|
||||
public HConnection getConnection() {
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
|
@ -1221,7 +1221,7 @@ public class HTable {
|
|||
* If there are multiple regions in a table, this scanner will iterate
|
||||
* through them all.
|
||||
*/
|
||||
private class ClientScanner implements Scanner {
|
||||
protected class ClientScanner implements Scanner {
|
||||
private final Log CLIENT_LOG = LogFactory.getLog(this.getClass());
|
||||
private byte[][] columns;
|
||||
private byte [] startRow;
|
||||
|
@ -1259,6 +1259,18 @@ public class HTable {
|
|||
}
|
||||
nextScanner();
|
||||
}
|
||||
|
||||
protected byte[][] getColumns() {
|
||||
return columns;
|
||||
}
|
||||
|
||||
protected long getTimestamp() {
|
||||
return scanTime;
|
||||
}
|
||||
|
||||
protected RowFilterInterface getFilter() {
|
||||
return filter;
|
||||
}
|
||||
|
||||
/*
|
||||
* Gets a scanner for the next region.
|
||||
|
@ -1297,8 +1309,7 @@ public class HTable {
|
|||
}
|
||||
|
||||
try {
|
||||
callable = new ScannerCallable(getConnection(), getTableName(), columns,
|
||||
localStartKey, scanTime, filter);
|
||||
callable = getScannerCallable(localStartKey);
|
||||
// open a scanner on the region server starting at the
|
||||
// beginning of the region
|
||||
getConnection().getRegionServerWithRetries(callable);
|
||||
|
@ -1309,6 +1320,11 @@ public class HTable {
|
|||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
protected ScannerCallable getScannerCallable(byte [] localStartKey) {
|
||||
return new ScannerCallable(getConnection(), getTableName(), columns,
|
||||
localStartKey, scanTime, filter);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param endKey
|
||||
|
|
|
@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.client;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.io.RowResult;
|
||||
|
||||
|
@ -38,7 +39,7 @@ public class ScannerCallable extends ServerCallable<RowResult> {
|
|||
private final long timestamp;
|
||||
private final RowFilterInterface filter;
|
||||
|
||||
ScannerCallable (HConnection connection, byte [] tableName, byte [][] columns,
|
||||
protected ScannerCallable (HConnection connection, byte [] tableName, byte [][] columns,
|
||||
byte [] startRow, long timestamp, RowFilterInterface filter) {
|
||||
super(connection, tableName, startRow);
|
||||
this.columns = columns;
|
||||
|
@ -65,15 +66,31 @@ public class ScannerCallable extends ServerCallable<RowResult> {
|
|||
scannerId = -1L;
|
||||
} else if (scannerId == -1L && !closed) {
|
||||
// open the scanner
|
||||
scannerId = server.openScanner(
|
||||
this.location.getRegionInfo().getRegionName(), columns, row,
|
||||
timestamp, filter);
|
||||
scannerId = openScanner();
|
||||
} else {
|
||||
return server.next(scannerId);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
protected long openScanner() throws IOException {
|
||||
return server.openScanner(
|
||||
this.location.getRegionInfo().getRegionName(), columns, row,
|
||||
timestamp, filter);
|
||||
}
|
||||
|
||||
protected byte [][] getColumns() {
|
||||
return columns;
|
||||
}
|
||||
|
||||
protected long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
protected RowFilterInterface getFilter() {
|
||||
return filter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this when the next invocation of call should close the scanner
|
||||
*/
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
|
@ -449,14 +450,16 @@ public class HRegion implements HConstants {
|
|||
// Load in all the HStores.
|
||||
long maxSeqId = -1;
|
||||
for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
|
||||
HStore store = new HStore(this.basedir, this.regionInfo, c, this.fs,
|
||||
oldLogFile, this.conf, reporter);
|
||||
HStore store = instantiateHStore(this.basedir, c, oldLogFile, reporter);
|
||||
stores.put(Bytes.mapKey(c.getName()), store);
|
||||
long storeSeqId = store.getMaxSequenceId();
|
||||
if (storeSeqId > maxSeqId) {
|
||||
maxSeqId = storeSeqId;
|
||||
}
|
||||
}
|
||||
|
||||
doReconstructionLog(oldLogFile, maxSeqId, reporter);
|
||||
|
||||
if (fs.exists(oldLogFile)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Deleting old log file: " + oldLogFile);
|
||||
|
@ -1542,12 +1545,27 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
// Do any reconstruction needed from the log
|
||||
@SuppressWarnings("unused")
|
||||
protected void doReconstructionLog(Path oldLogFile, long maxSeqId,
|
||||
Progressable reporter)
|
||||
throws UnsupportedEncodingException, IOException {
|
||||
// Nothing to do (Replaying is done in HStores)
|
||||
}
|
||||
|
||||
protected HStore instantiateHStore(Path baseDir,
|
||||
HColumnDescriptor c, Path oldLogFile, Progressable reporter)
|
||||
throws IOException {
|
||||
return new HStore(baseDir, this.regionInfo, c, this.fs, oldLogFile,
|
||||
this.conf, reporter);
|
||||
}
|
||||
|
||||
/*
|
||||
* @param column
|
||||
* @return Store that goes with the family on passed <code>column</code>.
|
||||
* TODO: Make this lookup faster.
|
||||
*/
|
||||
private HStore getStore(final byte [] column) {
|
||||
protected HStore getStore(final byte [] column) {
|
||||
return this.stores.get(HStoreKey.getFamilyMapKey(column));
|
||||
}
|
||||
|
||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.RegionHistorian;
|
|||
import org.apache.hadoop.hbase.RegionServerRunningException;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
|
@ -476,7 +477,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
* Run init. Sets up hlog and starts up all server threads.
|
||||
* @param c Extra configuration.
|
||||
*/
|
||||
private void init(final MapWritable c) throws IOException {
|
||||
protected void init(final MapWritable c) throws IOException {
|
||||
try {
|
||||
for (Map.Entry<Writable, Writable> e: c.entrySet()) {
|
||||
String key = e.getKey().toString();
|
||||
|
@ -860,15 +861,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
HRegion region = this.onlineRegions.get(mapKey);
|
||||
if (region == null) {
|
||||
try {
|
||||
region = new HRegion(HTableDescriptor.getTableDir(rootDir,
|
||||
regionInfo.getTableDesc().getName()),
|
||||
this.log, this.fs, conf, regionInfo, null, this.cacheFlusher,
|
||||
new Progressable() {
|
||||
public void progress() {
|
||||
addProcessingMessage(regionInfo);
|
||||
}
|
||||
}
|
||||
);
|
||||
region = instantiateRegion(regionInfo);
|
||||
// Startup a compaction early if one is needed.
|
||||
this.compactSplitThread.compactionRequested(region);
|
||||
} catch (IOException e) {
|
||||
|
@ -891,6 +884,17 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
reportOpen(regionInfo);
|
||||
}
|
||||
|
||||
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
|
||||
throws IOException {
|
||||
return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
|
||||
.getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null,
|
||||
this.cacheFlusher, new Progressable() {
|
||||
public void progress() {
|
||||
addProcessingMessage(regionInfo);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Add a MSG_REPORT_PROCESS_OPEN to the outbound queue.
|
||||
* This method is called while region is in the queue of regions to process
|
||||
|
@ -1172,16 +1176,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
requestCount.incrementAndGet();
|
||||
try {
|
||||
HRegion r = getRegion(regionName);
|
||||
long scannerId = -1L;
|
||||
InternalScanner s =
|
||||
r.getScanner(cols, firstRow, timestamp, filter);
|
||||
scannerId = rand.nextLong();
|
||||
String scannerName = String.valueOf(scannerId);
|
||||
synchronized(scanners) {
|
||||
scanners.put(scannerName, s);
|
||||
}
|
||||
this.leases.
|
||||
createLease(scannerName, new ScannerListener(scannerName));
|
||||
long scannerId = addScanner(s);
|
||||
return scannerId;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")",
|
||||
|
@ -1191,6 +1188,18 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
protected long addScanner(InternalScanner s) throws LeaseStillHeldException {
|
||||
long scannerId = -1L;
|
||||
scannerId = rand.nextLong();
|
||||
String scannerName = String.valueOf(scannerId);
|
||||
synchronized(scanners) {
|
||||
scanners.put(scannerName, s);
|
||||
}
|
||||
this.leases.
|
||||
createLease(scannerName, new ScannerListener(scannerName));
|
||||
return scannerId;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void close(final long scannerId) throws IOException {
|
||||
checkOpen();
|
||||
|
@ -1409,7 +1418,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void checkOpen() throws IOException {
|
||||
protected void checkOpen() throws IOException {
|
||||
if (this.stopRequested.get() || this.abortRequested) {
|
||||
throw new IOException("Server not running");
|
||||
}
|
||||
|
@ -1490,7 +1499,28 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return Return the leases.
|
||||
*/
|
||||
protected Leases getLeases() {
|
||||
return leases;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return the rootDir.
|
||||
*/
|
||||
protected Path getRootDir() {
|
||||
return rootDir;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return the fs.
|
||||
*/
|
||||
protected FileSystem getFileSystem() {
|
||||
return fs;
|
||||
}
|
||||
|
||||
//
|
||||
// Main program and support routines
|
||||
//
|
||||
|
@ -1567,6 +1597,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
* @param args
|
||||
*/
|
||||
public static void main(String [] args) {
|
||||
doMain(args, HRegionServer.class);
|
||||
Configuration conf = new HBaseConfiguration();
|
||||
@SuppressWarnings("unchecked")
|
||||
Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
|
||||
.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
|
||||
doMain(args, regionServerClass);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -167,7 +167,7 @@ public class HStore implements HConstants {
|
|||
* failed. Can be null.
|
||||
* @throws IOException
|
||||
*/
|
||||
HStore(Path basedir, HRegionInfo info, HColumnDescriptor family,
|
||||
protected HStore(Path basedir, HRegionInfo info, HColumnDescriptor family,
|
||||
FileSystem fs, Path reconstructionLog, HBaseConfiguration conf,
|
||||
final Progressable reporter)
|
||||
throws IOException {
|
||||
|
@ -621,7 +621,7 @@ public class HStore implements HConstants {
|
|||
* @param key
|
||||
* @param value
|
||||
*/
|
||||
void add(HStoreKey key, byte[] value) {
|
||||
protected void add(HStoreKey key, byte[] value) {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
this.memcache.add(key, value);
|
||||
|
@ -1845,7 +1845,7 @@ public class HStore implements HConstants {
|
|||
/**
|
||||
* Return a scanner for both the memcache and the HStore files
|
||||
*/
|
||||
InternalScanner getScanner(long timestamp, byte [][] targetCols,
|
||||
protected InternalScanner getScanner(long timestamp, byte [][] targetCols,
|
||||
byte [] firstRow, RowFilterInterface filter)
|
||||
throws IOException {
|
||||
lock.readLock().lock();
|
||||
|
|
Loading…
Reference in New Issue