HBASE-26214 Introduce a ConnectionRegistryEndpoint interface (#3613)
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
91db10a8bc
commit
137c7dcd3d
|
@ -18,8 +18,8 @@
|
|||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
|
@ -203,19 +203,19 @@ public class MetaRegionLocationCache extends ZKListener {
|
|||
* @return Optional list of HRegionLocations for meta replica(s), null if the cache is empty.
|
||||
*
|
||||
*/
|
||||
public Optional<List<HRegionLocation>> getMetaRegionLocations() {
|
||||
public List<HRegionLocation> getMetaRegionLocations() {
|
||||
ConcurrentNavigableMap<Integer, HRegionLocation> snapshot =
|
||||
cachedMetaLocations.tailMap(cachedMetaLocations.firstKey());
|
||||
if (snapshot.isEmpty()) {
|
||||
// This could be possible if the master has not successfully initialized yet or meta region
|
||||
// is stuck in some weird state.
|
||||
return Optional.empty();
|
||||
return Collections.emptyList();
|
||||
}
|
||||
List<HRegionLocation> result = new ArrayList<>();
|
||||
// Explicitly iterate instead of new ArrayList<>(snapshot.values()) because the underlying
|
||||
// ArrayValueCollection does not implement toArray().
|
||||
snapshot.values().forEach(location -> result.add(location));
|
||||
return Optional.of(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,11 +18,9 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
|
@ -71,15 +69,13 @@ public final class ClusterConnectionFactory {
|
|||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link AsyncClusterConnection} instance for a region server.
|
||||
* Create a new {@link AsyncClusterConnection} instance to be used at server side where we have a
|
||||
* {@link ConnectionRegistryEndpoint}.
|
||||
*/
|
||||
public static AsyncClusterConnection createAsyncClusterConnection(HRegionServer regionServer)
|
||||
public static AsyncClusterConnection createAsyncClusterConnection(
|
||||
ConnectionRegistryEndpoint endpoint, Configuration conf, SocketAddress localAddress, User user)
|
||||
throws IOException {
|
||||
RegionServerRegistry registry = new RegionServerRegistry(regionServer);
|
||||
Configuration conf = regionServer.getConfiguration();
|
||||
InetSocketAddress localAddress =
|
||||
new InetSocketAddress(regionServer.getRSRpcServices().getSocketAddress().getAddress(), 0);
|
||||
User user = regionServer.getUserProvider().getCurrent();
|
||||
ShortCircuitConnectionRegistry registry = new ShortCircuitConnectionRegistry(endpoint);
|
||||
return createAsyncClusterConnection(conf, registry, localAddress, user);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Define the necessary method for carrying {@code ClientMetaService}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface ConnectionRegistryEndpoint {
|
||||
|
||||
/**
|
||||
* Get cluster id.
|
||||
*/
|
||||
String getClusterId();
|
||||
|
||||
/**
|
||||
* Get active master address.
|
||||
*/
|
||||
Optional<ServerName> getActiveMaster();
|
||||
|
||||
/**
|
||||
* Get backup masters address.
|
||||
*/
|
||||
List<ServerName> getBackupMasters();
|
||||
|
||||
/**
|
||||
* Get all the region servers address.
|
||||
*/
|
||||
List<ServerName> getRegionServers();
|
||||
|
||||
/**
|
||||
* Get the location of meta regions.
|
||||
*/
|
||||
List<HRegionLocation> getMetaLocations();
|
||||
}
|
|
@ -24,48 +24,42 @@ import java.util.concurrent.CompletableFuture;
|
|||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Connection registry implementation for region server.
|
||||
* A {@link ConnectionRegistry} implementation used at server side, where we could use the
|
||||
* {@link ConnectionRegistryEndpoint} directly, without any rpcs.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RegionServerRegistry implements ConnectionRegistry {
|
||||
class ShortCircuitConnectionRegistry implements ConnectionRegistry {
|
||||
|
||||
private final HRegionServer regionServer;
|
||||
private final ConnectionRegistryEndpoint endpoint;
|
||||
|
||||
public RegionServerRegistry(HRegionServer regionServer) {
|
||||
this.regionServer = regionServer;
|
||||
public ShortCircuitConnectionRegistry(ConnectionRegistryEndpoint endpoint) {
|
||||
this.endpoint = endpoint;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
|
||||
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
|
||||
Optional<List<HRegionLocation>> locs =
|
||||
regionServer.getMetaRegionLocationCache().getMetaRegionLocations();
|
||||
if (locs.isPresent()) {
|
||||
List<HRegionLocation> list = locs.get();
|
||||
if (list.isEmpty()) {
|
||||
future.completeExceptionally(new IOException("no meta location available"));
|
||||
} else {
|
||||
future.complete(new RegionLocations(list));
|
||||
}
|
||||
} else {
|
||||
List<HRegionLocation> locs = endpoint.getMetaLocations();
|
||||
if (locs.isEmpty()) {
|
||||
future.completeExceptionally(new IOException("no meta location available"));
|
||||
} else {
|
||||
future.complete(new RegionLocations(locs));
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<String> getClusterId() {
|
||||
return CompletableFuture.completedFuture(regionServer.getClusterId());
|
||||
return CompletableFuture.completedFuture(endpoint.getClusterId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ServerName> getActiveMaster() {
|
||||
CompletableFuture<ServerName> future = new CompletableFuture<>();
|
||||
Optional<ServerName> activeMaster = regionServer.getActiveMaster();
|
||||
Optional<ServerName> activeMaster = endpoint.getActiveMaster();
|
||||
if (activeMaster.isPresent()) {
|
||||
future.complete(activeMaster.get());
|
||||
} else {
|
|
@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_
|
|||
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
|
||||
import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
|
||||
|
||||
import com.google.errorprone.annotations.RestrictedApi;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.lang.management.MemoryType;
|
||||
|
@ -75,6 +76,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HealthCheckChore;
|
||||
import org.apache.hadoop.hbase.MetaRegionLocationCache;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
|
@ -91,6 +93,7 @@ import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
|||
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint;
|
||||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
|
@ -247,8 +250,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
|||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
|
||||
@SuppressWarnings({ "deprecation"})
|
||||
public class HRegionServer extends Thread implements
|
||||
RegionServerServices, LastSequenceId, ConfigurationObserver {
|
||||
public class HRegionServer extends Thread implements RegionServerServices, LastSequenceId,
|
||||
ConnectionRegistryEndpoint, ConfigurationObserver {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
|
||||
|
||||
/**
|
||||
|
@ -887,7 +891,11 @@ public class HRegionServer extends Thread implements
|
|||
*/
|
||||
protected final synchronized void setupClusterConnection() throws IOException {
|
||||
if (asyncClusterConnection == null) {
|
||||
asyncClusterConnection = ClusterConnectionFactory.createAsyncClusterConnection(this);
|
||||
InetSocketAddress localAddress =
|
||||
new InetSocketAddress(rpcServices.getSocketAddress().getAddress(), 0);
|
||||
User user = userProvider.getCurrent();
|
||||
asyncClusterConnection =
|
||||
ClusterConnectionFactory.createAsyncClusterConnection(this, conf, localAddress, user);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3983,23 +3991,29 @@ public class HRegionServer extends Thread implements
|
|||
return this.retryPauseTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ServerName> getActiveMaster() {
|
||||
return Optional.ofNullable(masterAddressTracker.getMasterAddress());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> getBackupMasters() {
|
||||
return masterAddressTracker.getBackupMasters();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServerName> getRegionServers() {
|
||||
return regionServerAddressTracker.getRegionServers();
|
||||
}
|
||||
|
||||
public MetaRegionLocationCache getMetaRegionLocationCache() {
|
||||
return this.metaRegionLocationCache;
|
||||
@Override
|
||||
public List<HRegionLocation> getMetaLocations() {
|
||||
return metaRegionLocationCache.getMetaRegionLocations();
|
||||
}
|
||||
|
||||
public UserProvider getUserProvider() {
|
||||
return userProvider;
|
||||
@RestrictedApi(explanation = "Should only be called in tests", link = "",
|
||||
allowedOnPath = ".*/src/test/.*")
|
||||
public MetaRegionLocationCache getMetaRegionLocationCache() {
|
||||
return metaRegionLocationCache;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
|
@ -4119,10 +4118,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller,
|
||||
GetMetaRegionLocationsRequest request) throws ServiceException {
|
||||
GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder();
|
||||
Optional<List<HRegionLocation>> metaLocations =
|
||||
regionServer.getMetaRegionLocationCache().getMetaRegionLocations();
|
||||
metaLocations.ifPresent(hRegionLocations -> hRegionLocations
|
||||
.forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
|
||||
regionServer.getMetaLocations()
|
||||
.forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location)));
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -112,8 +112,7 @@ public class TestMasterRegistry {
|
|||
public void testRegistryRPCs() throws Exception {
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
final int size =
|
||||
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get().size();
|
||||
final int size = activeMaster.getMetaLocations().size();
|
||||
for (int numHedgedReqs = 1; numHedgedReqs <= size; numHedgedReqs++) {
|
||||
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
|
||||
try (MasterRegistry registry = new MasterRegistry(conf)) {
|
||||
|
@ -124,8 +123,7 @@ public class TestMasterRegistry {
|
|||
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
|
||||
List<HRegionLocation> metaLocations =
|
||||
Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
|
||||
List<HRegionLocation> actualMetaLocations =
|
||||
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
|
||||
List<HRegionLocation> actualMetaLocations = activeMaster.getMetaLocations();
|
||||
Collections.sort(metaLocations);
|
||||
Collections.sort(actualMetaLocations);
|
||||
assertEquals(actualMetaLocations, metaLocations);
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -88,7 +89,7 @@ public class TestMetaRegionLocationCache {
|
|||
private void verifyCachedMetaLocations(HMaster master) throws Exception {
|
||||
// Wait until initial meta locations are loaded.
|
||||
int retries = 0;
|
||||
while (!master.getMetaRegionLocationCache().getMetaRegionLocations().isPresent()) {
|
||||
while (master.getMetaRegionLocationCache().getMetaRegionLocations().isEmpty()) {
|
||||
Thread.sleep(1000);
|
||||
if (++retries == 10) {
|
||||
break;
|
||||
|
@ -98,15 +99,14 @@ public class TestMetaRegionLocationCache {
|
|||
List<String> metaZnodes = zk.getMetaReplicaNodes();
|
||||
// Wait till all replicas available.
|
||||
retries = 0;
|
||||
while (master.getMetaRegionLocationCache().getMetaRegionLocations().get().size() != metaZnodes
|
||||
while (master.getMetaRegionLocationCache().getMetaRegionLocations().size() != metaZnodes
|
||||
.size()) {
|
||||
Thread.sleep(1000);
|
||||
if (++retries == 10) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
List<HRegionLocation> metaHRLs =
|
||||
master.getMetaRegionLocationCache().getMetaRegionLocations().get();
|
||||
List<HRegionLocation> metaHRLs = master.getMetaRegionLocationCache().getMetaRegionLocations();
|
||||
assertFalse(metaHRLs.isEmpty());
|
||||
assertEquals(metaZnodes.size(), metaHRLs.size());
|
||||
List<HRegionLocation> actualHRLs = getCurrentMetaLocations(zk);
|
||||
|
@ -167,7 +167,7 @@ public class TestMetaRegionLocationCache {
|
|||
try {
|
||||
MetaRegionLocationCache metaCache = new MetaRegionLocationCache(zkWatcher);
|
||||
// meta znodes do not exist at this point, cache should be empty.
|
||||
assertFalse(metaCache.getMetaRegionLocations().isPresent());
|
||||
assertTrue(metaCache.getMetaRegionLocations().isEmpty());
|
||||
// Set the meta locations for a random meta replicas, simulating an active hmaster meta
|
||||
// assignment.
|
||||
for (int i = 0; i < 3; i++) {
|
||||
|
@ -177,13 +177,12 @@ public class TestMetaRegionLocationCache {
|
|||
// Wait until the meta cache is populated.
|
||||
int iters = 0;
|
||||
while (iters++ < 10) {
|
||||
if (metaCache.getMetaRegionLocations().isPresent()
|
||||
&& metaCache.getMetaRegionLocations().get().size() == 3) {
|
||||
if (metaCache.getMetaRegionLocations().size() == 3) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
List<HRegionLocation> metaLocations = metaCache.getMetaRegionLocations().get();
|
||||
List<HRegionLocation> metaLocations = metaCache.getMetaRegionLocations();
|
||||
assertEquals(3, metaLocations.size());
|
||||
for (HRegionLocation location : metaLocations) {
|
||||
assertEquals(sn, location.getServerName());
|
||||
|
|
|
@ -105,8 +105,7 @@ public class TestRpcConnectionRegistry {
|
|||
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
|
||||
List<HRegionLocation> metaLocations =
|
||||
Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
|
||||
List<HRegionLocation> actualMetaLocations =
|
||||
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
|
||||
List<HRegionLocation> actualMetaLocations = activeMaster.getMetaLocations();
|
||||
Collections.sort(metaLocations);
|
||||
Collections.sort(actualMetaLocations);
|
||||
assertEquals(actualMetaLocations, metaLocations);
|
||||
|
|
|
@ -144,8 +144,8 @@ public class TestClientMetaServiceRPCs {
|
|||
*/
|
||||
@Test public void TestMetaLocations() throws Exception {
|
||||
HBaseRpcController rpcController = getRpcController();
|
||||
List<HRegionLocation> metaLocations = TEST_UTIL.getMiniHBaseCluster().getMaster()
|
||||
.getMetaRegionLocationCache().getMetaRegionLocations().get();
|
||||
List<HRegionLocation> metaLocations =
|
||||
TEST_UTIL.getMiniHBaseCluster().getMaster().getMetaLocations();
|
||||
Collections.sort(metaLocations);
|
||||
int rpcCount = 0;
|
||||
for (JVMClusterUtil.MasterThread masterThread:
|
||||
|
|
Loading…
Reference in New Issue