HBASE-16648 [JDK8] Use computeIfAbsent instead of get and putIfAbsent

This commit is contained in:
zhangduo 2016-12-01 12:12:17 +08:00
parent fb789b340c
commit 540ede376b
17 changed files with 190 additions and 366 deletions

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
@ -645,23 +647,10 @@ class AsyncProcess {
protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) { protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
tasksInProgress.incrementAndGet(); tasksInProgress.incrementAndGet();
AtomicInteger serverCnt = taskCounterPerServer.get(sn); computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
if (serverCnt == null) {
taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
serverCnt = taskCounterPerServer.get(sn);
}
serverCnt.incrementAndGet();
for (byte[] regBytes : regions) { for (byte[] regBytes : regions) {
AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes); computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet();
if (regionCnt == null) {
regionCnt = new AtomicInteger();
AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
if (oldCnt != null) {
regionCnt = oldCnt;
}
}
regionCnt.incrementAndGet();
} }
} }

View File

@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsentEx;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -921,11 +923,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
} }
// Map keyed by service name + regionserver to service stub implementation // Map keyed by service name + regionserver to service stub implementation
private final ConcurrentHashMap<String, Object> stubs = private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<String, Object>();
new ConcurrentHashMap<String, Object>();
// Map of locks used creating service stubs per regionserver.
private final ConcurrentHashMap<String, String> connectionLock =
new ConcurrentHashMap<String, String>();
/** /**
* State of the MasterService connection/setup. * State of the MasterService connection/setup.
@ -1008,7 +1006,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
long result; long result;
ServerErrors errorStats = errorsByServer.get(server); ServerErrors errorStats = errorsByServer.get(server);
if (errorStats != null) { if (errorStats != null) {
result = ConnectionUtils.getPauseTime(basePause, errorStats.getCount()); result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1));
} else { } else {
result = 0; // yes, if the server is not in our list we don't wait before retrying. result = 0; // yes, if the server is not in our list we don't wait before retrying.
} }
@ -1017,19 +1015,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
/** /**
* Reports that there was an error on the server to do whatever bean-counting necessary. * Reports that there was an error on the server to do whatever bean-counting necessary.
*
* @param server The server in question. * @param server The server in question.
*/ */
void reportServerError(ServerName server) { void reportServerError(ServerName server) {
ServerErrors errors = errorsByServer.get(server); computeIfAbsent(errorsByServer, server, ServerErrors::new).addError();
if (errors != null) {
errors.addError();
} else {
errors = errorsByServer.putIfAbsent(server, new ServerErrors());
if (errors != null){
errors.addError();
}
}
} }
long getStartTrackingTime() { long getStartTrackingTime() {
@ -1053,32 +1042,26 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
} }
/** /**
* Makes a client-side stub for master services. Sub-class to specialize. * Class to make a MasterServiceStubMaker stub.
* Depends on hosting class so not static. Exists so we avoid duplicating a bunch of code
* when setting up the MasterMonitorService and MasterAdminService.
*/ */
abstract class StubMaker { private final class MasterServiceStubMaker {
/**
* Returns the name of the service stub being created. private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub)
*/ throws IOException {
protected abstract String getServiceName(); try {
stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
} catch (ServiceException e) {
throw ProtobufUtil.handleRemoteException(e);
}
}
/** /**
* Make stub and cache it internal so can be used later doing the isMasterRunning call. * Create a stub. Try once only. It is not typed because there is no common type to protobuf
*/ * services nor their interfaces. Let the caller do appropriate casting.
protected abstract Object makeStub(final BlockingRpcChannel channel);
/**
* Once setup, check it works by doing isMasterRunning check.
*/
protected abstract void isMasterRunning() throws IOException;
/**
* Create a stub. Try once only. It is not typed because there is no common type to
* protobuf services nor their interfaces. Let the caller do appropriate casting.
* @return A stub for master services. * @return A stub for master services.
*/ */
private Object makeStubNoRetries() throws IOException, KeeperException { private MasterProtos.MasterService.BlockingInterface makeStubNoRetries()
throws IOException, KeeperException {
ZooKeeperKeepAliveConnection zkw; ZooKeeperKeepAliveConnection zkw;
try { try {
zkw = getKeepAliveZooKeeperWatcher(); zkw = getKeepAliveZooKeeperWatcher();
@ -1098,18 +1081,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
throw new MasterNotRunningException(sn + " is dead."); throw new MasterNotRunningException(sn + " is dead.");
} }
// Use the security info interface name as our stub key // Use the security info interface name as our stub key
String key = getStubKey(getServiceName(), sn, hostnamesCanChange); String key = getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn,
connectionLock.putIfAbsent(key, key); hostnamesCanChange);
Object stub = null; MasterProtos.MasterService.BlockingInterface stub =
synchronized (connectionLock.get(key)) { (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
stub = stubs.get(key); BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
if (stub == null) { return MasterProtos.MasterService.newBlockingStub(channel);
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); });
stub = makeStub(channel); isMasterRunning(stub);
isMasterRunning();
stubs.put(key, stub);
}
}
return stub; return stub;
} finally { } finally {
zkw.close(); zkw.close();
@ -1121,9 +1100,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
* @return A stub to do <code>intf</code> against the master * @return A stub to do <code>intf</code> against the master
* @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running
*/ */
Object makeStub() throws IOException { MasterProtos.MasterService.BlockingInterface makeStub() throws IOException {
// The lock must be at the beginning to prevent multiple master creations // The lock must be at the beginning to prevent multiple master creations
// (and leaks) in a multithread context // (and leaks) in a multithread context
synchronized (masterAndZKLock) { synchronized (masterAndZKLock) {
Exception exceptionCaught = null; Exception exceptionCaught = null;
if (!closed) { if (!closed) {
@ -1142,80 +1121,33 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
} }
} }
/**
* Class to make a MasterServiceStubMaker stub.
*/
class MasterServiceStubMaker extends StubMaker {
private MasterProtos.MasterService.BlockingInterface stub;
@Override
protected String getServiceName() {
return MasterProtos.MasterService.getDescriptor().getName();
}
@Override
MasterProtos.MasterService.BlockingInterface makeStub() throws IOException {
return (MasterProtos.MasterService.BlockingInterface)super.makeStub();
}
@Override
protected Object makeStub(BlockingRpcChannel channel) {
this.stub = MasterProtos.MasterService.newBlockingStub(channel);
return this.stub;
}
@Override
protected void isMasterRunning() throws IOException {
try {
this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
}
}
}
@Override @Override
public AdminProtos.AdminService.BlockingInterface getAdmin(final ServerName serverName) public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName)
throws IOException { throws IOException {
if (isDeadServer(serverName)) { if (isDeadServer(serverName)) {
throw new RegionServerStoppedException(serverName + " is dead."); throw new RegionServerStoppedException(serverName + " is dead.");
} }
String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName, String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName,
this.hostnamesCanChange); this.hostnamesCanChange);
this.connectionLock.putIfAbsent(key, key); return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
AdminProtos.AdminService.BlockingInterface stub; BlockingRpcChannel channel =
synchronized (this.connectionLock.get(key)) {
stub = (AdminProtos.AdminService.BlockingInterface)this.stubs.get(key);
if (stub == null) {
BlockingRpcChannel channel =
this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
stub = AdminProtos.AdminService.newBlockingStub(channel); return AdminProtos.AdminService.newBlockingStub(channel);
this.stubs.put(key, stub); });
}
}
return stub;
} }
@Override @Override
public BlockingInterface getClient(final ServerName sn) public BlockingInterface getClient(ServerName serverName) throws IOException {
throws IOException { if (isDeadServer(serverName)) {
if (isDeadServer(sn)) { throw new RegionServerStoppedException(serverName + " is dead.");
throw new RegionServerStoppedException(sn + " is dead.");
} }
String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), sn, String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(),
this.hostnamesCanChange); serverName, this.hostnamesCanChange);
this.connectionLock.putIfAbsent(key, key); return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
ClientProtos.ClientService.BlockingInterface stub = null; BlockingRpcChannel channel =
synchronized (this.connectionLock.get(key)) { this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
stub = (ClientProtos.ClientService.BlockingInterface)this.stubs.get(key); return ClientProtos.ClientService.newBlockingStub(channel);
if (stub == null) { });
BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
stub = ClientProtos.ClientService.newBlockingStub(channel);
// In old days, after getting stub/proxy, we'd make a call. We are not doing that here.
// Just fail on first actual call rather than in here on setup.
this.stubs.put(key, stub);
}
}
return stub;
} }
private ZooKeeperKeepAliveConnection keepAliveZookeeper; private ZooKeeperKeepAliveConnection keepAliveZookeeper;

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
@ -49,8 +51,7 @@ public class MetaCache {
* Map of table to table {@link HRegionLocation}s. * Map of table to table {@link HRegionLocation}s.
*/ */
private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], RegionLocations>> private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], RegionLocations>>
cachedRegionLocations = cachedRegionLocations = new CopyOnWriteArrayMap<>();
new CopyOnWriteArrayMap<>();
// The presence of a server in the map implies it's likely that there is an // The presence of a server in the map implies it's likely that there is an
// entry in cachedRegionLocations that map to this server; but the absence // entry in cachedRegionLocations that map to this server; but the absence
@ -191,21 +192,11 @@ public class MetaCache {
* @param tableName * @param tableName
* @return Map of cached locations for passed <code>tableName</code> * @return Map of cached locations for passed <code>tableName</code>
*/ */
private ConcurrentNavigableMap<byte[], RegionLocations> private ConcurrentNavigableMap<byte[], RegionLocations> getTableLocations(
getTableLocations(final TableName tableName) { final TableName tableName) {
// find the map of cached locations for this table // find the map of cached locations for this table
ConcurrentNavigableMap<byte[], RegionLocations> result; return computeIfAbsent(cachedRegionLocations, tableName,
result = this.cachedRegionLocations.get(tableName); () -> new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR));
// if tableLocations for this table isn't built yet, make one
if (result == null) {
result = new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR);
ConcurrentNavigableMap<byte[], RegionLocations> old =
this.cachedRegionLocations.putIfAbsent(tableName, result);
if (old != null) {
return old;
}
}
return result;
} }
/** /**

View File

@ -17,30 +17,32 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting; import static com.codahale.metrics.MetricRegistry.name;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import com.codahale.metrics.Counter; import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram; import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.codahale.metrics.JmxReporter; import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.RatioGauge; import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static com.codahale.metrics.MetricRegistry.name;
/** /**
* This class is for maintaining the various connection statistics and publishing them through * This class is for maintaining the various connection statistics and publishing them through
* the metrics interfaces. * the metrics interfaces.
@ -207,32 +209,15 @@ public class MetricsConnection implements StatisticTrackable {
} }
@Override @Override
public void updateRegionStats(ServerName serverName, byte[] regionName, public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) {
RegionLoadStats stats) {
String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
ConcurrentMap<byte[], RegionStats> rsStats = null; ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName,
if (serverStats.containsKey(serverName)) { () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
rsStats = serverStats.get(serverName); RegionStats regionStats =
} else { computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name));
rsStats = serverStats.putIfAbsent(serverName,
new ConcurrentSkipListMap<byte[], RegionStats>(Bytes.BYTES_COMPARATOR));
if (rsStats == null) {
rsStats = serverStats.get(serverName);
}
}
RegionStats regionStats = null;
if (rsStats.containsKey(regionName)) {
regionStats = rsStats.get(regionName);
} else {
regionStats = rsStats.putIfAbsent(regionName, new RegionStats(this.registry, name));
if (regionStats == null) {
regionStats = rsStats.get(regionName);
}
}
regionStats.update(stats); regionStats.update(stats);
} }
/** A lambda for dispatching to the appropriate metric factory method */ /** A lambda for dispatching to the appropriate metric factory method */
private static interface NewMetric<T> { private static interface NewMetric<T> {
T newMetric(Class<?> clazz, String name, String scope); T newMetric(Class<?> clazz, String name, String scope);
@ -407,13 +392,7 @@ public class MetricsConnection implements StatisticTrackable {
* Get a metric for {@code key} from {@code map}, or create it with {@code factory}. * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
*/ */
private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) { private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
T t = map.get(key); return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope));
if (t == null) {
t = factory.newMetric(this.getClass(), key, scope);
T tmp = map.putIfAbsent(key, t);
t = (tmp == null) ? t : tmp;
}
return t;
} }
/** Update call stats for non-critical-path methods */ /** Update call stats for non-critical-path methods */

View File

@ -17,6 +17,10 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -35,34 +39,29 @@ import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import com.google.common.annotations.VisibleForTesting;
/** /**
*
* The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail
* feature. * feature.
* * <p>
* The motivation is as follows : * The motivation is as follows : In case where a large number of clients try and talk to a
* In case where a large number of clients try and talk to a particular region server in hbase, if * particular region server in hbase, if the region server goes down due to network problems, we
* the region server goes down due to network problems, we might end up in a scenario where * might end up in a scenario where the clients would go into a state where they all start to retry.
* the clients would go into a state where they all start to retry.
* This behavior will set off many of the threads in pretty much the same path and they all would be * This behavior will set off many of the threads in pretty much the same path and they all would be
* sleeping giving rise to a state where the client either needs to create more threads to send new * sleeping giving rise to a state where the client either needs to create more threads to send new
* requests to other hbase machines or block because the client cannot create anymore threads. * requests to other hbase machines or block because the client cannot create anymore threads.
* * <p>
* In most cases the clients might prefer to have a bound on the number of threads that are created * In most cases the clients might prefer to have a bound on the number of threads that are created
* in order to send requests to hbase. This would mostly result in the client thread starvation. * in order to send requests to hbase. This would mostly result in the client thread starvation.
* * <p>
* To circumvent this problem, the approach that is being taken here under is to let 1 of the many * To circumvent this problem, the approach that is being taken here under is to let 1 of the many
* threads who are trying to contact the regionserver with connection problems and let the other * threads who are trying to contact the regionserver with connection problems and let the other
* threads get a {@link PreemptiveFastFailException} so that they can move on and take other * threads get a {@link PreemptiveFastFailException} so that they can move on and take other
* requests. * requests.
* * <p>
* This would give the client more flexibility on the kind of action he would want to take in cases * This would give the client more flexibility on the kind of action he would want to take in cases
* where the regionserver is down. He can either discard the requests and send a nack upstream * where the regionserver is down. He can either discard the requests and send a nack upstream
* faster or have an application level retry or buffer the requests up so as to send them down to * faster or have an application level retry or buffer the requests up so as to send them down to
* hbase later. * hbase later.
*
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
@ -155,15 +154,8 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
return; return;
} }
long currentTime = EnvironmentEdgeManager.currentTime(); long currentTime = EnvironmentEdgeManager.currentTime();
FailureInfo fInfo = repeatedFailuresMap.get(serverName); FailureInfo fInfo =
if (fInfo == null) { computeIfAbsent(repeatedFailuresMap, serverName, () -> new FailureInfo(currentTime));
fInfo = new FailureInfo(currentTime);
FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(serverName, fInfo);
if (oldfInfo != null) {
fInfo = oldfInfo;
}
}
fInfo.timeOfLatestAttemptMilliSec = currentTime; fInfo.timeOfLatestAttemptMilliSec = currentTime;
fInfo.numConsecutiveFailures.incrementAndGet(); fInfo.numConsecutiveFailures.incrementAndGet();
} }

View File

@ -17,15 +17,18 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* Tracks the statistics for multiple regions * Tracks the statistics for multiple regions
*/ */
@ -36,23 +39,8 @@ public class ServerStatisticTracker implements StatisticTrackable {
new ConcurrentHashMap<ServerName, ServerStatistics>(); new ConcurrentHashMap<ServerName, ServerStatistics>();
@Override @Override
public void updateRegionStats(ServerName server, byte[] region, RegionLoadStats public void updateRegionStats(ServerName server, byte[] region, RegionLoadStats currentStats) {
currentStats) { computeIfAbsent(stats, server, ServerStatistics::new).update(region, currentStats);
ServerStatistics stat = stats.get(server);
if (stat == null) {
stat = stats.get(server);
// We don't have stats for that server yet, so we need to make an entry.
// If we race with another thread it's a harmless unnecessary allocation.
if (stat == null) {
stat = new ServerStatistics();
ServerStatistics old = stats.putIfAbsent(server, stat);
if (old != null) {
stat = old;
}
}
}
stat.update(region, currentStats);
} }
public ServerStatistics getStats(ServerName server) { public ServerStatistics getStats(ServerName server) {

View File

@ -110,15 +110,12 @@ public class CollectionUtils {
/** /**
* In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the * In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the
* value already exists. So here we copy the implementation of * value already exists. Notice that the implementation does not guarantee that the supplier will
* {@link ConcurrentMap#computeIfAbsent(Object, java.util.function.Function)}. It uses get and * only be executed once.
* putIfAbsent to implement computeIfAbsent. And notice that the implementation does not guarantee
* that the supplier will only be executed once.
*/ */
public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Supplier<V> supplier) { public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Supplier<V> supplier) {
V v, newValue; return computeIfAbsent(map, key, supplier, () -> {
return ((v = map.get(key)) == null && (newValue = supplier.get()) != null });
&& (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v;
} }
/** /**
@ -142,4 +139,19 @@ public class CollectionUtils {
return ((v = map.get(key)) == null && (newValue = supplier.get()) != null return ((v = map.get(key)) == null && (newValue = supplier.get()) != null
&& (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v; && (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v;
} }
public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Supplier<V> supplier,
Runnable actionIfAbsent) {
V v = map.get(key);
if (v != null) {
return v;
}
V newValue = supplier.get();
v = map.putIfAbsent(key, newValue);
if (v != null) {
return v;
}
actionIfAbsent.run();
return newValue;
}
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.coordination; package org.apache.hadoop.hbase.coordination;
import static org.apache.hadoop.hbase.util.CollectionUtils.*;
import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK; import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE; import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED; import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
@ -52,6 +53,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLo
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
@ -449,7 +451,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
setDone(path, FAILURE); setDone(path, FAILURE);
return; return;
} }
data = this.watcher.getRecoverableZooKeeper().removeMetaData(data); data = RecoverableZooKeeper.removeMetaData(data);
SplitLogTask slt = SplitLogTask.parseFrom(data); SplitLogTask slt = SplitLogTask.parseFrom(data);
if (slt.isUnassigned()) { if (slt.isUnassigned()) {
LOG.debug("task not yet acquired " + path + " ver = " + version); LOG.debug("task not yet acquired " + path + " ver = " + version);
@ -531,16 +533,11 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
return; return;
} }
Task findOrCreateOrphanTask(String path) { private Task findOrCreateOrphanTask(String path) {
Task orphanTask = new Task(); return computeIfAbsent(details.getTasks(), path, Task::new, () -> {
Task task;
task = details.getTasks().putIfAbsent(path, orphanTask);
if (task == null) {
LOG.info("creating orphan task " + path); LOG.info("creating orphan task " + path);
SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
task = orphanTask; });
}
return task;
} }
private void heartbeat(String path, int new_version, ServerName workerName) { private void heartbeat(String path, int new_version, ServerName workerName) {

View File

@ -18,6 +18,10 @@
*/ */
package org.apache.hadoop.hbase.master; package org.apache.hadoop.hbase.master;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -56,6 +60,11 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@ -67,9 +76,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.Reg
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounter;
@ -78,11 +84,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
/** /**
* The ServerManager class manages info about region servers. * The ServerManager class manages info about region servers.
* <p> * <p>
@ -273,18 +274,6 @@ public class ServerManager {
return sn; return sn;
} }
private ConcurrentNavigableMap<byte[], Long> getOrCreateStoreFlushedSequenceId(
byte[] regionName) {
ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
storeFlushedSequenceIdsByRegion.get(regionName);
if (storeFlushedSequenceId != null) {
return storeFlushedSequenceId;
}
storeFlushedSequenceId = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
ConcurrentNavigableMap<byte[], Long> alreadyPut =
storeFlushedSequenceIdsByRegion.putIfAbsent(regionName, storeFlushedSequenceId);
return alreadyPut == null ? storeFlushedSequenceId : alreadyPut;
}
/** /**
* Updates last flushed sequence Ids for the regions on server sn * Updates last flushed sequence Ids for the regions on server sn
* @param sn * @param sn
@ -309,7 +298,8 @@ public class ServerManager {
+ existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring."); + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
} }
ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId = ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
getOrCreateStoreFlushedSequenceId(encodedRegionName); computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
() -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
for (StoreSequenceId storeSeqId : entry.getValue().getStoreCompleteSequenceId()) { for (StoreSequenceId storeSeqId : entry.getValue().getStoreCompleteSequenceId()) {
byte[] family = storeSeqId.getFamilyName().toByteArray(); byte[] family = storeSeqId.getFamilyName().toByteArray();
existingValue = storeFlushedSequenceId.get(family); existingValue = storeFlushedSequenceId.get(family);

View File

@ -117,7 +117,8 @@ public class SplitLogManager {
*/ */
protected final ReentrantLock recoveringRegionLock = new ReentrantLock(); protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>(); @VisibleForTesting
final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
private TimeoutMonitor timeoutMonitor; private TimeoutMonitor timeoutMonitor;
private volatile Set<ServerName> deadWorkers = null; private volatile Set<ServerName> deadWorkers = null;
@ -504,18 +505,6 @@ public class SplitLogManager {
} }
} }
Task findOrCreateOrphanTask(String path) {
Task orphanTask = new Task();
Task task;
task = tasks.putIfAbsent(path, orphanTask);
if (task == null) {
LOG.info("creating orphan task " + path);
SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
task = orphanTask;
}
return task;
}
public void stop() { public void stop() {
if (choreService != null) { if (choreService != null) {
choreService.shutdown(); choreService.shutdown();

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.hbase.quotas; package org.apache.hadoop.hbase.quotas;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -38,8 +42,6 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Cache that keeps track of the quota settings for the users and tables that * Cache that keeps track of the quota settings for the users and tables that
* are interacting with it. * are interacting with it.
@ -114,20 +116,12 @@ public class QuotaCache implements Stoppable {
/** /**
* Returns the QuotaState associated to the specified user. * Returns the QuotaState associated to the specified user.
*
* @param ugi the user * @param ugi the user
* @return the quota info associated to specified user * @return the quota info associated to specified user
*/ */
public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
String key = ugi.getShortUserName(); return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new,
UserQuotaState quotaInfo = userQuotaCache.get(key); this::triggerCacheRefresh);
if (quotaInfo == null) {
quotaInfo = new UserQuotaState();
if (userQuotaCache.putIfAbsent(key, quotaInfo) == null) {
triggerCacheRefresh();
}
}
return quotaInfo;
} }
/** /**
@ -151,24 +145,12 @@ public class QuotaCache implements Stoppable {
} }
/** /**
* Returns the QuotaState requested. * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
* If the quota info is not in cache an empty one will be returned * returned and the quota request will be enqueued for the next cache refresh.
* and the quota request will be enqueued for the next cache refresh.
*/ */
private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap, private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap,
final K key) { final K key) {
QuotaState quotaInfo = quotasMap.get(key); return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
if (quotaInfo == null) {
quotaInfo = new QuotaState();
if (quotasMap.putIfAbsent(key, quotaInfo) == null) {
triggerCacheRefresh();
}
}
return quotaInfo;
}
private Configuration getConfiguration() {
return rsServices.getConfiguration();
} }
@VisibleForTesting @VisibleForTesting

View File

@ -1,4 +1,4 @@
/* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional; import com.google.common.base.Optional;
@ -5314,19 +5315,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Keep trying until we have a lock or error out. // Keep trying until we have a lock or error out.
// TODO: do we need to add a time component here? // TODO: do we need to add a time component here?
while (result == null) { while (result == null) {
rowLockContext = computeIfAbsent(lockedRows, rowKey, () -> new RowLockContext(rowKey));
// Try adding a RowLockContext to the lockedRows.
// If we can add it then there's no other transactions currently running.
rowLockContext = new RowLockContext(rowKey);
RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
// if there was a running transaction then there's already a context.
if (existingContext != null) {
rowLockContext = existingContext;
}
// Now try an get the lock. // Now try an get the lock.
//
// This can fail as // This can fail as
if (readLock) { if (readLock) {
result = rowLockContext.newReadLock(); result = rowLockContext.newReadLock();

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList; import java.util.ArrayList;
@ -215,16 +217,8 @@ class SequenceIdAccounting {
@VisibleForTesting @VisibleForTesting
ConcurrentMap<ImmutableByteArray, Long> getOrCreateLowestSequenceIds(byte[] encodedRegionName) { ConcurrentMap<ImmutableByteArray, Long> getOrCreateLowestSequenceIds(byte[] encodedRegionName) {
// Intentionally, this access is done outside of this.regionSequenceIdLock. Done per append. // Intentionally, this access is done outside of this.regionSequenceIdLock. Done per append.
ConcurrentMap<ImmutableByteArray, Long> m = this.lowestUnflushedSequenceIds return computeIfAbsent(this.lowestUnflushedSequenceIds, encodedRegionName,
.get(encodedRegionName); ConcurrentHashMap::new);
if (m != null) {
return m;
}
m = new ConcurrentHashMap<>();
// Another thread may have added it ahead of us.
ConcurrentMap<ImmutableByteArray, Long> alreadyPut = this.lowestUnflushedSequenceIds
.putIfAbsent(encodedRegionName, m);
return alreadyPut == null ? m : alreadyPut;
} }
/** /**

View File

@ -1,4 +1,4 @@
/* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -18,6 +18,13 @@
package org.apache.hadoop.hbase.security.access; package org.apache.hadoop.hbase.security.access;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
@ -28,11 +35,11 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
@ -41,11 +48,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
/** /**
* Performs authorization checks for a given user's assigned permissions * Performs authorization checks for a given user's assigned permissions
*/ */
@ -276,17 +278,11 @@ public class TableAuthManager implements Closeable {
} }
private PermissionCache<TablePermission> getTablePermissions(TableName table) { private PermissionCache<TablePermission> getTablePermissions(TableName table) {
if (!tableCache.containsKey(table)) { return computeIfAbsent(tableCache, table, PermissionCache::new);
tableCache.putIfAbsent(table, new PermissionCache<TablePermission>());
}
return tableCache.get(table);
} }
private PermissionCache<TablePermission> getNamespacePermissions(String namespace) { private PermissionCache<TablePermission> getNamespacePermissions(String namespace) {
if (!nsCache.containsKey(namespace)) { return computeIfAbsent(nsCache, namespace, PermissionCache::new);
nsCache.putIfAbsent(namespace, new PermissionCache<TablePermission>());
}
return nsCache.get(namespace);
} }
/** /**

View File

@ -18,6 +18,8 @@
*/ */
package org.apache.hadoop.hbase.wal; package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -44,15 +46,8 @@ public class BoundedGroupingStrategy implements RegionGroupingStrategy{
@Override @Override
public String group(byte[] identifier, byte[] namespace) { public String group(byte[] identifier, byte[] namespace) {
String idStr = Bytes.toString(identifier); String idStr = Bytes.toString(identifier);
String groupName = groupNameCache.get(idStr); return computeIfAbsent(groupNameCache, idStr,
if (null == groupName) { () -> groupNames[getAndIncrAtomicInteger(counter, groupNames.length)]);
groupName = groupNames[getAndIncrAtomicInteger(counter, groupNames.length)];
String extantName = groupNameCache.putIfAbsent(idStr, groupName);
if (extantName != null) {
return extantName;
}
}
return groupName;
} }
// Non-blocking incrementing & resetting of AtomicInteger. // Non-blocking incrementing & resetting of AtomicInteger.

View File

@ -122,7 +122,7 @@ public class TestMasterShutdown {
final MasterThread master = cluster.getMasters().get(MASTER_INDEX); final MasterThread master = cluster.getMasters().get(MASTER_INDEX);
master.start(); master.start();
LOG.info("Called master start on " + master.getName()); LOG.info("Called master start on " + master.getName());
Thread shutdownThread = new Thread() { Thread shutdownThread = new Thread("Shutdown-Thread") {
public void run() { public void run() {
LOG.info("Before call to shutdown master"); LOG.info("Before call to shutdown master");
try { try {

View File

@ -58,8 +58,8 @@ import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener; import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
@ -196,6 +196,14 @@ public class TestSplitLogManager {
assertEquals(newval, e.eval()); assertEquals(newval, e.eval());
} }
private Task findOrCreateOrphanTask(String path) {
return slm.tasks.computeIfAbsent(path, k -> {
LOG.info("creating orphan task " + k);
SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
return new Task();
});
}
private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException, private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
InterruptedException { InterruptedException {
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
@ -205,7 +213,7 @@ public class TestSplitLogManager {
slm.enqueueSplitTask(name, batch); slm.enqueueSplitTask(name, batch);
assertEquals(1, batch.installed); assertEquals(1, batch.installed);
assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch); assertTrue(findOrCreateOrphanTask(tasknode).batch == batch);
assertEquals(1L, tot_mgr_node_create_queued.get()); assertEquals(1L, tot_mgr_node_create_queued.get());
LOG.debug("waiting for task node creation"); LOG.debug("waiting for task node creation");
@ -244,7 +252,7 @@ public class TestSplitLogManager {
slm = new SplitLogManager(master, conf); slm = new SplitLogManager(master, conf);
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
Task task = slm.findOrCreateOrphanTask(tasknode); Task task = findOrCreateOrphanTask(tasknode);
assertTrue(task.isOrphan()); assertTrue(task.isOrphan());
waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
assertFalse(task.isUnassigned()); assertFalse(task.isUnassigned());
@ -270,12 +278,12 @@ public class TestSplitLogManager {
slm = new SplitLogManager(master, conf); slm = new SplitLogManager(master, conf);
waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2); waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
Task task = slm.findOrCreateOrphanTask(tasknode); Task task = findOrCreateOrphanTask(tasknode);
assertTrue(task.isOrphan()); assertTrue(task.isOrphan());
assertTrue(task.isUnassigned()); assertTrue(task.isUnassigned());
// wait for RESCAN node to be created // wait for RESCAN node to be created
waitForCounter(tot_mgr_rescan, 0, 1, to/2); waitForCounter(tot_mgr_rescan, 0, 1, to / 2);
Task task2 = slm.findOrCreateOrphanTask(tasknode); Task task2 = findOrCreateOrphanTask(tasknode);
assertTrue(task == task2); assertTrue(task == task2);
LOG.debug("task = " + task); LOG.debug("task = " + task);
assertEquals(1L, tot_mgr_resubmit.get()); assertEquals(1L, tot_mgr_resubmit.get());