HBASE-626 Use Visitor pattern in MetaRegion to reduce code clones in HTable and HConnectionManager
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@658337 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b365b0bf61
commit
fc80b823c4
|
@ -36,6 +36,8 @@ Hbase Change Log
|
|||
HBASE-522 Where new Text(string) might be used in client side method calls,
|
||||
add an overload that takes string (Done as part of HBASE-82)
|
||||
HBASE-570 Remove HQL unit test (Done as part of HBASE-82 commit).
|
||||
HBASE-626 Use Visitor pattern in MetaRegion to reduce code clones in HTable
|
||||
and HConnectionManager (Jean-Daniel Cryans via Stack)
|
||||
|
||||
|
||||
Release 0.1.2 - 05/13/2008
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
|
|||
import org.apache.hadoop.hbase.NoServerForRegionException;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.io.RowResult;
|
||||
import org.apache.hadoop.hbase.ipc.HMasterInterface;
|
||||
|
@ -157,16 +158,19 @@ public class HConnectionManager implements HConstants {
|
|||
this.masterChecked = false;
|
||||
this.servers = new ConcurrentHashMap<String, HRegionInterface>();
|
||||
}
|
||||
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public HMasterInterface getMaster() throws MasterNotRunningException {
|
||||
HServerAddress masterLocation = null;
|
||||
synchronized (this.masterLock) {
|
||||
for (int tries = 0; !this.closed &&
|
||||
!this.masterChecked && this.master == null && tries < numRetries;
|
||||
tries++) {
|
||||
String m = this.conf.get(MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS);
|
||||
masterLocation = new HServerAddress(m);
|
||||
for (int tries = 0;
|
||||
!this.closed &&
|
||||
!this.masterChecked && this.master == null &&
|
||||
tries < numRetries;
|
||||
tries++) {
|
||||
|
||||
masterLocation = new HServerAddress(this.conf.get(MASTER_ADDRESS,
|
||||
DEFAULT_MASTER_ADDRESS));
|
||||
try {
|
||||
HMasterInterface tryMaster = (HMasterInterface)HbaseRPC.getProxy(
|
||||
HMasterInterface.class, HMasterInterface.versionID,
|
||||
|
@ -178,7 +182,7 @@ public class HConnectionManager implements HConstants {
|
|||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) {
|
||||
if(tries == numRetries - 1) {
|
||||
// This was our last chance - don't bother sleeping
|
||||
break;
|
||||
}
|
||||
|
@ -197,8 +201,11 @@ public class HConnectionManager implements HConstants {
|
|||
this.masterChecked = true;
|
||||
}
|
||||
if (this.master == null) {
|
||||
throw new MasterNotRunningException(masterLocation == null? "":
|
||||
masterLocation.toString());
|
||||
if (masterLocation == null) {
|
||||
throw new MasterNotRunningException();
|
||||
} else {
|
||||
throw new MasterNotRunningException(masterLocation.toString());
|
||||
}
|
||||
}
|
||||
return this.master;
|
||||
}
|
||||
|
@ -257,40 +264,23 @@ public class HConnectionManager implements HConstants {
|
|||
|
||||
/** {@inheritDoc} */
|
||||
public HTableDescriptor[] listTables() throws IOException {
|
||||
HashSet<HTableDescriptor> uniqueTables = new HashSet<HTableDescriptor>();
|
||||
byte [] startRow = EMPTY_START_ROW;
|
||||
final HashSet<HTableDescriptor> uniqueTables = new HashSet<HTableDescriptor>();
|
||||
|
||||
// scan over the each meta region
|
||||
do {
|
||||
ScannerCallable callable = new ScannerCallable(this, META_TABLE_NAME,
|
||||
COL_REGIONINFO_ARRAY, startRow, LATEST_TIMESTAMP, null);
|
||||
try {
|
||||
// open scanner
|
||||
getRegionServerWithRetries(callable);
|
||||
// iterate through the scanner, accumulating unique table names
|
||||
while (true) {
|
||||
RowResult values = getRegionServerWithRetries(callable);
|
||||
if (values == null || values.size() == 0) {
|
||||
break;
|
||||
}
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitor() {
|
||||
|
||||
HRegionInfo info =
|
||||
Writables.getHRegionInfo(values.get(COL_REGIONINFO));
|
||||
public boolean processRow(RowResult rowResult,
|
||||
HRegionLocation metaLocation, HRegionInfo info) throws IOException {
|
||||
|
||||
// Only examine the rows where the startKey is zero length
|
||||
if (info.getStartKey().length == 0) {
|
||||
uniqueTables.add(info.getTableDesc());
|
||||
}
|
||||
// Only examine the rows where the startKey is zero length
|
||||
if (info.getStartKey().length == 0) {
|
||||
uniqueTables.add(info.getTableDesc());
|
||||
}
|
||||
// advance the startRow to the end key of the current region
|
||||
startRow = callable.getHRegionInfo().getEndKey();
|
||||
} finally {
|
||||
// close scanner
|
||||
callable.setClose();
|
||||
getRegionServerWithRetries(callable);
|
||||
return true;
|
||||
}
|
||||
} while (Bytes.compareTo(startRow, LAST_ROW) != 0);
|
||||
|
||||
|
||||
};
|
||||
MetaScanner.metaScan(conf, visitor);
|
||||
|
||||
return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]);
|
||||
}
|
||||
|
||||
|
@ -321,11 +311,12 @@ public class HConnectionManager implements HConstants {
|
|||
// This block guards against two threads trying to find the root
|
||||
// region at the same time. One will go do the find while the
|
||||
// second waits. The second thread will not do find.
|
||||
|
||||
if (!useCache || rootRegionLocation == null) {
|
||||
return locateRootRegion();
|
||||
}
|
||||
return rootRegionLocation;
|
||||
}
|
||||
}
|
||||
} else if (Bytes.equals(tableName, META_TABLE_NAME)) {
|
||||
synchronized (metaRegionLock) {
|
||||
// This block guards against two threads trying to load the meta
|
||||
|
@ -652,29 +643,30 @@ public class HConnectionManager implements HConstants {
|
|||
*/
|
||||
private HRegionLocation locateRootRegion()
|
||||
throws IOException {
|
||||
|
||||
getMaster();
|
||||
|
||||
HServerAddress rootRegionAddress = null;
|
||||
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
int localTimeouts = 0;
|
||||
// Ask the master which server has the root region
|
||||
|
||||
// ask the master which server has the root region
|
||||
while (rootRegionAddress == null && localTimeouts < numRetries) {
|
||||
rootRegionAddress = master.findRootRegion();
|
||||
if (rootRegionAddress == null) {
|
||||
// Increment and then only sleep if retries left.
|
||||
if (++localTimeouts < numRetries) {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sleeping " + pause + "ms. Waiting for root "
|
||||
+ "region. Attempt " + tries + " of " + numRetries);
|
||||
}
|
||||
Thread.sleep(pause);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Wake. Retry finding root region.");
|
||||
}
|
||||
} catch (InterruptedException iex) {
|
||||
// continue
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sleeping. Waiting for root region.");
|
||||
}
|
||||
Thread.sleep(pause);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Wake. Retry finding root region.");
|
||||
}
|
||||
} catch (InterruptedException iex) {
|
||||
// continue
|
||||
}
|
||||
localTimeouts++;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
|
@ -35,15 +35,14 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.filter.StopRowFilter;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.io.RowResult;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
/**
|
||||
|
@ -57,6 +56,7 @@ public class HTable implements HConstants {
|
|||
protected final long pause;
|
||||
protected final int numRetries;
|
||||
protected Random rand;
|
||||
protected HBaseConfiguration configuration;
|
||||
|
||||
protected volatile boolean tableDoesNotExist;
|
||||
|
||||
|
@ -96,6 +96,7 @@ public class HTable implements HConstants {
|
|||
public HTable(HBaseConfiguration conf, final byte [] tableName)
|
||||
throws IOException {
|
||||
this.connection = HConnectionManager.getConnection(conf);
|
||||
this.configuration = conf;
|
||||
this.tableName = tableName;
|
||||
this.pause = conf.getLong("hbase.client.pause", 10 * 1000);
|
||||
this.numRetries = conf.getInt("hbase.client.retries.number", 5);
|
||||
|
@ -142,145 +143,64 @@ public class HTable implements HConstants {
|
|||
|
||||
/**
|
||||
* Gets the starting row key for every region in the currently open table
|
||||
*
|
||||
* @return Array of region starting row keys
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("null")
|
||||
public byte [][] getStartKeys() throws IOException {
|
||||
List<byte []> keyList = new ArrayList<byte []>();
|
||||
public byte[][] getStartKeys() throws IOException {
|
||||
final List<byte[]> keyList = new ArrayList<byte[]>();
|
||||
|
||||
long scannerId = -1L;
|
||||
byte [] startRow =
|
||||
HRegionInfo.createRegionName(this.tableName, null, NINES);
|
||||
HRegionLocation metaLocation = null;
|
||||
HRegionInterface server;
|
||||
|
||||
// scan over the each meta region
|
||||
do {
|
||||
try{
|
||||
// turn the start row into a location
|
||||
metaLocation =
|
||||
connection.locateRegion(META_TABLE_NAME, startRow);
|
||||
|
||||
// connect to the server hosting the .META. region
|
||||
server =
|
||||
connection.getHRegionConnection(metaLocation.getServerAddress());
|
||||
|
||||
// open a scanner over the meta region
|
||||
scannerId = server.openScanner(
|
||||
metaLocation.getRegionInfo().getRegionName(),
|
||||
new byte[][]{COL_REGIONINFO}, tableName, LATEST_TIMESTAMP,
|
||||
null);
|
||||
|
||||
// iterate through the scanner, accumulating unique region names
|
||||
while (true) {
|
||||
RowResult values = server.next(scannerId);
|
||||
if (values == null || values.size() == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
HRegionInfo info = new HRegionInfo();
|
||||
info = (HRegionInfo) Writables.getWritable(
|
||||
values.get(COL_REGIONINFO).getValue(), info);
|
||||
|
||||
if (!Bytes.equals(info.getTableDesc().getName(), this.tableName)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (info.isOffline() || info.isSplit()) {
|
||||
continue;
|
||||
}
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitor() {
|
||||
public boolean processRow(RowResult rowResult,
|
||||
HRegionLocation metaLocation, HRegionInfo info)
|
||||
throws IOException {
|
||||
if (!(Bytes.equals(info.getTableDesc().getName(), tableName))) {
|
||||
return false;
|
||||
}
|
||||
if (!(info.isOffline() || info.isSplit())) {
|
||||
keyList.add(info.getStartKey());
|
||||
}
|
||||
|
||||
// close that remote scanner
|
||||
server.close(scannerId);
|
||||
|
||||
// advance the startRow to the end key of the current region
|
||||
startRow = metaLocation.getRegionInfo().getEndKey();
|
||||
} catch (IOException e) {
|
||||
// need retry logic?
|
||||
throw e;
|
||||
return true;
|
||||
}
|
||||
} while (Bytes.compareTo(startRow, EMPTY_START_ROW) != 0);
|
||||
|
||||
return keyList.toArray(new byte [keyList.size()][]);
|
||||
|
||||
};
|
||||
MetaScanner.metaScan(configuration, visitor, this.tableName);
|
||||
return keyList.toArray(new byte[keyList.size()][]);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get all the regions and their address for this table
|
||||
*
|
||||
* @return A map of HRegionInfo with it's server address
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("null")
|
||||
public Map<HRegionInfo, HServerAddress> getRegionsInfo() throws IOException {
|
||||
// TODO This code is a near exact copy of getStartKeys. To be refactored HBASE-626
|
||||
HashMap<HRegionInfo, HServerAddress> regionMap = new HashMap<HRegionInfo, HServerAddress>();
|
||||
|
||||
long scannerId = -1L;
|
||||
byte [] startRow =
|
||||
HRegionInfo.createRegionName(this.tableName, null, NINES);
|
||||
HRegionLocation metaLocation = null;
|
||||
HRegionInterface server;
|
||||
|
||||
// scan over the each meta region
|
||||
do {
|
||||
try{
|
||||
// turn the start row into a location
|
||||
metaLocation =
|
||||
connection.locateRegion(META_TABLE_NAME, startRow);
|
||||
final HashMap<HRegionInfo, HServerAddress> regionMap =
|
||||
new HashMap<HRegionInfo, HServerAddress>();
|
||||
|
||||
// connect to the server hosting the .META. region
|
||||
server =
|
||||
connection.getHRegionConnection(metaLocation.getServerAddress());
|
||||
|
||||
// open a scanner over the meta region
|
||||
scannerId = server.openScanner(
|
||||
metaLocation.getRegionInfo().getRegionName(),
|
||||
new byte [][]{COL_REGIONINFO}, tableName, LATEST_TIMESTAMP,
|
||||
null);
|
||||
|
||||
// iterate through the scanner, accumulating regions and their regionserver
|
||||
while (true) {
|
||||
RowResult values = server.next(scannerId);
|
||||
if (values == null || values.size() == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
HRegionInfo info = new HRegionInfo();
|
||||
info = (HRegionInfo) Writables.getWritable(
|
||||
values.get(COL_REGIONINFO).getValue(), info);
|
||||
|
||||
if (!Bytes.equals(info.getTableDesc().getName(), this.tableName)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (info.isOffline() || info.isSplit()) {
|
||||
continue;
|
||||
}
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitor() {
|
||||
public boolean processRow(RowResult rowResult,
|
||||
HRegionLocation metaLocation, HRegionInfo info)
|
||||
throws IOException {
|
||||
if (!(Bytes.equals(info.getTableDesc().getName(), tableName))) {
|
||||
return false;
|
||||
}
|
||||
if (!(info.isOffline() || info.isSplit())) {
|
||||
regionMap.put(info, metaLocation.getServerAddress());
|
||||
}
|
||||
|
||||
// close that remote scanner
|
||||
server.close(scannerId);
|
||||
|
||||
// advance the startRow to the end key of the current region
|
||||
startRow = metaLocation.getRegionInfo().getEndKey();
|
||||
|
||||
// turn the start row into a location
|
||||
metaLocation =
|
||||
connection.locateRegion(META_TABLE_NAME, startRow);
|
||||
} catch (IOException e) {
|
||||
// need retry logic?
|
||||
throw e;
|
||||
return true;
|
||||
}
|
||||
} while (Bytes.compareTo(startRow, EMPTY_START_ROW) != 0);
|
||||
|
||||
|
||||
};
|
||||
MetaScanner.metaScan(configuration, visitor, tableName);
|
||||
return regionMap;
|
||||
}
|
||||
/**
|
||||
|
||||
/**
|
||||
* Get a single value for the specified row and column
|
||||
*
|
||||
*
|
||||
* @param row row key
|
||||
* @param column column name
|
||||
* @return value for specified row/column
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.io.RowResult;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
||||
/**
|
||||
* Scanner class that contains the <code>.META.</code> table scanning logic
|
||||
* and uses a Retryable scanner. Provided visitors will be called
|
||||
* for each row.
|
||||
*/
|
||||
class MetaScanner implements HConstants {
|
||||
|
||||
/**
|
||||
* Scans the meta table and calls a visitor on each RowResult and uses a empty
|
||||
* start row value as table name.
|
||||
*
|
||||
* @param configuration
|
||||
* @param visitor A custom visitor
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void metaScan(HBaseConfiguration configuration,
|
||||
MetaScannerVisitor visitor)
|
||||
throws IOException {
|
||||
metaScan(configuration, visitor, EMPTY_START_ROW);
|
||||
}
|
||||
|
||||
/**
|
||||
* Scans the meta table and calls a visitor on each RowResult. Uses a table
|
||||
* name to locate meta regions.
|
||||
*
|
||||
* @param configuration
|
||||
* @param visitor
|
||||
* @param tableName
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void metaScan(HBaseConfiguration configuration,
|
||||
MetaScannerVisitor visitor, byte[] tableName)
|
||||
throws IOException {
|
||||
HConnection connection = HConnectionManager.getConnection(configuration);
|
||||
HRegionLocation metaLocation = null;
|
||||
boolean toContinue = true;
|
||||
byte [] startRow = Bytes.equals(tableName, EMPTY_START_ROW)? tableName:
|
||||
HRegionInfo.createRegionName(tableName, null, NINES);
|
||||
|
||||
// Scan over the each meta region
|
||||
do {
|
||||
ScannerCallable callable = new ScannerCallable(connection,
|
||||
META_TABLE_NAME, COL_REGIONINFO_ARRAY, tableName, LATEST_TIMESTAMP,
|
||||
null);
|
||||
try {
|
||||
// Open scanner
|
||||
connection.getRegionServerWithRetries(callable);
|
||||
metaLocation = connection.locateRegion(META_TABLE_NAME, startRow);
|
||||
while (toContinue) {
|
||||
RowResult rowResult = connection.getRegionServerWithRetries(callable);
|
||||
if (rowResult == null || rowResult.size() == 0) {
|
||||
break;
|
||||
}
|
||||
HRegionInfo info = Writables.
|
||||
getHRegionInfo(rowResult.get(COL_REGIONINFO));
|
||||
toContinue = visitor.processRow(rowResult, metaLocation, info);
|
||||
}
|
||||
// Advance the startRow to the end key of the current region
|
||||
startRow = callable.getHRegionInfo().getEndKey();
|
||||
} finally {
|
||||
// Close scanner
|
||||
callable.setClose();
|
||||
connection.getRegionServerWithRetries(callable);
|
||||
}
|
||||
} while (Bytes.compareTo(startRow, LAST_ROW) != 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Visitor class called to process each row of the .META. table
|
||||
*/
|
||||
protected interface MetaScannerVisitor {
|
||||
|
||||
/**
|
||||
* Visitor method that accepts a RowResult and the meta region location.
|
||||
* Implementations can return false to stop the region's loop if it becomes
|
||||
* unnecessary for some reason.
|
||||
*
|
||||
* @param rowResult
|
||||
* @param metaLocation
|
||||
* @return A boolean to know if it should continue to loop in the region
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean processRow(RowResult rowResult,
|
||||
HRegionLocation metaLocation, HRegionInfo info) throws IOException;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue