HBASE-3305 Allow round-robin distribution for table created with multiple regions (ted yu via jgray)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1064930 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
03618fdee9
commit
d09a8b5cec
|
@ -56,6 +56,8 @@ Release 0.91.0 - Unreleased
|
||||||
server.join() method (Jeff Hammerbacher via Stack)
|
server.join() method (Jeff Hammerbacher via Stack)
|
||||||
HBASE-3437 Support Explict Split Points from the Shell
|
HBASE-3437 Support Explict Split Points from the Shell
|
||||||
HBASE-3433 KeyValue API to explicitly distinguish between deep & shallow copies
|
HBASE-3433 KeyValue API to explicitly distinguish between deep & shallow copies
|
||||||
|
HBASE-3305 Allow round-robin distribution for table created with
|
||||||
|
multiple regions (ted yu via jgray)
|
||||||
|
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
|
@ -1182,6 +1182,28 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
assign(HRegionInfo.FIRST_META_REGIONINFO, true);
|
assign(HRegionInfo.FIRST_META_REGIONINFO, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assigns list of user regions in round-robin fashion, if any exist.
|
||||||
|
* <p>
|
||||||
|
* This is a synchronous call and will return once every region has been
|
||||||
|
* assigned. If anything fails, an exception is thrown
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void assignUserRegions(List<HRegionInfo> regions, List<HServerInfo> servers) throws IOException, InterruptedException {
|
||||||
|
if (regions == null)
|
||||||
|
return;
|
||||||
|
Map<HServerInfo, List<HRegionInfo>> bulkPlan = null;
|
||||||
|
// Generate a round-robin bulk assignment plan
|
||||||
|
bulkPlan = LoadBalancer.roundRobinAssignment(regions, servers);
|
||||||
|
LOG.info("Bulk assigning " + regions.size() + " region(s) round-robin across " +
|
||||||
|
servers.size() + " server(s)");
|
||||||
|
// Use fixed count thread pool assigning.
|
||||||
|
BulkAssigner ba = new BulkStartupAssigner(this.master, bulkPlan, this);
|
||||||
|
ba.bulkAssign();
|
||||||
|
LOG.info("Bulk assigning done");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assigns all user regions, if any exist. Used during cluster startup.
|
* Assigns all user regions, if any exist. Used during cluster startup.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -1209,9 +1231,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
// Reuse existing assignment info
|
// Reuse existing assignment info
|
||||||
bulkPlan = LoadBalancer.retainAssignment(allRegions, servers);
|
bulkPlan = LoadBalancer.retainAssignment(allRegions, servers);
|
||||||
} else {
|
} else {
|
||||||
// Generate a round-robin bulk assignment plan
|
// assign regions in round-robin fashion
|
||||||
bulkPlan = LoadBalancer.roundRobinAssignment(
|
assignUserRegions(new ArrayList<HRegionInfo>(allRegions.keySet()), servers);
|
||||||
new ArrayList<HRegionInfo>(allRegions.keySet()), servers);
|
return;
|
||||||
}
|
}
|
||||||
LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
|
LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
|
||||||
servers.size() + " server(s), retainAssignment=" + retainAssignment);
|
servers.size() + " server(s), retainAssignment=" + retainAssignment);
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.lang.reflect.InvocationTargetException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
@ -848,9 +849,15 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
// 4. Close the new region to flush to disk. Close log file too.
|
// 4. Close the new region to flush to disk. Close log file too.
|
||||||
region.close();
|
region.close();
|
||||||
region.getLog().closeAndDelete();
|
region.getLog().closeAndDelete();
|
||||||
|
}
|
||||||
|
|
||||||
// 5. Trigger immediate assignment of this region
|
// 5. Trigger immediate assignment of the regions in round-robin fashion
|
||||||
assignmentManager.assign(region.getRegionInfo(), true);
|
List<HServerInfo> servers = serverManager.getOnlineServersList();
|
||||||
|
try {
|
||||||
|
this.assignmentManager.assignUserRegions(Arrays.asList(newRegions), servers);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.error("Caught " + ie + " during round-robin assignment");
|
||||||
|
throw new IOException(ie);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. If sync, wait for assignment of regions
|
// 5. If sync, wait for assignment of regions
|
||||||
|
|
|
@ -378,13 +378,18 @@ public class LoadBalancer {
|
||||||
int numServers = servers.size();
|
int numServers = servers.size();
|
||||||
int max = (int)Math.ceil((float)numRegions/numServers);
|
int max = (int)Math.ceil((float)numRegions/numServers);
|
||||||
int serverIdx = 0;
|
int serverIdx = 0;
|
||||||
for(HServerInfo server : servers) {
|
if (numServers > 1) {
|
||||||
|
serverIdx = rand.nextInt(numServers);
|
||||||
|
}
|
||||||
|
int regionIdx = 0;
|
||||||
|
for (int j = 0; j < numServers; j++) {
|
||||||
|
HServerInfo server = servers.get((j+serverIdx) % numServers);
|
||||||
List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
|
List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
|
||||||
for(int i=serverIdx;i<regions.size();i+=numServers) {
|
for (int i=regionIdx; i<numRegions; i += numServers) {
|
||||||
serverRegions.add(regions.get(i));
|
serverRegions.add(regions.get(i % numRegions));
|
||||||
}
|
}
|
||||||
assignments.put(server, serverRegions);
|
assignments.put(server, serverRegions);
|
||||||
serverIdx++;
|
regionIdx++;
|
||||||
}
|
}
|
||||||
return assignments;
|
return assignments;
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,10 @@ import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -297,6 +300,27 @@ public class TestAdmin {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void verifyRoundRobinDistribution(HTable ht, int expectedRegions) throws IOException {
|
||||||
|
int numRS = ht.getCurrentNrHRS();
|
||||||
|
Map<HRegionInfo,HServerAddress> regions = ht.getRegionsInfo();
|
||||||
|
Map<HServerAddress, List<HRegionInfo>> server2Regions = new HashMap<HServerAddress, List<HRegionInfo>>();
|
||||||
|
for (Map.Entry<HRegionInfo,HServerAddress> entry : regions.entrySet()) {
|
||||||
|
HServerAddress server = entry.getValue();
|
||||||
|
List<HRegionInfo> regs = server2Regions.get(server);
|
||||||
|
if (regs == null) {
|
||||||
|
regs = new ArrayList<HRegionInfo>();
|
||||||
|
server2Regions.put(server, regs);
|
||||||
|
}
|
||||||
|
regs.add(entry.getKey());
|
||||||
|
}
|
||||||
|
float average = (float) expectedRegions/numRS;
|
||||||
|
int min = (int)Math.floor(average);
|
||||||
|
int max = (int)Math.ceil(average);
|
||||||
|
for (List<HRegionInfo> regionList : server2Regions.values()) {
|
||||||
|
assertTrue(regionList.size() == min || regionList.size() == max);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateTableWithRegions() throws IOException, InterruptedException {
|
public void testCreateTableWithRegions() throws IOException, InterruptedException {
|
||||||
|
|
||||||
|
@ -358,6 +382,8 @@ public class TestAdmin {
|
||||||
assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[8]));
|
assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[8]));
|
||||||
assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
|
assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
|
||||||
|
|
||||||
|
verifyRoundRobinDistribution(ht, expectedRegions);
|
||||||
|
|
||||||
// Now test using start/end with a number of regions
|
// Now test using start/end with a number of regions
|
||||||
|
|
||||||
// Use 80 bit numbers to make sure we aren't limited
|
// Use 80 bit numbers to make sure we aren't limited
|
||||||
|
@ -415,6 +441,8 @@ public class TestAdmin {
|
||||||
assertTrue(Bytes.equals(hri.getStartKey(), new byte [] {9,9,9,9,9,9,9,9,9,9}));
|
assertTrue(Bytes.equals(hri.getStartKey(), new byte [] {9,9,9,9,9,9,9,9,9,9}));
|
||||||
assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
|
assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
|
||||||
|
|
||||||
|
verifyRoundRobinDistribution(ht, expectedRegions);
|
||||||
|
|
||||||
// Try once more with something that divides into something infinite
|
// Try once more with something that divides into something infinite
|
||||||
|
|
||||||
startKey = new byte [] { 0, 0, 0, 0, 0, 0 };
|
startKey = new byte [] { 0, 0, 0, 0, 0, 0 };
|
||||||
|
@ -436,6 +464,8 @@ public class TestAdmin {
|
||||||
expectedRegions, regions.size());
|
expectedRegions, regions.size());
|
||||||
System.err.println("Found " + regions.size() + " regions");
|
System.err.println("Found " + regions.size() + " regions");
|
||||||
|
|
||||||
|
verifyRoundRobinDistribution(ht, expectedRegions);
|
||||||
|
|
||||||
// Try an invalid case where there are duplicate split keys
|
// Try an invalid case where there are duplicate split keys
|
||||||
splitKeys = new byte [][] {
|
splitKeys = new byte [][] {
|
||||||
new byte [] { 1, 1, 1 },
|
new byte [] { 1, 1, 1 },
|
||||||
|
|
Loading…
Reference in New Issue