HBASE-23102: Improper Usage of Map putIfAbsent (#828)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Co-authored-by: belugabehr <12578579+belugabehr@users.noreply.github.com>
This commit is contained in:
parent
82c4ccefd3
commit
3b6d8c3394
|
@ -836,10 +836,12 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
return;
|
||||
}
|
||||
|
||||
Procedure<TEnvironment> proc =
|
||||
new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
|
||||
completed.computeIfAbsent(procId, (key) -> {
|
||||
Procedure<TEnvironment> proc =
|
||||
new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
|
||||
|
||||
completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc));
|
||||
return new CompletedProcedureRetainer<>(proc);
|
||||
});
|
||||
}
|
||||
|
||||
// ==========================================================================
|
||||
|
|
|
@ -152,8 +152,7 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
|||
*/
|
||||
public void addNode(final TRemote key) {
|
||||
assert key != null : "Tried to add a node with a null key";
|
||||
final BufferNode newNode = new BufferNode(key);
|
||||
nodeMap.putIfAbsent(key, newNode);
|
||||
nodeMap.computeIfAbsent(key, k -> new BufferNode(k));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -694,9 +694,8 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
if (currRegion.isSplitParent()) {
|
||||
continue;
|
||||
}
|
||||
assignments.putIfAbsent(currTable, new HashMap<>());
|
||||
assignments.get(currTable).putIfAbsent(currServer, new ArrayList<>());
|
||||
assignments.get(currTable).get(currServer).add(currRegion);
|
||||
assignments.computeIfAbsent(currTable, key -> new HashMap<>())
|
||||
.computeIfAbsent(currServer, key -> new ArrayList<>()).add(currRegion);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -710,10 +709,14 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
// add all tables that are members of the group
|
||||
for (TableName tableName : rsGroupInfo.getTables()) {
|
||||
if (assignments.containsKey(tableName)) {
|
||||
result.put(tableName, new HashMap<>());
|
||||
result.get(tableName).putAll(serverMap);
|
||||
result.get(tableName).putAll(assignments.get(tableName));
|
||||
LOG.debug("Adding assignments for {}: {}", tableName, assignments.get(tableName));
|
||||
Map<ServerName, List<RegionInfo>> tableResults = new HashMap<>(serverMap);
|
||||
|
||||
Map<ServerName, List<RegionInfo>> tableAssignments = assignments.get(tableName);
|
||||
tableResults.putAll(tableAssignments);
|
||||
|
||||
result.put(tableName, tableResults);
|
||||
|
||||
LOG.debug("Adding assignments for {}: {}", tableName, tableAssignments);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ public class ExecutorService {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class);
|
||||
|
||||
// hold the all the executors created in a map addressable by their names
|
||||
private final ConcurrentHashMap<String, Executor> executorMap = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<String, Executor> executorMap = new ConcurrentHashMap<>();
|
||||
|
||||
// Name of the server hosting this executor service.
|
||||
private final String servername;
|
||||
|
@ -84,18 +84,16 @@ public class ExecutorService {
|
|||
*/
|
||||
public void startExecutorService(final ExecutorConfig config) {
|
||||
final String name = config.getName();
|
||||
if (this.executorMap.get(name) != null) {
|
||||
throw new RuntimeException(
|
||||
"An executor service with the name " + name + " is already running!");
|
||||
}
|
||||
Executor hbes = new Executor(config);
|
||||
if (this.executorMap.putIfAbsent(name, hbes) != null) {
|
||||
throw new RuntimeException(
|
||||
"An executor service with the name " + name + " is already running (2)!");
|
||||
}
|
||||
LOG.debug("Starting executor service name=" + name + ", corePoolSize="
|
||||
+ hbes.threadPoolExecutor.getCorePoolSize() + ", maxPoolSize="
|
||||
+ hbes.threadPoolExecutor.getMaximumPoolSize());
|
||||
Executor hbes = this.executorMap.compute(name, (key, value) -> {
|
||||
if (value != null) {
|
||||
throw new RuntimeException(
|
||||
"An executor service with the name " + key + " is already running!");
|
||||
}
|
||||
return new Executor(config);
|
||||
});
|
||||
|
||||
LOG.debug("Starting executor service name={}, corePoolSize={}, maxPoolSize={}", name,
|
||||
hbes.threadPoolExecutor.getCorePoolSize(), hbes.threadPoolExecutor.getMaximumPoolSize());
|
||||
}
|
||||
|
||||
boolean isExecutorServiceRunning(String name) {
|
||||
|
|
|
@ -155,8 +155,6 @@ public class RegionsRecoveryChore extends ScheduledChore {
|
|||
}
|
||||
LOG.warn("Region {} for Table {} has high storeFileRefCount {}, considering it for reopen..",
|
||||
regionInfo.getRegionNameAsString(), tableName, regionStoreRefCount);
|
||||
tableToReopenRegionsMap.putIfAbsent(tableName, new ArrayList<>());
|
||||
tableToReopenRegionsMap.get(tableName).add(regionName);
|
||||
|
||||
tableToReopenRegionsMap.computeIfAbsent(tableName, (key) -> new ArrayList<>()).add(regionName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -113,9 +113,8 @@ public class RegionStates {
|
|||
// RegionStateNode helpers
|
||||
// ==========================================================================
|
||||
RegionStateNode createRegionStateNode(RegionInfo regionInfo) {
|
||||
RegionStateNode newNode = new RegionStateNode(regionInfo, regionInTransition);
|
||||
RegionStateNode oldNode = regionsMap.putIfAbsent(regionInfo.getRegionName(), newNode);
|
||||
return oldNode != null ? oldNode : newNode;
|
||||
return regionsMap.computeIfAbsent(regionInfo.getRegionName(),
|
||||
key -> new RegionStateNode(regionInfo, regionInTransition));
|
||||
}
|
||||
|
||||
public RegionStateNode getOrCreateRegionStateNode(RegionInfo regionInfo) {
|
||||
|
@ -583,7 +582,7 @@ public class RegionStates {
|
|||
}
|
||||
// Add online servers with no assignment for the table.
|
||||
for (Map<ServerName, List<RegionInfo>> table : result.values()) {
|
||||
for (ServerName serverName : onlineServers) {
|
||||
for (ServerName serverName : serverMap.keySet()) {
|
||||
table.computeIfAbsent(serverName, key -> new ArrayList<>());
|
||||
}
|
||||
}
|
||||
|
@ -703,13 +702,7 @@ public class RegionStates {
|
|||
|
||||
public RegionFailedOpen addToFailedOpen(final RegionStateNode regionNode) {
|
||||
final byte[] key = regionNode.getRegionInfo().getRegionName();
|
||||
RegionFailedOpen node = regionFailedOpen.get(key);
|
||||
if (node == null) {
|
||||
RegionFailedOpen newNode = new RegionFailedOpen(regionNode);
|
||||
RegionFailedOpen oldNode = regionFailedOpen.putIfAbsent(key, newNode);
|
||||
node = oldNode != null ? oldNode : newNode;
|
||||
}
|
||||
return node;
|
||||
return regionFailedOpen.computeIfAbsent(key, (k) -> new RegionFailedOpen(regionNode));
|
||||
}
|
||||
|
||||
public RegionFailedOpen getFailedOpen(final RegionInfo regionInfo) {
|
||||
|
@ -740,13 +733,7 @@ public class RegionStates {
|
|||
* where we can.
|
||||
*/
|
||||
public ServerStateNode getOrCreateServer(final ServerName serverName) {
|
||||
ServerStateNode node = serverMap.get(serverName);
|
||||
if (node == null) {
|
||||
node = new ServerStateNode(serverName);
|
||||
ServerStateNode oldNode = serverMap.putIfAbsent(serverName, node);
|
||||
node = oldNode != null ? oldNode : node;
|
||||
}
|
||||
return node;
|
||||
return serverMap.computeIfAbsent(serverName, key -> new ServerStateNode(key));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.quotas;
|
|||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -34,7 +35,7 @@ public final class FileArchiverNotifierFactoryImpl implements FileArchiverNotifi
|
|||
private static final FileArchiverNotifierFactoryImpl DEFAULT_INSTANCE =
|
||||
new FileArchiverNotifierFactoryImpl();
|
||||
private static volatile FileArchiverNotifierFactory CURRENT_INSTANCE = DEFAULT_INSTANCE;
|
||||
private final ConcurrentHashMap<TableName, FileArchiverNotifier> CACHE;
|
||||
private final ConcurrentMap<TableName, FileArchiverNotifier> CACHE;
|
||||
|
||||
private FileArchiverNotifierFactoryImpl() {
|
||||
CACHE = new ConcurrentHashMap<>();
|
||||
|
@ -60,12 +61,7 @@ public final class FileArchiverNotifierFactoryImpl implements FileArchiverNotifi
|
|||
public FileArchiverNotifier get(Connection conn, Configuration conf, FileSystem fs,
|
||||
TableName tn) {
|
||||
// Ensure that only one instance is exposed to callers
|
||||
final FileArchiverNotifier newMapping = new FileArchiverNotifierImpl(conn, conf, fs, tn);
|
||||
final FileArchiverNotifier previousMapping = CACHE.putIfAbsent(tn, newMapping);
|
||||
if (previousMapping == null) {
|
||||
return newMapping;
|
||||
}
|
||||
return previousMapping;
|
||||
return CACHE.computeIfAbsent(tn, key -> new FileArchiverNotifierImpl(conn, conf, fs, key));
|
||||
}
|
||||
|
||||
public int getCacheSize() {
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
||||
|
@ -62,13 +63,10 @@ public class QuotaCache implements Stoppable {
|
|||
// for testing purpose only, enforce the cache to be always refreshed
|
||||
static boolean TEST_FORCE_REFRESH = false;
|
||||
|
||||
private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache =
|
||||
new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache =
|
||||
new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache =
|
||||
new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache =
|
||||
private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<String, QuotaState> regionServerQuotaCache =
|
||||
new ConcurrentHashMap<>();
|
||||
private volatile boolean exceedThrottleQuotaEnabled = false;
|
||||
// factors used to divide cluster scope quota into machine scope quota
|
||||
|
@ -166,8 +164,7 @@ public class QuotaCache implements Stoppable {
|
|||
* Returns the QuotaState requested. If the quota info is not in cache an empty one will be
|
||||
* returned and the quota request will be enqueued for the next cache refresh.
|
||||
*/
|
||||
private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap,
|
||||
final K key) {
|
||||
private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap, final K key) {
|
||||
return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
|
||||
}
|
||||
|
||||
|
@ -209,17 +206,18 @@ public class QuotaCache implements Stoppable {
|
|||
protected void chore() {
|
||||
// Prefetch online tables/namespaces
|
||||
for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) {
|
||||
if (table.isSystemTable()) continue;
|
||||
if (!QuotaCache.this.tableQuotaCache.containsKey(table)) {
|
||||
QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState());
|
||||
}
|
||||
String ns = table.getNamespaceAsString();
|
||||
if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) {
|
||||
QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
|
||||
if (table.isSystemTable()) {
|
||||
continue;
|
||||
}
|
||||
QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState());
|
||||
|
||||
final String ns = table.getNamespaceAsString();
|
||||
|
||||
QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState());
|
||||
}
|
||||
QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY,
|
||||
new QuotaState());
|
||||
|
||||
QuotaCache.this.regionServerQuotaCache
|
||||
.computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState());
|
||||
|
||||
updateQuotaFactors();
|
||||
fetchNamespaceQuotaState();
|
||||
|
@ -302,7 +300,7 @@ public class QuotaCache implements Stoppable {
|
|||
}
|
||||
|
||||
private <K, V extends QuotaState> void fetch(final String type,
|
||||
final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
|
||||
final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
long refreshPeriod = getPeriod();
|
||||
long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.throttle;
|
|||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -78,7 +79,7 @@ public class StoreHotnessProtector {
|
|||
private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
|
||||
private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
|
||||
|
||||
private final Map<byte[], AtomicInteger> preparePutToStoreMap =
|
||||
private final ConcurrentMap<byte[], AtomicInteger> preparePutToStoreMap =
|
||||
new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
|
||||
private final Region region;
|
||||
|
||||
|
@ -119,7 +120,7 @@ public class StoreHotnessProtector {
|
|||
public void update(Configuration conf) {
|
||||
init(conf);
|
||||
preparePutToStoreMap.clear();
|
||||
LOG.debug("update config: " + toString());
|
||||
LOG.debug("update config: {}", this);
|
||||
}
|
||||
|
||||
public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
|
||||
|
|
|
@ -17,6 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.emptyIterable;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasEntry;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -39,12 +45,15 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Test balancer with disabled table
|
||||
*/
|
||||
@Category({ MasterTests.class, LargeTests.class })
|
||||
public class TestBalancer {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestBalancer.class);
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
|
@ -85,8 +94,10 @@ public class TestBalancer {
|
|||
Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
|
||||
assignmentManager.getRegionStates().getAssignmentsForBalancer(tableStateManager,
|
||||
serverManager.getOnlineServersList());
|
||||
assignments.forEach((k, v) -> LOG.debug("{}: {}", k, v));
|
||||
assertFalse(assignments.containsKey(disableTableName));
|
||||
assertTrue(assignments.containsKey(tableName));
|
||||
assertFalse(assignments.get(tableName).containsKey(sn1));
|
||||
assertThat(assignments.get(tableName),
|
||||
allOf(notNullValue(), hasEntry(equalTo(sn1), emptyIterable())));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue