HBASE-23230 Enforce member visibility in HRegionServer (#775)

* Clean up a bunch of private variable leakage into other
   classes. Reduces visibility as much as possible, providing getters
   where access remains necessary or making use of getters that
   already exist. There remains an insidious relationship between
   `HRegionServer` and `RSRpcServices`.
 * Rename `fs` to `dataFs`, `rootDir` as `dataRootDir` so as to
   distinguish from the new `walFs`, `walRootDir` (and make it easier
   to spot bugs).
 * Cleanup or delete a bunch of lack-luster javadoc comments.
 * Delete a handful of methods that are unused according to static
   analysis.
 * Reduces the warning count as reported by IntelliJ from 100 to 7.

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Nick Dimiduk 2019-11-07 15:27:02 -08:00 committed by GitHub
parent f58bd4a7ac
commit 12c19a6e51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 446 additions and 558 deletions

View File

@ -133,7 +133,7 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
<section>
<h2>Server Metrics</h2>
<& ServerMetricsTmpl; mWrap = regionServer.getRegionServerMetrics().getRegionServerWrapper();
<& ServerMetricsTmpl; mWrap = regionServer.getMetrics().getRegionServerWrapper();
mServerWrap = regionServer.getRpcServer().getMetrics().getHBaseServerWrapper();
bbAllocator = regionServer.getRpcServer().getByteBuffAllocator(); &>
</section>

View File

@ -89,8 +89,7 @@ public interface Server extends Abortable, Stoppable {
/**
* @return Return the FileSystem object used (can return null!).
*/
// TODO: On Master, return Master's. On RegionServer, return RegionServers. The FileSystems
// may differ. TODO.
// TODO: Distinguish between "dataFs" and "walFs".
default FileSystem getFileSystem() {
// This default is pretty dodgy!
Configuration c = getConfiguration();

View File

@ -522,8 +522,8 @@ public class HMaster extends HRegionServer implements MasterServices {
this.rsFatals = new MemoryBoundedLogMessageBuffer(
conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
LOG.info("hbase.rootdir=" + getRootDir() +
", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}", getDataRootDir(),
this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
// Disable usage of meta replicas in the master
this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
@ -3474,7 +3474,7 @@ public class HMaster extends HRegionServer implements MasterServices {
*/
public void requestMobCompaction(TableName tableName,
List<ColumnFamilyDescriptor> columns, boolean allFiles) throws IOException {
mobCompactThread.requestMobCompaction(conf, fs, tableName, columns, allFiles);
mobCompactThread.requestMobCompaction(conf, getFileSystem(), tableName, columns, allFiles);
}
/**

View File

@ -394,9 +394,10 @@ public class MasterRpcServices extends RSRpcServices
}
@Override
protected RpcServerInterface createRpcServer(Server server, Configuration conf,
RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name)
throws IOException {
protected RpcServerInterface createRpcServer(final Server server,
final RpcSchedulerFactory rpcSchedulerFactory, final InetSocketAddress bindAddress,
final String name) throws IOException {
final Configuration conf = regionServer.getConfiguration();
// RpcServer at HM by default enable ByteBufferPool iff HM having user table region in it
boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY,
LoadBalancer.isMasterCanHostUserRegions(conf));

View File

@ -53,29 +53,25 @@ import org.slf4j.LoggerFactory;
* sleep time which is invariant.
*/
@InterfaceAudience.Private
public class Leases extends HasThread {
private static final Logger LOG = LoggerFactory.getLogger(Leases.class.getName());
public static final int MIN_WAIT_TIME = 100;
private final Map<String, Lease> leases = new ConcurrentHashMap<>();
public class LeaseManager extends HasThread {
private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class.getName());
private static final int MIN_WAIT_TIME = 100;
protected final int leaseCheckFrequency;
protected volatile boolean stopRequested = false;
private final Map<String, Lease> leases = new ConcurrentHashMap<>();
private final int leaseCheckFrequency;
private volatile boolean stopRequested = false;
/**
* Creates a lease monitor
* Creates a lease manager.
*
* @param leaseCheckFrequency - how often the lease should be checked
* (milliseconds)
* @param leaseCheckFrequency - how often the lease should be checked (milliseconds)
*/
public Leases(final int leaseCheckFrequency) {
super("RegionServerLeases"); // thread name
public LeaseManager(final int leaseCheckFrequency) {
super("RegionServer.LeaseManager"); // thread name
this.leaseCheckFrequency = leaseCheckFrequency;
setDaemon(true);
}
/**
* @see Thread#run()
*/
@Override
public void run() {
long toWait = leaseCheckFrequency;
@ -93,9 +89,7 @@ public class Leases extends HasThread {
toWait = Math.max(MIN_WAIT_TIME, toWait);
Thread.sleep(toWait);
} catch (InterruptedException e) {
continue;
} catch (ConcurrentModificationException e) {
} catch (InterruptedException | ConcurrentModificationException e) {
continue;
} catch (Throwable e) {
LOG.error(HBaseMarkers.FATAL, "Unexpected exception killed leases thread", e);
@ -156,7 +150,6 @@ public class Leases extends HasThread {
* @param leaseTimeoutPeriod length of the lease in milliseconds
* @param listener listener that will process lease expirations
* @return The lease created.
* @throws LeaseStillHeldException
*/
public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
throws LeaseStillHeldException {
@ -167,8 +160,6 @@ public class Leases extends HasThread {
/**
* Inserts lease. Resets expiration before insertion.
* @param lease
* @throws LeaseStillHeldException
*/
public void addLease(final Lease lease) throws LeaseStillHeldException {
if (this.stopRequested) {
@ -184,8 +175,7 @@ public class Leases extends HasThread {
/**
* Renew a lease
*
* @param leaseName name of lease
* @throws LeaseException
* @param leaseName name of the lease
*/
public void renewLease(final String leaseName) throws LeaseException {
if (this.stopRequested) {
@ -202,20 +192,17 @@ public class Leases extends HasThread {
/**
* Client explicitly cancels a lease.
*
* @param leaseName name of lease
* @throws org.apache.hadoop.hbase.regionserver.LeaseException
*/
public void cancelLease(final String leaseName) throws LeaseException {
removeLease(leaseName);
}
/**
* Remove named lease.
* Lease is removed from the map of leases.
* Lease can be reinserted using {@link #addLease(Lease)}
* Remove named lease. Lease is removed from the map of leases.
*
* @param leaseName name of lease
* @throws org.apache.hadoop.hbase.regionserver.LeaseException
* @return Removed lease
*/
Lease removeLease(final String leaseName) throws LeaseException {
@ -234,9 +221,6 @@ public class Leases extends HasThread {
public static class LeaseStillHeldException extends IOException {
private final String leaseName;
/**
* @param name
*/
public LeaseStillHeldException(final String name) {
this.leaseName = name;
}

View File

@ -711,7 +711,7 @@ class MemStoreFlusher implements FlushRequester {
try {
flushType = isAboveHighWaterMark();
while (flushType != FlushType.NORMAL && !server.isStopped()) {
server.cacheFlusher.setFlushType(flushType);
server.getMemStoreFlusher().setFlushType(flushType);
if (!blocked) {
startTime = EnvironmentEdgeManager.currentTime();
if (!server.getRegionServerAccounting().isOffheap()) {
@ -769,7 +769,7 @@ class MemStoreFlusher implements FlushRequester {
} else {
flushType = isAboveLowWaterMark();
if (flushType != FlushType.NORMAL) {
server.cacheFlusher.setFlushType(flushType);
server.getMemStoreFlusher().setFlushType(flushType);
wakeupFlushThread();
}
}

View File

@ -130,7 +130,7 @@ class MetricsRegionServerWrapperImpl
initBlockCache();
initMobFileCache();
this.period = regionServer.conf.getLong(HConstants.REGIONSERVER_METRICS_PERIOD,
this.period = regionServer.getConfiguration().getLong(HConstants.REGIONSERVER_METRICS_PERIOD,
HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD);
this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
@ -266,10 +266,10 @@ class MetricsRegionServerWrapperImpl
@Override
public int getFlushQueueSize() {
//If there is no flusher there should be no queue.
if (this.regionServer.cacheFlusher == null) {
if (this.regionServer.getMemStoreFlusher() == null) {
return 0;
}
return this.regionServer.cacheFlusher.getFlushQueueSize();
return this.regionServer.getMemStoreFlusher().getFlushQueueSize();
}
@Override
@ -550,10 +550,10 @@ class MetricsRegionServerWrapperImpl
@Override
public long getUpdatesBlockedTime() {
if (this.regionServer.cacheFlusher == null) {
if (this.regionServer.getMemStoreFlusher() == null) {
return 0;
}
return this.regionServer.cacheFlusher.getUpdatesBlockedMsHighWater().sum();
return this.regionServer.getMemStoreFlusher().getUpdatesBlockedMsHighWater().sum();
}
@Override
@ -821,8 +821,8 @@ class MetricsRegionServerWrapperImpl
lastRan = currentTime;
WALProvider provider = regionServer.walFactory.getWALProvider();
WALProvider metaProvider = regionServer.walFactory.getMetaWALProvider();
final WALProvider provider = regionServer.getWalFactory().getWALProvider();
final WALProvider metaProvider = regionServer.getWalFactory().getMetaWALProvider();
numWALFiles = (provider == null ? 0 : provider.getNumLogFiles()) +
(metaProvider == null ? 0 : metaProvider.getNumLogFiles());
walFileSize = (provider == null ? 0 : provider.getLogFileSize()) +

View File

@ -48,7 +48,7 @@ public class MetricsTableWrapperAggregateImpl implements MetricsTableWrapperAggr
public MetricsTableWrapperAggregateImpl(final HRegionServer regionServer) {
this.regionServer = regionServer;
this.period = regionServer.conf.getLong(HConstants.REGIONSERVER_METRICS_PERIOD,
this.period = regionServer.getConfiguration().getLong(HConstants.REGIONSERVER_METRICS_PERIOD,
HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD) + 1000;
this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
this.runnable = new TableMetricsWrapperRunnable();

View File

@ -134,11 +134,10 @@ public class RSDumpServlet extends StateDumpServlet {
out.println(hrs.compactSplitThread.dumpQueue());
}
if (hrs.cacheFlusher != null) {
if (hrs.getMemStoreFlusher() != null) {
// 2. Print out flush Queue
out.println("\nFlush Queue summary: "
+ hrs.cacheFlusher.toString());
out.println(hrs.cacheFlusher.dumpQueue());
out.println("\nFlush Queue summary: " + hrs.getMemStoreFlusher().toString());
out.println(hrs.getMemStoreFlusher().dumpQueue());
}
}

View File

@ -117,8 +117,8 @@ import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.Leases.Lease;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.LeaseManager.Lease;
import org.apache.hadoop.hbase.regionserver.LeaseManager.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
@ -399,7 +399,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// We're done. On way out re-add the above removed lease. The lease was temp removed for this
// Rpc call and we are at end of the call now. Time to add it back.
if (scanners.containsKey(scannerName)) {
if (lease != null) regionServer.leases.addLease(lease);
if (lease != null) {
regionServer.getLeaseManager().addLease(lease);
}
}
}
}
@ -615,7 +617,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
int countOfCompleteMutation = 0;
try {
if (!region.getRegionInfo().isMetaRegion()) {
regionServer.cacheFlusher.reclaimMemStoreMemory();
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
}
RowMutations rm = null;
int i = 0;
@ -706,8 +708,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
r = region.getCoprocessorHost().postAppend(append, r);
}
}
if (regionServer.metricsRegionServer != null) {
regionServer.metricsRegionServer.updateAppend(
if (regionServer.getMetrics() != null) {
regionServer.getMetrics().updateAppend(
region.getTableDescriptor().getTableName(),
EnvironmentEdgeManager.currentTime() - before);
}
@ -716,11 +718,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
/**
* Execute an increment mutation.
*
* @param region
* @param mutation
* @return the Result
* @throws IOException
*/
private Result increment(final HRegion region, final OperationQuota quota,
final MutationProto mutation, final CellScanner cells, long nonceGroup,
@ -758,8 +755,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
r = region.getCoprocessorHost().postIncrement(increment, r);
}
}
if (regionServer.metricsRegionServer != null) {
regionServer.metricsRegionServer.updateIncrement(
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null) {
metricsRegionServer.updateIncrement(
region.getTableDescriptor().getTableName(),
EnvironmentEdgeManager.currentTime() - before);
}
@ -851,8 +849,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
r = region.get(get);
}
} finally {
if (regionServer.metricsRegionServer != null) {
regionServer.metricsRegionServer.updateGet(
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null) {
metricsRegionServer.updateGet(
region.getTableDescriptor().getTableName(),
EnvironmentEdgeManager.currentTime() - before);
}
@ -1031,7 +1030,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
if (!region.getRegionInfo().isMetaRegion()) {
regionServer.cacheFlusher.reclaimMemStoreMemory();
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
}
// HBASE-17924
@ -1089,15 +1088,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private void updateMutationMetrics(HRegion region, long starttime, boolean batchContainsPuts,
boolean batchContainsDelete) {
if (regionServer.metricsRegionServer != null) {
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTime();
if (batchContainsPuts) {
regionServer.metricsRegionServer
.updatePutBatch(region.getTableDescriptor().getTableName(), after - starttime);
metricsRegionServer.updatePutBatch(
region.getTableDescriptor().getTableName(), after - starttime);
}
if (batchContainsDelete) {
regionServer.metricsRegionServer
.updateDeleteBatch(region.getTableDescriptor().getTableName(), after - starttime);
metricsRegionServer.updateDeleteBatch(
region.getTableDescriptor().getTableName(), after - starttime);
}
}
}
@ -1161,7 +1161,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
requestCount.increment();
if (!region.getRegionInfo().isMetaRegion()) {
regionServer.cacheFlusher.reclaimMemStoreMemory();
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
}
return region.batchReplay(mutations.toArray(
new MutationReplay[mutations.size()]), replaySeqId);
@ -1202,16 +1202,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private final LogDelegate ld;
public RSRpcServices(HRegionServer rs) throws IOException {
public RSRpcServices(final HRegionServer rs) throws IOException {
this(rs, DEFAULT_LOG_DELEGATE);
}
// Directly invoked only for testing
RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException {
RSRpcServices(final HRegionServer rs, final LogDelegate ld) throws IOException {
final Configuration conf = rs.getConfiguration();
this.ld = ld;
regionServer = rs;
rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
RpcSchedulerFactory rpcSchedulerFactory;
rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
final RpcSchedulerFactory rpcSchedulerFactory;
try {
rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
.getDeclaredConstructor().newInstance();
@ -1220,22 +1222,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
throw new IllegalArgumentException(e);
}
// Server to handle client requests.
InetSocketAddress initialIsa;
InetSocketAddress bindAddress;
final InetSocketAddress initialIsa;
final InetSocketAddress bindAddress;
if(this instanceof MasterRpcServices) {
String hostname = getHostname(rs.conf, true);
int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
String hostname = getHostname(conf, true);
int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
// Creation of a HSA will force a resolve.
initialIsa = new InetSocketAddress(hostname, port);
bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
bindAddress = new InetSocketAddress(conf.get("hbase.master.ipc.address", hostname), port);
} else {
String hostname = getHostname(rs.conf, false);
int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
String hostname = getHostname(conf, false);
int port = conf.getInt(HConstants.REGIONSERVER_PORT,
HConstants.DEFAULT_REGIONSERVER_PORT);
// Creation of a HSA will force a resolve.
initialIsa = new InetSocketAddress(hostname, port);
bindAddress = new InetSocketAddress(
rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
bindAddress =
new InetSocketAddress(conf.get("hbase.regionserver.ipc.address", hostname), port);
}
if (initialIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
@ -1243,26 +1245,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
priority = createPriority();
// Using Address means we don't get the IP too. Shorten it more even to just the host name
// w/o the domain.
String name = rs.getProcessName() + "/" +
final String name = rs.getProcessName() + "/" +
Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain();
// Set how many times to retry talking to another server over Connection.
ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
rpcServer = createRpcServer(rs, rs.conf, rpcSchedulerFactory, bindAddress, name);
ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
rpcServer.setRsRpcServices(this);
scannerLeaseTimeoutPeriod = rs.conf.getInt(
scannerLeaseTimeoutPeriod = conf.getInt(
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
maxScannerResultSize = rs.conf.getLong(
maxScannerResultSize = conf.getLong(
HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
rpcTimeout = rs.conf.getInt(
rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
minimumScanTimeLimitDelta = rs.conf.getLong(
minimumScanTimeLimitDelta = conf.getLong(
REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
InetSocketAddress address = rpcServer.getListenerAddress();
final InetSocketAddress address = rpcServer.getListenerAddress();
if (address == null) {
throw new IOException("Listener channel is closed");
}
@ -1275,9 +1277,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
.expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build();
}
protected RpcServerInterface createRpcServer(Server server, Configuration conf,
RpcSchedulerFactory rpcSchedulerFactory, InetSocketAddress bindAddress, String name)
throws IOException {
protected RpcServerInterface createRpcServer(
final Server server,
final RpcSchedulerFactory rpcSchedulerFactory,
final InetSocketAddress bindAddress,
final String name
) throws IOException {
final Configuration conf = server.getConfiguration();
boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
try {
return RpcServerFactory.createRpcServer(server, name, getServices(),
@ -1291,7 +1297,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
protected Class<?> getRpcSchedulerFactoryClass() {
return this.regionServer.conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
final Configuration conf = regionServer.getConfiguration();
return conf.getClass(REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
SimpleRpcSchedulerFactory.class);
}
@ -1410,8 +1417,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
HRegion r, boolean needCursor) throws LeaseStillHeldException {
Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
new ScannerListener(scannerName));
Lease lease = regionServer.getLeaseManager().createLease(
scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName));
RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
RpcCallback closeCallback;
if (s instanceof RpcCallback) {
@ -1518,7 +1525,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (regionServer.isStopped()) {
throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
}
if (!regionServer.fsOk) {
if (!regionServer.isDataFileSystemOk()) {
throw new RegionServerStoppedException("File system not available");
}
if (!regionServer.isOnline()) {
@ -1757,12 +1764,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
checkOpen();
requestCount.increment();
Map<String, HRegion> onlineRegions = regionServer.onlineRegions;
Map<String, HRegion> onlineRegions = regionServer.getOnlineRegions();
List<RegionInfo> list = new ArrayList<>(onlineRegions.size());
for (HRegion region: onlineRegions.values()) {
list.add(region.getRegionInfo());
}
Collections.sort(list, RegionInfo.COMPARATOR);
list.sort(RegionInfo.COMPARATOR);
return ResponseConverter.buildGetOnlineRegionResponse(list);
} catch (IOException ie) {
throw new ServiceException(ie);
@ -2012,7 +2019,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
throw new ServiceException(ie);
}
// We are assigning meta, wait a little for regionserver to finish initialization.
int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
int timeout = regionServer.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
long endTime = System.currentTimeMillis() + timeout;
synchronized (regionServer.online) {
@ -2052,7 +2059,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
LOG.info("Open " + region.getRegionNameAsString());
final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
final Boolean previous = regionServer.getRegionsInTransitionInRS().putIfAbsent(
encodedNameBytes, Boolean.TRUE);
if (Boolean.FALSE.equals(previous)) {
@ -2063,7 +2070,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionServer.abort(error);
throw new IOException(error);
}
regionServer.regionsInTransitionInRS.put(encodedNameBytes, Boolean.TRUE);
regionServer.getRegionsInTransitionInRS().put(encodedNameBytes, Boolean.TRUE);
}
if (Boolean.TRUE.equals(previous)) {
@ -2265,9 +2272,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} catch (IOException ie) {
throw new ServiceException(ie);
} finally {
if (regionServer.metricsRegionServer != null) {
regionServer.metricsRegionServer.updateReplay(
EnvironmentEdgeManager.currentTime() - before);
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null) {
metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTime() - before);
}
}
}
@ -2294,7 +2301,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* Replicate WAL entries on the region server.
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HConstants.REPLICATION_QOS)
@ -2302,13 +2308,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
final ReplicateWALEntryRequest request) throws ServiceException {
try {
checkOpen();
if (regionServer.replicationSinkHandler != null) {
if (regionServer.getReplicationSinkService() != null) {
requestCount.increment();
List<WALEntry> entries = request.getEntryList();
checkShouldRejectReplicationRequest(entries);
CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries();
regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
regionServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
request.getSourceHFileArchiveDirPath());
regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries();
@ -2334,7 +2340,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
checkOpen();
requestCount.increment();
regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
regionServer.walRoller.requestRollAll();
regionServer.getWalRoller().requestRollAll();
regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
return builder.build();
@ -2416,7 +2422,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
// secure bulk load
Map<byte[], List<Path>> map =
regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds);
regionServer.getSecureBulkLoadManager().secureBulkLoadHFiles(region, request, clusterIds);
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
builder.setLoaded(map != null);
if (map != null) {
@ -2436,9 +2442,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} catch (IOException ie) {
throw new ServiceException(ie);
} finally {
if (regionServer.metricsRegionServer != null) {
regionServer.metricsRegionServer.updateBulkLoad(
EnvironmentEdgeManager.currentTime() - start);
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null) {
metricsRegionServer.updateBulkLoad(EnvironmentEdgeManager.currentTime() - start);
}
}
}
@ -2452,7 +2458,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
HRegion region = getRegion(request.getRegion());
String bulkToken = regionServer.secureBulkLoadManager.prepareBulkLoad(region, request);
String bulkToken = regionServer.getSecureBulkLoadManager().prepareBulkLoad(region, request);
PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder();
builder.setBulkToken(bulkToken);
return builder.build();
@ -2470,9 +2476,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
HRegion region = getRegion(request.getRegion());
regionServer.secureBulkLoadManager.cleanupBulkLoad(region, request);
CleanupBulkLoadResponse response = CleanupBulkLoadResponse.newBuilder().build();
return response;
regionServer.getSecureBulkLoadManager().cleanupBulkLoad(region, request);
return CleanupBulkLoadResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
@ -2609,11 +2614,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} catch (IOException ie) {
throw new ServiceException(ie);
} finally {
MetricsRegionServer mrs = regionServer.metricsRegionServer;
if (mrs != null) {
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null) {
TableDescriptor td = region != null? region.getTableDescriptor(): null;
if (td != null) {
mrs.updateGet(td.getTableName(), EnvironmentEdgeManager.currentTime() - before);
metricsRegionServer.updateGet(
td.getTableName(), EnvironmentEdgeManager.currentTime() - before);
}
}
if (quota != null) {
@ -2916,7 +2922,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
MutateResponse.Builder builder = MutateResponse.newBuilder();
MutationProto mutation = request.getMutation();
if (!region.getRegionInfo().isMetaRegion()) {
regionServer.cacheFlusher.reclaimMemStoreMemory();
regionServer.getMemStoreFlusher().reclaimMemStoreMemory();
}
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
Result r = null;
@ -3023,22 +3029,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
quota.close();
}
// Update metrics
if (regionServer.metricsRegionServer != null && type != null) {
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null && type != null) {
long after = EnvironmentEdgeManager.currentTime();
switch (type) {
case DELETE:
if (request.hasCondition()) {
regionServer.metricsRegionServer.updateCheckAndDelete(after - before);
metricsRegionServer.updateCheckAndDelete(after - before);
} else {
regionServer.metricsRegionServer.updateDelete(
metricsRegionServer.updateDelete(
region == null ? null : region.getRegionInfo().getTable(), after - before);
}
break;
case PUT:
if (request.hasCondition()) {
regionServer.metricsRegionServer.updateCheckAndPut(after - before);
metricsRegionServer.updateCheckAndPut(after - before);
} else {
regionServer.metricsRegionServer.updatePut(
metricsRegionServer.updatePut(
region == null ? null : region.getRegionInfo().getTable(),after - before);
}
break;
@ -3096,7 +3103,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
LOG.warn("Getting exception closing " + scannerName, e);
} finally {
try {
regionServer.leases.cancelLease(scannerName);
regionServer.getLeaseManager().cancelLease(scannerName);
} catch (LeaseException e) {
LOG.warn("Getting exception closing " + scannerName, e);
}
@ -3158,9 +3165,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
private void addScannerLeaseBack(Leases.Lease lease) {
private void addScannerLeaseBack(LeaseManager.Lease lease) {
try {
regionServer.leases.addLease(lease);
regionServer.getLeaseManager().addLease(lease);
} catch (LeaseStillHeldException e) {
// should not happen as the scanner id is unique.
throw new AssertionError(e);
@ -3376,10 +3383,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
long end = EnvironmentEdgeManager.currentTime();
long responseCellSize = context != null ? context.getResponseCellSize() : 0;
region.getMetrics().updateScanTime(end - before);
if (regionServer.metricsRegionServer != null) {
regionServer.metricsRegionServer.updateScanSize(
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null) {
metricsRegionServer.updateScanSize(
region.getTableDescriptor().getTableName(), responseCellSize);
regionServer.metricsRegionServer.updateScanTime(
metricsRegionServer.updateScanTime(
region.getTableDescriptor().getTableName(), end - before);
}
} finally {
@ -3418,9 +3426,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
LOG.debug(
"Server shutting down and client tried to access missing scanner " + scannerName);
}
if (regionServer.leases != null) {
final LeaseManager leaseManager = regionServer.getLeaseManager();
if (leaseManager != null) {
try {
regionServer.leases.cancelLease(scannerName);
leaseManager.cancelLease(scannerName);
} catch (LeaseException le) {
// No problem, ignore
if (LOG.isTraceEnabled()) {
@ -3454,11 +3463,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
HRegion region = rsh.r;
String scannerName = rsh.scannerName;
Leases.Lease lease;
LeaseManager.Lease lease;
try {
// Remove lease while its being processed in server; protects against case
// where processing of request takes > lease expiration time.
lease = regionServer.leases.removeLease(scannerName);
lease = regionServer.getLeaseManager().removeLease(scannerName);
} catch (LeaseException e) {
throw new ServiceException(e);
}

View File

@ -195,7 +195,7 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
/**
* @return The RegionServer's "Leases" service
*/
Leases getLeases();
LeaseManager getLeaseManager();
/**
* @return hbase executor service

View File

@ -59,14 +59,11 @@ class SplitRequest implements Runnable {
}
private void doSplitting() {
server.metricsRegionServer.incrSplitRequest();
server.getMetrics().incrSplitRequest();
if (user != null && user.getUGI() != null) {
user.getUGI().doAs (new PrivilegedAction<Void>() {
@Override
public Void run() {
requestRegionSplit();
return null;
}
user.getUGI().doAs((PrivilegedAction<Void>) () -> {
requestRegionSplit();
return null;
});
} else {
requestRegionSplit();

View File

@ -101,7 +101,7 @@ public class SplitWALCallable implements RSProcedureCallable {
private void splitWal() throws IOException {
SplitLogWorker.TaskExecutor.Status status =
SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.walFactory);
SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.getWalFactory());
if (status != SplitLogWorker.TaskExecutor.Status.DONE) {
throw new IOException("Split WAL " + walPath + " failed at server ");
}

View File

@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.quotas.RegionSizeStore;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.LeaseManager;
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
@ -229,7 +229,7 @@ public class MockRegionServerServices implements RegionServerServices {
}
@Override
public Leases getLeases() {
public LeaseManager getLeaseManager() {
return null;
}

View File

@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.quotas.RegionSizeStore;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.LeaseManager;
import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
@ -528,7 +528,7 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
}
@Override
public Leases getLeases() {
public LeaseManager getLeaseManager() {
return null;
}

View File

@ -113,7 +113,7 @@ public class TestMasterStatusServlet {
MetricsRegionServer rms = Mockito.mock(MetricsRegionServer.class);
Mockito.doReturn(new MetricsRegionServerWrapperStub()).when(rms).getRegionServerWrapper();
Mockito.doReturn(rms).when(master).getRegionServerMetrics();
Mockito.doReturn(rms).when(master).getMetrics();
// Mock admin
admin = Mockito.mock(Admin.class);

View File

@ -156,7 +156,7 @@ public class TestCleanupCompactedFileAfterFailover {
int walNum = rsServedTable.getWALs().size();
// Roll WAL
rsServedTable.walRoller.requestRollAll();
rsServedTable.getWalRoller().requestRollAll();
// Flush again
region.flush(true);
// The WAL which contains compaction event marker should be archived

View File

@ -487,7 +487,7 @@ public class TestPerColumnFamilyFlush {
assertTrue(desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize() < cfFlushSizeLowerBound);
table.put(createPut(1, 12345678));
// Make numRolledLogFiles greater than maxLogs
desiredRegionAndServer.getSecond().walRoller.requestRollAll();
desiredRegionAndServer.getSecond().getWalRoller().requestRollAll();
// Wait for some time till the flush caused by log rolling happens.
TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() {

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
@ -60,12 +59,13 @@ public class TestPriorityRpc {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestPriorityRpc.class);
private Configuration conf;
private HRegionServer regionServer = null;
private PriorityFunction priority = null;
@Before
public void setup() {
Configuration conf = HBaseConfiguration.create();
conf = HBaseConfiguration.create();
conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK
final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.getDataTestDir(this.getClass().getName());
@ -106,8 +106,8 @@ public class TestPriorityRpc {
.thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO.getTable());
// Presume type.
((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest,
User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"})));
assertEquals(
HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest, createSomeUser()));
}
@Test
@ -120,8 +120,7 @@ public class TestPriorityRpc {
headerBuilder.setMethodName("foo");
RequestHeader header = headerBuilder.build();
PriorityFunction qosFunc = regionServer.rpcServices.getPriority();
assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null,
User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"})));
assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null, createSomeUser()));
}
@Test
@ -141,12 +140,12 @@ public class TestPriorityRpc {
Mockito.when(mockRpc.getRegion(Mockito.any())).thenReturn(mockRegion);
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
// make isSystemTable return false
Mockito.when(mockRegionInfo.getTable()).thenReturn(TableName.valueOf("testQosFunctionForScanMethod"));
Mockito.when(mockRegionInfo.getTable())
.thenReturn(TableName.valueOf("testQosFunctionForScanMethod"));
// Presume type.
((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
int qos = priority.getPriority(header, scanRequest,
User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}));
assertTrue ("" + qos, qos == HConstants.NORMAL_QOS);
final int qos = priority.getPriority(header, scanRequest, createSomeUser());
assertEquals(Integer.toString(qos), qos, HConstants.NORMAL_QOS);
//build a scan request with scannerID
scanBuilder = ScanRequest.newBuilder();
@ -158,18 +157,26 @@ public class TestPriorityRpc {
Mockito.when(mockRegionScanner.getRegionInfo()).thenReturn(mockRegionInfo);
Mockito.when(mockRpc.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion);
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
Mockito.when(mockRegionInfo.getTable()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO.getTable());
Mockito.when(mockRegionInfo.getTable())
.thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO.getTable());
// Presume type.
((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, scanRequest,
User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"})));
assertEquals(
HConstants.SYSTEMTABLE_QOS,
priority.getPriority(header, scanRequest, createSomeUser()));
//the same as above but with non-meta region
// make isSystemTable return false
Mockito.when(mockRegionInfo.getTable()).thenReturn(TableName.valueOf("testQosFunctionForScanMethod"));
assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest,
User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"})));
Mockito.when(mockRegionInfo.getTable())
.thenReturn(TableName.valueOf("testQosFunctionForScanMethod"));
assertEquals(
HConstants.NORMAL_QOS,
priority.getPriority(header, scanRequest, createSomeUser()));
}
private User createSomeUser() {
return User.createUserForTesting(conf, "someuser", new String[] { "somegroup" });
}
}

View File

@ -110,7 +110,7 @@ public class TestRSStatusServlet {
MetricsRegionServer rms = Mockito.mock(MetricsRegionServer.class);
Mockito.doReturn(new MetricsRegionServerWrapperStub()).when(rms).getRegionServerWrapper();
Mockito.doReturn(rms).when(rs).getRegionServerMetrics();
Mockito.doReturn(rms).when(rs).getMetrics();
MetricsHBaseServer ms = Mockito.mock(MetricsHBaseServer.class);
Mockito.doReturn(new MetricsHBaseServerWrapperStub()).when(ms).getHBaseServerWrapper();

View File

@ -91,8 +91,11 @@ public class TestRegionServerAbortTimeout {
@AfterClass
public static void tearDown() throws Exception {
// Wait the SCP of abort rs to finish
UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream()
.filter(p -> p instanceof ServerCrashProcedure && p.isFinished()).count() > 0);
UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster()
.getMaster()
.getProcedures()
.stream()
.anyMatch(p -> p instanceof ServerCrashProcedure && p.isFinished()));
UTIL.getAdmin().disableTable(TABLE_NAME);
UTIL.getAdmin().deleteTable(TABLE_NAME);
UTIL.shutdownMiniCluster();

View File

@ -123,11 +123,11 @@ public class TestRegionServerMetrics {
while (cluster.getLiveRegionServerThreads().isEmpty() &&
cluster.getRegionServer(0) == null &&
rs.getRegionServerMetrics() == null) {
rs.getMetrics() == null) {
Threads.sleep(100);
}
rs = cluster.getRegionServer(0);
metricsRegionServer = rs.getRegionServerMetrics();
metricsRegionServer = rs.getMetrics();
serverSource = metricsRegionServer.getMetricsSource();
}

View File

@ -122,7 +122,7 @@ public class TestRegionServerNoMaster {
while (true) {
sn = MetaTableLocator.getMetaRegionLocation(zkw);
if (sn != null && sn.equals(hrs.getServerName())
&& hrs.onlineRegions.containsKey(
&& hrs.getOnlineRegions().containsKey(
HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
break;
}

View File

@ -121,13 +121,13 @@ public class TestSecureBulkLoadManager {
}
} ;
testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.secureBulkLoadManager.setFsCreatedListener(fsCreatedListener);
.getSecureBulkLoadManager().setFsCreatedListener(fsCreatedListener);
/// create table
testUtil.createTable(TABLE,FAMILY,Bytes.toByteArrays(SPLIT_ROWKEY));
/// prepare files
Path rootdir = testUtil.getMiniHBaseCluster().getRegionServerThreads().get(0)
.getRegionServer().getRootDir();
.getRegionServer().getDataRootDir();
Path dir1 = new Path(rootdir, "dir1");
prepareHFile(dir1, key1, value1);
Path dir2 = new Path(rootdir, "dir2");

View File

@ -125,13 +125,13 @@ public class TestSplitWalDataLoss {
Matchers.<Collection<HStore>> any());
// Find region key; don't pick up key for hbase:meta by mistake.
String key = null;
for (Map.Entry<String, HRegion> entry: rs.onlineRegions.entrySet()) {
for (Map.Entry<String, HRegion> entry: rs.getOnlineRegions().entrySet()) {
if (entry.getValue().getRegionInfo().getTable().equals(this.tableName)) {
key = entry.getKey();
break;
}
}
rs.onlineRegions.put(key, spiedRegion);
rs.getOnlineRegions().put(key, spiedRegion);
Connection conn = testUtil.getConnection();
try (Table table = conn.getTable(tableName)) {
@ -141,7 +141,7 @@ public class TestSplitWalDataLoss {
long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore);
assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
rs.cacheFlusher.requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY);
rs.getMemStoreFlusher().requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY);
synchronized (flushed) {
while (!flushed.booleanValue()) {
flushed.wait();

View File

@ -170,8 +170,11 @@ public class SerialReplicationTestBase {
@Override
public boolean evaluate() throws Exception {
return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
.map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished);
return UTIL.getMiniHBaseCluster()
.getLiveRegionServerThreads()
.stream()
.map(RegionServerThread::getRegionServer)
.allMatch(HRegionServer::walRollRequestFinished);
}
@Override

View File

@ -64,8 +64,8 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -229,7 +229,7 @@ public abstract class TestReplicationSourceManager {
return utility.getMiniHBaseCluster().getRegionServerThreads()
.stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
.findAny()
.map(HRegionServer::getReplicationSourceService)
.map(RegionServerServices::getReplicationSourceService)
.map(r -> (Replication)r)
.map(Replication::getReplicationManager)
.get();
@ -546,7 +546,7 @@ public abstract class TestReplicationSourceManager {
}
return utility.getMiniHBaseCluster().getRegionServerThreads()
.stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
.map(HRegionServer::getReplicationSourceService)
.map(RegionServerServices::getReplicationSourceService)
.map(r -> (Replication)r)
.map(Replication::getReplicationManager)
.mapToLong(ReplicationSourceManager::getSizeOfLatestPath)