HBASE-22317 Support reading from meta replicas

This commit is contained in:
Duo Zhang 2019-04-28 09:39:48 +08:00 committed by Apache9
parent 962585d376
commit 4477dd5de8
9 changed files with 267 additions and 264 deletions

View File

@ -17,8 +17,10 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.NINES; import static org.apache.hadoop.hbase.HConstants.NINES;
import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
import static org.apache.hadoop.hbase.HConstants.ZEROES; import static org.apache.hadoop.hbase.HConstants.ZEROES;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError; import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
@ -87,6 +89,8 @@ class AsyncNonMetaRegionLocator {
private final int locatePrefetchLimit; private final int locatePrefetchLimit;
private final boolean useMetaReplicas;
private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>(); private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
private static final class LocateRequest { private static final class LocateRequest {
@ -193,6 +197,8 @@ class AsyncNonMetaRegionLocator {
MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE); MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
this.locatePrefetchLimit = this.locatePrefetchLimit =
conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT); conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
this.useMetaReplicas =
conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
} }
private TableCache getTableCache(TableName tableName) { private TableCache getTableCache(TableName tableName) {
@ -425,10 +431,13 @@ class AsyncNonMetaRegionLocator {
} }
byte[] metaStopKey = byte[] metaStopKey =
RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
conn.getTable(META_TABLE_NAME) Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
.scan(new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
.addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit) .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
.setReadType(ReadType.PREAD), new AdvancedScanResultConsumer() { .setReadType(ReadType.PREAD);
if (useMetaReplicas) {
scan.setConsistency(Consistency.TIMELINE);
}
conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
private boolean completeNormally = false; private boolean completeNormally = false;
@ -444,8 +453,8 @@ class AsyncNonMetaRegionLocator {
if (tableNotFound) { if (tableNotFound) {
complete(tableName, req, null, new TableNotFoundException(tableName)); complete(tableName, req, null, new TableNotFoundException(tableName));
} else if (!completeNormally) { } else if (!completeNormally) {
complete(tableName, req, null, new IOException("Unable to find region for '" + complete(tableName, req, null, new IOException(
Bytes.toStringBinary(req.row) + "' in " + tableName)); "Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
} }
} }

View File

@ -59,8 +59,8 @@ public abstract class AbstractTestRegionLocator {
UTIL.getAdmin().createTable(td, SPLIT_KEYS); UTIL.getAdmin().createTable(td, SPLIT_KEYS);
UTIL.waitTableAvailable(TABLE_NAME); UTIL.waitTableAvailable(TABLE_NAME);
try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(UTIL.getConfiguration())) { try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(UTIL.getConfiguration())) {
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(registry, RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(UTIL.getConfiguration(),
REGION_REPLICATION); registry, REGION_REPLICATION);
} }
UTIL.getAdmin().balancerSwitch(false, true); UTIL.getAdmin().balancerSwitch(false, true);
} }

View File

@ -22,12 +22,14 @@ import static org.junit.Assert.assertNotNull;
import java.io.IOException; import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
final class RegionReplicaTestHelper { final class RegionReplicaTestHelper {
@ -36,12 +38,10 @@ final class RegionReplicaTestHelper {
} }
// waits for all replicas to have region location // waits for all replicas to have region location
static void waitUntilAllMetaReplicasHavingRegionLocation(AsyncRegistry registry, static void waitUntilAllMetaReplicasHavingRegionLocation(Configuration conf,
int regionReplication) throws IOException { AsyncRegistry registry, int regionReplication) throws IOException {
TestZKAsyncRegistry.TEST_UTIL.waitFor( Waiter.waitFor(conf, conf.getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
TestZKAsyncRegistry.TEST_UTIL.getConfiguration() new ExplainingPredicate<IOException>() {
.getLong("hbase.client.sync.wait.timeout.msec", 60000),
200, true, new ExplainingPredicate<IOException>() {
@Override @Override
public String explainFailure() throws IOException { public String explainFailure() throws IOException {
return "Not all meta replicas get assigned"; return "Not all meta replicas get assigned";

View File

@ -54,7 +54,8 @@ public class TestAsyncAdminWithRegionReplicas extends TestAsyncAdminBase {
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TestAsyncAdminBase.setUpBeforeClass(); TestAsyncAdminBase.setUpBeforeClass();
try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) { try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) {
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(registry, 3); RegionReplicaTestHelper
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), registry, 3);
} }
} }

View File

@ -55,7 +55,8 @@ public class TestAsyncMetaRegionLocator {
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3); RegionReplicaTestHelper
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3);
TEST_UTIL.getAdmin().balancerSwitch(false, true); TEST_UTIL.getAdmin().balancerSwitch(false, true);
LOCATOR = new AsyncMetaRegionLocator(REGISTRY); LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
} }

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ClientTests.class, LargeTests.class })
public class TestAsyncTableUseMetaReplicas {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncTableUseMetaReplicas.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("Replica");
private static byte[] FAMILY = Bytes.toBytes("Family");
private static byte[] QUALIFIER = Bytes.toBytes("Qual");
private static byte[] ROW = Bytes.toBytes("Row");
private static byte[] VALUE = Bytes.toBytes("Value");
private static volatile boolean FAIL_PRIMARY_SCAN = false;
public static final class FailPrimaryMetaScanCp implements RegionObserver, RegionCoprocessor {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
throws IOException {
RegionInfo region = c.getEnvironment().getRegionInfo();
if (FAIL_PRIMARY_SCAN && TableName.isMetaTableName(region.getTable()) &&
region.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
throw new IOException("Inject error");
}
}
}
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setInt(HConstants.META_REPLICAS_NUM, 3);
conf.setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 1000);
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
FailPrimaryMetaScanCp.class.getName());
UTIL.startMiniCluster(3);
try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf)) {
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(conf, registry, 3);
}
try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
}
UTIL.flush(TableName.META_TABLE_NAME);
// wait for the store file refresh so we can read the region location from secondary meta
// replicas
Thread.sleep(2000);
}
@AfterClass
public static void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@After
public void tearDownAfterTest() {
// make sure we do not mess up cleanup code.
FAIL_PRIMARY_SCAN = false;
}
private void testRead(boolean useMetaReplicas)
throws IOException, InterruptedException, ExecutionException {
FAIL_PRIMARY_SCAN = true;
Configuration conf = new Configuration(UTIL.getConfiguration());
conf.setBoolean(HConstants.USE_META_REPLICAS, useMetaReplicas);
conf.setLong(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, TimeUnit.SECONDS.toMicros(1));
try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) {
Result result = FutureUtils.get(conn.getTableBuilder(TABLE_NAME)
.setOperationTimeout(3, TimeUnit.SECONDS).build().get(new Get(ROW)));
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
}
}
@Test(expected = RetriesExhaustedException.class)
public void testNotUseMetaReplicas()
throws IOException, InterruptedException, ExecutionException {
testRead(false);
}
@Test
public void testUseMetaReplicas() throws IOException, InterruptedException, ExecutionException {
testRead(true);
}
}

View File

@ -40,7 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -1072,4 +1071,22 @@ public class TestConnectionImplementation {
TEST_UTIL.deleteTable(tableName); TEST_UTIL.deleteTable(tableName);
} }
} }
@Test
public void testMetaLookupThreadPoolCreated() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
if (TEST_UTIL.getAdmin().tableExists(tableName)) {
TEST_UTIL.getAdmin().disableTable(tableName);
TEST_UTIL.getAdmin().deleteTable(tableName);
}
try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) {
byte[] row = Bytes.toBytes("test");
ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection());
// check that metalookup pool would get created
c.relocateRegion(tableName, row);
ExecutorService ex = c.getCurrentMetaLookupPool();
assertNotNull(ex);
}
}
} }

View File

@ -1,4 +1,4 @@
/* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -17,22 +17,18 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.hbck.HbckTestingUtil.assertErrors; import static org.junit.Assert.assertEquals;
import static org.apache.hadoop.hbase.util.hbck.HbckTestingUtil.doFsck;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
@ -42,31 +38,23 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
import org.apache.hadoop.hbase.util.HBaseFsckRepair;
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker; import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.zookeeper.KeeperException;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -157,8 +145,8 @@ public class TestMetaWithReplicas {
@Test @Test
public void testMetaHTDReplicaCount() throws Exception { public void testMetaHTDReplicaCount() throws Exception {
assertTrue(TEST_UTIL.getAdmin().getDescriptor(TableName.META_TABLE_NAME) assertEquals(3,
.getRegionReplication() == 3); TEST_UTIL.getAdmin().getDescriptor(TableName.META_TABLE_NAME).getRegionReplication());
} }
@Test @Test
@ -216,12 +204,13 @@ public class TestMetaWithReplicas {
util.getAdmin().disableTable(TABLE); util.getAdmin().disableTable(TABLE);
util.getAdmin().deleteTable(TABLE); util.getAdmin().deleteTable(TABLE);
} }
byte[] row = Bytes.toBytes("test");
ServerName master = null; ServerName master = null;
try (Connection c = ConnectionFactory.createConnection(util.getConfiguration())) { try (Connection c = ConnectionFactory.createConnection(conf)) {
try (Table htable = util.createTable(TABLE, FAMILIES)) { try (Table htable = util.createTable(TABLE, FAMILIES)) {
util.getAdmin().flush(TableName.META_TABLE_NAME); util.getAdmin().flush(TableName.META_TABLE_NAME);
Thread.sleep(conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, Thread.sleep(
30000) * 6); conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 30000) * 6);
List<RegionInfo> regions = MetaTableAccessor.getTableRegions(c, TABLE); List<RegionInfo> regions = MetaTableAccessor.getTableRegions(c, TABLE);
HRegionLocation hrl = MetaTableAccessor.getRegionLocation(c, regions.get(0)); HRegionLocation hrl = MetaTableAccessor.getRegionLocation(c, regions.get(0));
// Ensure that the primary server for test table is not the same one as the primary // Ensure that the primary server for test table is not the same one as the primary
@ -230,7 +219,7 @@ public class TestMetaWithReplicas {
// If the servers are the same, then move the test table's region out of the server // If the servers are the same, then move the test table's region out of the server
// to another random server // to another random server
if (hrl.getServerName().equals(primary)) { if (hrl.getServerName().equals(primary)) {
util.getAdmin().move(hrl.getRegionInfo().getEncodedNameAsBytes()); util.getAdmin().move(hrl.getRegion().getEncodedNameAsBytes());
// wait for the move to complete // wait for the move to complete
do { do {
Thread.sleep(10); Thread.sleep(10);
@ -253,12 +242,9 @@ public class TestMetaWithReplicas {
util.getHBaseClusterInterface().killRegionServer(primary); util.getHBaseClusterInterface().killRegionServer(primary);
util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000); util.getHBaseClusterInterface().waitForRegionServerToStop(primary, 60000);
} }
((ClusterConnection)c).clearRegionLocationCache(); c.clearRegionLocationCache();
} }
LOG.info("Running GETs"); LOG.info("Running GETs");
Get get = null;
Result r = null;
byte[] row = Bytes.toBytes("test");
try (Table htable = c.getTable(TABLE)) { try (Table htable = c.getTable(TABLE)) {
Put put = new Put(row); Put put = new Put(row);
put.addColumn(Bytes.toBytes("foo"), row, row); put.addColumn(Bytes.toBytes("foo"), row, row);
@ -266,8 +252,7 @@ public class TestMetaWithReplicas {
m.mutate(put); m.mutate(put);
m.flush(); m.flush();
// Try to do a get of the row that was just put // Try to do a get of the row that was just put
get = new Get(row); Result r = htable.get(new Get(row));
r = htable.get(get);
assertTrue(Arrays.equals(r.getRow(), row)); assertTrue(Arrays.equals(r.getRow(), row));
// now start back the killed servers and disable use of replicas. That would mean // now start back the killed servers and disable use of replicas. That would mean
// calls go to the primary // calls go to the primary
@ -276,151 +261,19 @@ public class TestMetaWithReplicas {
util.getHBaseClusterInterface().startRegionServer(primary.getHostname(), 0); util.getHBaseClusterInterface().startRegionServer(primary.getHostname(), 0);
util.getHBaseClusterInterface().waitForActiveAndReadyMaster(); util.getHBaseClusterInterface().waitForActiveAndReadyMaster();
LOG.info("Master active!"); LOG.info("Master active!");
((ClusterConnection)c).clearRegionLocationCache(); c.clearRegionLocationCache();
}
} }
conf.setBoolean(HConstants.USE_META_REPLICAS, false); conf.setBoolean(HConstants.USE_META_REPLICAS, false);
LOG.info("Running GETs no replicas"); LOG.info("Running GETs no replicas");
try (Table htable = c.getTable(TABLE);) { try (Connection c = ConnectionFactory.createConnection(conf)) {
r = htable.get(get); try (Table htable = c.getTable(TABLE)) {
Result r = htable.get(new Get(row));
assertTrue(Arrays.equals(r.getRow(), row)); assertTrue(Arrays.equals(r.getRow(), row));
} }
} }
} }
@Test
public void testMetaLookupThreadPoolCreated() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
if (TEST_UTIL.getAdmin().tableExists(tableName)) {
TEST_UTIL.getAdmin().disableTable(tableName);
TEST_UTIL.getAdmin().deleteTable(tableName);
}
try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) {
byte[] row = Bytes.toBytes("test");
ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection());
// check that metalookup pool would get created
c.relocateRegion(tableName, row);
ExecutorService ex = c.getCurrentMetaLookupPool();
assert(ex != null);
}
}
@Ignore @Test // Uses FSCK. Needs fixing after HBASE-14614.
public void testChangingReplicaCount() throws Exception {
// tests changing the replica count across master restarts
// reduce the replica count from 3 to 2
stopMasterAndValidateReplicaCount(3, 2);
// increase the replica count from 2 to 3
stopMasterAndValidateReplicaCount(2, 3);
}
private void stopMasterAndValidateReplicaCount(final int originalReplicaCount,
final int newReplicaCount)
throws Exception {
ServerName sn = TEST_UTIL.getHBaseClusterInterface().getClusterMetrics().getMasterName();
TEST_UTIL.getHBaseClusterInterface().stopMaster(sn);
TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(sn, 60000);
List<String> metaZnodes = TEST_UTIL.getZooKeeperWatcher().getMetaReplicaNodes();
assert(metaZnodes.size() == originalReplicaCount); //we should have what was configured before
TEST_UTIL.getHBaseClusterInterface().getConf().setInt(HConstants.META_REPLICAS_NUM,
newReplicaCount);
if (TEST_UTIL.getHBaseCluster().countServedRegions() < newReplicaCount) {
TEST_UTIL.getHBaseCluster().startRegionServer();
}
TEST_UTIL.getHBaseClusterInterface().startMaster(sn.getHostname(), 0);
TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster();
TEST_UTIL.waitFor(10000, predicateMetaHasReplicas(newReplicaCount));
// also check if hbck returns without errors
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM,
newReplicaCount);
HBaseFsck hbck = HbckTestingUtil.doFsck(TEST_UTIL.getConfiguration(), false);
HbckTestingUtil.assertNoErrors(hbck);
}
private Waiter.ExplainingPredicate<Exception> predicateMetaHasReplicas(
final int newReplicaCount) {
return new Waiter.ExplainingPredicate<Exception>() {
@Override
public String explainFailure() throws Exception {
return checkMetaLocationAndExplain(newReplicaCount);
}
@Override
public boolean evaluate() throws Exception {
return checkMetaLocationAndExplain(newReplicaCount) == null;
}
};
}
@Nullable
private String checkMetaLocationAndExplain(int originalReplicaCount)
throws KeeperException, IOException {
List<String> metaZnodes = TEST_UTIL.getZooKeeperWatcher().getMetaReplicaNodes();
if (metaZnodes.size() == originalReplicaCount) {
RegionLocations rl = ((ClusterConnection) TEST_UTIL.getConnection())
.locateRegion(TableName.META_TABLE_NAME,
HConstants.EMPTY_START_ROW, false, false);
for (HRegionLocation location : rl.getRegionLocations()) {
if (location == null) {
return "Null location found in " + rl.toString();
}
if (location.getRegionInfo() == null) {
return "Null regionInfo for location " + location;
}
if (location.getHostname() == null) {
return "Null hostName for location " + location;
}
}
return null; // OK
}
return "Replica count is not as expected " + originalReplicaCount + " <> " + metaZnodes.size()
+ "(" + metaZnodes.toString() + ")";
}
@Ignore @Test
public void testHBaseFsckWithMetaReplicas() throws Exception {
HBaseFsck hbck = HbckTestingUtil.doFsck(TEST_UTIL.getConfiguration(), false);
HbckTestingUtil.assertNoErrors(hbck);
}
@Ignore @Test // Disabled. Relies on FSCK which needs work for AMv2.
public void testHBaseFsckWithFewerMetaReplicas() throws Exception {
ClusterConnection c = (ClusterConnection)ConnectionFactory.createConnection(
TEST_UTIL.getConfiguration());
RegionLocations rl = c.locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
false, false);
HBaseFsckRepair.closeRegionSilentlyAndWait(c,
rl.getRegionLocation(1).getServerName(), rl.getRegionLocation(1).getRegionInfo());
// check that problem exists
HBaseFsck hbck = doFsck(TEST_UTIL.getConfiguration(), false);
assertErrors(hbck, new ERROR_CODE[]{ERROR_CODE.UNKNOWN,ERROR_CODE.NO_META_REGION});
// fix the problem
hbck = doFsck(TEST_UTIL.getConfiguration(), true);
// run hbck again to make sure we don't see any errors
hbck = doFsck(TEST_UTIL.getConfiguration(), false);
assertErrors(hbck, new ERROR_CODE[]{});
}
@Ignore @Test // The close silently doesn't work any more since HBASE-14614. Fix.
public void testHBaseFsckWithFewerMetaReplicaZnodes() throws Exception {
ClusterConnection c = (ClusterConnection)ConnectionFactory.createConnection(
TEST_UTIL.getConfiguration());
RegionLocations rl = c.locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
false, false);
HBaseFsckRepair.closeRegionSilentlyAndWait(c,
rl.getRegionLocation(2).getServerName(), rl.getRegionLocation(2).getRegionInfo());
ZKWatcher zkw = TEST_UTIL.getZooKeeperWatcher();
ZKUtil.deleteNode(zkw, zkw.getZNodePaths().getZNodeForReplica(2));
// check that problem exists
HBaseFsck hbck = doFsck(TEST_UTIL.getConfiguration(), false);
assertErrors(hbck, new ERROR_CODE[]{ERROR_CODE.UNKNOWN,ERROR_CODE.NO_META_REGION});
// fix the problem
hbck = doFsck(TEST_UTIL.getConfiguration(), true);
// run hbck again to make sure we don't see any errors
hbck = doFsck(TEST_UTIL.getConfiguration(), false);
assertErrors(hbck, new ERROR_CODE[]{});
}
@Test @Test
public void testAccessingUnknownTables() throws Exception { public void testAccessingUnknownTables() throws Exception {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
@ -457,14 +310,14 @@ public class TestMetaWithReplicas {
moveToServer = s; moveToServer = s;
} }
} }
assert(moveToServer != null); assertNotNull(moveToServer);
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = TableName.valueOf(name.getMethodName());
TEST_UTIL.createTable(tableName, "f"); TEST_UTIL.createTable(tableName, "f");
assertTrue(TEST_UTIL.getAdmin().tableExists(tableName)); assertTrue(TEST_UTIL.getAdmin().tableExists(tableName));
TEST_UTIL.getAdmin().move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), TEST_UTIL.getAdmin().move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
moveToServer); moveToServer);
int i = 0; int i = 0;
assert !moveToServer.equals(currentServer); assertNotEquals(currentServer, moveToServer);
LOG.info("CurrentServer=" + currentServer + ", moveToServer=" + moveToServer); LOG.info("CurrentServer=" + currentServer + ", moveToServer=" + moveToServer);
final int max = 10000; final int max = 10000;
do { do {
@ -473,7 +326,7 @@ public class TestMetaWithReplicas {
currentServer = ProtobufUtil.toServerName(data); currentServer = ProtobufUtil.toServerName(data);
i++; i++;
} while (!moveToServer.equals(currentServer) && i < max); //wait for 10 seconds overall } while (!moveToServer.equals(currentServer) && i < max); //wait for 10 seconds overall
assert(i != max); assertNotEquals(max, i);
TEST_UTIL.getAdmin().disableTable(tableName); TEST_UTIL.getAdmin().disableTable(tableName);
assertTrue(TEST_UTIL.getAdmin().isTableDisabled(tableName)); assertTrue(TEST_UTIL.getAdmin().isTableDisabled(tableName));
} }
@ -482,39 +335,19 @@ public class TestMetaWithReplicas {
public void testShutdownOfReplicaHolder() throws Exception { public void testShutdownOfReplicaHolder() throws Exception {
// checks that the when the server holding meta replica is shut down, the meta replica // checks that the when the server holding meta replica is shut down, the meta replica
// can be recovered // can be recovered
try (ClusterConnection conn = (ClusterConnection) try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { RegionLocator locator = conn.getRegionLocator(TableName.META_TABLE_NAME)) {
RegionLocations rl = conn. HRegionLocation hrl = locator.getRegionLocations(HConstants.EMPTY_START_ROW, true).get(1);
locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true);
HRegionLocation hrl = rl.getRegionLocation(1);
ServerName oldServer = hrl.getServerName(); ServerName oldServer = hrl.getServerName();
TEST_UTIL.getHBaseClusterInterface().killRegionServer(oldServer); TEST_UTIL.getHBaseClusterInterface().killRegionServer(oldServer);
int i = 0; int i = 0;
do { do {
LOG.debug("Waiting for the replica " + hrl.getRegionInfo() + " to come up"); LOG.debug("Waiting for the replica " + hrl.getRegion() + " to come up");
Thread.sleep(10000); // wait for the detection/recovery Thread.sleep(10000); // wait for the detection/recovery
rl = conn.locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true); hrl = locator.getRegionLocations(HConstants.EMPTY_START_ROW, true).get(1);
hrl = rl.getRegionLocation(1);
i++; i++;
} while ((hrl == null || hrl.getServerName().equals(oldServer)) && i < 3); } while ((hrl == null || hrl.getServerName().equals(oldServer)) && i < 3);
assertTrue(i != 3); assertNotEquals(3, i);
} }
} }
@Ignore @Test // Disabled because fsck and this needs work for AMv2
public void testHBaseFsckWithExcessMetaReplicas() throws Exception {
// Create a meta replica (this will be the 4th one) and assign it
RegionInfo h = RegionReplicaUtil.getRegionInfoForReplica(
RegionInfoBuilder.FIRST_META_REGIONINFO, 3);
TEST_UTIL.assignRegion(h);
HBaseFsckRepair.waitUntilAssigned(TEST_UTIL.getAdmin(), h);
// check that problem exists
HBaseFsck hbck = doFsck(TEST_UTIL.getConfiguration(), false);
assertErrors(hbck, new ERROR_CODE[]{ERROR_CODE.UNKNOWN, ERROR_CODE.SHOULD_NOT_BE_DEPLOYED});
// fix the problem
hbck = doFsck(TEST_UTIL.getConfiguration(), true);
// run hbck again to make sure we don't see any errors
hbck = doFsck(TEST_UTIL.getConfiguration(), false);
assertErrors(hbck, new ERROR_CODE[]{});
}
} }

View File

@ -82,7 +82,8 @@ public class TestZKAsyncRegistry {
assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(), assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
REGISTRY.getMasterAddress().get()); REGISTRY.getMasterAddress().get());
assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue()); assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue());
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3); RegionReplicaTestHelper
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3);
RegionLocations locs = REGISTRY.getMetaRegionLocation().get(); RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
assertEquals(3, locs.getRegionLocations().length); assertEquals(3, locs.getRegionLocations().length);
IntStream.range(0, 3).forEach(i -> { IntStream.range(0, 3).forEach(i -> {