HBASE-3298 Regionserver can close during a split causing double assignment
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1042043 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
00c0dec88b
commit
fe2e720318
|
@ -742,6 +742,7 @@ Release 0.90.0 - Unreleased
|
|||
assigning/unassigning regions
|
||||
HBASE-3296 Newly created table ends up disabled instead of assigned
|
||||
HBASE-3304 Get spurious master fails during bootup
|
||||
HBASE-3298 Regionserver can close during a split causing double assignment
|
||||
|
||||
|
||||
IMPROVEMENTS
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.IOException;
|
|||
import java.net.ConnectException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -33,7 +34,6 @@ import java.util.Set;
|
|||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentNavigableMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -108,11 +108,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
/** Plans for region movement. Key is the encoded version of a region name*/
|
||||
// TODO: When do plans get cleaned out? Ever? In server open and in server
|
||||
// shutdown processing -- St.Ack
|
||||
// TODO: Better to just synchronize access around regionPlans? I think that
|
||||
// would be better than a concurrent structure since we do more than
|
||||
// one operation at a time -- jgray
|
||||
final ConcurrentNavigableMap<String, RegionPlan> regionPlans =
|
||||
new ConcurrentSkipListMap<String, RegionPlan>();
|
||||
// All access to this Map must be synchronized.
|
||||
final NavigableMap<String, RegionPlan> regionPlans =
|
||||
new TreeMap<String, RegionPlan>();
|
||||
|
||||
private final ZKTable zkTable;
|
||||
|
||||
|
@ -538,7 +536,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
addToServers(serverInfo, regionInfo);
|
||||
}
|
||||
// Remove plan if one.
|
||||
this.regionPlans.remove(regionInfo.getEncodedName());
|
||||
clearRegionPlan(regionInfo);
|
||||
// Update timers for all regions in transition going against this server.
|
||||
updateTimers(serverInfo);
|
||||
}
|
||||
|
@ -557,21 +555,26 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* @param hsi
|
||||
*/
|
||||
private void updateTimers(final HServerInfo hsi) {
|
||||
// This loop could be expensive
|
||||
for (Map.Entry<String, RegionPlan> e: this.regionPlans.entrySet()) {
|
||||
if (e.getValue().getDestination().equals(hsi)) {
|
||||
// This loop could be expensive.
|
||||
// First make a copy of current regionPlan rather than hold sync while
|
||||
// looping because holding sync can cause deadlock. Its ok in this loop
|
||||
// if the Map we're going against is a little stale
|
||||
Map<String, RegionPlan> copy = new HashMap<String, RegionPlan>();
|
||||
synchronized(this.regionPlans) {
|
||||
copy.putAll(this.regionPlans);
|
||||
}
|
||||
for (Map.Entry<String, RegionPlan> e: copy.entrySet()) {
|
||||
if (!e.getValue().getDestination().equals(hsi)) continue;
|
||||
RegionState rs = null;
|
||||
synchronized (this.regionsInTransition) {
|
||||
rs = this.regionsInTransition.get(e.getKey());
|
||||
}
|
||||
if (rs != null) {
|
||||
if (rs == null) continue;
|
||||
synchronized (rs) {
|
||||
rs.update(rs.getState());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the region as offline. Removes it from regions in transition and
|
||||
|
@ -586,6 +589,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
this.regionsInTransition.notifyAll();
|
||||
}
|
||||
}
|
||||
// remove the region plan as well just in case.
|
||||
clearRegionPlan(regionInfo);
|
||||
setOffline(regionInfo);
|
||||
}
|
||||
|
||||
|
@ -681,6 +686,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
final List<HRegionInfo> regions) {
|
||||
LOG.debug("Bulk assigning " + regions.size() + " region(s) to " +
|
||||
destination.getServerName());
|
||||
|
||||
List<RegionState> states = new ArrayList<RegionState>(regions.size());
|
||||
synchronized (this.regionsInTransition) {
|
||||
for (HRegionInfo region: regions) {
|
||||
|
@ -941,24 +947,29 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
if (servers.isEmpty()) return null;
|
||||
RegionPlan randomPlan = new RegionPlan(state.getRegion(), null,
|
||||
LoadBalancer.randomAssignment(servers));
|
||||
boolean newPlan = false;
|
||||
RegionPlan existingPlan = null;
|
||||
synchronized (this.regionPlans) {
|
||||
RegionPlan existingPlan = this.regionPlans.get(encodedName);
|
||||
existingPlan = this.regionPlans.get(encodedName);
|
||||
if (existingPlan == null || forceNewPlan ||
|
||||
existingPlan.getDestination().equals(serverToExclude)) {
|
||||
newPlan = true;
|
||||
this.regionPlans.put(encodedName, randomPlan);
|
||||
}
|
||||
}
|
||||
if (newPlan) {
|
||||
LOG.debug("No previous transition plan was found (or we are ignoring " +
|
||||
"an existing plan) for " + state.getRegion().getRegionNameAsString()
|
||||
+ " so generated a random one; " + randomPlan + "; " +
|
||||
"an existing plan) for " + state.getRegion().getRegionNameAsString() +
|
||||
" so generated a random one; " + randomPlan + "; " +
|
||||
serverManager.countOfRegionServers() +
|
||||
" (online=" + serverManager.getOnlineServers().size() +
|
||||
", exclude=" + serverToExclude + ") available servers");
|
||||
this.regionPlans.put(encodedName, randomPlan);
|
||||
return randomPlan;
|
||||
}
|
||||
LOG.debug("Using pre-existing plan for region " +
|
||||
state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan);
|
||||
return existingPlan;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unassigns the specified region.
|
||||
|
@ -1384,15 +1395,15 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
}
|
||||
clearRegionPlan(hri.getEncodedName());
|
||||
clearRegionPlan(hri);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param encodedRegionName Region whose plan we are to clear.
|
||||
* @param region Region whose plan we are to clear.
|
||||
*/
|
||||
void clearRegionPlan(final String encodedRegionName) {
|
||||
void clearRegionPlan(final HRegionInfo region) {
|
||||
synchronized (this.regionPlans) {
|
||||
this.regionPlans.remove(encodedRegionName);
|
||||
this.regionPlans.remove(region.getEncodedName());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1646,6 +1657,24 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
public void handleSplitReport(final HServerInfo hsi, final HRegionInfo parent,
|
||||
final HRegionInfo a, final HRegionInfo b) {
|
||||
regionOffline(parent);
|
||||
// Remove any CLOSING node, if exists, due to race between master & rs
|
||||
// for close & split. Not putting into regionOffline method because it is
|
||||
// called from various locations.
|
||||
try {
|
||||
RegionTransitionData node = ZKAssign.getDataNoWatch(this.watcher,
|
||||
parent.getEncodedName(), null);
|
||||
if (node != null) {
|
||||
if (node.getEventType().equals(EventType.RS_ZK_REGION_CLOSING)) {
|
||||
ZKAssign.deleteClosingNode(this.watcher, parent);
|
||||
} else {
|
||||
LOG.warn("Split report has RIT node (shouldnt have one): " +
|
||||
parent + " node: " + node);
|
||||
}
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Exception while validating RIT during split report", e);
|
||||
}
|
||||
|
||||
regionOnline(a, hsi);
|
||||
regionOnline(b, hsi);
|
||||
|
||||
|
@ -1706,7 +1735,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* @param plan Plan to execute.
|
||||
*/
|
||||
void balance(final RegionPlan plan) {
|
||||
synchronized (this.regionPlans) {
|
||||
this.regionPlans.put(plan.getRegionName(), plan);
|
||||
}
|
||||
unassign(plan.getRegionInfo());
|
||||
}
|
||||
|
||||
|
|
|
@ -711,7 +711,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
if (destServerName == null || destServerName.length == 0) {
|
||||
LOG.info("Passed destination servername is null/empty so " +
|
||||
"choosing a server at random");
|
||||
this.assignmentManager.clearRegionPlan(hri.getEncodedName());
|
||||
this.assignmentManager.clearRegionPlan(hri);
|
||||
// Unassign will reassign it elsewhere choosing random server.
|
||||
this.assignmentManager.unassign(hri);
|
||||
} else {
|
||||
|
|
|
@ -45,8 +45,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import com.google.common.collect.ClassToInstanceMap;
|
||||
import com.google.common.collect.MutableClassToInstanceMap;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -66,15 +64,14 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.RowLock;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
|
@ -82,7 +79,6 @@ import org.apache.hadoop.hbase.io.TimeRange;
|
|||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRPC;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
|
@ -97,7 +93,9 @@ import org.apache.hadoop.io.Writable;
|
|||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.collect.ClassToInstanceMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.MutableClassToInstanceMap;
|
||||
|
||||
/**
|
||||
* HRegion stores data for a certain region of a table. It stores all columns
|
||||
|
@ -495,6 +493,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return close(false);
|
||||
}
|
||||
|
||||
private final Object closeLock = new Object();
|
||||
|
||||
/**
|
||||
* Close down this HRegion. Flush the cache unless abort parameter is true,
|
||||
* Shut down each HStore, don't service any more calls.
|
||||
|
@ -509,7 +509,15 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
*
|
||||
* @throws IOException e
|
||||
*/
|
||||
public List<StoreFile> close(final boolean abort)
|
||||
public List<StoreFile> close(final boolean abort) throws IOException {
|
||||
// Only allow one thread to close at a time. Serialize them so dual
|
||||
// threads attempting to close will run up against each other.
|
||||
synchronized (closeLock) {
|
||||
return doClose(abort);
|
||||
}
|
||||
}
|
||||
|
||||
private List<StoreFile> doClose(final boolean abort)
|
||||
throws IOException {
|
||||
if (isClosed()) {
|
||||
LOG.warn("Region " + this + " already closed");
|
||||
|
|
|
@ -200,6 +200,15 @@ public class SplitTransaction {
|
|||
this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
|
||||
|
||||
List<StoreFile> hstoreFilesToSplit = this.parent.close(false);
|
||||
if (hstoreFilesToSplit == null) {
|
||||
// The region was closed by a concurrent thread. We can't continue
|
||||
// with the split, instead we must just abandon the split. If we
|
||||
// reopen or split this could cause problems because the region has
|
||||
// probably already been moved to a different server, or is in the
|
||||
// process of moving to a different server.
|
||||
throw new IOException("Failed to close region: already closed by " +
|
||||
"another thread");
|
||||
}
|
||||
this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
|
||||
|
||||
if (!testing) {
|
||||
|
|
|
@ -115,7 +115,14 @@ public class CloseRegionHandler extends EventHandler {
|
|||
try {
|
||||
// TODO: If we need to keep updating CLOSING stamp to prevent against
|
||||
// a timeout if this is long-running, need to spin up a thread?
|
||||
region.close(abort);
|
||||
if (region.close(abort) == null) {
|
||||
// This region got closed. Most likely due to a split. So instead
|
||||
// of doing the setClosedState() below, let's just ignore and continue.
|
||||
// The split message will clean up the master state.
|
||||
LOG.warn("Can't close region: was already closed during close(): " +
|
||||
regionInfo.getRegionNameAsString());
|
||||
return;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unrecoverable exception while closing region " +
|
||||
regionInfo.getRegionNameAsString() + ", still finishing close", e);
|
||||
|
@ -164,12 +171,12 @@ public class CloseRegionHandler extends EventHandler {
|
|||
try {
|
||||
if ((expectedVersion = ZKAssign.createNodeClosing(
|
||||
server.getZooKeeper(), regionInfo, server.getServerName())) == FAILED) {
|
||||
LOG.warn("Error creating node in CLOSING state, aborting close of "
|
||||
+ regionInfo.getRegionNameAsString());
|
||||
LOG.warn("Error creating node in CLOSING state, aborting close of " +
|
||||
regionInfo.getRegionNameAsString());
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Error creating node in CLOSING state, aborting close of "
|
||||
+ regionInfo.getRegionNameAsString());
|
||||
LOG.warn("Error creating node in CLOSING state, aborting close of " +
|
||||
regionInfo.getRegionNameAsString());
|
||||
}
|
||||
return expectedVersion;
|
||||
}
|
||||
|
|
|
@ -355,13 +355,14 @@ public class ZKAssign {
|
|||
* of the specified regions transition to being closed.
|
||||
*
|
||||
* @param zkw zk reference
|
||||
* @param regionName closing region to be deleted from zk
|
||||
* @param region closing region to be deleted from zk
|
||||
* @throws KeeperException if unexpected zookeeper exception
|
||||
* @throws KeeperException.NoNodeException if node does not exist
|
||||
*/
|
||||
public static boolean deleteClosingNode(ZooKeeperWatcher zkw,
|
||||
String regionName)
|
||||
HRegionInfo region)
|
||||
throws KeeperException, KeeperException.NoNodeException {
|
||||
String regionName = region.getEncodedName();
|
||||
return deleteNode(zkw, regionName, EventType.RS_ZK_REGION_CLOSING);
|
||||
}
|
||||
|
||||
|
@ -381,7 +382,7 @@ public class ZKAssign {
|
|||
* of the specified regions transition to being closed.
|
||||
*
|
||||
* @param zkw zk reference
|
||||
* @param region region to be deleted from zk
|
||||
* @param regionName region to be deleted from zk
|
||||
* @param expectedState state region must be in for delete to complete
|
||||
* @throws KeeperException if unexpected zookeeper exception
|
||||
* @throws KeeperException.NoNodeException if node does not exist
|
||||
|
@ -467,9 +468,11 @@ public class ZKAssign {
|
|||
throws KeeperException, KeeperException.NodeExistsException {
|
||||
LOG.debug(zkw.prefix("Creating unassigned node for " +
|
||||
region.getEncodedName() + " in a CLOSING state"));
|
||||
|
||||
RegionTransitionData data = new RegionTransitionData(
|
||||
EventType.RS_ZK_REGION_CLOSING, region.getRegionName(), serverName);
|
||||
synchronized(zkw.getNodes()) {
|
||||
|
||||
synchronized (zkw.getNodes()) {
|
||||
String node = getNodeName(zkw, region.getEncodedName());
|
||||
zkw.getNodes().add(node);
|
||||
return ZKUtil.createAndWatch(zkw, node, data.getBytes());
|
||||
|
|
Loading…
Reference in New Issue