HADOOP-2274 Excess synchronization introduced by HADOOP-2139 negatively impacts performance

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@598113 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-11-26 03:05:37 +00:00
parent cbe167c981
commit c64eb9ebdc
3 changed files with 498 additions and 530 deletions

View File

@ -31,6 +31,8 @@ Trunk (unreleased changes)
HADOOP-2161 getRow() is orders of magnitudes slower than get(), even on rows
with one column (Clint Morgan and Stack)
HADOOP-2040 Hudson hangs AFTER test has finished
HADOOP-2274 Excess synchronization introduced by HADOOP-2139 negatively
impacts performance
IMPROVEMENTS
HADOOP-2401 Add convenience put method that takes writable

View File

@ -26,16 +26,16 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.LinkedBlockingQueue;
@ -414,11 +414,11 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
protected void checkAssigned(final HRegionInfo info,
final String serverName, final long startCode) throws IOException {
synchronized (serversToServerInfo) {
// Skip region - if ...
if(info.isOffline() // offline
|| killedRegions.contains(info.getRegionName()) // queued for offline
|| regionsToDelete.contains(info.getRegionName())) { // queued for delete
unassignedRegions.remove(info.getRegionName());
assignAttempts.remove(info.getRegionName());
return;
@ -426,6 +426,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
HServerInfo storedInfo = null;
boolean deadServer = false;
if (serverName.length() != 0) {
synchronized (killList) {
Map<Text, HRegionInfo> regionsToKill = killList.get(serverName);
if (regionsToKill != null &&
regionsToKill.containsKey(info.getRegionName())) {
@ -437,15 +438,13 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
}
return;
}
storedInfo = serversToServerInfo.get(serverName);
if (deadServers.contains(serverName)) {
deadServer = true;
}
storedInfo = serversToServerInfo.get(serverName);
deadServer = deadServers.contains(serverName);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Checking " + info.getRegionName() + " is assigned");
}
/*
* If the server is not dead and either:
* the stored info is not null and the start code does not match
@ -498,7 +497,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
}
}
}
}
volatile boolean rootScanned;
@ -800,25 +798,21 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
MetaScanner metaScannerThread;
Integer metaScannerLock = new Integer(0);
/////////////////////////////////////////////////////////////////////////////
//
// Access to all of the following objects MUST be synchronized on
// serversToServerInfo
/** The map of known server names to server info */
final Map<String, HServerInfo> serversToServerInfo =
new HashMap<String, HServerInfo>();
new ConcurrentHashMap<String, HServerInfo>();
/** Set of known dead servers */
final Set<String> deadServers = new HashSet<String>();
final Set<String> deadServers =
Collections.synchronizedSet(new HashSet<String>());
/** SortedMap server load -> Set of server names */
final SortedMap<HServerLoad, Set<String>> loadToServers =
new TreeMap<HServerLoad, Set<String>>();
Collections.synchronizedSortedMap(new TreeMap<HServerLoad, Set<String>>());
/** Map of server names -> server load */
final Map<String, HServerLoad> serversToLoad =
new HashMap<String, HServerLoad>();
new ConcurrentHashMap<String, HServerLoad>();
/**
* The 'unassignedRegions' table maps from a region name to a HRegionInfo
@ -831,39 +825,43 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
* the region has been deployed.
*/
final SortedMap<Text, HRegionInfo> unassignedRegions =
new TreeMap<Text, HRegionInfo>();
Collections.synchronizedSortedMap(new TreeMap<Text, HRegionInfo>());
/**
* The 'assignAttempts' table maps from regions to a timestamp that indicates
* the last time we *tried* to assign the region to a RegionServer. If the
* timestamp is out of date, then we can try to reassign it.
*/
final Map<Text, Long> assignAttempts = new HashMap<Text, Long>();
final Map<Text, Long> assignAttempts = new ConcurrentHashMap<Text, Long>();
/**
* Regions that have been assigned, and the server has reported that it has
* started serving it, but that we have not yet recorded in the meta table.
*/
final Set<Text> pendingRegions = new HashSet<Text>();
final Set<Text> pendingRegions =
Collections.synchronizedSet(new HashSet<Text>());
/**
* The 'killList' is a list of regions that are going to be closed, but not
* reopened.
*/
final Map<String, HashMap<Text, HRegionInfo>> killList =
new HashMap<String, HashMap<Text, HRegionInfo>>();
new ConcurrentHashMap<String, HashMap<Text, HRegionInfo>>();
/** 'killedRegions' contains regions that are in the process of being closed */
final Set<Text> killedRegions = new HashSet<Text>();
final Set<Text> killedRegions =
Collections.synchronizedSet(new HashSet<Text>());
/**
* 'regionsToDelete' contains regions that need to be deleted, but cannot be
* until the region server closes it
*/
final Set<Text> regionsToDelete = new HashSet<Text>();
final Set<Text> regionsToDelete =
Collections.synchronizedSet(new HashSet<Text>());
//
/////////////////////////////////////////////////////////////////////////////
/** Set of tables currently in creation. */
private Set<Text> tableInCreation =
Collections.synchronizedSet(new HashSet<Text>());
/** Build the HMaster out of a raw configuration item.
*
@ -974,8 +972,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
* could 'notice' dead region server in root scanner -- if we failed access
* multiple times -- but reassigning root is catastrophic.
*
* Note: This method must be called from inside a synchronized block on
* serversToServerInfo
*/
void unassignRootRegion() {
this.rootRegionLocation.set(null);
@ -1233,7 +1229,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
String s = serverInfo.getServerAddress().toString().trim();
LOG.info("received start message from: " + s);
synchronized (serversToServerInfo) {
HServerLoad load = serversToLoad.remove(s);
if (load != null) {
// The startup message was from a known server.
@ -1247,7 +1242,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
HServerInfo storedInfo = serversToServerInfo.remove(s);
if (storedInfo != null && !closed.get()) {
// The startup message was from a know server with the same name.
// The startup message was from a known server with the same name.
// Timeout the old one right away.
HServerAddress root = rootRegionLocation.get();
if (root != null && root.equals(storedInfo.getServerAddress())) {
@ -1268,8 +1263,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
}
servers.add(s);
loadToServers.put(load, servers);
serversToServerInfo.notifyAll();
}
if (!closed.get()) {
long serverLabel = getServerLabel(s);
@ -1327,13 +1320,11 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
onlineMetaRegions.remove(info.getStartKey());
}
synchronized (serversToServerInfo) {
this.unassignedRegions.put(info.getRegionName(), info);
this.assignAttempts.put(info.getRegionName(), Long.valueOf(0L));
}
}
}
}
// We don't need to return anything to the server because it isn't
// going to do any more work.
@ -1347,10 +1338,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
}
HServerInfo storedInfo;
synchronized (serversToServerInfo) {
storedInfo = serversToServerInfo.get(serverName);
}
HServerInfo storedInfo = serversToServerInfo.get(serverName);
if (storedInfo == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("received server report from unknown server: " + serverName);
@ -1389,7 +1377,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
// Refresh the info object and the load information
synchronized (serversToServerInfo) {
serversToServerInfo.put(serverName, serverInfo);
HServerLoad load = serversToLoad.get(serverName);
@ -1416,7 +1403,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
}
servers.add(serverName);
loadToServers.put(load, servers);
}
// Next, process messages for this server
return processMsgs(serverInfo, msgs);
@ -1426,7 +1412,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
/** Cancel a server's lease and update its load information */
private boolean cancelLease(final String serverName, final long serverLabel) {
boolean leaseCancelled = false;
synchronized (serversToServerInfo) {
HServerInfo info = serversToServerInfo.remove(serverName);
if (rootRegionLocation.get() != null &&
info.getServerAddress().equals(rootRegionLocation.get())) {
@ -1449,6 +1434,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
}
}
}
synchronized (serversToServerInfo) {
serversToServerInfo.notifyAll();
}
return leaseCancelled;
@ -1466,16 +1452,13 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
String serverName = info.getServerAddress().toString();
HashMap<Text, HRegionInfo> regionsToKill = null;
synchronized (serversToServerInfo) {
regionsToKill = killList.remove(serverName);
}
// Get reports on what the RegionServer did.
for (int i = 0; i < incomingMsgs.length; i++) {
HRegionInfo region = incomingMsgs[i].getRegionInfo();
synchronized (serversToServerInfo) {
switch (incomingMsgs[i].getMsg()) {
case HMsg.MSG_REPORT_OPEN:
@ -1597,18 +1580,15 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
incomingMsgs[i].getMsg());
}
}
}
// Process the kill list
synchronized (serversToServerInfo) {
if (regionsToKill != null) {
for (HRegionInfo i: regionsToKill.values()) {
returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, i));
killedRegions.add(i.getRegionName());
}
}
}
// Figure out what the RegionServer ought to do, and write back.
assignRegions(info, serverName, returnMsgs);
@ -1627,14 +1607,15 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
ArrayList<HMsg> returnMsgs) {
long now = System.currentTimeMillis();
SortedSet<Text> regionsToAssign = new TreeSet<Text>();
synchronized (serversToServerInfo) {
Set<Text> regionsToAssign = new HashSet<Text>();
synchronized (this.assignAttempts) {
for (Map.Entry<Text, Long> e: this.assignAttempts.entrySet()) {
long diff = now - e.getValue().longValue();
if (diff > this.maxRegionOpenTime) {
regionsToAssign.add(e.getKey());
}
}
}
int nRegionsToAssign = regionsToAssign.size();
if (nRegionsToAssign <= 0) {
// No regions to assign. Return.
@ -1657,7 +1638,10 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
// before this server becomes more heavily loaded than the next
// most heavily loaded server.
SortedMap<HServerLoad, Set<String>> heavyServers =
this.loadToServers.tailMap(thisServersLoad);
new TreeMap<HServerLoad, Set<String>>();
synchronized (this.loadToServers) {
heavyServers.putAll(this.loadToServers.tailMap(thisServersLoad));
}
int nservers = 0;
HServerLoad heavierLoad = null;
for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
@ -1681,10 +1665,8 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
for (HServerLoad load =
new HServerLoad(thisServersLoad.getNumberOfRequests(),
thisServersLoad.getNumberOfRegions());
load.compareTo(heavierLoad) <= 0 &&
nregions < nRegionsToAssign;
load.setNumberOfRegions(load.getNumberOfRegions() + 1),
nregions++) {
load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
load.setNumberOfRegions(load.getNumberOfRegions() + 1), nregions++) {
// continue;
}
}
@ -1721,25 +1703,26 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
}
}
}
}
/*
* @param nRegionsToAssign
* @param thisServersLoad
* @return How many regions we can assign to more lightly loaded servers
*
* Note: this method MUST be called from inside a synchronized block on
* serversToServerInfo
*/
private int regionsPerServer(final int nRegionsToAssign,
final HServerLoad thisServersLoad) {
SortedMap<HServerLoad, Set<String>> lightServers =
this.loadToServers.headMap(thisServersLoad);
new TreeMap<HServerLoad, Set<String>>();
synchronized (this.loadToServers) {
lightServers.putAll(this.loadToServers.headMap(thisServersLoad));
}
int nRegions = 0;
for (Map.Entry<HServerLoad, Set<String>> e : lightServers.entrySet()) {
HServerLoad lightLoad = new HServerLoad(e.getKey()
.getNumberOfRequests(), e.getKey().getNumberOfRegions());
HServerLoad lightLoad = new HServerLoad(e.getKey().getNumberOfRequests(),
e.getKey().getNumberOfRegions());
do {
lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1);
nRegions += 1;
@ -1760,11 +1743,10 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
* @param serverName
* @param returnMsgs
*/
private void assignRegionsToOneServer(final SortedSet<Text> regionsToAssign,
private void assignRegionsToOneServer(final Set<Text> regionsToAssign,
final String serverName, final ArrayList<HMsg> returnMsgs) {
long now = System.currentTimeMillis();
for (Text regionName: regionsToAssign) {
synchronized (serversToServerInfo) {
HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
LOG.info("assigning region " + regionName + " to the only server " +
serverName);
@ -1772,16 +1754,13 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
}
}
}
/*
* Some internal classes to manage msg-passing and region server operations
*/
private abstract class RegionServerOperation {
RegionServerOperation() {
super();
}
RegionServerOperation() {}
abstract boolean process() throws IOException;
}
@ -1939,16 +1918,19 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
ToDoEntry todo = new ToDoEntry(row, info);
toDoList.add(todo);
synchronized (serversToServerInfo) {
if (killList.containsKey(deadServerName)) {
HashMap<Text, HRegionInfo> regionsToKill =
killList.get(deadServerName);
new HashMap<Text, HRegionInfo>();
synchronized (killList) {
regionsToKill.putAll(killList.get(deadServerName));
}
if (regionsToKill.containsKey(info.getRegionName())) {
regionsToKill.remove(info.getRegionName());
killList.put(deadServerName, regionsToKill);
unassignedRegions.remove(info.getRegionName());
assignAttempts.remove(info.getRegionName());
synchronized (regionsToDelete) {
if (regionsToDelete.contains(info.getRegionName())) {
// Delete this region
regionsToDelete.remove(info.getRegionName());
@ -1958,6 +1940,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
todo.regionOffline = true;
}
}
}
} else {
// Get region reassigned
@ -1968,7 +1951,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
pendingRegions.remove(info.getRegionName());
}
}
}
} finally {
if(scannerId != -1L) {
try {
@ -1999,12 +1981,10 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
for (Map.Entry<Text, HRegionInfo> e: regions.entrySet()) {
Text region = e.getKey();
HRegionInfo regionInfo = e.getValue();
synchronized (serversToServerInfo) {
unassignedRegions.put(region, regionInfo);
assignAttempts.put(region, Long.valueOf(0L));
}
}
}
@Override
boolean process() throws IOException {
@ -2039,9 +2019,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
// is created, a check is made to see if it is the root server.
// and unassignRootRegion() is called then. However, in the
// unlikely event that we do end up here, let's do the right thing.
synchronized (serversToServerInfo) {
unassignRootRegion();
}
rootRegionUnavailable = true;
}
if (rootRegionUnavailable) {
@ -2128,9 +2106,11 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
if (closed.get()) {
return true;
}
List<MetaRegion> regions = new ArrayList<MetaRegion>();
synchronized (onlineMetaRegions) {
for (MetaRegion r: onlineMetaRegions.values()) {
regions.addAll(onlineMetaRegions.values());
}
for (MetaRegion r: regions) {
HRegionInterface server = null;
long scannerId = -1L;
@ -2153,10 +2133,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
Thread.currentThread().getName());
}
}
}
synchronized (serversToServerInfo) {
deadServers.remove(deadServerName);
}
break;
} catch (IOException e) {
@ -2287,10 +2264,8 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
if (reassignRegion) {
LOG.info("reassign region: " + regionInfo.getRegionName());
synchronized (serversToServerInfo) {
unassignedRegions.put(regionInfo.getRegionName(), regionInfo);
assignAttempts.put(regionInfo.getRegionName(), Long.valueOf(0L));
}
} else if (deleteRegion) {
try {
@ -2375,10 +2350,13 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
return false;
}
MetaRegion r = onlineMetaRegions.containsKey(region.getRegionName()) ?
MetaRegion r = null;
synchronized (onlineMetaRegions) {
r = onlineMetaRegions.containsKey(region.getRegionName()) ?
onlineMetaRegions.get(region.getRegionName()) :
onlineMetaRegions.get(onlineMetaRegions.headMap(
region.getRegionName()).lastKey());
}
metaRegionName = r.getRegionName();
server = connection.getHRegionConnection(r.getServer());
}
@ -2414,9 +2392,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
}
}
// If updated successfully, remove from pending list.
synchronized (serversToServerInfo) {
pendingRegions.remove(region.getRegionName());
}
break;
} catch (IOException e) {
if (tries == numRetries - 1) {
@ -2482,18 +2458,13 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
}
}
/* Set of tables currently in creation. Access needs to be synchronized. */
private Set<Text> tableInCreation = new HashSet<Text>();
private void createTable(final HRegionInfo newRegion) throws IOException {
Text tableName = newRegion.getTableDesc().getName();
synchronized (tableInCreation) {
if (tableInCreation.contains(tableName)) {
throw new TableExistsException("Table " + tableName + " in process "
+ "of being created");
}
tableInCreation.add(tableName);
}
try {
// 1. Check to see if table already exists. Get meta region where
// table would sit should it exist. Open scanner on it. If a region
@ -2558,17 +2529,13 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
// 5. Get it assigned to a server
synchronized (serversToServerInfo) {
this.unassignedRegions.put(regionName, info);
this.assignAttempts.put(regionName, Long.valueOf(0L));
}
} finally {
synchronized (tableInCreation) {
tableInCreation.remove(newRegion.getTableDesc().getName());
}
}
}
/** {@inheritDoc} */
public void deleteTable(Text tableName) throws IOException {
@ -2629,6 +2596,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
}
Text firstMetaRegion = null;
synchronized (onlineMetaRegions) {
if (onlineMetaRegions.size() == 1) {
firstMetaRegion = onlineMetaRegions.firstKey();
@ -2638,8 +2606,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
} else {
firstMetaRegion = onlineMetaRegions.headMap(tableName).lastKey();
}
synchronized (onlineMetaRegions) {
this.metaRegions.addAll(onlineMetaRegions.tailMap(
firstMetaRegion).values());
}
@ -2758,10 +2724,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
protected boolean isBeingServed(String serverName, long startCode) {
boolean result = false;
if (serverName != null && serverName.length() > 0 && startCode != -1L) {
HServerInfo s;
synchronized (serversToServerInfo) {
s = serversToServerInfo.get(serverName);
}
HServerInfo s = serversToServerInfo.get(serverName);
result = s != null && s.getStartCode() == startCode;
}
return result;
@ -2840,7 +2803,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
LOG.debug("updated columns in row: " + i.getRegionName());
}
synchronized (serversToServerInfo) {
if (online) { // Bring offline regions on-line
if (!unassignedRegions.containsKey(i.getRegionName())) {
unassignedRegions.put(i.getRegionName(), i);
@ -2852,7 +2814,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
assignAttempts.remove(i.getRegionName());
}
}
}
// Process regions currently being served
@ -2868,12 +2829,14 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
// Cause regions being served to be taken off-line and disabled
HashMap<Text, HRegionInfo> localKillList = null;
synchronized (serversToServerInfo) {
localKillList = killList.get(serverName);
HashMap<Text, HRegionInfo> localKillList =
new HashMap<Text, HRegionInfo>();
synchronized (killList) {
HashMap<Text, HRegionInfo> killedRegions = killList.get(serverName);
if (killedRegions != null) {
localKillList.putAll(killedRegions);
}
if (localKillList == null) {
localKillList = new HashMap<Text, HRegionInfo>();
}
for (HRegionInfo i: e.getValue()) {
if (LOG.isDebugEnabled()) {
@ -2887,11 +2850,9 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
LOG.debug("inserted local kill list into kill list for server " +
serverName);
}
synchronized (serversToServerInfo) {
killList.put(serverName, localKillList);
}
}
}
servedRegions.clear();
}
@ -2922,11 +2883,9 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
for (HashSet<HRegionInfo> s: servedRegions.values()) {
for (HRegionInfo i: s) {
synchronized (serversToServerInfo) {
regionsToDelete.add(i.getRegionName());
}
}
}
// Unserved regions we can delete now
@ -3051,9 +3010,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
public void leaseExpired() {
LOG.info(server + " lease expired");
// Remove the server from the known servers list and update load info
HServerInfo info;
synchronized (serversToServerInfo) {
info = serversToServerInfo.remove(server);
HServerInfo info = serversToServerInfo.remove(server);
if (info != null) {
HServerAddress root = rootRegionLocation.get();
if (root != null && root.equals(info.getServerAddress())) {
@ -3070,6 +3027,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
}
deadServers.add(server);
}
synchronized (serversToServerInfo) {
serversToServerInfo.notifyAll();
}

View File

@ -30,13 +30,14 @@ import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -95,10 +96,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
protected final SortedMap<Text, HRegion> onlineRegions =
Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>());
protected final Map<Text, HRegion> retiringRegions =
new HashMap<Text, HRegion>();
new ConcurrentHashMap<Text, HRegion>();
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Vector<HMsg> outboundMsgs = new Vector<HMsg>();
private final List<HMsg> outboundMsgs =
Collections.synchronizedList(new ArrayList<HMsg>());
final int numRetries;
protected final int threadWakeFrequency;
@ -529,6 +531,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/** Runs periodically to determine if the HLog should be rolled */
class LogRoller extends Thread implements LogRollListener {
private final Integer rollLock = new Integer(0);
private volatile boolean rollLog;
/** constructor */
@ -539,15 +542,23 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/** {@inheritDoc} */
@Override
public synchronized void run() {
public void run() {
while (!stopRequested.get()) {
while (!rollLog && !stopRequested.get()) {
synchronized (rollLock) {
try {
this.wait(threadWakeFrequency);
rollLock.wait(threadWakeFrequency);
} catch (InterruptedException e) {
continue;
}
}
}
if (!rollLog) {
// There's only two reasons to break out of the while loop.
// 1. Log roll requested
// 2. Stop requested
// so if a log roll was not requested, continue and break out of loop
continue;
}
synchronized (logRollerLock) {
@ -572,9 +583,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
/** {@inheritDoc} */
public synchronized void logRollRequested() {
public void logRollRequested() {
synchronized (rollLock) {
rollLog = true;
this.notifyAll();
rollLock.notifyAll();
}
}
}
@ -662,8 +675,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
synchronized(outboundMsgs) {
outboundArray =
this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]);
this.outboundMsgs.clear();
}
this.outboundMsgs.clear();
try {
this.serverInfo.setLoad(new HServerLoad(requestCount.get(),
@ -1017,17 +1030,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/** Add to the outbound message buffer */
private void reportOpen(HRegion region) {
synchronized(outboundMsgs) {
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, region.getRegionInfo()));
}
}
/** Add to the outbound message buffer */
private void reportClose(HRegion region) {
synchronized(outboundMsgs) {
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_CLOSE, region.getRegionInfo()));
}
}
/**
* Add to the outbound message buffer
@ -1041,12 +1050,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
*/
void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA,
HRegionInfo newRegionB) {
synchronized(outboundMsgs) {
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_SPLIT, oldRegion));
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, newRegionA));
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, newRegionB));
}
}
//////////////////////////////////////////////////////////////////////////////
// HMaster-given operations