HBASE-23102: Improper Usage of Map putIfAbsent (#828)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
b292ffd14d
commit
a3efa5911d
|
@ -828,10 +828,12 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Procedure<TEnvironment> proc =
|
completed.computeIfAbsent(procId, (key) -> {
|
||||||
new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
|
Procedure<TEnvironment> proc = new FailedProcedure<>(procId.longValue(),
|
||||||
|
procName, procOwner, nonceKey, exception);
|
||||||
|
|
||||||
completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc));
|
return new CompletedProcedureRetainer<>(proc);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
|
|
|
@ -147,8 +147,7 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
||||||
*/
|
*/
|
||||||
public void addNode(final TRemote key) {
|
public void addNode(final TRemote key) {
|
||||||
assert key != null: "Tried to add a node with a null key";
|
assert key != null: "Tried to add a node with a null key";
|
||||||
final BufferNode newNode = new BufferNode(key);
|
nodeMap.computeIfAbsent(key, k -> new BufferNode(k));
|
||||||
nodeMap.putIfAbsent(key, newNode);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -581,9 +581,9 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
||||||
ServerName currServer = entry.getValue();
|
ServerName currServer = entry.getValue();
|
||||||
RegionInfo currRegion = entry.getKey();
|
RegionInfo currRegion = entry.getKey();
|
||||||
if (rsGroupInfo.getTables().contains(currTable)) {
|
if (rsGroupInfo.getTables().contains(currTable)) {
|
||||||
assignments.putIfAbsent(currTable, new HashMap<>());
|
assignments.computeIfAbsent(currTable, key -> new HashMap<>())
|
||||||
assignments.get(currTable).putIfAbsent(currServer, new ArrayList<>());
|
.computeIfAbsent(currServer, key -> new ArrayList<>())
|
||||||
assignments.get(currTable).get(currServer).add(currRegion);
|
.add(currRegion);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -595,12 +595,16 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
||||||
}
|
}
|
||||||
|
|
||||||
// add all tables that are members of the group
|
// add all tables that are members of the group
|
||||||
for(TableName tableName : rsGroupInfo.getTables()) {
|
for (TableName tableName : rsGroupInfo.getTables()) {
|
||||||
if(assignments.containsKey(tableName)) {
|
if (assignments.containsKey(tableName)) {
|
||||||
result.put(tableName, new HashMap<>());
|
Map<ServerName, List<RegionInfo>> tableResults = new HashMap<>(serverMap);
|
||||||
result.get(tableName).putAll(serverMap);
|
|
||||||
result.get(tableName).putAll(assignments.get(tableName));
|
Map<ServerName, List<RegionInfo>> tableAssignments = assignments.get(tableName);
|
||||||
LOG.debug("Adding assignments for {}: {}", tableName, assignments.get(tableName));
|
tableResults.putAll(tableAssignments);
|
||||||
|
|
||||||
|
result.put(tableName, tableResults);
|
||||||
|
|
||||||
|
LOG.debug("Adding assignments for {}: {}", tableName, tableAssignments);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,7 @@ public class ExecutorService {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class);
|
||||||
|
|
||||||
// hold the all the executors created in a map addressable by their names
|
// hold the all the executors created in a map addressable by their names
|
||||||
private final ConcurrentHashMap<String, Executor> executorMap = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, Executor> executorMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
// Name of the server hosting this executor service.
|
// Name of the server hosting this executor service.
|
||||||
private final String servername;
|
private final String servername;
|
||||||
|
@ -87,18 +87,18 @@ public class ExecutorService {
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void startExecutorService(String name, int maxThreads) {
|
public void startExecutorService(String name, int maxThreads) {
|
||||||
if (this.executorMap.get(name) != null) {
|
Executor hbes = this.executorMap.compute(name, (key, value) -> {
|
||||||
throw new RuntimeException("An executor service with the name " + name +
|
if (value != null) {
|
||||||
" is already running!");
|
throw new RuntimeException("An executor service with the name " + key +
|
||||||
}
|
" is already running!");
|
||||||
Executor hbes = new Executor(name, maxThreads);
|
}
|
||||||
if (this.executorMap.putIfAbsent(name, hbes) != null) {
|
return new Executor(key, maxThreads);
|
||||||
throw new RuntimeException("An executor service with the name " + name +
|
});
|
||||||
" is already running (2)!");
|
|
||||||
}
|
LOG.debug(
|
||||||
LOG.debug("Starting executor service name=" + name +
|
"Starting executor service name={}, corePoolSize={}, maxPoolSize={}",
|
||||||
", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
|
name, hbes.threadPoolExecutor.getCorePoolSize(),
|
||||||
", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
|
hbes.threadPoolExecutor.getMaximumPoolSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isExecutorServiceRunning(String name) {
|
boolean isExecutorServiceRunning(String name) {
|
||||||
|
@ -134,7 +134,8 @@ public class ExecutorService {
|
||||||
public void startExecutorService(final ExecutorType type, final int maxThreads) {
|
public void startExecutorService(final ExecutorType type, final int maxThreads) {
|
||||||
String name = type.getExecutorName(this.servername);
|
String name = type.getExecutorName(this.servername);
|
||||||
if (isExecutorServiceRunning(name)) {
|
if (isExecutorServiceRunning(name)) {
|
||||||
LOG.debug("Executor service " + toString() + " already running on " + this.servername);
|
LOG.debug("Executor service {} already running on {}", this,
|
||||||
|
this.servername);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
startExecutorService(name, maxThreads);
|
startExecutorService(name, maxThreads);
|
||||||
|
|
|
@ -161,9 +161,8 @@ public class RegionsRecoveryChore extends ScheduledChore {
|
||||||
}
|
}
|
||||||
LOG.warn("Region {} for Table {} has high storeFileRefCount {}, considering it for reopen..",
|
LOG.warn("Region {} for Table {} has high storeFileRefCount {}, considering it for reopen..",
|
||||||
regionInfo.getRegionNameAsString(), tableName, regionStoreRefCount);
|
regionInfo.getRegionNameAsString(), tableName, regionStoreRefCount);
|
||||||
tableToReopenRegionsMap.putIfAbsent(tableName, new ArrayList<>());
|
tableToReopenRegionsMap
|
||||||
tableToReopenRegionsMap.get(tableName).add(regionName);
|
.computeIfAbsent(tableName, (key) -> new ArrayList<>()).add(regionName);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// hashcode/equals implementation to ensure at-most one object of RegionsRecoveryChore
|
// hashcode/equals implementation to ensure at-most one object of RegionsRecoveryChore
|
||||||
|
|
|
@ -112,9 +112,8 @@ public class RegionStates {
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
RegionStateNode createRegionStateNode(RegionInfo regionInfo) {
|
RegionStateNode createRegionStateNode(RegionInfo regionInfo) {
|
||||||
RegionStateNode newNode = new RegionStateNode(regionInfo, regionInTransition);
|
return regionsMap.computeIfAbsent(regionInfo.getRegionName(),
|
||||||
RegionStateNode oldNode = regionsMap.putIfAbsent(regionInfo.getRegionName(), newNode);
|
key -> new RegionStateNode(regionInfo, regionInTransition));
|
||||||
return oldNode != null ? oldNode : newNode;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegionStateNode getOrCreateRegionStateNode(RegionInfo regionInfo) {
|
public RegionStateNode getOrCreateRegionStateNode(RegionInfo regionInfo) {
|
||||||
|
@ -556,7 +555,7 @@ public class RegionStates {
|
||||||
// Add online servers with no assignment for the table.
|
// Add online servers with no assignment for the table.
|
||||||
for (Map<ServerName, List<RegionInfo>> table : result.values()) {
|
for (Map<ServerName, List<RegionInfo>> table : result.values()) {
|
||||||
for (ServerName serverName : serverMap.keySet()) {
|
for (ServerName serverName : serverMap.keySet()) {
|
||||||
table.putIfAbsent(serverName, new ArrayList<>());
|
table.computeIfAbsent(serverName, key -> new ArrayList<>());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -677,13 +676,7 @@ public class RegionStates {
|
||||||
|
|
||||||
public RegionFailedOpen addToFailedOpen(final RegionStateNode regionNode) {
|
public RegionFailedOpen addToFailedOpen(final RegionStateNode regionNode) {
|
||||||
final byte[] key = regionNode.getRegionInfo().getRegionName();
|
final byte[] key = regionNode.getRegionInfo().getRegionName();
|
||||||
RegionFailedOpen node = regionFailedOpen.get(key);
|
return regionFailedOpen.computeIfAbsent(key, (k) -> new RegionFailedOpen(regionNode));
|
||||||
if (node == null) {
|
|
||||||
RegionFailedOpen newNode = new RegionFailedOpen(regionNode);
|
|
||||||
RegionFailedOpen oldNode = regionFailedOpen.putIfAbsent(key, newNode);
|
|
||||||
node = oldNode != null ? oldNode : newNode;
|
|
||||||
}
|
|
||||||
return node;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegionFailedOpen getFailedOpen(final RegionInfo regionInfo) {
|
public RegionFailedOpen getFailedOpen(final RegionInfo regionInfo) {
|
||||||
|
@ -714,13 +707,7 @@ public class RegionStates {
|
||||||
* to {@link #getServerNode(ServerName)} where we can.
|
* to {@link #getServerNode(ServerName)} where we can.
|
||||||
*/
|
*/
|
||||||
public ServerStateNode getOrCreateServer(final ServerName serverName) {
|
public ServerStateNode getOrCreateServer(final ServerName serverName) {
|
||||||
ServerStateNode node = serverMap.get(serverName);
|
return serverMap.computeIfAbsent(serverName, key -> new ServerStateNode(key));
|
||||||
if (node == null) {
|
|
||||||
node = new ServerStateNode(serverName);
|
|
||||||
ServerStateNode oldNode = serverMap.putIfAbsent(serverName, node);
|
|
||||||
node = oldNode != null ? oldNode : node;
|
|
||||||
}
|
|
||||||
return node;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeServer(final ServerName serverName) {
|
public void removeServer(final ServerName serverName) {
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.quotas;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -36,7 +37,7 @@ public final class FileArchiverNotifierFactoryImpl implements FileArchiverNotifi
|
||||||
private static final FileArchiverNotifierFactoryImpl DEFAULT_INSTANCE =
|
private static final FileArchiverNotifierFactoryImpl DEFAULT_INSTANCE =
|
||||||
new FileArchiverNotifierFactoryImpl();
|
new FileArchiverNotifierFactoryImpl();
|
||||||
private static volatile FileArchiverNotifierFactory CURRENT_INSTANCE = DEFAULT_INSTANCE;
|
private static volatile FileArchiverNotifierFactory CURRENT_INSTANCE = DEFAULT_INSTANCE;
|
||||||
private final ConcurrentHashMap<TableName,FileArchiverNotifier> CACHE;
|
private final ConcurrentMap<TableName,FileArchiverNotifier> CACHE;
|
||||||
|
|
||||||
private FileArchiverNotifierFactoryImpl() {
|
private FileArchiverNotifierFactoryImpl() {
|
||||||
CACHE = new ConcurrentHashMap<>();
|
CACHE = new ConcurrentHashMap<>();
|
||||||
|
@ -62,15 +63,10 @@ public final class FileArchiverNotifierFactoryImpl implements FileArchiverNotifi
|
||||||
* @param tn The table to obtain a notifier for
|
* @param tn The table to obtain a notifier for
|
||||||
* @return The notifier for the given {@code tablename}.
|
* @return The notifier for the given {@code tablename}.
|
||||||
*/
|
*/
|
||||||
public FileArchiverNotifier get(
|
public FileArchiverNotifier get(Connection conn, Configuration conf, FileSystem fs,
|
||||||
Connection conn, Configuration conf, FileSystem fs, TableName tn) {
|
TableName tn) {
|
||||||
// Ensure that only one instance is exposed to callers
|
// Ensure that only one instance is exposed to callers
|
||||||
final FileArchiverNotifier newMapping = new FileArchiverNotifierImpl(conn, conf, fs, tn);
|
return CACHE.computeIfAbsent(tn, key -> new FileArchiverNotifierImpl(conn, conf, fs, key));
|
||||||
final FileArchiverNotifier previousMapping = CACHE.putIfAbsent(tn, newMapping);
|
|
||||||
if (previousMapping == null) {
|
|
||||||
return newMapping;
|
|
||||||
}
|
|
||||||
return previousMapping;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getCacheSize() {
|
public int getCacheSize() {
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.ScheduledChore;
|
import org.apache.hadoop.hbase.ScheduledChore;
|
||||||
|
@ -69,10 +70,10 @@ public class QuotaCache implements Stoppable {
|
||||||
// for testing purpose only, enforce the cache to be always refreshed
|
// for testing purpose only, enforce the cache to be always refreshed
|
||||||
static boolean TEST_FORCE_REFRESH = false;
|
static boolean TEST_FORCE_REFRESH = false;
|
||||||
|
|
||||||
private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
|
||||||
private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
|
private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
|
||||||
private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
|
||||||
private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache =
|
private final ConcurrentMap<String, QuotaState> regionServerQuotaCache =
|
||||||
new ConcurrentHashMap<>();
|
new ConcurrentHashMap<>();
|
||||||
private volatile boolean exceedThrottleQuotaEnabled = false;
|
private volatile boolean exceedThrottleQuotaEnabled = false;
|
||||||
// factors used to divide cluster scope quota into machine scope quota
|
// factors used to divide cluster scope quota into machine scope quota
|
||||||
|
@ -174,7 +175,7 @@ public class QuotaCache implements Stoppable {
|
||||||
* Returns the QuotaState requested. If the quota info is not in cache an empty one will be
|
* Returns the QuotaState requested. If the quota info is not in cache an empty one will be
|
||||||
* returned and the quota request will be enqueued for the next cache refresh.
|
* returned and the quota request will be enqueued for the next cache refresh.
|
||||||
*/
|
*/
|
||||||
private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap,
|
private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap,
|
||||||
final K key) {
|
final K key) {
|
||||||
return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
|
return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
|
||||||
}
|
}
|
||||||
|
@ -223,17 +224,18 @@ public class QuotaCache implements Stoppable {
|
||||||
protected void chore() {
|
protected void chore() {
|
||||||
// Prefetch online tables/namespaces
|
// Prefetch online tables/namespaces
|
||||||
for (TableName table: ((HRegionServer)QuotaCache.this.rsServices).getOnlineTables()) {
|
for (TableName table: ((HRegionServer)QuotaCache.this.rsServices).getOnlineTables()) {
|
||||||
if (table.isSystemTable()) continue;
|
if (table.isSystemTable()) {
|
||||||
if (!QuotaCache.this.tableQuotaCache.containsKey(table)) {
|
continue;
|
||||||
QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState());
|
|
||||||
}
|
|
||||||
String ns = table.getNamespaceAsString();
|
|
||||||
if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) {
|
|
||||||
QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
|
|
||||||
}
|
}
|
||||||
|
QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState());
|
||||||
|
|
||||||
|
final String ns = table.getNamespaceAsString();
|
||||||
|
|
||||||
|
QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState());
|
||||||
}
|
}
|
||||||
QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY,
|
|
||||||
new QuotaState());
|
QuotaCache.this.regionServerQuotaCache.computeIfAbsent(
|
||||||
|
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState());
|
||||||
|
|
||||||
updateQuotaFactors();
|
updateQuotaFactors();
|
||||||
fetchNamespaceQuotaState();
|
fetchNamespaceQuotaState();
|
||||||
|
@ -319,7 +321,7 @@ public class QuotaCache implements Stoppable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private <K, V extends QuotaState> void fetch(final String type,
|
private <K, V extends QuotaState> void fetch(final String type,
|
||||||
final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
|
final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
long refreshPeriod = getPeriod();
|
long refreshPeriod = getPeriod();
|
||||||
long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
|
long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.throttle;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -78,7 +79,7 @@ public class StoreHotnessProtector {
|
||||||
private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
|
private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
|
||||||
private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
|
private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
|
||||||
|
|
||||||
private final Map<byte[], AtomicInteger> preparePutToStoreMap =
|
private final ConcurrentMap<byte[], AtomicInteger> preparePutToStoreMap =
|
||||||
new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
|
new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
|
||||||
private final Region region;
|
private final Region region;
|
||||||
|
|
||||||
|
@ -101,7 +102,7 @@ public class StoreHotnessProtector {
|
||||||
public void update(Configuration conf) {
|
public void update(Configuration conf) {
|
||||||
init(conf);
|
init(conf);
|
||||||
preparePutToStoreMap.clear();
|
preparePutToStoreMap.clear();
|
||||||
LOG.debug("update config: " + toString());
|
LOG.debug("update config: {}", this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
|
public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
|
||||||
|
@ -121,13 +122,9 @@ public class StoreHotnessProtector {
|
||||||
|
|
||||||
//we need to try to add #preparePutCount at first because preparePutToStoreMap will be
|
//we need to try to add #preparePutCount at first because preparePutToStoreMap will be
|
||||||
//cleared when changing the configuration.
|
//cleared when changing the configuration.
|
||||||
preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger());
|
int preparePutCount = preparePutToStoreMap
|
||||||
AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey());
|
.computeIfAbsent(e.getKey(), key -> new AtomicInteger())
|
||||||
if (preparePutCounter == null) {
|
.incrementAndGet();
|
||||||
preparePutCounter = new AtomicInteger();
|
|
||||||
preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter);
|
|
||||||
}
|
|
||||||
int preparePutCount = preparePutCounter.incrementAndGet();
|
|
||||||
if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit
|
if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit
|
||||||
|| preparePutCount > this.parallelPreparePutToStoreThreadLimit) {
|
|| preparePutCount > this.parallelPreparePutToStoreThreadLimit) {
|
||||||
tooBusyStore = (tooBusyStore == null ?
|
tooBusyStore = (tooBusyStore == null ?
|
||||||
|
@ -146,9 +143,7 @@ public class StoreHotnessProtector {
|
||||||
String msg =
|
String msg =
|
||||||
"StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore
|
"StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore
|
||||||
+ " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")";
|
+ " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")";
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace(msg);
|
||||||
LOG.trace(msg);
|
|
||||||
}
|
|
||||||
throw new RegionTooBusyException(msg);
|
throw new RegionTooBusyException(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -305,24 +305,30 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
|
private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
|
||||||
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
|
workerThreads.compute(walGroupId, (key, value) -> {
|
||||||
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
|
if (value != null) {
|
||||||
if (extant != null) {
|
if (LOG.isDebugEnabled()) {
|
||||||
if(LOG.isDebugEnabled()) {
|
LOG.debug(
|
||||||
LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(),
|
"{} Someone has beat us to start a worker thread for wal group {}",
|
||||||
walGroupId);
|
logPeerId(), key);
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
} else {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key);
|
||||||
|
}
|
||||||
|
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
|
||||||
|
ReplicationSourceWALReader walReader =
|
||||||
|
createNewWALReader(walGroupId, queue, worker.getStartPosition());
|
||||||
|
Threads.setDaemonThreadRunning(
|
||||||
|
walReader, Thread.currentThread().getName()
|
||||||
|
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId,
|
||||||
|
this::uncaughtException);
|
||||||
|
worker.setWALReader(walReader);
|
||||||
|
worker.startup(this::uncaughtException);
|
||||||
|
return worker;
|
||||||
}
|
}
|
||||||
} else {
|
});
|
||||||
if(LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId);
|
|
||||||
}
|
|
||||||
ReplicationSourceWALReader walReader =
|
|
||||||
createNewWALReader(walGroupId, queue, worker.getStartPosition());
|
|
||||||
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
|
|
||||||
".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException);
|
|
||||||
worker.setWALReader(walReader);
|
|
||||||
worker.startup(this::uncaughtException);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue