HBASE-22317 Support reading from meta replicas
This commit is contained in:
parent
aa9f36ae53
commit
940f374a07
|
@ -17,8 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
|
||||
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
|
||||
import static org.apache.hadoop.hbase.HConstants.NINES;
|
||||
import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
|
||||
import static org.apache.hadoop.hbase.HConstants.ZEROES;
|
||||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
|
||||
|
@ -87,6 +89,8 @@ class AsyncNonMetaRegionLocator {
|
|||
|
||||
private final int locatePrefetchLimit;
|
||||
|
||||
private final boolean useMetaReplicas;
|
||||
|
||||
private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
|
||||
|
||||
private static final class LocateRequest {
|
||||
|
@ -193,6 +197,8 @@ class AsyncNonMetaRegionLocator {
|
|||
MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
|
||||
this.locatePrefetchLimit =
|
||||
conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
|
||||
this.useMetaReplicas =
|
||||
conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
|
||||
}
|
||||
|
||||
private TableCache getTableCache(TableName tableName) {
|
||||
|
@ -421,69 +427,72 @@ class AsyncNonMetaRegionLocator {
|
|||
}
|
||||
byte[] metaStopKey =
|
||||
RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
|
||||
conn.getTable(META_TABLE_NAME)
|
||||
.scan(new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
|
||||
.addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
|
||||
.setReadType(ReadType.PREAD), new AdvancedScanResultConsumer() {
|
||||
Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
|
||||
.addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
|
||||
.setReadType(ReadType.PREAD);
|
||||
if (useMetaReplicas) {
|
||||
scan.setConsistency(Consistency.TIMELINE);
|
||||
}
|
||||
conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
|
||||
|
||||
private boolean completeNormally = false;
|
||||
private boolean completeNormally = false;
|
||||
|
||||
private boolean tableNotFound = true;
|
||||
private boolean tableNotFound = true;
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
complete(tableName, req, null, error);
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
complete(tableName, req, null, error);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
if (tableNotFound) {
|
||||
complete(tableName, req, null, new TableNotFoundException(tableName));
|
||||
} else if (!completeNormally) {
|
||||
complete(tableName, req, null, new IOException(
|
||||
"Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(Result[] results, ScanController controller) {
|
||||
if (results.length == 0) {
|
||||
return;
|
||||
}
|
||||
tableNotFound = false;
|
||||
int i = 0;
|
||||
for (; i < results.length; i++) {
|
||||
if (onScanNext(tableName, req, results[i])) {
|
||||
completeNormally = true;
|
||||
controller.terminate();
|
||||
i++;
|
||||
break;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
if (tableNotFound) {
|
||||
complete(tableName, req, null, new TableNotFoundException(tableName));
|
||||
} else if (!completeNormally) {
|
||||
complete(tableName, req, null, new IOException("Unable to find region for '" +
|
||||
Bytes.toStringBinary(req.row) + "' in " + tableName));
|
||||
}
|
||||
// Add the remaining results into cache
|
||||
if (i < results.length) {
|
||||
TableCache tableCache = getTableCache(tableName);
|
||||
for (; i < results.length; i++) {
|
||||
RegionLocations locs = MetaTableAccessor.getRegionLocations(results[i]);
|
||||
if (locs == null) {
|
||||
continue;
|
||||
}
|
||||
HRegionLocation loc = locs.getDefaultRegionLocation();
|
||||
if (loc == null) {
|
||||
continue;
|
||||
}
|
||||
RegionInfo info = loc.getRegion();
|
||||
if (info == null || info.isOffline() || info.isSplitParent()) {
|
||||
continue;
|
||||
}
|
||||
RegionLocations addedLocs = addToCache(tableCache, locs);
|
||||
synchronized (tableCache) {
|
||||
tableCache.clearCompletedRequests(Optional.of(addedLocs));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(Result[] results, ScanController controller) {
|
||||
if (results.length == 0) {
|
||||
return;
|
||||
}
|
||||
tableNotFound = false;
|
||||
int i = 0;
|
||||
for (; i < results.length; i++) {
|
||||
if (onScanNext(tableName, req, results[i])) {
|
||||
completeNormally = true;
|
||||
controller.terminate();
|
||||
i++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Add the remaining results into cache
|
||||
if (i < results.length) {
|
||||
TableCache tableCache = getTableCache(tableName);
|
||||
for (; i < results.length; i++) {
|
||||
RegionLocations locs = MetaTableAccessor.getRegionLocations(results[i]);
|
||||
if (locs == null) {
|
||||
continue;
|
||||
}
|
||||
HRegionLocation loc = locs.getDefaultRegionLocation();
|
||||
if (loc == null) {
|
||||
continue;
|
||||
}
|
||||
RegionInfo info = loc.getRegion();
|
||||
if (info == null || info.isOffline() || info.isSplitParent()) {
|
||||
continue;
|
||||
}
|
||||
RegionLocations addedLocs = addToCache(tableCache, locs);
|
||||
synchronized (tableCache) {
|
||||
tableCache.clearCompletedRequests(Optional.of(addedLocs));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,
|
||||
|
|
|
@ -59,8 +59,8 @@ public abstract class AbstractTestRegionLocator {
|
|||
UTIL.getAdmin().createTable(td, SPLIT_KEYS);
|
||||
UTIL.waitTableAvailable(TABLE_NAME);
|
||||
try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(UTIL.getConfiguration())) {
|
||||
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(registry,
|
||||
REGION_REPLICATION);
|
||||
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(UTIL.getConfiguration(),
|
||||
registry, REGION_REPLICATION);
|
||||
}
|
||||
UTIL.getAdmin().balancerSwitch(false, true);
|
||||
}
|
||||
|
|
|
@ -22,12 +22,14 @@ import static org.junit.Assert.assertNotNull;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||
|
||||
final class RegionReplicaTestHelper {
|
||||
|
@ -36,12 +38,10 @@ final class RegionReplicaTestHelper {
|
|||
}
|
||||
|
||||
// waits for all replicas to have region location
|
||||
static void waitUntilAllMetaReplicasHavingRegionLocation(AsyncRegistry registry,
|
||||
int regionReplication) throws IOException {
|
||||
TestZKAsyncRegistry.TEST_UTIL.waitFor(
|
||||
TestZKAsyncRegistry.TEST_UTIL.getConfiguration()
|
||||
.getLong("hbase.client.sync.wait.timeout.msec", 60000),
|
||||
200, true, new ExplainingPredicate<IOException>() {
|
||||
static void waitUntilAllMetaReplicasHavingRegionLocation(Configuration conf,
|
||||
AsyncRegistry registry, int regionReplication) throws IOException {
|
||||
Waiter.waitFor(conf, conf.getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
|
||||
new ExplainingPredicate<IOException>() {
|
||||
@Override
|
||||
public String explainFailure() throws IOException {
|
||||
return "Not all meta replicas get assigned";
|
||||
|
|
|
@ -54,7 +54,8 @@ public class TestAsyncAdminWithRegionReplicas extends TestAsyncAdminBase {
|
|||
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
|
||||
TestAsyncAdminBase.setUpBeforeClass();
|
||||
try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) {
|
||||
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(registry, 3);
|
||||
RegionReplicaTestHelper
|
||||
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), registry, 3);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -55,7 +55,8 @@ public class TestAsyncMetaRegionLocator {
|
|||
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
|
||||
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3);
|
||||
RegionReplicaTestHelper
|
||||
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3);
|
||||
TEST_UTIL.getAdmin().balancerSwitch(false, true);
|
||||
LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ ClientTests.class, LargeTests.class })
|
||||
public class TestAsyncTableUseMetaReplicas {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAsyncTableUseMetaReplicas.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName TABLE_NAME = TableName.valueOf("Replica");
|
||||
|
||||
private static byte[] FAMILY = Bytes.toBytes("Family");
|
||||
|
||||
private static byte[] QUALIFIER = Bytes.toBytes("Qual");
|
||||
|
||||
private static byte[] ROW = Bytes.toBytes("Row");
|
||||
|
||||
private static byte[] VALUE = Bytes.toBytes("Value");
|
||||
|
||||
private static volatile boolean FAIL_PRIMARY_SCAN = false;
|
||||
|
||||
public static final class FailPrimaryMetaScanCp implements RegionObserver, RegionCoprocessor {
|
||||
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
|
||||
throws IOException {
|
||||
RegionInfo region = c.getEnvironment().getRegionInfo();
|
||||
if (FAIL_PRIMARY_SCAN && TableName.isMetaTableName(region.getTable()) &&
|
||||
region.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
|
||||
throw new IOException("Inject error");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.setInt(HConstants.META_REPLICAS_NUM, 3);
|
||||
conf.setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 1000);
|
||||
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
FailPrimaryMetaScanCp.class.getName());
|
||||
UTIL.startMiniCluster(3);
|
||||
try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf)) {
|
||||
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(conf, registry, 3);
|
||||
}
|
||||
try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
|
||||
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
|
||||
}
|
||||
UTIL.flush(TableName.META_TABLE_NAME);
|
||||
// wait for the store file refresh so we can read the region location from secondary meta
|
||||
// replicas
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownAfterTest() {
|
||||
// make sure we do not mess up cleanup code.
|
||||
FAIL_PRIMARY_SCAN = false;
|
||||
}
|
||||
|
||||
private void testRead(boolean useMetaReplicas)
|
||||
throws IOException, InterruptedException, ExecutionException {
|
||||
FAIL_PRIMARY_SCAN = true;
|
||||
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||
conf.setBoolean(HConstants.USE_META_REPLICAS, useMetaReplicas);
|
||||
conf.setLong(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, TimeUnit.SECONDS.toMicros(1));
|
||||
try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) {
|
||||
Result result = FutureUtils.get(conn.getTableBuilder(TABLE_NAME)
|
||||
.setOperationTimeout(3, TimeUnit.SECONDS).build().get(new Get(ROW)));
|
||||
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = RetriesExhaustedException.class)
|
||||
public void testNotUseMetaReplicas()
|
||||
throws IOException, InterruptedException, ExecutionException {
|
||||
testRead(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUseMetaReplicas() throws IOException, InterruptedException, ExecutionException {
|
||||
testRead(true);
|
||||
}
|
||||
}
|
|
@ -1071,4 +1071,22 @@ public class TestConnectionImplementation {
|
|||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetaLookupThreadPoolCreated() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
|
||||
if (TEST_UTIL.getAdmin().tableExists(tableName)) {
|
||||
TEST_UTIL.getAdmin().disableTable(tableName);
|
||||
TEST_UTIL.getAdmin().deleteTable(tableName);
|
||||
}
|
||||
try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) {
|
||||
byte[] row = Bytes.toBytes("test");
|
||||
ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection());
|
||||
// check that metalookup pool would get created
|
||||
c.relocateRegion(tableName, row);
|
||||
ExecutorService ex = c.getCurrentMetaLookupPool();
|
||||
assertNotNull(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -17,22 +17,18 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.util.hbck.HbckTestingUtil.assertErrors;
|
||||
import static org.apache.hadoop.hbase.util.hbck.HbckTestingUtil.doFsck;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
|
@ -42,31 +38,23 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.HBaseFsck;
|
||||
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
|
||||
import org.apache.hadoop.hbase.util.HBaseFsckRepair;
|
||||
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -157,8 +145,8 @@ public class TestMetaWithReplicas {
|
|||
|
||||
@Test
|
||||
public void testMetaHTDReplicaCount() throws Exception {
|
||||
assertTrue(TEST_UTIL.getAdmin().getTableDescriptor(TableName.META_TABLE_NAME)
|
||||
.getRegionReplication() == 3);
|
||||
assertEquals(3,
|
||||
TEST_UTIL.getAdmin().getDescriptor(TableName.META_TABLE_NAME).getRegionReplication());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -216,12 +204,13 @@ public class TestMetaWithReplicas {
|
|||
util.getAdmin().disableTable(TABLE);
|
||||
util.getAdmin().deleteTable(TABLE);
|
||||
}
|
||||
byte[] row = Bytes.toBytes("test");
|
||||
ServerName master = null;
|
||||
try (Connection c = ConnectionFactory.createConnection(util.getConfiguration());) {
|
||||
try (Table htable = util.createTable(TABLE, FAMILIES);) {
|
||||
try (Connection c = ConnectionFactory.createConnection(conf)) {
|
||||
try (Table htable = util.createTable(TABLE, FAMILIES)) {
|
||||
util.getAdmin().flush(TableName.META_TABLE_NAME);
|
||||
Thread.sleep(conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
|
||||
30000) * 6);
|
||||
Thread.sleep(
|
||||
conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 30000) * 6);
|
||||
List<RegionInfo> regions = MetaTableAccessor.getTableRegions(c, TABLE);
|
||||
HRegionLocation hrl = MetaTableAccessor.getRegionLocation(c, regions.get(0));
|
||||
// Ensure that the primary server for test table is not the same one as the primary
|
||||
|
@ -230,7 +219,7 @@ public class TestMetaWithReplicas {
|
|||
// If the servers are the same, then move the test table's region out of the server
|
||||
// to another random server
|
||||
if (hrl.getServerName().equals(primary)) {
|
||||
util.getAdmin().move(hrl.getRegionInfo().getEncodedNameAsBytes());
|
||||
util.getAdmin().move(hrl.getRegion().getEncodedNameAsBytes());
|
||||
// wait for the move to complete
|
||||
do {
|
||||
Thread.sleep(10);
|
||||
|
@ -253,21 +242,17 @@ public class TestMetaWithReplicas {
|
|||
util.getHBaseClusterInterface().killRegionServer(primary);
|
||||
util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000);
|
||||
}
|
||||
((ClusterConnection)c).clearRegionLocationCache();
|
||||
c.clearRegionLocationCache();
|
||||
}
|
||||
LOG.info("Running GETs");
|
||||
Get get = null;
|
||||
Result r = null;
|
||||
byte[] row = "test".getBytes();
|
||||
try (Table htable = c.getTable(TABLE);) {
|
||||
try (Table htable = c.getTable(TABLE)) {
|
||||
Put put = new Put(row);
|
||||
put.addColumn("foo".getBytes(), row, row);
|
||||
BufferedMutator m = c.getBufferedMutator(TABLE);
|
||||
m.mutate(put);
|
||||
m.flush();
|
||||
// Try to do a get of the row that was just put
|
||||
get = new Get(row);
|
||||
r = htable.get(get);
|
||||
Result r = htable.get(new Get(row));
|
||||
assertTrue(Arrays.equals(r.getRow(), row));
|
||||
// now start back the killed servers and disable use of replicas. That would mean
|
||||
// calls go to the primary
|
||||
|
@ -276,151 +261,19 @@ public class TestMetaWithReplicas {
|
|||
util.getHBaseClusterInterface().startRegionServer(primary.getHostname(), 0);
|
||||
util.getHBaseClusterInterface().waitForActiveAndReadyMaster();
|
||||
LOG.info("Master active!");
|
||||
((ClusterConnection)c).clearRegionLocationCache();
|
||||
c.clearRegionLocationCache();
|
||||
}
|
||||
conf.setBoolean(HConstants.USE_META_REPLICAS, false);
|
||||
LOG.info("Running GETs no replicas");
|
||||
try (Table htable = c.getTable(TABLE);) {
|
||||
r = htable.get(get);
|
||||
}
|
||||
conf.setBoolean(HConstants.USE_META_REPLICAS, false);
|
||||
LOG.info("Running GETs no replicas");
|
||||
try (Connection c = ConnectionFactory.createConnection(conf)) {
|
||||
try (Table htable = c.getTable(TABLE)) {
|
||||
Result r = htable.get(new Get(row));
|
||||
assertTrue(Arrays.equals(r.getRow(), row));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetaLookupThreadPoolCreated() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
|
||||
if (TEST_UTIL.getAdmin().tableExists(tableName)) {
|
||||
TEST_UTIL.getAdmin().disableTable(tableName);
|
||||
TEST_UTIL.getAdmin().deleteTable(tableName);
|
||||
}
|
||||
try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES);) {
|
||||
byte[] row = "test".getBytes();
|
||||
ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection());
|
||||
// check that metalookup pool would get created
|
||||
c.relocateRegion(tableName, row);
|
||||
ExecutorService ex = c.getCurrentMetaLookupPool();
|
||||
assert(ex != null);
|
||||
}
|
||||
}
|
||||
|
||||
@Ignore @Test // Uses FSCK. Needs fixing after HBASE-14614.
|
||||
public void testChangingReplicaCount() throws Exception {
|
||||
// tests changing the replica count across master restarts
|
||||
// reduce the replica count from 3 to 2
|
||||
stopMasterAndValidateReplicaCount(3, 2);
|
||||
// increase the replica count from 2 to 3
|
||||
stopMasterAndValidateReplicaCount(2, 3);
|
||||
}
|
||||
|
||||
private void stopMasterAndValidateReplicaCount(final int originalReplicaCount,
|
||||
final int newReplicaCount)
|
||||
throws Exception {
|
||||
ServerName sn = TEST_UTIL.getHBaseClusterInterface().getClusterMetrics().getMasterName();
|
||||
TEST_UTIL.getHBaseClusterInterface().stopMaster(sn);
|
||||
TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(sn, 60000);
|
||||
List<String> metaZnodes = TEST_UTIL.getZooKeeperWatcher().getMetaReplicaNodes();
|
||||
assert(metaZnodes.size() == originalReplicaCount); //we should have what was configured before
|
||||
TEST_UTIL.getHBaseClusterInterface().getConf().setInt(HConstants.META_REPLICAS_NUM,
|
||||
newReplicaCount);
|
||||
if (TEST_UTIL.getHBaseCluster().countServedRegions() < newReplicaCount) {
|
||||
TEST_UTIL.getHBaseCluster().startRegionServer();
|
||||
}
|
||||
TEST_UTIL.getHBaseClusterInterface().startMaster(sn.getHostname(), 0);
|
||||
TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster();
|
||||
TEST_UTIL.waitFor(10000, predicateMetaHasReplicas(newReplicaCount));
|
||||
// also check if hbck returns without errors
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM,
|
||||
newReplicaCount);
|
||||
HBaseFsck hbck = HbckTestingUtil.doFsck(TEST_UTIL.getConfiguration(), false);
|
||||
HbckTestingUtil.assertNoErrors(hbck);
|
||||
}
|
||||
|
||||
private Waiter.ExplainingPredicate<Exception> predicateMetaHasReplicas(
|
||||
final int newReplicaCount) {
|
||||
return new Waiter.ExplainingPredicate<Exception>() {
|
||||
@Override
|
||||
public String explainFailure() throws Exception {
|
||||
return checkMetaLocationAndExplain(newReplicaCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return checkMetaLocationAndExplain(newReplicaCount) == null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private String checkMetaLocationAndExplain(int originalReplicaCount)
|
||||
throws KeeperException, IOException {
|
||||
List<String> metaZnodes = TEST_UTIL.getZooKeeperWatcher().getMetaReplicaNodes();
|
||||
if (metaZnodes.size() == originalReplicaCount) {
|
||||
RegionLocations rl = ((ClusterConnection) TEST_UTIL.getConnection())
|
||||
.locateRegion(TableName.META_TABLE_NAME,
|
||||
HConstants.EMPTY_START_ROW, false, false);
|
||||
for (HRegionLocation location : rl.getRegionLocations()) {
|
||||
if (location == null) {
|
||||
return "Null location found in " + rl.toString();
|
||||
}
|
||||
if (location.getRegionInfo() == null) {
|
||||
return "Null regionInfo for location " + location;
|
||||
}
|
||||
if (location.getHostname() == null) {
|
||||
return "Null hostName for location " + location;
|
||||
}
|
||||
}
|
||||
return null; // OK
|
||||
}
|
||||
return "Replica count is not as expected " + originalReplicaCount + " <> " + metaZnodes.size()
|
||||
+ "(" + metaZnodes.toString() + ")";
|
||||
}
|
||||
|
||||
@Ignore @Test
|
||||
public void testHBaseFsckWithMetaReplicas() throws Exception {
|
||||
HBaseFsck hbck = HbckTestingUtil.doFsck(TEST_UTIL.getConfiguration(), false);
|
||||
HbckTestingUtil.assertNoErrors(hbck);
|
||||
}
|
||||
|
||||
@Ignore @Test // Disabled. Relies on FSCK which needs work for AMv2.
|
||||
public void testHBaseFsckWithFewerMetaReplicas() throws Exception {
|
||||
ClusterConnection c = (ClusterConnection)ConnectionFactory.createConnection(
|
||||
TEST_UTIL.getConfiguration());
|
||||
RegionLocations rl = c.locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
|
||||
false, false);
|
||||
HBaseFsckRepair.closeRegionSilentlyAndWait(c,
|
||||
rl.getRegionLocation(1).getServerName(), rl.getRegionLocation(1).getRegionInfo());
|
||||
// check that problem exists
|
||||
HBaseFsck hbck = doFsck(TEST_UTIL.getConfiguration(), false);
|
||||
assertErrors(hbck, new ERROR_CODE[]{ERROR_CODE.UNKNOWN,ERROR_CODE.NO_META_REGION});
|
||||
// fix the problem
|
||||
hbck = doFsck(TEST_UTIL.getConfiguration(), true);
|
||||
// run hbck again to make sure we don't see any errors
|
||||
hbck = doFsck(TEST_UTIL.getConfiguration(), false);
|
||||
assertErrors(hbck, new ERROR_CODE[]{});
|
||||
}
|
||||
|
||||
@Ignore @Test // The close silently doesn't work any more since HBASE-14614. Fix.
|
||||
public void testHBaseFsckWithFewerMetaReplicaZnodes() throws Exception {
|
||||
ClusterConnection c = (ClusterConnection)ConnectionFactory.createConnection(
|
||||
TEST_UTIL.getConfiguration());
|
||||
RegionLocations rl = c.locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
|
||||
false, false);
|
||||
HBaseFsckRepair.closeRegionSilentlyAndWait(c,
|
||||
rl.getRegionLocation(2).getServerName(), rl.getRegionLocation(2).getRegionInfo());
|
||||
ZKWatcher zkw = TEST_UTIL.getZooKeeperWatcher();
|
||||
ZKUtil.deleteNode(zkw, zkw.getZNodePaths().getZNodeForReplica(2));
|
||||
// check that problem exists
|
||||
HBaseFsck hbck = doFsck(TEST_UTIL.getConfiguration(), false);
|
||||
assertErrors(hbck, new ERROR_CODE[]{ERROR_CODE.UNKNOWN,ERROR_CODE.NO_META_REGION});
|
||||
// fix the problem
|
||||
hbck = doFsck(TEST_UTIL.getConfiguration(), true);
|
||||
// run hbck again to make sure we don't see any errors
|
||||
hbck = doFsck(TEST_UTIL.getConfiguration(), false);
|
||||
assertErrors(hbck, new ERROR_CODE[]{});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAccessingUnknownTables() throws Exception {
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
|
@ -457,14 +310,14 @@ public class TestMetaWithReplicas {
|
|||
moveToServer = s;
|
||||
}
|
||||
}
|
||||
assert(moveToServer != null);
|
||||
assertNotNull(moveToServer);
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
TEST_UTIL.createTable(tableName, "f");
|
||||
assertTrue(TEST_UTIL.getAdmin().tableExists(tableName));
|
||||
TEST_UTIL.getAdmin().move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
|
||||
moveToServer);
|
||||
int i = 0;
|
||||
assert !moveToServer.equals(currentServer);
|
||||
assertNotEquals(currentServer, moveToServer);
|
||||
LOG.info("CurrentServer=" + currentServer + ", moveToServer=" + moveToServer);
|
||||
final int max = 10000;
|
||||
do {
|
||||
|
@ -473,7 +326,7 @@ public class TestMetaWithReplicas {
|
|||
currentServer = ProtobufUtil.toServerName(data);
|
||||
i++;
|
||||
} while (!moveToServer.equals(currentServer) && i < max); //wait for 10 seconds overall
|
||||
assert(i != max);
|
||||
assertNotEquals(max, i);
|
||||
TEST_UTIL.getAdmin().disableTable(tableName);
|
||||
assertTrue(TEST_UTIL.getAdmin().isTableDisabled(tableName));
|
||||
}
|
||||
|
@ -482,39 +335,19 @@ public class TestMetaWithReplicas {
|
|||
public void testShutdownOfReplicaHolder() throws Exception {
|
||||
// checks that the when the server holding meta replica is shut down, the meta replica
|
||||
// can be recovered
|
||||
try (ClusterConnection conn = (ClusterConnection)
|
||||
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());) {
|
||||
RegionLocations rl = conn.
|
||||
locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true);
|
||||
HRegionLocation hrl = rl.getRegionLocation(1);
|
||||
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
|
||||
RegionLocator locator = conn.getRegionLocator(TableName.META_TABLE_NAME)) {
|
||||
HRegionLocation hrl = locator.getRegionLocations(HConstants.EMPTY_START_ROW, true).get(1);
|
||||
ServerName oldServer = hrl.getServerName();
|
||||
TEST_UTIL.getHBaseClusterInterface().killRegionServer(oldServer);
|
||||
int i = 0;
|
||||
do {
|
||||
LOG.debug("Waiting for the replica " + hrl.getRegionInfo() + " to come up");
|
||||
Thread.sleep(10000); //wait for the detection/recovery
|
||||
rl = conn.locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true);
|
||||
hrl = rl.getRegionLocation(1);
|
||||
LOG.debug("Waiting for the replica " + hrl.getRegion() + " to come up");
|
||||
Thread.sleep(10000); // wait for the detection/recovery
|
||||
hrl = locator.getRegionLocations(HConstants.EMPTY_START_ROW, true).get(1);
|
||||
i++;
|
||||
} while ((hrl == null || hrl.getServerName().equals(oldServer)) && i < 3);
|
||||
assertTrue(i != 3);
|
||||
assertNotEquals(3, i);
|
||||
}
|
||||
}
|
||||
|
||||
@Ignore @Test // Disabled because fsck and this needs work for AMv2
|
||||
public void testHBaseFsckWithExcessMetaReplicas() throws Exception {
|
||||
// Create a meta replica (this will be the 4th one) and assign it
|
||||
RegionInfo h = RegionReplicaUtil.getRegionInfoForReplica(
|
||||
RegionInfoBuilder.FIRST_META_REGIONINFO, 3);
|
||||
TEST_UTIL.assignRegion(h);
|
||||
HBaseFsckRepair.waitUntilAssigned(TEST_UTIL.getAdmin(), h);
|
||||
// check that problem exists
|
||||
HBaseFsck hbck = doFsck(TEST_UTIL.getConfiguration(), false);
|
||||
assertErrors(hbck, new ERROR_CODE[]{ERROR_CODE.UNKNOWN, ERROR_CODE.SHOULD_NOT_BE_DEPLOYED});
|
||||
// fix the problem
|
||||
hbck = doFsck(TEST_UTIL.getConfiguration(), true);
|
||||
// run hbck again to make sure we don't see any errors
|
||||
hbck = doFsck(TEST_UTIL.getConfiguration(), false);
|
||||
assertErrors(hbck, new ERROR_CODE[]{});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,7 +82,8 @@ public class TestZKAsyncRegistry {
|
|||
assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
|
||||
REGISTRY.getMasterAddress().get());
|
||||
assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue());
|
||||
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3);
|
||||
RegionReplicaTestHelper
|
||||
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3);
|
||||
RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
|
||||
assertEquals(3, locs.getRegionLocations().length);
|
||||
IntStream.range(0, 3).forEach(i -> {
|
||||
|
|
Loading…
Reference in New Issue