HBASE-5986 Clients can see holes in the META table when regions are being split
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1342099 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
495ecfbc16
commit
6db2eedc9b
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.HTable;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
||||
/**
|
||||
|
@ -134,7 +135,7 @@ public class MetaEditor {
|
|||
t.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Adds a META row for the specified new region.
|
||||
* @param regionInfo region information
|
||||
|
@ -157,7 +158,7 @@ public class MetaEditor {
|
|||
List<HRegionInfo> regionInfos)
|
||||
throws IOException {
|
||||
List<Put> puts = new ArrayList<Put>();
|
||||
for (HRegionInfo regionInfo : regionInfos) {
|
||||
for (HRegionInfo regionInfo : regionInfos) {
|
||||
puts.add(makePutFromRegionInfo(regionInfo));
|
||||
}
|
||||
putsToMetaTable(catalogTracker, puts);
|
||||
|
@ -306,6 +307,18 @@ public class MetaEditor {
|
|||
return info;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the daughter regions by reading from the corresponding columns of the .META. table
|
||||
* Result. If the region is not a split parent region, it returns PairOfSameType(null, null).
|
||||
*/
|
||||
public static PairOfSameType<HRegionInfo> getDaughterRegions(Result data) throws IOException {
|
||||
HRegionInfo splitA = Writables.getHRegionInfoOrNull(
|
||||
data.getValue(HConstants.CATALOG_FAMILY, HConstants.SPLITA_QUALIFIER));
|
||||
HRegionInfo splitB = Writables.getHRegionInfoOrNull(
|
||||
data.getValue(HConstants.CATALOG_FAMILY, HConstants.SPLITB_QUALIFIER));
|
||||
return new PairOfSameType<HRegionInfo>(splitA, splitB);
|
||||
}
|
||||
|
||||
private static Put addRegionInfo(final Put p, final HRegionInfo hri)
|
||||
throws IOException {
|
||||
p.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
|
||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
|
|||
import org.apache.hadoop.hbase.client.AdminProtocol;
|
||||
import org.apache.hadoop.hbase.client.ClientProtocol;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
|
||||
import org.apache.hadoop.hbase.ipc.HMasterInterface;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
|
@ -400,7 +401,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
++tries) {
|
||||
// Wait for new table to come on-line
|
||||
final AtomicInteger actualRegCount = new AtomicInteger(0);
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitor() {
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
|
||||
@Override
|
||||
public boolean processRow(Result rowResult) throws IOException {
|
||||
HRegionInfo info = Writables.getHRegionInfoOrNull(
|
||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
|||
import org.apache.hadoop.hbase.client.AdminProtocol;
|
||||
import org.apache.hadoop.hbase.client.ClientProtocol;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
|
||||
|
@ -849,7 +850,7 @@ public class HConnectionManager {
|
|||
public boolean isTableAvailable(final byte[] tableName) throws IOException {
|
||||
final AtomicBoolean available = new AtomicBoolean(true);
|
||||
final AtomicInteger regionCount = new AtomicInteger(0);
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitor() {
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
|
||||
@Override
|
||||
public boolean processRow(Result row) throws IOException {
|
||||
byte[] value = row.getValue(HConstants.CATALOG_FAMILY,
|
||||
|
@ -971,7 +972,7 @@ public class HConnectionManager {
|
|||
final byte[] row) {
|
||||
// Implement a new visitor for MetaScanner, and use it to walk through
|
||||
// the .META.
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitor() {
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
|
||||
public boolean processRow(Result result) throws IOException {
|
||||
try {
|
||||
byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
|
||||
|
|
|
@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
|
@ -70,7 +69,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
|
|||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
|
@ -458,28 +456,15 @@ public class HTable implements HTableInterface {
|
|||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
|
||||
final List<byte[]> startKeyList = new ArrayList<byte[]>();
|
||||
final List<byte[]> endKeyList = new ArrayList<byte[]>();
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitor() {
|
||||
public boolean processRow(Result rowResult) throws IOException {
|
||||
byte [] bytes = rowResult.getValue(HConstants.CATALOG_FAMILY,
|
||||
HConstants.REGIONINFO_QUALIFIER);
|
||||
if (bytes == null) {
|
||||
LOG.warn("Null " + HConstants.REGIONINFO_QUALIFIER_STR +
|
||||
" cell in " + rowResult);
|
||||
return true;
|
||||
}
|
||||
HRegionInfo info = Writables.getHRegionInfo(bytes);
|
||||
if (Bytes.equals(info.getTableName(), getTableName())) {
|
||||
if (!(info.isOffline() || info.isSplit())) {
|
||||
startKeyList.add(info.getStartKey());
|
||||
endKeyList.add(info.getEndKey());
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
MetaScanner.metaScan(configuration, visitor, this.tableName);
|
||||
NavigableMap<HRegionInfo, ServerName> regions = getRegionLocations();
|
||||
final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
|
||||
final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
|
||||
|
||||
for (HRegionInfo region : regions.keySet()) {
|
||||
startKeyList.add(region.getStartKey());
|
||||
endKeyList.add(region.getEndKey());
|
||||
}
|
||||
|
||||
return new Pair<byte [][], byte [][]>(
|
||||
startKeyList.toArray(new byte[startKeyList.size()][]),
|
||||
endKeyList.toArray(new byte[endKeyList.size()][]));
|
||||
|
@ -496,32 +481,18 @@ public class HTable implements HTableInterface {
|
|||
final Map<HRegionInfo, HServerAddress> regionMap =
|
||||
new TreeMap<HRegionInfo, HServerAddress>();
|
||||
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitor() {
|
||||
public boolean processRow(Result rowResult) throws IOException {
|
||||
HRegionInfo info = Writables.getHRegionInfo(
|
||||
rowResult.getValue(HConstants.CATALOG_FAMILY,
|
||||
HConstants.REGIONINFO_QUALIFIER));
|
||||
final Map<HRegionInfo, ServerName> regionLocations = getRegionLocations();
|
||||
|
||||
if (!(Bytes.equals(info.getTableName(), getTableName()))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
HServerAddress server = new HServerAddress();
|
||||
byte [] value = rowResult.getValue(HConstants.CATALOG_FAMILY,
|
||||
HConstants.SERVER_QUALIFIER);
|
||||
if (value != null && value.length > 0) {
|
||||
String hostAndPort = Bytes.toString(value);
|
||||
server = new HServerAddress(Addressing.createInetSocketAddressFromHostAndPortStr(hostAndPort));
|
||||
}
|
||||
|
||||
if (!(info.isOffline() || info.isSplit())) {
|
||||
regionMap.put(new UnmodifyableHRegionInfo(info), server);
|
||||
}
|
||||
return true;
|
||||
for (Map.Entry<HRegionInfo, ServerName> entry : regionLocations.entrySet()) {
|
||||
HServerAddress server = new HServerAddress();
|
||||
ServerName serverName = entry.getValue();
|
||||
if (serverName != null && serverName.getHostAndPort() != null) {
|
||||
server = new HServerAddress(Addressing.createInetSocketAddressFromHostAndPortStr(
|
||||
serverName.getHostAndPort()));
|
||||
}
|
||||
regionMap.put(entry.getKey(), server);
|
||||
}
|
||||
|
||||
};
|
||||
MetaScanner.metaScan(configuration, visitor, tableName);
|
||||
return regionMap;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,16 +20,17 @@
|
|||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
@ -43,12 +44,15 @@ import org.apache.hadoop.hbase.util.Writables;
|
|||
* Scanner class that contains the <code>.META.</code> table scanning logic
|
||||
* and uses a Retryable scanner. Provided visitors will be called
|
||||
* for each row.
|
||||
*
|
||||
*
|
||||
* Although public visibility, this is not a public-facing API and may evolve in
|
||||
* minor releases.
|
||||
*
|
||||
* <p> Note that during concurrent region splits, the scanner might not see
|
||||
* META changes across rows (for parent and daughter entries) consistently.
|
||||
* see HBASE-5986, and {@link BlockingMetaScannerVisitor} for details. </p>
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
@InterfaceAudience.Private
|
||||
public class MetaScanner {
|
||||
private static final Log LOG = LogFactory.getLog(MetaScanner.class);
|
||||
/**
|
||||
|
@ -110,7 +114,7 @@ public class MetaScanner {
|
|||
* <code>rowLimit</code> of rows.
|
||||
*
|
||||
* @param configuration HBase configuration.
|
||||
* @param visitor Visitor object.
|
||||
* @param visitor Visitor object. Closes the visitor before returning.
|
||||
* @param tableName User table name in meta table to start scan at. Pass
|
||||
* null if not interested in a particular table.
|
||||
* @param row Name of the row at the user table. The scan will start from
|
||||
|
@ -124,14 +128,18 @@ public class MetaScanner {
|
|||
final MetaScannerVisitor visitor, final byte[] tableName,
|
||||
final byte[] row, final int rowLimit, final byte[] metaTableName)
|
||||
throws IOException {
|
||||
HConnectionManager.execute(new HConnectable<Void>(configuration) {
|
||||
@Override
|
||||
public Void connect(HConnection connection) throws IOException {
|
||||
metaScan(conf, connection, visitor, tableName, row, rowLimit,
|
||||
metaTableName);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
try {
|
||||
HConnectionManager.execute(new HConnectable<Void>(configuration) {
|
||||
@Override
|
||||
public Void connect(HConnection connection) throws IOException {
|
||||
metaScan(conf, connection, visitor, tableName, row, rowLimit,
|
||||
metaTableName);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
visitor.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static void metaScan(Configuration configuration, HConnection connection,
|
||||
|
@ -165,7 +173,7 @@ public class MetaScanner {
|
|||
Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow));
|
||||
}
|
||||
HRegionInfo regionInfo = Writables.getHRegionInfo(value);
|
||||
|
||||
|
||||
byte[] rowBefore = regionInfo.getStartKey();
|
||||
startRow = HRegionInfo.createRegionName(tableName, rowBefore,
|
||||
HConstants.ZEROES, false);
|
||||
|
@ -253,9 +261,9 @@ public class MetaScanner {
|
|||
public static List<HRegionInfo> listAllRegions(Configuration conf, final boolean offlined)
|
||||
throws IOException {
|
||||
final List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitor() {
|
||||
MetaScannerVisitor visitor = new BlockingMetaScannerVisitor(conf) {
|
||||
@Override
|
||||
public boolean processRow(Result result) throws IOException {
|
||||
public boolean processRowInternal(Result result) throws IOException {
|
||||
if (result == null || result.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
@ -284,19 +292,16 @@ public class MetaScanner {
|
|||
* @return Map of all user-space regions to servers
|
||||
* @throws IOException
|
||||
*/
|
||||
public static NavigableMap<HRegionInfo, ServerName> allTableRegions(Configuration conf, final byte [] tablename, final boolean offlined)
|
||||
throws IOException {
|
||||
public static NavigableMap<HRegionInfo, ServerName> allTableRegions(Configuration conf,
|
||||
final byte [] tablename, final boolean offlined) throws IOException {
|
||||
final NavigableMap<HRegionInfo, ServerName> regions =
|
||||
new TreeMap<HRegionInfo, ServerName>();
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitor() {
|
||||
MetaScannerVisitor visitor = new TableMetaScannerVisitor(conf, tablename) {
|
||||
@Override
|
||||
public boolean processRow(Result rowResult) throws IOException {
|
||||
public boolean processRowInternal(Result rowResult) throws IOException {
|
||||
HRegionInfo info = Writables.getHRegionInfo(
|
||||
rowResult.getValue(HConstants.CATALOG_FAMILY,
|
||||
HConstants.REGIONINFO_QUALIFIER));
|
||||
if (!(Bytes.equals(info.getTableName(), tablename))) {
|
||||
return false;
|
||||
}
|
||||
byte [] value = rowResult.getValue(HConstants.CATALOG_FAMILY,
|
||||
HConstants.SERVER_QUALIFIER);
|
||||
String hostAndPort = null;
|
||||
|
@ -324,7 +329,7 @@ public class MetaScanner {
|
|||
/**
|
||||
* Visitor class called to process each row of the .META. table
|
||||
*/
|
||||
public interface MetaScannerVisitor {
|
||||
public interface MetaScannerVisitor extends Closeable {
|
||||
/**
|
||||
* Visitor method that accepts a RowResult and the meta region location.
|
||||
* Implementations can return false to stop the region's loop if it becomes
|
||||
|
@ -336,4 +341,153 @@ public class MetaScanner {
|
|||
*/
|
||||
public boolean processRow(Result rowResult) throws IOException;
|
||||
}
|
||||
|
||||
public static abstract class MetaScannerVisitorBase implements MetaScannerVisitor {
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A MetaScannerVisitor that provides a consistent view of the table's
|
||||
* META entries during concurrent splits (see HBASE-5986 for details). This class
|
||||
* does not guarantee ordered traversal of meta entries, and can block until the
|
||||
* META entries for daughters are available during splits.
|
||||
*/
|
||||
public static abstract class BlockingMetaScannerVisitor
|
||||
extends MetaScannerVisitorBase {
|
||||
|
||||
private static final int DEFAULT_BLOCKING_TIMEOUT = 10000;
|
||||
private Configuration conf;
|
||||
private TreeSet<byte[]> daughterRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||
private int blockingTimeout;
|
||||
private HTable metaTable;
|
||||
|
||||
public BlockingMetaScannerVisitor(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.blockingTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||
DEFAULT_BLOCKING_TIMEOUT);
|
||||
}
|
||||
|
||||
public abstract boolean processRowInternal(Result rowResult) throws IOException;
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
if (metaTable != null) {
|
||||
metaTable.close();
|
||||
metaTable = null;
|
||||
}
|
||||
}
|
||||
|
||||
public HTable getMetaTable() throws IOException {
|
||||
if (metaTable == null) {
|
||||
metaTable = new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
}
|
||||
return metaTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processRow(Result rowResult) throws IOException {
|
||||
HRegionInfo info = Writables.getHRegionInfoOrNull(
|
||||
rowResult.getValue(HConstants.CATALOG_FAMILY,
|
||||
HConstants.REGIONINFO_QUALIFIER));
|
||||
if (info == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (daughterRegions.remove(info.getRegionName())) {
|
||||
return true; //we have already processed this row
|
||||
}
|
||||
|
||||
if (info.isSplitParent()) {
|
||||
/* we have found a parent region which was split. We have to ensure that it's daughters are
|
||||
* seen by this scanner as well, so we block until they are added to the META table. Even
|
||||
* though we are waiting for META entries, ACID semantics in HBase indicates that this
|
||||
* scanner might not see the new rows. So we manually query the daughter rows */
|
||||
HRegionInfo splitA = Writables.getHRegionInfo(rowResult.getValue(HConstants.CATALOG_FAMILY,
|
||||
HConstants.SPLITA_QUALIFIER));
|
||||
HRegionInfo splitB = Writables.getHRegionInfo(rowResult.getValue(HConstants.CATALOG_FAMILY,
|
||||
HConstants.SPLITB_QUALIFIER));
|
||||
|
||||
HTable metaTable = getMetaTable();
|
||||
long start = System.currentTimeMillis();
|
||||
Result resultA = getRegionResultBlocking(metaTable, blockingTimeout,
|
||||
splitA.getRegionName());
|
||||
if (resultA != null) {
|
||||
processRow(resultA);
|
||||
daughterRegions.add(splitA.getRegionName());
|
||||
} else {
|
||||
throw new RegionOfflineException("Split daughter region " +
|
||||
splitA.getRegionNameAsString() + " cannot be found in META.");
|
||||
}
|
||||
long rem = blockingTimeout - (System.currentTimeMillis() - start);
|
||||
|
||||
Result resultB = getRegionResultBlocking(metaTable, rem,
|
||||
splitB.getRegionName());
|
||||
if (resultB != null) {
|
||||
processRow(resultB);
|
||||
daughterRegions.add(splitB.getRegionName());
|
||||
} else {
|
||||
throw new RegionOfflineException("Split daughter region " +
|
||||
splitB.getRegionNameAsString() + " cannot be found in META.");
|
||||
}
|
||||
}
|
||||
|
||||
return processRowInternal(rowResult);
|
||||
}
|
||||
|
||||
private Result getRegionResultBlocking(HTable metaTable, long timeout, byte[] regionName)
|
||||
throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("blocking until region is in META: " + Bytes.toStringBinary(regionName));
|
||||
}
|
||||
long start = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - start < timeout) {
|
||||
Get get = new Get(regionName);
|
||||
Result result = metaTable.get(get);
|
||||
HRegionInfo info = Writables.getHRegionInfoOrNull(
|
||||
result.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
|
||||
if (info != null) {
|
||||
return result;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A MetaScannerVisitor for a table. Provides a consistent view of the table's
|
||||
* META entries during concurrent splits (see HBASE-5986 for details). This class
|
||||
* does not guarantee ordered traversal of meta entries, and can block until the
|
||||
* META entries for daughters are available during splits.
|
||||
*/
|
||||
public static abstract class TableMetaScannerVisitor extends BlockingMetaScannerVisitor {
|
||||
private byte[] tableName;
|
||||
|
||||
public TableMetaScannerVisitor(Configuration conf, byte[] tableName) {
|
||||
super(conf);
|
||||
this.tableName = tableName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean processRow(Result rowResult) throws IOException {
|
||||
HRegionInfo info = Writables.getHRegionInfoOrNull(
|
||||
rowResult.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
|
||||
if (info == null) {
|
||||
return true;
|
||||
}
|
||||
if (!(Bytes.equals(info.getTableName(), tableName))) {
|
||||
return false;
|
||||
}
|
||||
return super.processRow(rowResult);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
|
|||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
|
@ -1424,7 +1425,7 @@ Server {
|
|||
new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
|
||||
|
||||
MetaScannerVisitor visitor =
|
||||
new MetaScannerVisitor() {
|
||||
new MetaScannerVisitorBase() {
|
||||
@Override
|
||||
public boolean processRow(Result data) throws IOException {
|
||||
if (data == null || data.size() <= 0) {
|
||||
|
|
|
@ -20,20 +20,20 @@
|
|||
|
||||
package org.apache.hadoop.hbase.regionserver.metrics;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.metrics.util.MBeanUtil;
|
||||
import org.apache.hadoop.metrics.util.MetricsDynamicMBeanBase;
|
||||
import org.apache.hadoop.metrics.util.MetricsRegistry;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.metrics.MetricsMBeanBase;
|
||||
import org.apache.hadoop.metrics.util.MBeanUtil;
|
||||
import org.apache.hadoop.metrics.util.MetricsRegistry;
|
||||
|
||||
/**
|
||||
* Exports dynamic region server metric recorded in
|
||||
* {@link RegionServerDynamicMetrics} as an MBean
|
||||
* for JMX monitoring.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RegionServerDynamicStatistics extends MetricsDynamicMBeanBase {
|
||||
public class RegionServerDynamicStatistics extends MetricsMBeanBase {
|
||||
private final ObjectName mbeanName;
|
||||
|
||||
public RegionServerDynamicStatistics(MetricsRegistry registry) {
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
|
|||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
|
@ -2185,7 +2186,7 @@ public class HBaseFsck {
|
|||
return false;
|
||||
}
|
||||
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitor() {
|
||||
MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
|
||||
int countRecord = 1;
|
||||
|
||||
// comparator to sort KeyValues with latest modtime
|
||||
|
|
|
@ -17,34 +17,62 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Chore;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.MetaScanner;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
public class TestEndToEndSplitTransaction {
|
||||
private static final Log LOG = LogFactory.getLog(TestEndToEndSplitTransaction.class);
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final Configuration conf = TEST_UTIL.getConfiguration();
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeAllTests() throws Exception {
|
||||
|
@ -56,7 +84,7 @@ public class TestEndToEndSplitTransaction {
|
|||
public static void afterAllTests() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMasterOpsWhileSplitting() throws Exception {
|
||||
byte[] tableName = Bytes.toBytes("TestSplit");
|
||||
|
@ -138,6 +166,338 @@ public class TestEndToEndSplitTransaction {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that the client sees meta table changes as atomic during splits
|
||||
*/
|
||||
@Test
|
||||
public void testFromClientSideWhileSplitting() throws Throwable {
|
||||
LOG.info("Starting testFromClientSideWhileSplitting");
|
||||
final byte[] TABLENAME = Bytes.toBytes("testFromClientSideWhileSplitting");
|
||||
final byte[] FAMILY = Bytes.toBytes("family");
|
||||
|
||||
//SplitTransaction will update the meta table by offlining the parent region, and adding info
|
||||
//for daughters.
|
||||
HTable table = TEST_UTIL.createTable(TABLENAME, FAMILY);
|
||||
|
||||
Stoppable stopper = new SimpleStoppable();
|
||||
RegionSplitter regionSplitter = new RegionSplitter(table);
|
||||
RegionChecker regionChecker = new RegionChecker(conf, stopper, TABLENAME);
|
||||
|
||||
regionChecker.start();
|
||||
regionSplitter.start();
|
||||
|
||||
//wait until the splitter is finished
|
||||
regionSplitter.join();
|
||||
stopper.stop(null);
|
||||
|
||||
if (regionChecker.ex != null) {
|
||||
throw regionChecker.ex;
|
||||
}
|
||||
|
||||
if (regionSplitter.ex != null) {
|
||||
throw regionSplitter.ex;
|
||||
}
|
||||
|
||||
//one final check
|
||||
regionChecker.verify();
|
||||
}
|
||||
|
||||
private static class SimpleStoppable implements Stoppable {
|
||||
volatile boolean stopped = false;
|
||||
|
||||
@Override
|
||||
public void stop(String why) {
|
||||
this.stopped = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStopped() {
|
||||
return stopped;
|
||||
}
|
||||
}
|
||||
|
||||
static class RegionSplitter extends Thread {
|
||||
Throwable ex;
|
||||
HTable table;
|
||||
byte[] tableName, family;
|
||||
HBaseAdmin admin;
|
||||
HTable metaTable;
|
||||
HRegionServer rs;
|
||||
|
||||
RegionSplitter(HTable table) throws IOException {
|
||||
this.table = table;
|
||||
this.tableName = table.getTableName();
|
||||
this.family = table.getTableDescriptor().getFamiliesKeys().iterator().next();
|
||||
admin = TEST_UTIL.getHBaseAdmin();
|
||||
rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
|
||||
metaTable = new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
Random random = new Random();
|
||||
for (int i=0; i< 5; i++) {
|
||||
NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(conf, tableName, false);
|
||||
if (regions.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
int regionIndex = random.nextInt(regions.size());
|
||||
|
||||
//pick a random region and split it into two
|
||||
HRegionInfo region = Iterators.get(regions.keySet().iterator(), regionIndex);
|
||||
|
||||
//pick the mid split point
|
||||
int start = 0, end = Integer.MAX_VALUE;
|
||||
if (region.getStartKey().length > 0) {
|
||||
start = Bytes.toInt(region.getStartKey());
|
||||
}
|
||||
if (region.getEndKey().length > 0) {
|
||||
end = Bytes.toInt(region.getEndKey());
|
||||
}
|
||||
int mid = start + ((end - start) / 2);
|
||||
byte[] splitPoint = Bytes.toBytes(mid);
|
||||
|
||||
//put some rows to the regions
|
||||
addData(start);
|
||||
addData(mid);
|
||||
|
||||
flushAndBlockUntilDone(region.getRegionName());
|
||||
compactAndBlockUntilDone(region.getRegionName());
|
||||
|
||||
log("Initiating region split for:" + region.getRegionNameAsString());
|
||||
try {
|
||||
admin.split(region.getRegionName(), splitPoint);
|
||||
//wait until the split is complete
|
||||
blockUntilRegionSplit(50000, region.getRegionName(), true);
|
||||
|
||||
} catch (NotServingRegionException ex) {
|
||||
//ignore
|
||||
}
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
this.ex = ex;
|
||||
} finally {
|
||||
if (metaTable != null) {
|
||||
IOUtils.closeQuietly(metaTable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void addData(int start) throws IOException {
|
||||
for (int i=start; i< start + 100; i++) {
|
||||
Put put = new Put(Bytes.toBytes(i));
|
||||
|
||||
put.add(family, family, Bytes.toBytes(i));
|
||||
table.put(put);
|
||||
}
|
||||
table.flushCommits();
|
||||
}
|
||||
|
||||
void flushAndBlockUntilDone(byte[] regionName) throws IOException, InterruptedException {
|
||||
log("flushing region: " + Bytes.toStringBinary(regionName));
|
||||
admin.flush(regionName);
|
||||
log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
|
||||
Threads.sleepWithoutInterrupt(500);
|
||||
while (rs.cacheFlusher.getFlushQueueSize() > 0) {
|
||||
Threads.sleep(50);
|
||||
}
|
||||
}
|
||||
|
||||
void compactAndBlockUntilDone(byte[] regionName) throws IOException,
|
||||
InterruptedException {
|
||||
log("Compacting region: " + Bytes.toStringBinary(regionName));
|
||||
admin.majorCompact(regionName);
|
||||
log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
|
||||
Threads.sleepWithoutInterrupt(500);
|
||||
while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
|
||||
Threads.sleep(50);
|
||||
}
|
||||
}
|
||||
|
||||
/** bloks until the region split is complete in META and region server opens the daughters */
|
||||
void blockUntilRegionSplit(long timeout, final byte[] regionName, boolean waitForDaughters)
|
||||
throws IOException, InterruptedException {
|
||||
long start = System.currentTimeMillis();
|
||||
log("blocking until region is split:" + Bytes.toStringBinary(regionName));
|
||||
HRegionInfo daughterA = null, daughterB = null;
|
||||
|
||||
while (System.currentTimeMillis() - start < timeout) {
|
||||
Result result = getRegionRow(regionName);
|
||||
if (result == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
HRegionInfo region = MetaEditor.getHRegionInfo(result);
|
||||
if(region.isSplitParent()) {
|
||||
log("found parent region: " + region.toString());
|
||||
PairOfSameType<HRegionInfo> pair = MetaEditor.getDaughterRegions(result);
|
||||
daughterA = pair.getFirst();
|
||||
daughterB = pair.getSecond();
|
||||
break;
|
||||
}
|
||||
sleep(100);
|
||||
}
|
||||
|
||||
//if we are here, this means the region split is complete or timed out
|
||||
if (waitForDaughters) {
|
||||
long rem = timeout - (System.currentTimeMillis() - start);
|
||||
blockUntilRegionIsInMeta(rem, daughterA.getRegionName());
|
||||
|
||||
rem = timeout - (System.currentTimeMillis() - start);
|
||||
blockUntilRegionIsInMeta(rem, daughterB.getRegionName());
|
||||
|
||||
rem = timeout - (System.currentTimeMillis() - start);
|
||||
blockUntilRegionIsOpenedByRS(rem, daughterA.getRegionName());
|
||||
|
||||
rem = timeout - (System.currentTimeMillis() - start);
|
||||
blockUntilRegionIsOpenedByRS(rem, daughterB.getRegionName());
|
||||
}
|
||||
}
|
||||
|
||||
Result getRegionRow(byte[] regionName) throws IOException {
|
||||
Get get = new Get(regionName);
|
||||
return metaTable.get(get);
|
||||
}
|
||||
|
||||
void blockUntilRegionIsInMeta(long timeout, byte[] regionName)
|
||||
throws IOException, InterruptedException {
|
||||
log("blocking until region is in META: " + Bytes.toStringBinary(regionName));
|
||||
long start = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - start < timeout) {
|
||||
Result result = getRegionRow(regionName);
|
||||
if (result != null) {
|
||||
HRegionInfo info = MetaEditor.getHRegionInfo(result);
|
||||
if (info != null && !info.isOffline()) {
|
||||
log("found region in META: " + Bytes.toStringBinary(regionName));
|
||||
break;
|
||||
}
|
||||
}
|
||||
sleep(10);
|
||||
}
|
||||
}
|
||||
|
||||
void blockUntilRegionIsOpenedByRS(long timeout, byte[] regionName)
|
||||
throws IOException, InterruptedException {
|
||||
log("blocking until region is opened by region server: " + Bytes.toStringBinary(regionName));
|
||||
long start = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - start < timeout) {
|
||||
List<HRegion> regions = rs.getOnlineRegions(tableName);
|
||||
for (HRegion region : regions) {
|
||||
if (Bytes.compareTo(region.getRegionName(), regionName) == 0) {
|
||||
log("found region open in RS: " + Bytes.toStringBinary(regionName));
|
||||
return;
|
||||
}
|
||||
}
|
||||
sleep(10);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks regions using MetaScanner, MetaReader and HTable methods
|
||||
*/
|
||||
static class RegionChecker extends Chore {
|
||||
Configuration conf;
|
||||
byte[] tableName;
|
||||
Throwable ex;
|
||||
|
||||
RegionChecker(Configuration conf, Stoppable stopper, byte[] tableName) {
|
||||
super("RegionChecker", 10, stopper);
|
||||
this.conf = conf;
|
||||
this.tableName = tableName;
|
||||
this.setDaemon(true);
|
||||
}
|
||||
|
||||
/** verify region boundaries obtained from MetaScanner */
|
||||
void verifyRegionsUsingMetaScanner() throws Exception {
|
||||
|
||||
//MetaScanner.allTableRegions()
|
||||
NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(conf, tableName,
|
||||
false);
|
||||
verifyTableRegions(regions.keySet());
|
||||
|
||||
//MetaScanner.listAllRegions()
|
||||
List<HRegionInfo> regionList = MetaScanner.listAllRegions(conf, false);
|
||||
verifyTableRegions(Sets.newTreeSet(regionList));
|
||||
}
|
||||
|
||||
/** verify region boundaries obtained from HTable.getStartEndKeys() */
|
||||
void verifyRegionsUsingHTable() throws IOException {
|
||||
HTable table = null;
|
||||
try {
|
||||
//HTable.getStartEndKeys()
|
||||
table = new HTable(conf, tableName);
|
||||
Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
|
||||
verifyStartEndKeys(keys);
|
||||
|
||||
//HTable.getRegionsInfo()
|
||||
Map<HRegionInfo, HServerAddress> regions = table.getRegionsInfo();
|
||||
verifyTableRegions(regions.keySet());
|
||||
} finally {
|
||||
IOUtils.closeQuietly(table);
|
||||
}
|
||||
}
|
||||
|
||||
void verify() throws Exception {
|
||||
verifyRegionsUsingMetaScanner();
|
||||
verifyRegionsUsingHTable();
|
||||
}
|
||||
|
||||
void verifyTableRegions(Set<HRegionInfo> regions) {
|
||||
log("Verifying " + regions.size() + " regions");
|
||||
|
||||
byte[][] startKeys = new byte[regions.size()][];
|
||||
byte[][] endKeys = new byte[regions.size()][];
|
||||
|
||||
int i=0;
|
||||
for (HRegionInfo region : regions) {
|
||||
startKeys[i] = region.getStartKey();
|
||||
endKeys[i] = region.getEndKey();
|
||||
i++;
|
||||
}
|
||||
|
||||
Pair<byte[][], byte[][]> keys = new Pair<byte[][], byte[][]>(startKeys, endKeys);
|
||||
verifyStartEndKeys(keys);
|
||||
}
|
||||
|
||||
void verifyStartEndKeys(Pair<byte[][], byte[][]> keys) {
|
||||
byte[][] startKeys = keys.getFirst();
|
||||
byte[][] endKeys = keys.getSecond();
|
||||
assertEquals(startKeys.length, endKeys.length);
|
||||
assertTrue("Found 0 regions for the table", startKeys.length > 0);
|
||||
|
||||
assertArrayEquals("Start key for the first region is not byte[0]",
|
||||
HConstants.EMPTY_START_ROW, startKeys[0]);
|
||||
byte[] prevEndKey = HConstants.EMPTY_START_ROW;
|
||||
|
||||
// ensure that we do not have any gaps
|
||||
for (int i=0; i<startKeys.length; i++) {
|
||||
assertArrayEquals(
|
||||
"Hole in .META. is detected. prevEndKey=" + Bytes.toStringBinary(prevEndKey)
|
||||
+ " ,regionStartKey=" + Bytes.toStringBinary(startKeys[i]), prevEndKey,
|
||||
startKeys[i]);
|
||||
prevEndKey = endKeys[i];
|
||||
}
|
||||
assertArrayEquals("End key for the last region is not byte[0]", HConstants.EMPTY_END_ROW,
|
||||
endKeys[endKeys.length - 1]);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void chore() {
|
||||
try {
|
||||
verify();
|
||||
} catch (Throwable ex) {
|
||||
this.ex = ex;
|
||||
stopper.stop("caught exception");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void log(String msg) {
|
||||
LOG.info(msg);
|
||||
}
|
||||
|
||||
@org.junit.Rule
|
||||
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||
|
|
Loading…
Reference in New Issue