HBASE-11261 Handle splitting/merging of regions that have region_replication greater than one
This commit is contained in:
parent
a462990765
commit
72ddb74700
|
@ -30,8 +30,10 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.TableStateManager;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
|
@ -148,6 +151,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
final private KeyLocker<String> locker = new KeyLocker<String>();
|
||||
|
||||
Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
|
||||
|
||||
/**
|
||||
* Map of regions to reopen after the schema of a table is changed. Key -
|
||||
* encoded region name, value - HRegionInfo
|
||||
|
@ -616,6 +621,13 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
LOG.info("Clean cluster startup. Assigning user regions");
|
||||
assignAllUserRegions(allRegions);
|
||||
}
|
||||
// unassign replicas of the split parents and the merged regions
|
||||
// the daughter replicas are opened in assignAllUserRegions if it was
|
||||
// not already opened.
|
||||
for (HRegionInfo h : replicasToClose) {
|
||||
unassign(h);
|
||||
}
|
||||
replicasToClose.clear();
|
||||
return failover;
|
||||
}
|
||||
|
||||
|
@ -798,7 +810,11 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
case RS_ZK_REGION_FAILED_OPEN:
|
||||
// Region is closed, insert into RIT and handle it
|
||||
regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
|
||||
invokeAssign(regionInfo);
|
||||
if (!replicasToClose.contains(regionInfo)) {
|
||||
invokeAssign(regionInfo);
|
||||
} else {
|
||||
offlineDisabledRegion(regionInfo);
|
||||
}
|
||||
break;
|
||||
|
||||
case M_ZK_REGION_OFFLINE:
|
||||
|
@ -1483,6 +1499,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
deleteNodeInStates(encodedName, "closed", null,
|
||||
EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
|
||||
}
|
||||
LOG.debug("Removing region from replicasToClose " + regionInfo);
|
||||
replicasToClose.remove(regionInfo);
|
||||
regionOffline(regionInfo);
|
||||
}
|
||||
|
||||
|
@ -2207,7 +2225,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
|
||||
if (this.tableStateManager.isTableState(region.getTable(),
|
||||
ZooKeeperProtos.Table.State.DISABLED,
|
||||
ZooKeeperProtos.Table.State.DISABLING)) {
|
||||
ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
|
||||
LOG.info("Table " + region.getTable() + " is disabled or disabling;"
|
||||
+ " skipping assign of " + region.getRegionNameAsString());
|
||||
offlineDisabledRegion(region);
|
||||
|
@ -2461,7 +2479,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
lock.unlock();
|
||||
|
||||
// Region is expected to be reassigned afterwards
|
||||
if (reassign && regionStates.isRegionOffline(region)) {
|
||||
if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
|
||||
assign(region, true);
|
||||
}
|
||||
}
|
||||
|
@ -2768,6 +2786,19 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
LOG.debug("null result from meta - ignoring but this is strange.");
|
||||
continue;
|
||||
}
|
||||
// keep a track of replicas to close. These were the replicas of the originally
|
||||
// unmerged regions. The master might have closed them before but it mightn't
|
||||
// maybe because it crashed.
|
||||
PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
|
||||
if (p.getFirst() != null && p.getSecond() != null) {
|
||||
int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
|
||||
getTable()).getRegionReplication();
|
||||
for (HRegionInfo merge : p) {
|
||||
for (int i = 1; i < numReplicas; i++) {
|
||||
replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
|
||||
}
|
||||
}
|
||||
}
|
||||
RegionLocations rl = MetaTableAccessor.getRegionLocations(result);
|
||||
if (rl == null) continue;
|
||||
HRegionLocation[] locations = rl.getRegionLocations();
|
||||
|
@ -2777,6 +2808,14 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
if (regionInfo == null) continue;
|
||||
int replicaId = regionInfo.getReplicaId();
|
||||
State state = RegionStateStore.getRegionState(result, replicaId);
|
||||
// keep a track of replicas to close. These were the replicas of the split parents
|
||||
// from the previous life of the master. The master should have closed them before
|
||||
// but it couldn't maybe because it crashed
|
||||
if (replicaId == 0 && state.equals(State.SPLIT)) {
|
||||
for (HRegionLocation h : locations) {
|
||||
replicasToClose.add(h.getRegionInfo());
|
||||
}
|
||||
}
|
||||
ServerName lastHost = hrl.getServerName();
|
||||
ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
|
||||
regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
|
||||
|
@ -3380,7 +3419,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// When there are more than one region server a new RS is selected as the
|
||||
// destination and the same is updated in the region plan. (HBASE-5546)
|
||||
if (getTableStateManager().isTableState(hri.getTable(),
|
||||
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
|
||||
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
|
||||
replicasToClose.contains(hri)) {
|
||||
offlineDisabledRegion(hri);
|
||||
return;
|
||||
}
|
||||
|
@ -3420,7 +3460,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
private void onRegionClosed(final HRegionInfo hri) {
|
||||
if (getTableStateManager().isTableState(hri.getTable(),
|
||||
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
|
||||
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
|
||||
replicasToClose.contains(hri)) {
|
||||
offlineDisabledRegion(hri);
|
||||
return;
|
||||
}
|
||||
|
@ -3432,8 +3473,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
|
||||
private String onRegionSplit(ServerName sn, TransitionCode code,
|
||||
HRegionInfo p, HRegionInfo a, HRegionInfo b) {
|
||||
RegionState rs_p = regionStates.getRegionState(p);
|
||||
final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
|
||||
final RegionState rs_p = regionStates.getRegionState(p);
|
||||
RegionState rs_a = regionStates.getRegionState(a);
|
||||
RegionState rs_b = regionStates.getRegionState(b);
|
||||
if (!(rs_p.isOpenOrSplittingOnServer(sn)
|
||||
|
@ -3459,6 +3500,15 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
|
||||
invokeUnAssign(a);
|
||||
invokeUnAssign(b);
|
||||
} else {
|
||||
Callable<Object> splitReplicasCallable = new Callable<Object>() {
|
||||
@Override
|
||||
public Object call() {
|
||||
doSplittingOfReplicas(p, a, b);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
threadPoolExecutorService.submit(splitReplicasCallable);
|
||||
}
|
||||
} else if (code == TransitionCode.SPLIT_PONR) {
|
||||
try {
|
||||
|
@ -3481,7 +3531,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
|
||||
private String onRegionMerge(ServerName sn, TransitionCode code,
|
||||
HRegionInfo p, HRegionInfo a, HRegionInfo b) {
|
||||
final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
|
||||
RegionState rs_p = regionStates.getRegionState(p);
|
||||
RegionState rs_a = regionStates.getRegionState(a);
|
||||
RegionState rs_b = regionStates.getRegionState(b);
|
||||
|
@ -3508,6 +3558,15 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
if (getTableStateManager().isTableState(p.getTable(),
|
||||
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
|
||||
invokeUnAssign(p);
|
||||
} else {
|
||||
Callable<Object> mergeReplicasCallable = new Callable<Object>() {
|
||||
@Override
|
||||
public Object call() {
|
||||
doMergingOfReplicas(p, a, b);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
threadPoolExecutorService.submit(mergeReplicasCallable);
|
||||
}
|
||||
} else if (code == TransitionCode.MERGE_PONR) {
|
||||
try {
|
||||
|
@ -3614,6 +3673,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
|
||||
if (et == EventType.RS_ZK_REGION_MERGED) {
|
||||
doMergingOfReplicas(p, hri_a, hri_b);
|
||||
LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
|
||||
// Remove region from ZK
|
||||
try {
|
||||
|
@ -3740,6 +3800,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
|
||||
if (et == EventType.RS_ZK_REGION_SPLIT) {
|
||||
// split replicas
|
||||
doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
|
||||
LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
|
||||
// Remove region from ZK
|
||||
try {
|
||||
|
@ -3772,6 +3834,110 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
return true;
|
||||
}
|
||||
|
||||
private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
|
||||
final HRegionInfo hri_b) {
|
||||
// Close replicas for the original unmerged regions. create/assign new replicas
|
||||
// for the merged parent.
|
||||
List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
|
||||
unmergedRegions.add(hri_a);
|
||||
unmergedRegions.add(hri_b);
|
||||
Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
|
||||
Collection<List<HRegionInfo>> c = map.values();
|
||||
for (List<HRegionInfo> l : c) {
|
||||
for (HRegionInfo h : l) {
|
||||
if (!RegionReplicaUtil.isDefaultReplica(h)) {
|
||||
LOG.debug("Unassigning un-merged replica " + h);
|
||||
unassign(h);
|
||||
}
|
||||
}
|
||||
}
|
||||
int numReplicas = 1;
|
||||
try {
|
||||
numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
|
||||
getRegionReplication();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
|
||||
" due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
|
||||
"will not be done");
|
||||
}
|
||||
List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
|
||||
for (int i = 1; i < numReplicas; i++) {
|
||||
regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
|
||||
}
|
||||
try {
|
||||
assign(regions);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
|
||||
ioe.getMessage());
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
|
||||
ie.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
|
||||
final HRegionInfo hri_b) {
|
||||
// create new regions for the replica, and assign them to match with the
|
||||
// current replica assignments. If replica1 of parent is assigned to RS1,
|
||||
// the replica1s of daughters will be on the same machine
|
||||
int numReplicas = 1;
|
||||
try {
|
||||
numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
|
||||
getRegionReplication();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
|
||||
" due to " + e.getMessage() + ". The assignment of daughter replicas " +
|
||||
"replicas will not be done");
|
||||
}
|
||||
// unassign the old replicas
|
||||
List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
|
||||
parentRegion.add(parentHri);
|
||||
Map<ServerName, List<HRegionInfo>> currentAssign =
|
||||
regionStates.getRegionAssignments(parentRegion);
|
||||
Collection<List<HRegionInfo>> c = currentAssign.values();
|
||||
for (List<HRegionInfo> l : c) {
|
||||
for (HRegionInfo h : l) {
|
||||
if (!RegionReplicaUtil.isDefaultReplica(h)) {
|
||||
LOG.debug("Unassigning parent's replica " + h);
|
||||
unassign(h);
|
||||
}
|
||||
}
|
||||
}
|
||||
// assign daughter replicas
|
||||
Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
|
||||
for (int i = 1; i < numReplicas; i++) {
|
||||
prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
|
||||
prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
|
||||
}
|
||||
try {
|
||||
assign(map);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
|
||||
}
|
||||
}
|
||||
|
||||
private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
|
||||
int replicaId, Map<HRegionInfo, ServerName> map) {
|
||||
HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
|
||||
HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
|
||||
replicaId);
|
||||
LOG.debug("Created replica region for daughter " + daughterReplica);
|
||||
ServerName sn;
|
||||
if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
|
||||
map.put(daughterReplica, sn);
|
||||
} else {
|
||||
List<ServerName> servers = serverManager.getOnlineServersList();
|
||||
sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
|
||||
map.put(daughterReplica, sn);
|
||||
}
|
||||
}
|
||||
|
||||
public Set<HRegionInfo> getReplicasToClose() {
|
||||
return replicasToClose;
|
||||
}
|
||||
|
||||
/**
|
||||
* A region is offline. The new state should be the specified one,
|
||||
* if not null. If the specified state is null, the new state is Offline.
|
||||
|
@ -3786,6 +3952,25 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
// Tell our listeners that a region was closed
|
||||
sendRegionClosedNotification(regionInfo);
|
||||
// also note that all the replicas of the primary should be closed
|
||||
if (state != null && state.equals(State.SPLIT)) {
|
||||
Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
|
||||
c.add(regionInfo);
|
||||
Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
|
||||
Collection<List<HRegionInfo>> allReplicas = map.values();
|
||||
for (List<HRegionInfo> list : allReplicas) {
|
||||
replicasToClose.addAll(list);
|
||||
}
|
||||
}
|
||||
else if (state != null && state.equals(State.MERGED)) {
|
||||
Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
|
||||
c.add(regionInfo);
|
||||
Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
|
||||
Collection<List<HRegionInfo>> allReplicas = map.values();
|
||||
for (List<HRegionInfo> list : allReplicas) {
|
||||
replicasToClose.addAll(list);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
|
||||
|
|
|
@ -93,7 +93,8 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
|
|||
LOG.debug("Handling CLOSED event for " + regionInfo.getEncodedName());
|
||||
// Check if this table is being disabled or not
|
||||
if (this.assignmentManager.getTableStateManager().isTableState(this.regionInfo.getTable(),
|
||||
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
|
||||
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
|
||||
assignmentManager.getReplicasToClose().contains(regionInfo)) {
|
||||
assignmentManager.offlineDisabledRegion(regionInfo);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -266,7 +266,8 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
} else if (rit != null) {
|
||||
if (rit.isPendingCloseOrClosing()
|
||||
&& am.getTableStateManager().isTableState(hri.getTable(),
|
||||
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
|
||||
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
|
||||
am.getReplicasToClose().contains(hri)) {
|
||||
// If the table was partially disabled and the RS went down, we should clear the RIT
|
||||
// and remove the node for the region.
|
||||
// The rit that we use may be stale in case the table was in DISABLING state
|
||||
|
|
|
@ -148,6 +148,23 @@ public class StoreFileInfo implements Comparable<StoreFileInfo> {
|
|||
this.link = link;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Store File Info from an HFileLink
|
||||
* @param conf
|
||||
* @param fs
|
||||
* @param fileStatus
|
||||
* @param reference
|
||||
* @throws IOException
|
||||
*/
|
||||
public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileStatus fileStatus,
|
||||
final Reference reference)
|
||||
throws IOException {
|
||||
this.conf = conf;
|
||||
this.fileStatus = fileStatus;
|
||||
this.reference = reference;
|
||||
this.link = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the region coprocessor env.
|
||||
* @param coprocessorHost
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.io.Reference;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
|
||||
|
@ -83,6 +84,11 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
|
|||
return new StoreFileInfo(conf, fs, status);
|
||||
}
|
||||
|
||||
if (StoreFileInfo.isReference(status.getPath())) {
|
||||
Reference reference = Reference.read(fs, status.getPath());
|
||||
return new StoreFileInfo(conf, fs, status, reference);
|
||||
}
|
||||
|
||||
// else create a store file link. The link file does not exists on filesystem though.
|
||||
HFileLink link = new HFileLink(conf,
|
||||
HFileLink.createPath(regionInfoForFs.getTable(), regionInfoForFs.getEncodedName()
|
||||
|
|
|
@ -92,7 +92,7 @@ public class TestReplicasClient {
|
|||
static final AtomicLong sleepTime = new AtomicLong(0);
|
||||
static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
|
||||
static final AtomicInteger countOfNext = new AtomicInteger(0);
|
||||
static final AtomicReference<CountDownLatch> cdl =
|
||||
private static final AtomicReference<CountDownLatch> cdl =
|
||||
new AtomicReference<CountDownLatch>(new CountDownLatch(0));
|
||||
Random r = new Random();
|
||||
public SlowMeCopro() {
|
||||
|
@ -129,7 +129,7 @@ public class TestReplicasClient {
|
|||
|
||||
private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
|
||||
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
|
||||
CountDownLatch latch = cdl.get();
|
||||
CountDownLatch latch = getCdl().get();
|
||||
try {
|
||||
if (sleepTime.get() > 0) {
|
||||
LOG.info("Sleeping for " + sleepTime.get() + " ms");
|
||||
|
@ -148,6 +148,10 @@ public class TestReplicasClient {
|
|||
LOG.info("We're not the primary replicas.");
|
||||
}
|
||||
}
|
||||
|
||||
public static AtomicReference<CountDownLatch> getCdl() {
|
||||
return cdl;
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
@ -291,7 +295,7 @@ public class TestReplicasClient {
|
|||
public void testUseRegionWithoutReplica() throws Exception {
|
||||
byte[] b1 = "testUseRegionWithoutReplica".getBytes();
|
||||
openRegion(hriSecondary);
|
||||
SlowMeCopro.cdl.set(new CountDownLatch(0));
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(0));
|
||||
try {
|
||||
Get g = new Get(b1);
|
||||
Result r = table.get(g);
|
||||
|
@ -347,14 +351,14 @@ public class TestReplicasClient {
|
|||
byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
|
||||
openRegion(hriSecondary);
|
||||
|
||||
SlowMeCopro.cdl.set(new CountDownLatch(1));
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(1));
|
||||
try {
|
||||
Get g = new Get(b1);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
Result r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
} finally {
|
||||
SlowMeCopro.cdl.get().countDown();
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
closeRegion(hriSecondary);
|
||||
}
|
||||
}
|
||||
|
@ -465,13 +469,13 @@ public class TestReplicasClient {
|
|||
LOG.info("sleep and is not stale done");
|
||||
|
||||
// But if we ask for stale we will get it
|
||||
SlowMeCopro.cdl.set(new CountDownLatch(1));
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(1));
|
||||
g = new Get(b1);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
|
||||
SlowMeCopro.cdl.get().countDown();
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
|
||||
LOG.info("stale done");
|
||||
|
||||
|
@ -484,14 +488,14 @@ public class TestReplicasClient {
|
|||
LOG.info("exists not stale done");
|
||||
|
||||
// exists works on stale but don't see the put
|
||||
SlowMeCopro.cdl.set(new CountDownLatch(1));
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(1));
|
||||
g = new Get(b1);
|
||||
g.setCheckExistenceOnly(true);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
Assert.assertFalse("The secondary has stale data", r.getExists());
|
||||
SlowMeCopro.cdl.get().countDown();
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
LOG.info("exists stale before flush done");
|
||||
|
||||
flushRegion(hriPrimary);
|
||||
|
@ -500,28 +504,28 @@ public class TestReplicasClient {
|
|||
Thread.sleep(1000 + REFRESH_PERIOD * 2);
|
||||
|
||||
// get works and is not stale
|
||||
SlowMeCopro.cdl.set(new CountDownLatch(1));
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(1));
|
||||
g = new Get(b1);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
Assert.assertFalse(r.isEmpty());
|
||||
SlowMeCopro.cdl.get().countDown();
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
LOG.info("stale done");
|
||||
|
||||
// exists works on stale and we see the put after the flush
|
||||
SlowMeCopro.cdl.set(new CountDownLatch(1));
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(1));
|
||||
g = new Get(b1);
|
||||
g.setCheckExistenceOnly(true);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
r = table.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
Assert.assertTrue(r.getExists());
|
||||
SlowMeCopro.cdl.get().countDown();
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
LOG.info("exists stale after flush done");
|
||||
|
||||
} finally {
|
||||
SlowMeCopro.cdl.get().countDown();
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
SlowMeCopro.sleepTime.set(0);
|
||||
Delete d = new Delete(b1);
|
||||
table.delete(d);
|
||||
|
@ -587,7 +591,7 @@ public class TestReplicasClient {
|
|||
SlowMeCopro.slowDownNext.set(false);
|
||||
SlowMeCopro.countOfNext.set(0);
|
||||
} finally {
|
||||
SlowMeCopro.cdl.get().countDown();
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
SlowMeCopro.sleepTime.set(0);
|
||||
SlowMeCopro.slowDownNext.set(false);
|
||||
SlowMeCopro.countOfNext.set(0);
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.lang.math.RandomUtils;
|
||||
|
@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
|
|||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -287,6 +289,45 @@ public class TestRegionMergeTransactionOnCluster {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeWithReplicas() throws Exception {
|
||||
final TableName tableName = TableName.valueOf("testMergeWithReplicas");
|
||||
// Create table and load data.
|
||||
createTableAndLoadData(master, tableName, 5, 2);
|
||||
List<Pair<HRegionInfo, ServerName>> initialRegionToServers =
|
||||
MetaTableAccessor.getTableRegionsAndLocations(master.getZooKeeper(),
|
||||
master.getShortCircuitConnection(), tableName);
|
||||
// Merge 1st and 2nd region
|
||||
PairOfSameType<HRegionInfo> mergedRegions = mergeRegionsAndVerifyRegionNum(master, tableName,
|
||||
0, 2, 5 * 2 - 2);
|
||||
List<Pair<HRegionInfo, ServerName>> currentRegionToServers =
|
||||
MetaTableAccessor.getTableRegionsAndLocations(master.getZooKeeper(),
|
||||
master.getShortCircuitConnection(), tableName);
|
||||
List<HRegionInfo> initialRegions = new ArrayList<HRegionInfo>();
|
||||
for (Pair<HRegionInfo, ServerName> p : initialRegionToServers) {
|
||||
initialRegions.add(p.getFirst());
|
||||
}
|
||||
List<HRegionInfo> currentRegions = new ArrayList<HRegionInfo>();
|
||||
for (Pair<HRegionInfo, ServerName> p : currentRegionToServers) {
|
||||
currentRegions.add(p.getFirst());
|
||||
}
|
||||
assertTrue(initialRegions.contains(mergedRegions.getFirst())); //this is the first region
|
||||
assertTrue(initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
|
||||
mergedRegions.getFirst(), 1))); //this is the replica of the first region
|
||||
assertTrue(initialRegions.contains(mergedRegions.getSecond())); //this is the second region
|
||||
assertTrue(initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
|
||||
mergedRegions.getSecond(), 1))); //this is the replica of the second region
|
||||
assertTrue(!initialRegions.contains(currentRegions.get(0))); //this is the new region
|
||||
assertTrue(!initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
|
||||
currentRegions.get(0), 1))); //replica of the new region
|
||||
assertTrue(currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
|
||||
currentRegions.get(0), 1))); //replica of the new region
|
||||
assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
|
||||
mergedRegions.getFirst(), 1))); //replica of the merged region
|
||||
assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(
|
||||
mergedRegions.getSecond(), 1))); //replica of the merged region
|
||||
}
|
||||
|
||||
private PairOfSameType<HRegionInfo> mergeRegionsAndVerifyRegionNum(
|
||||
HMaster master, TableName tablename,
|
||||
int regionAnum, int regionBnum, int expectedRegionNum) throws Exception {
|
||||
|
@ -335,11 +376,11 @@ public class TestRegionMergeTransactionOnCluster {
|
|||
|
||||
private HTable createTableAndLoadData(HMaster master, TableName tablename)
|
||||
throws Exception {
|
||||
return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM);
|
||||
return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM, 1);
|
||||
}
|
||||
|
||||
private HTable createTableAndLoadData(HMaster master, TableName tablename,
|
||||
int numRegions) throws Exception {
|
||||
int numRegions, int replication) throws Exception {
|
||||
assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > numRegions);
|
||||
byte[][] splitRows = new byte[numRegions - 1][];
|
||||
for (int i = 0; i < splitRows.length; i++) {
|
||||
|
@ -347,6 +388,9 @@ public class TestRegionMergeTransactionOnCluster {
|
|||
}
|
||||
|
||||
HTable table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows);
|
||||
if (replication > 1) {
|
||||
HBaseTestingUtility.setReplicas(admin, tablename, replication);
|
||||
}
|
||||
loadData(table);
|
||||
verifyRowCount(table, ROWSIZE);
|
||||
|
||||
|
@ -356,7 +400,7 @@ public class TestRegionMergeTransactionOnCluster {
|
|||
while (System.currentTimeMillis() < timeout) {
|
||||
tableRegions = MetaTableAccessor.getTableRegionsAndLocations(
|
||||
master.getZooKeeper(), master.getShortCircuitConnection(), tablename);
|
||||
if (tableRegions.size() == numRegions)
|
||||
if (tableRegions.size() == numRegions * replication)
|
||||
break;
|
||||
Thread.sleep(250);
|
||||
}
|
||||
|
@ -364,7 +408,7 @@ public class TestRegionMergeTransactionOnCluster {
|
|||
tableRegions = MetaTableAccessor.getTableRegionsAndLocations(
|
||||
master.getZooKeeper(), master.getShortCircuitConnection(), tablename);
|
||||
LOG.info("Regions after load: " + Joiner.on(',').join(tableRegions));
|
||||
assertEquals(numRegions, tableRegions.size());
|
||||
assertEquals(numRegions * replication, tableRegions.size());
|
||||
return table;
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,9 @@ import org.apache.hadoop.hbase.UnknownRegionException;
|
|||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.client.Consistency;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
|
@ -65,6 +67,7 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TestReplicasClient.SlowMeCopro;
|
||||
import org.apache.hadoop.hbase.coordination.ZKSplitTransactionCoordination;
|
||||
import org.apache.hadoop.hbase.coordination.ZkCloseRegionCoordination;
|
||||
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
|
||||
|
@ -924,6 +927,87 @@ public class TestSplitTransactionOnCluster {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitWithRegionReplicas() throws Exception {
|
||||
ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(TESTING_UTIL);
|
||||
final TableName tableName =
|
||||
TableName.valueOf("foobar");
|
||||
HTableDescriptor htd = TESTING_UTIL.createTableDescriptor("foobar");
|
||||
htd.setRegionReplication(2);
|
||||
htd.addCoprocessor(SlowMeCopro.class.getName());
|
||||
// Create table then get the single region for our new table.
|
||||
HTable t = TESTING_UTIL.createTable(htd, new byte[][]{Bytes.toBytes("cf")},
|
||||
TESTING_UTIL.getConfiguration());
|
||||
int count;
|
||||
List<HRegion> oldRegions;
|
||||
do {
|
||||
oldRegions = cluster.getRegions(tableName);
|
||||
Thread.sleep(10);
|
||||
} while (oldRegions.size() != 2);
|
||||
for (HRegion h : oldRegions) LOG.debug("OLDREGION " + h.getRegionInfo());
|
||||
try {
|
||||
int regionServerIndex = cluster.getServerWith(oldRegions.get(0).getRegionName());
|
||||
HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
|
||||
insertData(tableName.getName(), admin, t);
|
||||
// Turn off balancer so it doesn't cut in and mess up our placements.
|
||||
admin.setBalancerRunning(false, true);
|
||||
// Turn off the meta scanner so it don't remove parent on us.
|
||||
cluster.getMaster().setCatalogJanitorEnabled(false);
|
||||
boolean tableExists = MetaTableAccessor.tableExists(regionServer.getShortCircuitConnection(),
|
||||
tableName);
|
||||
assertEquals("The specified table should be present.", true, tableExists);
|
||||
final HRegion region = findSplittableRegion(oldRegions);
|
||||
regionServerIndex = cluster.getServerWith(region.getRegionName());
|
||||
regionServer = cluster.getRegionServer(regionServerIndex);
|
||||
assertTrue("not able to find a splittable region", region != null);
|
||||
String node = ZKAssign.getNodeName(regionServer.getZooKeeper(),
|
||||
region.getRegionInfo().getEncodedName());
|
||||
regionServer.getZooKeeper().sync(node);
|
||||
SplitTransaction st = new SplitTransaction(region, Bytes.toBytes("row2"));
|
||||
try {
|
||||
st.prepare();
|
||||
st.execute(regionServer, regionServer);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
fail("Split execution should have succeeded with no exceptions thrown " + e);
|
||||
}
|
||||
//TESTING_UTIL.waitUntilAllRegionsAssigned(tableName);
|
||||
List<HRegion> newRegions;
|
||||
do {
|
||||
newRegions = cluster.getRegions(tableName);
|
||||
for (HRegion h : newRegions) LOG.debug("NEWREGION " + h.getRegionInfo());
|
||||
Thread.sleep(1000);
|
||||
} while ((newRegions.contains(oldRegions.get(0)) || newRegions.contains(oldRegions.get(1)))
|
||||
|| newRegions.size() != 4);
|
||||
tableExists = MetaTableAccessor.tableExists(regionServer.getShortCircuitConnection(),
|
||||
tableName);
|
||||
assertEquals("The specified table should be present.", true, tableExists);
|
||||
// exists works on stale and we see the put after the flush
|
||||
byte[] b1 = "row1".getBytes();
|
||||
Get g = new Get(b1);
|
||||
g.setConsistency(Consistency.STRONG);
|
||||
// The following GET will make a trip to the meta to get the new location of the 1st daughter
|
||||
// In the process it will also get the location of the replica of the daughter (initially
|
||||
// pointing to the parent's replica)
|
||||
Result r = t.get(g);
|
||||
Assert.assertFalse(r.isStale());
|
||||
LOG.info("exists stale after flush done");
|
||||
|
||||
SlowMeCopro.getCdl().set(new CountDownLatch(1));
|
||||
g = new Get(b1);
|
||||
g.setConsistency(Consistency.TIMELINE);
|
||||
// This will succeed because in the previous GET we get the location of the replica
|
||||
r = t.get(g);
|
||||
Assert.assertTrue(r.isStale());
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
} finally {
|
||||
SlowMeCopro.getCdl().get().countDown();
|
||||
admin.setBalancerRunning(true, false);
|
||||
cluster.getMaster().setCatalogJanitorEnabled(true);
|
||||
t.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void insertData(final byte[] tableName, HBaseAdmin admin, HTable t) throws IOException,
|
||||
InterruptedException {
|
||||
Put p = new Put(Bytes.toBytes("row1"));
|
||||
|
@ -1192,7 +1276,7 @@ public class TestSplitTransactionOnCluster {
|
|||
private HRegion findSplittableRegion(final List<HRegion> regions) throws InterruptedException {
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
for (HRegion r: regions) {
|
||||
if (r.isSplittable()) {
|
||||
if (r.isSplittable() && r.getRegionInfo().getReplicaId() == 0) {
|
||||
return(r);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue