HBASE-22261 Make use of ClusterStatusListener for async client
This commit is contained in:
parent
65fb40d7ee
commit
393c5b9502
|
@ -17,6 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
|
||||
import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
|
||||
import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
|
||||
import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
|
||||
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
|
||||
|
@ -112,6 +116,8 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
|
||||
private final Optional<MetricsConnection> metrics;
|
||||
|
||||
private final ClusterStatusListener clusterStatusListener;
|
||||
|
||||
public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
|
||||
User user) {
|
||||
this.conf = conf;
|
||||
|
@ -140,6 +146,31 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
}
|
||||
this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
|
||||
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
|
||||
ClusterStatusListener listener = null;
|
||||
if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
|
||||
// TODO: this maybe a blocking operation, better to create it outside the constructor and pass
|
||||
// it in, just like clusterId. Not a big problem for now as the default value is false.
|
||||
Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(
|
||||
STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
|
||||
if (listenerClass == null) {
|
||||
LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
|
||||
} else {
|
||||
try {
|
||||
listener = new ClusterStatusListener(
|
||||
new ClusterStatusListener.DeadServerHandler() {
|
||||
@Override
|
||||
public void newDead(ServerName sn) {
|
||||
locator.clearCache(sn);
|
||||
rpcClient.cancelConnections(sn);
|
||||
}
|
||||
}, conf, listenerClass);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to create ClusterStatusListener, not a critical problem, ignoring...",
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.clusterStatusListener = listener;
|
||||
}
|
||||
|
||||
private void spawnRenewalChore(final UserGroupInformation user) {
|
||||
|
@ -159,6 +190,7 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
if (closed) {
|
||||
return;
|
||||
}
|
||||
IOUtils.closeQuietly(clusterStatusListener);
|
||||
IOUtils.closeQuietly(rpcClient);
|
||||
IOUtils.closeQuietly(registry);
|
||||
if (authService != null) {
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -113,4 +114,23 @@ class AsyncMetaRegionLocator {
|
|||
void clearCache() {
|
||||
metaRegionLocations.set(null);
|
||||
}
|
||||
|
||||
void clearCache(ServerName serverName) {
|
||||
for (;;) {
|
||||
RegionLocations locs = metaRegionLocations.get();
|
||||
if (locs == null) {
|
||||
return;
|
||||
}
|
||||
RegionLocations newLocs = locs.removeByServer(serverName);
|
||||
if (locs == newLocs) {
|
||||
return;
|
||||
}
|
||||
if (newLocs.isEmpty()) {
|
||||
newLocs = null;
|
||||
}
|
||||
if (metaRegionLocations.compareAndSet(locs, newLocs)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||
|
@ -613,6 +614,24 @@ class AsyncNonMetaRegionLocator {
|
|||
cache.clear();
|
||||
}
|
||||
|
||||
void clearCache(ServerName serverName) {
|
||||
for (TableCache tableCache : cache.values()) {
|
||||
for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) {
|
||||
byte[] regionName = entry.getKey();
|
||||
RegionLocations locs = entry.getValue();
|
||||
RegionLocations newLocs = locs.removeByServer(serverName);
|
||||
if (locs == newLocs) {
|
||||
continue;
|
||||
}
|
||||
if (newLocs.isEmpty()) {
|
||||
tableCache.cache.remove(regionName, locs);
|
||||
} else {
|
||||
tableCache.cache.replace(regionName, locs, newLocs);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// only used for testing whether we have cached the location for a region.
|
||||
@VisibleForTesting
|
||||
RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -33,6 +34,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
|
||||
|
||||
|
@ -46,11 +48,14 @@ class AsyncRegionLocator {
|
|||
|
||||
private final HashedWheelTimer retryTimer;
|
||||
|
||||
private final AsyncConnectionImpl conn;
|
||||
|
||||
private final AsyncMetaRegionLocator metaRegionLocator;
|
||||
|
||||
private final AsyncNonMetaRegionLocator nonMetaRegionLocator;
|
||||
|
||||
AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
|
||||
this.conn = conn;
|
||||
this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry);
|
||||
this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn);
|
||||
this.retryTimer = retryTimer;
|
||||
|
@ -150,9 +155,7 @@ class AsyncRegionLocator {
|
|||
}
|
||||
|
||||
void clearCache(TableName tableName) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Clear meta cache for " + tableName);
|
||||
}
|
||||
LOG.debug("Clear meta cache for {}", tableName);
|
||||
if (tableName.equals(META_TABLE_NAME)) {
|
||||
metaRegionLocator.clearCache();
|
||||
} else {
|
||||
|
@ -160,8 +163,20 @@ class AsyncRegionLocator {
|
|||
}
|
||||
}
|
||||
|
||||
void clearCache(ServerName serverName) {
|
||||
LOG.debug("Clear meta cache for {}", serverName);
|
||||
metaRegionLocator.clearCache(serverName);
|
||||
nonMetaRegionLocator.clearCache(serverName);
|
||||
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
|
||||
}
|
||||
|
||||
void clearCache() {
|
||||
metaRegionLocator.clearCache();
|
||||
nonMetaRegionLocator.clearCache();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
|
||||
return nonMetaRegionLocator;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestAsyncTableRSCrashPublish {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAsyncTableRSCrashPublish.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static AsyncConnectionImpl CONN;
|
||||
|
||||
private static TableName TABLE_NAME = TableName.valueOf("Publish");
|
||||
|
||||
private static byte[] FAMILY = Bytes.toBytes("family");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
|
||||
UTIL.startMiniCluster(2);
|
||||
UTIL.createTable(TABLE_NAME, FAMILY);
|
||||
UTIL.waitTableAvailable(TABLE_NAME);
|
||||
CONN =
|
||||
(AsyncConnectionImpl) ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
Closeables.close(CONN, true);
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws IOException {
|
||||
AsyncNonMetaRegionLocator locator = CONN.getLocator().getNonMetaRegionLocator();
|
||||
CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
|
||||
ServerName serverName = locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW)
|
||||
.getDefaultRegionLocation().getServerName();
|
||||
UTIL.getMiniHBaseCluster().stopRegionServer(serverName);
|
||||
UTIL.waitFor(60000,
|
||||
() -> locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW) == null);
|
||||
CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
|
||||
assertNotEquals(serverName,
|
||||
locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW)
|
||||
.getDefaultRegionLocation().getServerName());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue