diff --git a/CHANGES.txt b/CHANGES.txt index 3594fa427a4..735a8f347c9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -79,6 +79,8 @@ Hbase Change Log HBASE-632 HTable.getMetadata is very inefficient HBASE-671 New UI page displaying all regions in a table should be sorted HBASE-672 Sort regions in the regionserver UI + HBASE-677 Make HTable, HRegion, HRegionServer, HStore, and HColumnDescriptor + subclassable (Clint Morgan via Stack) NEW FEATURES HBASE-47 Option to set TTL for columns in hbase diff --git a/src/java/org/apache/hadoop/hbase/HConstants.java b/src/java/org/apache/hadoop/hbase/HConstants.java index 2502720279a..cee9610c1d3 100644 --- a/src/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/java/org/apache/hadoop/hbase/HConstants.java @@ -84,6 +84,9 @@ public interface HConstants { /** Parameter name for what region server interface to use. */ static final String REGION_SERVER_CLASS = "hbase.regionserver.class"; + /** Parameter name for what region server implementation to use. */ + static final String REGION_SERVER_IMPL= "hbase.regionserver.impl"; + /** Default region server interface class name. */ static final String DEFAULT_REGION_SERVER_CLASS = HRegionInterface.class.getName(); diff --git a/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index 7794c64034a..cee20d7a703 100644 --- a/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ b/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -65,6 +65,7 @@ public class LocalHBaseCluster implements HConstants { /** 'local:' */ public static final String LOCAL_COLON = LOCAL + ":"; private final HBaseConfiguration conf; + private final Class regionServerClass; /** * Constructor. @@ -98,6 +99,7 @@ public class LocalHBaseCluster implements HConstants { // start/stop ports at different times during the life of the test. conf.set(REGIONSERVER_ADDRESS, DEFAULT_HOST + ":0"); this.regionThreads = new ArrayList(); + regionServerClass = (Class) conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); for (int i = 0; i < noRegionServers; i++) { addRegionServer(); } @@ -112,7 +114,15 @@ public class LocalHBaseCluster implements HConstants { */ public RegionServerThread addRegionServer() throws IOException { synchronized (regionThreads) { - RegionServerThread t = new RegionServerThread(new HRegionServer(conf), + HRegionServer server; + try { + server = regionServerClass.getConstructor(HBaseConfiguration.class).newInstance(conf); + } catch (Exception e) { + IOException ioe = new IOException(); + ioe.initCause(e); + throw ioe; + } + RegionServerThread t = new RegionServerThread(server, this.regionThreads.size()); this.regionThreads.add(t); return t; diff --git a/src/java/org/apache/hadoop/hbase/client/HTable.java b/src/java/org/apache/hadoop/hbase/client/HTable.java index 313e6808c58..a6501edab2d 100644 --- a/src/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/java/org/apache/hadoop/hbase/client/HTable.java @@ -333,7 +333,7 @@ public class HTable { return this.tableName; } - protected HConnection getConnection() { + public HConnection getConnection() { return this.connection; } @@ -1221,7 +1221,7 @@ public class HTable { * If there are multiple regions in a table, this scanner will iterate * through them all. */ - private class ClientScanner implements Scanner { + protected class ClientScanner implements Scanner { private final Log CLIENT_LOG = LogFactory.getLog(this.getClass()); private byte[][] columns; private byte [] startRow; @@ -1259,6 +1259,18 @@ public class HTable { } nextScanner(); } + + protected byte[][] getColumns() { + return columns; + } + + protected long getTimestamp() { + return scanTime; + } + + protected RowFilterInterface getFilter() { + return filter; + } /* * Gets a scanner for the next region. @@ -1297,8 +1309,7 @@ public class HTable { } try { - callable = new ScannerCallable(getConnection(), getTableName(), columns, - localStartKey, scanTime, filter); + callable = getScannerCallable(localStartKey); // open a scanner on the region server starting at the // beginning of the region getConnection().getRegionServerWithRetries(callable); @@ -1309,6 +1320,11 @@ public class HTable { } return true; } + + protected ScannerCallable getScannerCallable(byte [] localStartKey) { + return new ScannerCallable(getConnection(), getTableName(), columns, + localStartKey, scanTime, filter); + } /** * @param endKey diff --git a/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java index f0927d7011d..01ec6c1b4bc 100644 --- a/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.RowResult; @@ -38,7 +39,7 @@ public class ScannerCallable extends ServerCallable { private final long timestamp; private final RowFilterInterface filter; - ScannerCallable (HConnection connection, byte [] tableName, byte [][] columns, + protected ScannerCallable (HConnection connection, byte [] tableName, byte [][] columns, byte [] startRow, long timestamp, RowFilterInterface filter) { super(connection, tableName, startRow); this.columns = columns; @@ -65,15 +66,31 @@ public class ScannerCallable extends ServerCallable { scannerId = -1L; } else if (scannerId == -1L && !closed) { // open the scanner - scannerId = server.openScanner( - this.location.getRegionInfo().getRegionName(), columns, row, - timestamp, filter); + scannerId = openScanner(); } else { return server.next(scannerId); } return null; } + protected long openScanner() throws IOException { + return server.openScanner( + this.location.getRegionInfo().getRegionName(), columns, row, + timestamp, filter); + } + + protected byte [][] getColumns() { + return columns; + } + + protected long getTimestamp() { + return timestamp; + } + + protected RowFilterInterface getFilter() { + return filter; + } + /** * Call this when the next invocation of call should close the scanner */ diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 144446f5d32..51ddd92f6f6 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -449,14 +450,16 @@ public class HRegion implements HConstants { // Load in all the HStores. long maxSeqId = -1; for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) { - HStore store = new HStore(this.basedir, this.regionInfo, c, this.fs, - oldLogFile, this.conf, reporter); + HStore store = instantiateHStore(this.basedir, c, oldLogFile, reporter); stores.put(Bytes.mapKey(c.getName()), store); long storeSeqId = store.getMaxSequenceId(); if (storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } } + + doReconstructionLog(oldLogFile, maxSeqId, reporter); + if (fs.exists(oldLogFile)) { if (LOG.isDebugEnabled()) { LOG.debug("Deleting old log file: " + oldLogFile); @@ -1542,12 +1545,27 @@ public class HRegion implements HConstants { } } + // Do any reconstruction needed from the log + @SuppressWarnings("unused") + protected void doReconstructionLog(Path oldLogFile, long maxSeqId, + Progressable reporter) + throws UnsupportedEncodingException, IOException { + // Nothing to do (Replaying is done in HStores) + } + + protected HStore instantiateHStore(Path baseDir, + HColumnDescriptor c, Path oldLogFile, Progressable reporter) + throws IOException { + return new HStore(baseDir, this.regionInfo, c, this.fs, oldLogFile, + this.conf, reporter); + } + /* * @param column * @return Store that goes with the family on passed column. * TODO: Make this lookup faster. */ - private HStore getStore(final byte [] column) { + protected HStore getStore(final byte [] column) { return this.stores.get(HStoreKey.getFamilyMapKey(column)); } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index bde556040d3..d1ec2f8a6e5 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.RegionHistorian; import org.apache.hadoop.hbase.RegionServerRunningException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.Cell; @@ -476,7 +477,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { * Run init. Sets up hlog and starts up all server threads. * @param c Extra configuration. */ - private void init(final MapWritable c) throws IOException { + protected void init(final MapWritable c) throws IOException { try { for (Map.Entry e: c.entrySet()) { String key = e.getKey().toString(); @@ -860,15 +861,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { HRegion region = this.onlineRegions.get(mapKey); if (region == null) { try { - region = new HRegion(HTableDescriptor.getTableDir(rootDir, - regionInfo.getTableDesc().getName()), - this.log, this.fs, conf, regionInfo, null, this.cacheFlusher, - new Progressable() { - public void progress() { - addProcessingMessage(regionInfo); - } - } - ); + region = instantiateRegion(regionInfo); // Startup a compaction early if one is needed. this.compactSplitThread.compactionRequested(region); } catch (IOException e) { @@ -891,6 +884,17 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { reportOpen(regionInfo); } + protected HRegion instantiateRegion(final HRegionInfo regionInfo) + throws IOException { + return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo + .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null, + this.cacheFlusher, new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + } + /* * Add a MSG_REPORT_PROCESS_OPEN to the outbound queue. * This method is called while region is in the queue of regions to process @@ -1172,16 +1176,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { requestCount.incrementAndGet(); try { HRegion r = getRegion(regionName); - long scannerId = -1L; InternalScanner s = r.getScanner(cols, firstRow, timestamp, filter); - scannerId = rand.nextLong(); - String scannerName = String.valueOf(scannerId); - synchronized(scanners) { - scanners.put(scannerName, s); - } - this.leases. - createLease(scannerName, new ScannerListener(scannerName)); + long scannerId = addScanner(s); return scannerId; } catch (IOException e) { LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", @@ -1191,6 +1188,18 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } } + protected long addScanner(InternalScanner s) throws LeaseStillHeldException { + long scannerId = -1L; + scannerId = rand.nextLong(); + String scannerName = String.valueOf(scannerId); + synchronized(scanners) { + scanners.put(scannerName, s); + } + this.leases. + createLease(scannerName, new ScannerListener(scannerName)); + return scannerId; + } + /** {@inheritDoc} */ public void close(final long scannerId) throws IOException { checkOpen(); @@ -1409,7 +1418,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { * * @throws IOException */ - private void checkOpen() throws IOException { + protected void checkOpen() throws IOException { if (this.stopRequested.get() || this.abortRequested) { throw new IOException("Server not running"); } @@ -1490,7 +1499,28 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } return total; } - + + /** + * @return Return the leases. + */ + protected Leases getLeases() { + return leases; + } + + /** + * @return Return the rootDir. + */ + protected Path getRootDir() { + return rootDir; + } + + /** + * @return Return the fs. + */ + protected FileSystem getFileSystem() { + return fs; + } + // // Main program and support routines // @@ -1567,6 +1597,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { * @param args */ public static void main(String [] args) { - doMain(args, HRegionServer.class); + Configuration conf = new HBaseConfiguration(); + @SuppressWarnings("unchecked") + Class regionServerClass = (Class) conf + .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); + doMain(args, regionServerClass); } -} +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HStore.java b/src/java/org/apache/hadoop/hbase/regionserver/HStore.java index 90113dd0bb8..029aa17c8cb 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -167,7 +167,7 @@ public class HStore implements HConstants { * failed. Can be null. * @throws IOException */ - HStore(Path basedir, HRegionInfo info, HColumnDescriptor family, + protected HStore(Path basedir, HRegionInfo info, HColumnDescriptor family, FileSystem fs, Path reconstructionLog, HBaseConfiguration conf, final Progressable reporter) throws IOException { @@ -621,7 +621,7 @@ public class HStore implements HConstants { * @param key * @param value */ - void add(HStoreKey key, byte[] value) { + protected void add(HStoreKey key, byte[] value) { lock.readLock().lock(); try { this.memcache.add(key, value); @@ -1845,7 +1845,7 @@ public class HStore implements HConstants { /** * Return a scanner for both the memcache and the HStore files */ - InternalScanner getScanner(long timestamp, byte [][] targetCols, + protected InternalScanner getScanner(long timestamp, byte [][] targetCols, byte [] firstRow, RowFilterInterface filter) throws IOException { lock.readLock().lock();