HBASE-22135 AsyncAdmin will not refresh master address
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
51dda380a6
commit
1c4e705592
|
@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.ChoreService;
|
|||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
|
@ -49,14 +48,11 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
|
||||
|
||||
/**
|
||||
|
@ -200,86 +196,31 @@ class AsyncConnectionImpl implements AsyncConnection {
|
|||
() -> createAdminServerStub(serverName));
|
||||
}
|
||||
|
||||
private void makeMasterStub(CompletableFuture<MasterService.Interface> future) {
|
||||
addListener(registry.getMasterAddress(), (sn, error) -> {
|
||||
if (sn == null) {
|
||||
String msg = "ZooKeeper available but no active master location found";
|
||||
LOG.info(msg);
|
||||
this.masterStubMakeFuture.getAndSet(null)
|
||||
.completeExceptionally(new MasterNotRunningException(msg));
|
||||
return;
|
||||
}
|
||||
try {
|
||||
MasterService.Interface stub = createMasterStub(sn);
|
||||
HBaseRpcController controller = getRpcController();
|
||||
stub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(),
|
||||
new RpcCallback<IsMasterRunningResponse>() {
|
||||
@Override
|
||||
public void run(IsMasterRunningResponse resp) {
|
||||
if (controller.failed() || resp == null ||
|
||||
(resp != null && !resp.getIsMasterRunning())) {
|
||||
masterStubMakeFuture.getAndSet(null).completeExceptionally(
|
||||
new MasterNotRunningException("Master connection is not running anymore"));
|
||||
} else {
|
||||
masterStub.set(stub);
|
||||
masterStubMakeFuture.set(null);
|
||||
future.complete(stub);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
this.masterStubMakeFuture.getAndSet(null)
|
||||
.completeExceptionally(new IOException("Failed to create async master stub", e));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
CompletableFuture<MasterService.Interface> getMasterStub() {
|
||||
MasterService.Interface masterStub = this.masterStub.get();
|
||||
|
||||
if (masterStub == null) {
|
||||
for (;;) {
|
||||
if (this.masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) {
|
||||
CompletableFuture<MasterService.Interface> future = this.masterStubMakeFuture.get();
|
||||
makeMasterStub(future);
|
||||
return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
|
||||
CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
|
||||
addListener(registry.getMasterAddress(), (addr, error) -> {
|
||||
if (error != null) {
|
||||
future.completeExceptionally(error);
|
||||
} else if (addr == null) {
|
||||
future.completeExceptionally(new MasterNotRunningException(
|
||||
"ZooKeeper available but no active master location found"));
|
||||
} else {
|
||||
CompletableFuture<MasterService.Interface> future = this.masterStubMakeFuture.get();
|
||||
if (future != null) {
|
||||
return future;
|
||||
LOG.debug("The fetched master address is {}", addr);
|
||||
try {
|
||||
future.complete(createMasterStub(addr));
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
if (masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) {
|
||||
CompletableFuture<MasterService.Interface> future = masterStubMakeFuture.get();
|
||||
HBaseRpcController controller = getRpcController();
|
||||
masterStub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(),
|
||||
new RpcCallback<IsMasterRunningResponse>() {
|
||||
@Override
|
||||
public void run(IsMasterRunningResponse resp) {
|
||||
if (controller.failed() || resp == null ||
|
||||
(resp != null && !resp.getIsMasterRunning())) {
|
||||
makeMasterStub(future);
|
||||
} else {
|
||||
future.complete(masterStub);
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
CompletableFuture<MasterService.Interface> future = masterStubMakeFuture.get();
|
||||
if (future != null) {
|
||||
return future;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}, stub -> true, "master stub");
|
||||
}
|
||||
|
||||
private HBaseRpcController getRpcController() {
|
||||
HBaseRpcController controller = this.rpcControllerFactory.newController();
|
||||
controller.setCallTimeout((int) TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
|
||||
return controller;
|
||||
void clearMasterStubCache(MasterService.Interface stub) {
|
||||
masterStub.compareAndSet(stub, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.client;
|
|||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.util.Timer;
|
||||
|
@ -49,6 +51,14 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
|
|||
this.callable = callable;
|
||||
}
|
||||
|
||||
private void clearMasterStubCacheOnError(MasterService.Interface stub, Throwable error) {
|
||||
// ServerNotRunningYetException may because it is the backup master.
|
||||
if (ClientExceptionsUtil.isConnectionException(error) ||
|
||||
error instanceof ServerNotRunningYetException) {
|
||||
conn.clearMasterStubCache(stub);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doCall() {
|
||||
addListener(conn.getMasterStub(), (stub, error) -> {
|
||||
|
@ -60,8 +70,8 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
|
|||
resetCallTimeout();
|
||||
addListener(callable.call(controller, stub), (result, error2) -> {
|
||||
if (error2 != null) {
|
||||
onError(error2, () -> "Call to master failed", err -> {
|
||||
});
|
||||
onError(error2, () -> "Call to master failed",
|
||||
err -> clearMasterStubCacheOnError(stub, error2));
|
||||
return;
|
||||
}
|
||||
future.complete(result);
|
||||
|
|
|
@ -22,15 +22,12 @@ import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegi
|
|||
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
|
||||
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
|
||||
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* The asynchronous locator for meta region.
|
||||
|
@ -38,8 +35,6 @@ import org.slf4j.LoggerFactory;
|
|||
@InterfaceAudience.Private
|
||||
class AsyncMetaRegionLocator {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AsyncMetaRegionLocator.class);
|
||||
|
||||
private final AsyncRegistry registry;
|
||||
|
||||
private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();
|
||||
|
@ -61,45 +56,8 @@ class AsyncMetaRegionLocator {
|
|||
* cached region locations and cause an infinite loop.
|
||||
*/
|
||||
CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
|
||||
for (;;) {
|
||||
if (!reload) {
|
||||
RegionLocations locs = this.metaRegionLocations.get();
|
||||
if (isGood(locs, replicaId)) {
|
||||
return CompletableFuture.completedFuture(locs);
|
||||
}
|
||||
}
|
||||
LOG.trace("Meta region location cache is null, try fetching from registry.");
|
||||
if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
|
||||
LOG.debug("Start fetching meta region location from registry.");
|
||||
CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
|
||||
addListener(registry.getMetaRegionLocation(), (locs, error) -> {
|
||||
if (error != null) {
|
||||
LOG.debug("Failed to fetch meta region location from registry", error);
|
||||
metaRelocateFuture.getAndSet(null).completeExceptionally(error);
|
||||
return;
|
||||
}
|
||||
LOG.debug("The fetched meta region location is {}", locs);
|
||||
// Here we update cache before reset future, so it is possible that someone can get a
|
||||
// stale value. Consider this:
|
||||
// 1. update cache
|
||||
// 2. someone clear the cache and relocate again
|
||||
// 3. the metaRelocateFuture is not null so the old future is used.
|
||||
// 4. we clear metaRelocateFuture and complete the future in it with the value being
|
||||
// cleared in step 2.
|
||||
// But we do not think it is a big deal as it rarely happens, and even if it happens, the
|
||||
// caller will retry again later, no correctness problems.
|
||||
this.metaRegionLocations.set(locs);
|
||||
metaRelocateFuture.set(null);
|
||||
future.complete(locs);
|
||||
});
|
||||
return future;
|
||||
} else {
|
||||
CompletableFuture<RegionLocations> future = metaRelocateFuture.get();
|
||||
if (future != null) {
|
||||
return future;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload,
|
||||
registry::getMetaRegionLocation, locs -> isGood(locs, replicaId), "meta region location");
|
||||
}
|
||||
|
||||
private HRegionLocation getCacheLocation(HRegionLocation loc) {
|
||||
|
|
|
@ -32,7 +32,10 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
|
@ -617,4 +620,48 @@ public final class ConnectionUtils {
|
|||
return HConstants.NORMAL_QOS;
|
||||
}
|
||||
}
|
||||
|
||||
static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef,
|
||||
AtomicReference<CompletableFuture<T>> futureRef, boolean reload,
|
||||
Supplier<CompletableFuture<T>> fetch, Predicate<T> validator, String type) {
|
||||
for (;;) {
|
||||
if (!reload) {
|
||||
T value = cacheRef.get();
|
||||
if (value != null && validator.test(value)) {
|
||||
return CompletableFuture.completedFuture(value);
|
||||
}
|
||||
}
|
||||
LOG.trace("{} cache is null, try fetching from registry", type);
|
||||
if (futureRef.compareAndSet(null, new CompletableFuture<>())) {
|
||||
LOG.debug("Start fetching{} from registry", type);
|
||||
CompletableFuture<T> future = futureRef.get();
|
||||
addListener(fetch.get(), (value, error) -> {
|
||||
if (error != null) {
|
||||
LOG.debug("Failed to fetch {} from registry", type, error);
|
||||
futureRef.getAndSet(null).completeExceptionally(error);
|
||||
return;
|
||||
}
|
||||
LOG.debug("The fetched {} is {}", type, value);
|
||||
// Here we update cache before reset future, so it is possible that someone can get a
|
||||
// stale value. Consider this:
|
||||
// 1. update cacheRef
|
||||
// 2. someone clears the cache and relocates again
|
||||
// 3. the futureRef is not null so the old future is used.
|
||||
// 4. we clear futureRef and complete the future in it with the value being
|
||||
// cleared in step 2.
|
||||
// But we do not think it is a big deal as it rarely happens, and even if it happens, the
|
||||
// caller will retry again later, no correctness problems.
|
||||
cacheRef.set(value);
|
||||
futureRef.set(null);
|
||||
future.complete(value);
|
||||
});
|
||||
return future;
|
||||
} else {
|
||||
CompletableFuture<T> future = futureRef.get();
|
||||
if (future != null) {
|
||||
return future;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
/**
|
||||
* Testcase for HBASE-22135.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestAsyncAdminMasterSwitch extends TestAsyncAdminBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestAsyncAdminMasterSwitch.class);
|
||||
|
||||
@Test
|
||||
public void testSwitch() throws IOException, InterruptedException {
|
||||
assertEquals(TEST_UTIL.getHBaseCluster().getRegionServerThreads().size(),
|
||||
admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.SERVERS_NAME)).join()
|
||||
.getServersName().size());
|
||||
// stop the old master, and start a new one
|
||||
TEST_UTIL.getMiniHBaseCluster().startMaster();
|
||||
TEST_UTIL.getMiniHBaseCluster().stopMaster(0).join();
|
||||
assertTrue(TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(30000));
|
||||
// make sure that we could still call master
|
||||
assertEquals(TEST_UTIL.getHBaseCluster().getRegionServerThreads().size(),
|
||||
admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.SERVERS_NAME)).join()
|
||||
.getServersName().size());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue