Revert "HBASE-26306 Backport "HBASE-26220 Use P2P communicate between region servers to sync the list for bootstrap node" to branch-2 (#3727)"
This reverts commit 10584d70d2
.
See HBASE-26937 for discussion.
Signed-off-by: Andrew Purtell <apurtell@apache.org>
Conflicts:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
This commit is contained in:
parent
9019e6d202
commit
abb1bf2617
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.function.Supplier;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||||
|
@ -325,17 +324,6 @@ public interface ClusterConnection extends Connection {
|
||||||
*/
|
*/
|
||||||
boolean hasCellBlockSupport();
|
boolean hasCellBlockSupport();
|
||||||
|
|
||||||
/**
|
|
||||||
* Get live region servers from masters.
|
|
||||||
*/
|
|
||||||
List<ServerName> getLiveRegionServers(Supplier<ServerName> masterAddrTracker, int count)
|
|
||||||
throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the bootstrap node list of another region server.
|
|
||||||
*/
|
|
||||||
List<ServerName> getAllBootstrapNodes(ServerName regionServer) throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the {@link User} associated with this connection. May be {@code null}.
|
* Get the {@link User} associated with this connection. May be {@code null}.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -50,8 +50,6 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.function.Supplier;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.AuthUtil;
|
import org.apache.hadoop.hbase.AuthUtil;
|
||||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||||
|
@ -109,9 +107,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.Get
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||||
|
@ -146,9 +141,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaSta
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
|
||||||
|
@ -2216,36 +2208,4 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<ServerName> getLiveRegionServers(Supplier<ServerName> masterAddrTracker, int count)
|
|
||||||
throws IOException {
|
|
||||||
RegionServerStatusService.BlockingInterface stub = RegionServerStatusService.newBlockingStub(
|
|
||||||
rpcClient.createBlockingRpcChannel(masterAddrTracker.get(), user, rpcTimeout));
|
|
||||||
GetLiveRegionServersResponse resp;
|
|
||||||
try {
|
|
||||||
resp = stub.getLiveRegionServers(null,
|
|
||||||
GetLiveRegionServersRequest.newBuilder().setCount(count).build());
|
|
||||||
} catch (ServiceException e) {
|
|
||||||
Throwable t = ConnectionUtils.translateException(e);
|
|
||||||
Throwables.propagateIfPossible(t, IOException.class);
|
|
||||||
throw new IOException(t);
|
|
||||||
}
|
|
||||||
return resp.getServerList().stream().map(ProtobufUtil::toServerName)
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<ServerName> getAllBootstrapNodes(ServerName regionServer) throws IOException {
|
|
||||||
BootstrapNodeService.BlockingInterface stub = BootstrapNodeService
|
|
||||||
.newBlockingStub(rpcClient.createBlockingRpcChannel(regionServer, user, rpcTimeout));
|
|
||||||
GetAllBootstrapNodesResponse resp;
|
|
||||||
try {
|
|
||||||
resp = stub.getAllBootstrapNodes(null, GetAllBootstrapNodesRequest.getDefaultInstance());
|
|
||||||
} catch (ServiceException e) {
|
|
||||||
Throwable t = ConnectionUtils.translateException(e);
|
|
||||||
Throwables.propagateIfPossible(t, IOException.class);
|
|
||||||
throw new IOException(t);
|
|
||||||
}
|
|
||||||
return resp.getNodeList().stream().map(ProtobufUtil::toServerName).collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,41 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
syntax = "proto2";
|
|
||||||
|
|
||||||
// The protos for exchange bootstrap nodes between region servers.
|
|
||||||
package hbase.pb;
|
|
||||||
|
|
||||||
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
|
|
||||||
option java_outer_classname = "BootstrapNodeProtos";
|
|
||||||
option java_generic_services = true;
|
|
||||||
option java_generate_equals_and_hash = true;
|
|
||||||
option optimize_for = SPEED;
|
|
||||||
|
|
||||||
import "HBase.proto";
|
|
||||||
|
|
||||||
message GetAllBootstrapNodesRequest {
|
|
||||||
}
|
|
||||||
|
|
||||||
message GetAllBootstrapNodesResponse {
|
|
||||||
repeated ServerName node = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
service BootstrapNodeService {
|
|
||||||
rpc GetAllBootstrapNodes(GetAllBootstrapNodesRequest)
|
|
||||||
returns(GetAllBootstrapNodesResponse);
|
|
||||||
}
|
|
|
@ -175,15 +175,6 @@ message FileArchiveNotificationRequest {
|
||||||
message FileArchiveNotificationResponse {
|
message FileArchiveNotificationResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
message GetLiveRegionServersRequest {
|
|
||||||
required uint32 count = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message GetLiveRegionServersResponse {
|
|
||||||
repeated ServerName server = 1;
|
|
||||||
required uint32 total = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
service RegionServerStatusService {
|
service RegionServerStatusService {
|
||||||
/** Called when a region server first starts. */
|
/** Called when a region server first starts. */
|
||||||
rpc RegionServerStartup(RegionServerStartupRequest)
|
rpc RegionServerStartup(RegionServerStartupRequest)
|
||||||
|
@ -226,8 +217,4 @@ service RegionServerStatusService {
|
||||||
/** Reports files that were moved to the archive directory for space quotas */
|
/** Reports files that were moved to the archive directory for space quotas */
|
||||||
rpc ReportFileArchival(FileArchiveNotificationRequest)
|
rpc ReportFileArchival(FileArchiveNotificationRequest)
|
||||||
returns(FileArchiveNotificationResponse);
|
returns(FileArchiveNotificationResponse);
|
||||||
|
|
||||||
/** Get some live region servers to be used as seed for bootstrap nodes */
|
|
||||||
rpc GetLiveRegionServers(GetLiveRegionServersRequest)
|
|
||||||
returns(GetLiveRegionServersResponse);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2891,7 +2891,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<ServerName> getBootstrapNodes() {
|
public Iterator<ServerName> getRegionServers() {
|
||||||
return regionServerTracker.getRegionServers().iterator();
|
return regionServerTracker.getRegionServers().iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -123,6 +122,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -330,8 +331,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
|
||||||
|
@ -3076,15 +3075,4 @@ public class MasterRpcServices extends RSRpcServices implements
|
||||||
.addAllBalancerRejection(balancerRejections).build();
|
.addAllBalancerRejection(balancerRejections).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public GetLiveRegionServersResponse getLiveRegionServers(RpcController controller,
|
|
||||||
GetLiveRegionServersRequest request) throws ServiceException {
|
|
||||||
List<ServerName> regionServers = new ArrayList<>(master.getLiveRegionServers());
|
|
||||||
Collections.shuffle(regionServers, ThreadLocalRandom.current());
|
|
||||||
GetLiveRegionServersResponse.Builder builder =
|
|
||||||
GetLiveRegionServersResponse.newBuilder().setTotal(regionServers.size());
|
|
||||||
regionServers.stream().limit(request.getCount()).map(ProtobufUtil::toServerName)
|
|
||||||
.forEach(builder::addServer);
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,216 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|
||||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
|
||||||
import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
|
|
||||||
import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
|
|
||||||
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Manage the bootstrap node list at region server side.
|
|
||||||
* <p/>
|
|
||||||
* It will request master first to get the initial set of bootstrap nodes(a sub set of live region
|
|
||||||
* servers), and then it will exchange the bootstrap nodes with other bootstrap nodes. In most
|
|
||||||
* cases, if the cluster is stable, we do not need to request master again until we reach the
|
|
||||||
* request master interval. And if the current number of bootstrap nodes is not enough, we will
|
|
||||||
* request master soon.
|
|
||||||
* <p/>
|
|
||||||
* The algorithm is very simple, as we will always fallback to request master. THe trick here is
|
|
||||||
* that, if we can not get enough bootstrap nodes from master, then the cluster will be small, so it
|
|
||||||
* will not put too much pressure on master if we always request master. And for large clusters, we
|
|
||||||
* will soon get enough bootstrap nodes and stop requesting master.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class BootstrapNodeManager {
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(BootstrapNodeManager.class);
|
|
||||||
|
|
||||||
public static final String REQUEST_MASTER_INTERVAL_SECS =
|
|
||||||
"hbase.server.bootstrap.request_master_interval.secs";
|
|
||||||
|
|
||||||
// default request every 10 minutes
|
|
||||||
public static final long DEFAULT_REQUEST_MASTER_INTERVAL_SECS = TimeUnit.MINUTES.toSeconds(10);
|
|
||||||
|
|
||||||
public static final String REQUEST_MASTER_MIN_INTERVAL_SECS =
|
|
||||||
"hbase.server.bootstrap.request_master_min_interval.secs";
|
|
||||||
|
|
||||||
// default 30 seconds
|
|
||||||
public static final long DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS = 30;
|
|
||||||
|
|
||||||
public static final String REQUEST_REGIONSERVER_INTERVAL_SECS =
|
|
||||||
"hbase.server.bootstrap.request_regionserver_interval.secs";
|
|
||||||
|
|
||||||
// default request every 30 seconds
|
|
||||||
public static final long DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS = 30;
|
|
||||||
|
|
||||||
private static final float JITTER = 0.2f;
|
|
||||||
|
|
||||||
private volatile List<ServerName> nodes = Collections.emptyList();
|
|
||||||
|
|
||||||
private final ClusterConnection conn;
|
|
||||||
|
|
||||||
private final MasterAddressTracker masterAddrTracker;
|
|
||||||
|
|
||||||
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
|
|
||||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(getClass().getSimpleName()).build());
|
|
||||||
|
|
||||||
private final long requestMasterIntervalSecs;
|
|
||||||
|
|
||||||
private final long requestMasterMinIntervalSecs;
|
|
||||||
|
|
||||||
private final long requestRegionServerIntervalSecs;
|
|
||||||
|
|
||||||
private final int maxNodeCount;
|
|
||||||
|
|
||||||
private final RetryCounterFactory retryCounterFactory;
|
|
||||||
|
|
||||||
private RetryCounter retryCounter;
|
|
||||||
|
|
||||||
private long lastRequestMasterTime;
|
|
||||||
|
|
||||||
public BootstrapNodeManager(ClusterConnection conn, MasterAddressTracker masterAddrTracker) {
|
|
||||||
this.conn = conn;
|
|
||||||
this.masterAddrTracker = masterAddrTracker;
|
|
||||||
Configuration conf = conn.getConfiguration();
|
|
||||||
requestMasterIntervalSecs =
|
|
||||||
conf.getLong(REQUEST_MASTER_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_INTERVAL_SECS);
|
|
||||||
requestMasterMinIntervalSecs =
|
|
||||||
conf.getLong(REQUEST_MASTER_MIN_INTERVAL_SECS, DEFAULT_REQUEST_MASTER_MIN_INTERVAL_SECS);
|
|
||||||
requestRegionServerIntervalSecs =
|
|
||||||
conf.getLong(REQUEST_REGIONSERVER_INTERVAL_SECS, DEFAULT_REQUEST_REGIONSERVER_INTERVAL_SECS);
|
|
||||||
maxNodeCount = conf.getInt(RSRpcServices.CLIENT_BOOTSTRAP_NODE_LIMIT,
|
|
||||||
RSRpcServices.DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
|
|
||||||
retryCounterFactory = new RetryCounterFactory(
|
|
||||||
new RetryConfig().setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()).setJitter(JITTER)
|
|
||||||
.setSleepInterval(requestMasterMinIntervalSecs).setMaxSleepTime(requestMasterIntervalSecs)
|
|
||||||
.setTimeUnit(TimeUnit.SECONDS));
|
|
||||||
executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs),
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
private long getDelay(long delay) {
|
|
||||||
long jitterDelay = (long) (delay * ThreadLocalRandom.current().nextFloat() * JITTER);
|
|
||||||
return delay + jitterDelay;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void getFromMaster() {
|
|
||||||
List<ServerName> liveRegionServers;
|
|
||||||
try {
|
|
||||||
// get 2 times number of node
|
|
||||||
liveRegionServers =
|
|
||||||
conn.getLiveRegionServers(() -> masterAddrTracker.getMasterAddress(), maxNodeCount * 2);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("failed to get live region servers from master", e);
|
|
||||||
if (retryCounter == null) {
|
|
||||||
retryCounter = retryCounterFactory.create();
|
|
||||||
}
|
|
||||||
executor.schedule(this::getFromMaster, retryCounter.getBackoffTimeAndIncrementAttempts(),
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
retryCounter = null;
|
|
||||||
lastRequestMasterTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
this.nodes = Collections.unmodifiableList(liveRegionServers);
|
|
||||||
if (liveRegionServers.size() < maxNodeCount) {
|
|
||||||
// If the number of live region servers is small, it means the cluster is small, so requesting
|
|
||||||
// master with a higher frequency will not be a big problem, so here we will always request
|
|
||||||
// master to get the live region servers as bootstrap nodes.
|
|
||||||
executor.schedule(this::getFromMaster, getDelay(requestMasterMinIntervalSecs),
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// schedule tasks to exchange the bootstrap nodes with other region servers.
|
|
||||||
executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs),
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
// this method is also used to test whether a given region server is still alive.
|
|
||||||
private void getFromRegionServer() {
|
|
||||||
if (EnvironmentEdgeManager.currentTime() - lastRequestMasterTime >= TimeUnit.SECONDS
|
|
||||||
.toMillis(requestMasterIntervalSecs)) {
|
|
||||||
// schedule a get from master task immediately if haven't request master for more than
|
|
||||||
// requestMasterIntervalSecs
|
|
||||||
executor.execute(this::getFromMaster);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
List<ServerName> currentList = this.nodes;
|
|
||||||
ServerName peer = currentList.get(ThreadLocalRandom.current().nextInt(currentList.size()));
|
|
||||||
List<ServerName> otherList;
|
|
||||||
try {
|
|
||||||
otherList = conn.getAllBootstrapNodes(peer);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("failed to request region server {}", peer, e);
|
|
||||||
// remove this region server from the list since it can not respond successfully
|
|
||||||
List<ServerName> newList = currentList.stream().filter(sn -> sn != peer)
|
|
||||||
.collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
|
|
||||||
this.nodes = newList;
|
|
||||||
if (newList.size() < maxNodeCount) {
|
|
||||||
// schedule a get from master task immediately
|
|
||||||
executor.execute(this::getFromMaster);
|
|
||||||
} else {
|
|
||||||
executor.schedule(this::getFromRegionServer, getDelay(requestRegionServerIntervalSecs),
|
|
||||||
TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// randomly select new live region server list
|
|
||||||
Set<ServerName> newRegionServers = new HashSet<ServerName>(currentList);
|
|
||||||
newRegionServers.addAll(otherList);
|
|
||||||
List<ServerName> newList = new ArrayList<ServerName>(newRegionServers);
|
|
||||||
Collections.shuffle(newList, ThreadLocalRandom.current());
|
|
||||||
int expectedListSize = maxNodeCount * 2;
|
|
||||||
if (newList.size() <= expectedListSize) {
|
|
||||||
this.nodes = Collections.unmodifiableList(newList);
|
|
||||||
} else {
|
|
||||||
this.nodes =
|
|
||||||
Collections.unmodifiableList(new ArrayList<>(newList.subList(0, expectedListSize)));
|
|
||||||
}
|
|
||||||
// schedule a new get from region server task
|
|
||||||
executor.schedule(this::getFromRegionServer, requestRegionServerIntervalSecs, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void stop() {
|
|
||||||
executor.shutdownNow();
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<ServerName> getBootstrapNodes() {
|
|
||||||
return nodes;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -184,6 +184,7 @@ import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
|
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
|
||||||
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.RegionServerAddressTracker;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKAuthentication;
|
import org.apache.hadoop.hbase.zookeeper.ZKAuthentication;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
|
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
|
||||||
|
@ -434,6 +435,10 @@ public class HRegionServer extends Thread implements
|
||||||
* entries. Used for serving ClientMetaService.
|
* entries. Used for serving ClientMetaService.
|
||||||
*/
|
*/
|
||||||
private final MetaRegionLocationCache metaRegionLocationCache;
|
private final MetaRegionLocationCache metaRegionLocationCache;
|
||||||
|
/**
|
||||||
|
* Cache for all the region servers in the cluster. Used for serving ClientMetaService.
|
||||||
|
*/
|
||||||
|
private final RegionServerAddressTracker regionServerAddressTracker;
|
||||||
|
|
||||||
// Cluster Status Tracker
|
// Cluster Status Tracker
|
||||||
protected final ClusterStatusTracker clusterStatusTracker;
|
protected final ClusterStatusTracker clusterStatusTracker;
|
||||||
|
@ -567,8 +572,6 @@ public class HRegionServer extends Thread implements
|
||||||
*/
|
*/
|
||||||
private NamedQueueRecorder namedQueueRecorder = null;
|
private NamedQueueRecorder namedQueueRecorder = null;
|
||||||
|
|
||||||
private BootstrapNodeManager bootstrapNodeManager;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* True if this RegionServer is coming up in a cluster where there is no Master;
|
* True if this RegionServer is coming up in a cluster where there is no Master;
|
||||||
* means it needs to just come up and make do without a Master to talk to: e.g. in test or
|
* means it needs to just come up and make do without a Master to talk to: e.g. in test or
|
||||||
|
@ -690,6 +693,12 @@ public class HRegionServer extends Thread implements
|
||||||
}
|
}
|
||||||
this.rpcServices.start(zooKeeper);
|
this.rpcServices.start(zooKeeper);
|
||||||
this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);
|
this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);
|
||||||
|
if (!(this instanceof HMaster)) {
|
||||||
|
// do not create this field for HMaster, we have another region server tracker for HMaster.
|
||||||
|
this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this);
|
||||||
|
} else {
|
||||||
|
this.regionServerAddressTracker = null;
|
||||||
|
}
|
||||||
// This violates 'no starting stuff in Constructor' but Master depends on the below chore
|
// This violates 'no starting stuff in Constructor' but Master depends on the below chore
|
||||||
// and executor being created and takes a different startup route. Lots of overlap between HRS
|
// and executor being created and takes a different startup route. Lots of overlap between HRS
|
||||||
// and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
|
// and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
|
||||||
|
@ -924,9 +933,6 @@ public class HRegionServer extends Thread implements
|
||||||
try {
|
try {
|
||||||
initializeZooKeeper();
|
initializeZooKeeper();
|
||||||
setupClusterConnection();
|
setupClusterConnection();
|
||||||
if (!(this instanceof HMaster)) {
|
|
||||||
bootstrapNodeManager = new BootstrapNodeManager(clusterConnection, masterAddressTracker);
|
|
||||||
}
|
|
||||||
// Setup RPC client for master communication
|
// Setup RPC client for master communication
|
||||||
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
|
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
|
||||||
this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
|
this.rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
|
||||||
|
@ -2762,9 +2768,7 @@ public class HRegionServer extends Thread implements
|
||||||
// TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
|
// TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
|
||||||
choreService.shutdown();
|
choreService.shutdown();
|
||||||
}
|
}
|
||||||
if (bootstrapNodeManager != null) {
|
|
||||||
bootstrapNodeManager.stop();
|
|
||||||
}
|
|
||||||
if (this.cacheFlusher != null) {
|
if (this.cacheFlusher != null) {
|
||||||
this.cacheFlusher.join();
|
this.cacheFlusher.join();
|
||||||
}
|
}
|
||||||
|
@ -4063,8 +4067,8 @@ public class HRegionServer extends Thread implements
|
||||||
return masterAddressTracker.getBackupMasters();
|
return masterAddressTracker.getBackupMasters();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Iterator<ServerName> getBootstrapNodes() {
|
public Iterator<ServerName> getRegionServers() {
|
||||||
return bootstrapNodeManager.getBootstrapNodes().iterator();
|
return regionServerAddressTracker.getRegionServers().iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
public MetaRegionLocationCache getMetaRegionLocationCache() {
|
public MetaRegionLocationCache getMetaRegionLocationCache() {
|
||||||
|
|
|
@ -219,9 +219,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
|
||||||
|
@ -284,9 +281,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.BlockingInterface,
|
public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
ClientService.BlockingInterface, ClientMetaService.BlockingInterface,
|
AdminService.BlockingInterface, ClientService.BlockingInterface,
|
||||||
BootstrapNodeService.BlockingInterface, PriorityFunction, ConfigurationObserver {
|
ClientMetaService.BlockingInterface, PriorityFunction, ConfigurationObserver {
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
|
protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
|
||||||
|
|
||||||
/** RPC scheduler to use for the region server. */
|
/** RPC scheduler to use for the region server. */
|
||||||
|
@ -4112,20 +4109,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
||||||
int maxNodeCount = regionServer.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT,
|
int maxNodeCount = regionServer.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT,
|
||||||
DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
|
DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT);
|
||||||
ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount);
|
ReservoirSample<ServerName> sample = new ReservoirSample<>(maxNodeCount);
|
||||||
sample.add(regionServer.getBootstrapNodes());
|
sample.add(regionServer.getRegionServers());
|
||||||
|
|
||||||
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
|
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
|
||||||
sample.getSamplingResult().stream().map(ProtobufUtil::toServerName)
|
sample.getSamplingResult().stream().map(ProtobufUtil::toServerName)
|
||||||
.forEach(builder::addServerName);
|
.forEach(builder::addServerName);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public GetAllBootstrapNodesResponse getAllBootstrapNodes(RpcController controller,
|
|
||||||
GetAllBootstrapNodesRequest request) throws ServiceException {
|
|
||||||
GetAllBootstrapNodesResponse.Builder builder = GetAllBootstrapNodesResponse.newBuilder();
|
|
||||||
regionServer.getBootstrapNodes()
|
|
||||||
.forEachRemaining(server -> builder.addNode(ProtobufUtil.toServerName(server)));
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.regionserver.BootstrapNodeManager;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
@ -62,7 +61,6 @@ public class TestRpcConnectionRegistry {
|
||||||
UTIL.getConfiguration().setLong(RpcConnectionRegistry.INITIAL_REFRESH_DELAY_SECS, 1);
|
UTIL.getConfiguration().setLong(RpcConnectionRegistry.INITIAL_REFRESH_DELAY_SECS, 1);
|
||||||
UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
|
UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
|
||||||
UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
|
UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
|
||||||
UTIL.getConfiguration().setLong(BootstrapNodeManager.REQUEST_MASTER_MIN_INTERVAL_SECS, 1);
|
|
||||||
UTIL.startMiniCluster(3);
|
UTIL.startMiniCluster(3);
|
||||||
HBaseTestingUtility.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
|
HBaseTestingUtility.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,168 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.hasItems;
|
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
|
||||||
import static org.mockito.ArgumentMatchers.anyInt;
|
|
||||||
import static org.mockito.Mockito.atLeast;
|
|
||||||
import static org.mockito.Mockito.atLeastOnce;
|
|
||||||
import static org.mockito.Mockito.doAnswer;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.never;
|
|
||||||
import static org.mockito.Mockito.times;
|
|
||||||
import static org.mockito.Mockito.verify;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.Waiter;
|
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
@Category({ RegionServerTests.class, SmallTests.class })
|
|
||||||
public class TestBootstrapNodeManager {
|
|
||||||
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestBootstrapNodeManager.class);
|
|
||||||
|
|
||||||
private Configuration conf;
|
|
||||||
|
|
||||||
private ClusterConnection conn;
|
|
||||||
|
|
||||||
private MasterAddressTracker tracker;
|
|
||||||
|
|
||||||
private BootstrapNodeManager manager;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() {
|
|
||||||
conf = HBaseConfiguration.create();
|
|
||||||
conf.setLong(BootstrapNodeManager.REQUEST_MASTER_INTERVAL_SECS, 5);
|
|
||||||
conf.setLong(BootstrapNodeManager.REQUEST_MASTER_MIN_INTERVAL_SECS, 1);
|
|
||||||
conf.setLong(BootstrapNodeManager.REQUEST_REGIONSERVER_INTERVAL_SECS, 1);
|
|
||||||
conf.setInt(RSRpcServices.CLIENT_BOOTSTRAP_NODE_LIMIT, 2);
|
|
||||||
conn = mock(ClusterConnection.class);
|
|
||||||
when(conn.getConfiguration()).thenReturn(conf);
|
|
||||||
tracker = mock(MasterAddressTracker.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() {
|
|
||||||
if (manager != null) {
|
|
||||||
manager.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertListEquals(List<ServerName> expected, List<ServerName> actual) {
|
|
||||||
assertEquals(expected.size(), expected.size());
|
|
||||||
assertThat(actual, hasItems(expected.toArray(new ServerName[0])));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testNormal() throws Exception {
|
|
||||||
List<ServerName> regionServers =
|
|
||||||
Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime()),
|
|
||||||
ServerName.valueOf("server2", 12345, EnvironmentEdgeManager.currentTime()),
|
|
||||||
ServerName.valueOf("server3", 12345, EnvironmentEdgeManager.currentTime()),
|
|
||||||
ServerName.valueOf("server4", 12345, EnvironmentEdgeManager.currentTime()));
|
|
||||||
when(conn.getLiveRegionServers(any(), anyInt())).thenReturn(regionServers);
|
|
||||||
when(conn.getAllBootstrapNodes(any())).thenReturn(regionServers);
|
|
||||||
manager = new BootstrapNodeManager(conn, tracker);
|
|
||||||
Thread.sleep(3000);
|
|
||||||
verify(conn, times(1)).getLiveRegionServers(any(), anyInt());
|
|
||||||
verify(conn, atLeastOnce()).getAllBootstrapNodes(any());
|
|
||||||
assertListEquals(regionServers, manager.getBootstrapNodes());
|
|
||||||
}
|
|
||||||
|
|
||||||
// if we do not return enough region servers, we will always get from master
|
|
||||||
@Test
|
|
||||||
public void testOnlyMaster() throws Exception {
|
|
||||||
List<ServerName> regionServers =
|
|
||||||
Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime()));
|
|
||||||
when(conn.getLiveRegionServers(any(), anyInt())).thenReturn(regionServers);
|
|
||||||
when(conn.getAllBootstrapNodes(any())).thenReturn(regionServers);
|
|
||||||
manager = new BootstrapNodeManager(conn, tracker);
|
|
||||||
Thread.sleep(3000);
|
|
||||||
verify(conn, atLeast(2)).getLiveRegionServers(any(), anyInt());
|
|
||||||
verify(conn, never()).getAllBootstrapNodes(any());
|
|
||||||
assertListEquals(regionServers, manager.getBootstrapNodes());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRegionServerError() throws Exception {
|
|
||||||
List<ServerName> regionServers =
|
|
||||||
Arrays.asList(ServerName.valueOf("server1", 12345, EnvironmentEdgeManager.currentTime()),
|
|
||||||
ServerName.valueOf("server2", 12345, EnvironmentEdgeManager.currentTime()),
|
|
||||||
ServerName.valueOf("server3", 12345, EnvironmentEdgeManager.currentTime()),
|
|
||||||
ServerName.valueOf("server4", 12345, EnvironmentEdgeManager.currentTime()));
|
|
||||||
List<ServerName> newRegionServers =
|
|
||||||
Arrays.asList(ServerName.valueOf("server5", 12345, EnvironmentEdgeManager.currentTime()),
|
|
||||||
ServerName.valueOf("server6", 12345, EnvironmentEdgeManager.currentTime()));
|
|
||||||
when(conn.getLiveRegionServers(any(), anyInt())).thenReturn(regionServers);
|
|
||||||
when(conn.getAllBootstrapNodes(any())).thenAnswer(invocation -> {
|
|
||||||
if (invocation.getArgument(0, ServerName.class).getHostname().equals("server4")) {
|
|
||||||
throw new IOException("Inject error");
|
|
||||||
} else {
|
|
||||||
return regionServers.subList(0, 3);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
manager = new BootstrapNodeManager(conn, tracker);
|
|
||||||
// we should remove server4 from the list
|
|
||||||
Waiter.waitFor(conf, 30000, () -> manager.getBootstrapNodes().size() == 3);
|
|
||||||
assertListEquals(regionServers.subList(0, 3), manager.getBootstrapNodes());
|
|
||||||
when(conn.getLiveRegionServers(any(), anyInt())).thenReturn(newRegionServers);
|
|
||||||
doAnswer(invocation -> {
|
|
||||||
String hostname = invocation.getArgument(0, ServerName.class).getHostname();
|
|
||||||
switch (hostname) {
|
|
||||||
case "server1":
|
|
||||||
return regionServers.subList(0, 1);
|
|
||||||
case "server2":
|
|
||||||
case "server3":
|
|
||||||
throw new IOException("Inject error");
|
|
||||||
default:
|
|
||||||
return newRegionServers;
|
|
||||||
}
|
|
||||||
}).when(conn).getAllBootstrapNodes(any());
|
|
||||||
// we should remove server2, server3 from the list and then get the new list from master again
|
|
||||||
Waiter.waitFor(conf, 30000, () -> {
|
|
||||||
List<ServerName> bootstrapNodes = manager.getBootstrapNodes();
|
|
||||||
if (bootstrapNodes.size() != 2) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
String hostname = bootstrapNodes.get(0).getHostname();
|
|
||||||
return hostname.equals("server5") || hostname.equals("server6");
|
|
||||||
});
|
|
||||||
assertListEquals(newRegionServers, manager.getBootstrapNodes());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.zookeeper;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class for tracking the region servers for a cluster.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class RegionServerAddressTracker extends ZKListener {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(RegionServerAddressTracker.class);
|
||||||
|
|
||||||
|
private volatile List<ServerName> regionServers = Collections.emptyList();
|
||||||
|
|
||||||
|
private final Abortable abortable;
|
||||||
|
|
||||||
|
public RegionServerAddressTracker(ZKWatcher watcher, Abortable abortable) {
|
||||||
|
super(watcher);
|
||||||
|
this.abortable = abortable;
|
||||||
|
watcher.registerListener(this);
|
||||||
|
loadRegionServerList();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadRegionServerList() {
|
||||||
|
List<String> names;
|
||||||
|
try {
|
||||||
|
names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
LOG.error("failed to list region servers", e);
|
||||||
|
abortable.abort("failed to list region servers", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (CollectionUtils.isEmpty(names)) {
|
||||||
|
regionServers = Collections.emptyList();
|
||||||
|
} else {
|
||||||
|
regionServers = names.stream().map(ServerName::parseServerName)
|
||||||
|
.collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void nodeChildrenChanged(String path) {
|
||||||
|
if (path.equals(watcher.getZNodePaths().rsZNode)) {
|
||||||
|
loadRegionServerList();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<ServerName> getRegionServers() {
|
||||||
|
return regionServers;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,121 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.zookeeper;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.hasItems;
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ZKTests;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
|
||||||
|
@Category({ ZKTests.class, MediumTests.class })
|
||||||
|
public class TestRegionServerAddressTracker {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestRegionServerAddressTracker.class);
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerAddressTracker.class);
|
||||||
|
|
||||||
|
private static final HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility();
|
||||||
|
|
||||||
|
private ZKWatcher zk;
|
||||||
|
|
||||||
|
private RegionServerAddressTracker tracker;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TEST_UTIL.startMiniZKCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniZKCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws ZooKeeperConnectionException, IOException, KeeperException {
|
||||||
|
TEST_UTIL.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + name.getMethodName());
|
||||||
|
zk = new ZKWatcher(TEST_UTIL.getConfiguration(), name.getMethodName(), null);
|
||||||
|
ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
|
||||||
|
tracker = new RegionServerAddressTracker(zk, new WarnOnlyAbortable());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
Closeables.close(zk, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws KeeperException {
|
||||||
|
ServerName rs1 = ServerName.valueOf("127.0.0.1", 16000, EnvironmentEdgeManager.currentTime());
|
||||||
|
ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString()));
|
||||||
|
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1);
|
||||||
|
assertEquals(rs1, tracker.getRegionServers().get(0));
|
||||||
|
|
||||||
|
ServerName rs2 = ServerName.valueOf("127.0.0.2", 16000, EnvironmentEdgeManager.currentTime());
|
||||||
|
ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString()));
|
||||||
|
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 2);
|
||||||
|
assertThat(tracker.getRegionServers(), hasItems(rs1, rs2));
|
||||||
|
|
||||||
|
ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString()));
|
||||||
|
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1);
|
||||||
|
assertEquals(rs2, tracker.getRegionServers().get(0));
|
||||||
|
|
||||||
|
ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString()));
|
||||||
|
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class WarnOnlyAbortable implements Abortable {
|
||||||
|
@Override
|
||||||
|
public void abort(String why, Throwable e) {
|
||||||
|
LOG.warn("RegionServerAddressTracker received abort, ignoring. Reason: {}", why, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAborted() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue