HBASE-17668: Implement async assgin/offline/move/unassign methods
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
80381f3944
commit
5f98ad2053
|
@ -431,4 +431,40 @@ public interface AsyncAdmin {
|
||||||
* @param splitPoint the explicit position to split on
|
* @param splitPoint the explicit position to split on
|
||||||
*/
|
*/
|
||||||
CompletableFuture<Void> splitRegion(final byte[] regionName, final byte[] splitPoint);
|
CompletableFuture<Void> splitRegion(final byte[] regionName, final byte[] splitPoint);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param regionName Encoded or full name of region to assign.
|
||||||
|
*/
|
||||||
|
CompletableFuture<Void> assign(final byte[] regionName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unassign a region from current hosting regionserver. Region will then be assigned to a
|
||||||
|
* regionserver chosen at random. Region could be reassigned back to the same server. Use
|
||||||
|
* {@link #move(byte[], byte[])} if you want to control the region movement.
|
||||||
|
* @param regionName Encoded or full name of region to unassign. Will clear any existing
|
||||||
|
* RegionPlan if one found.
|
||||||
|
* @param force If true, force unassign (Will remove region from regions-in-transition too if
|
||||||
|
* present. If results in double assignment use hbck -fix to resolve. To be used by
|
||||||
|
* experts).
|
||||||
|
*/
|
||||||
|
CompletableFuture<Void> unassign(final byte[] regionName, final boolean force);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Offline specified region from master's in-memory state. It will not attempt to reassign the
|
||||||
|
* region as in unassign. This API can be used when a region not served by any region server and
|
||||||
|
* still online as per Master's in memory state. If this API is incorrectly used on active region
|
||||||
|
* then master will loose track of that region. This is a special method that should be used by
|
||||||
|
* experts or hbck.
|
||||||
|
* @param regionName Encoded or full name of region to offline
|
||||||
|
*/
|
||||||
|
CompletableFuture<Void> offline(final byte[] regionName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Move the region <code>r</code> to <code>dest</code>.
|
||||||
|
* @param regionName Encoded or full name of region to move.
|
||||||
|
* @param destServerName The servername of the destination regionserver. If passed the empty byte
|
||||||
|
* array we'll assign to a random server. A server name is made of host, port and
|
||||||
|
* startcode. Here is an example: <code> host187.example.com,60020,1289493121758</code>
|
||||||
|
*/
|
||||||
|
CompletableFuture<Void> move(final byte[] regionName, final byte[] destServerName);
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,6 @@ import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
@ -68,6 +67,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegion
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
|
||||||
|
@ -105,10 +106,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColu
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
@ -260,7 +267,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
||||||
LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex);
|
LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex);
|
||||||
failed.add(table);
|
failed.add(table);
|
||||||
}
|
}
|
||||||
})).toArray(size -> new CompletableFuture[size]);
|
})).<CompletableFuture> toArray(size -> new CompletableFuture[size]);
|
||||||
CompletableFuture.allOf(futures).thenAccept((v) -> {
|
CompletableFuture.allOf(futures).thenAccept((v) -> {
|
||||||
future.complete(failed.toArray(new HTableDescriptor[failed.size()]));
|
future.complete(failed.toArray(new HTableDescriptor[failed.size()]));
|
||||||
});
|
});
|
||||||
|
@ -616,7 +623,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
||||||
return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
|
return this.<DeleteNamespaceRequest, DeleteNamespaceResponse> procedureCall(
|
||||||
RequestConverter.buildDeleteNamespaceRequest(name),
|
RequestConverter.buildDeleteNamespaceRequest(name),
|
||||||
(s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
|
(s, c, req, done) -> s.deleteNamespace(c, req, done), (resp) -> resp.getProcId(),
|
||||||
new ModifyNamespaceProcedureBiConsumer(this, name));
|
new DeleteNamespaceProcedureBiConsumer(this, name));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1008,6 +1015,140 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
||||||
.serverName(sn).call();
|
.serverName(sn).call();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Turn regionNameOrEncodedRegionName into regionName, if region does not found, then it'll throw
|
||||||
|
* an IllegalArgumentException wrapped by a {@link CompletableFuture}
|
||||||
|
* @param regionNameOrEncodedRegionName
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
CompletableFuture<byte[]> getRegionName(byte[] regionNameOrEncodedRegionName) {
|
||||||
|
CompletableFuture<byte[]> future = new CompletableFuture<>();
|
||||||
|
if (Bytes
|
||||||
|
.equals(regionNameOrEncodedRegionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
|
||||||
|
|| Bytes.equals(regionNameOrEncodedRegionName,
|
||||||
|
HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
|
||||||
|
future.complete(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
getRegion(regionNameOrEncodedRegionName).whenComplete((p, err) -> {
|
||||||
|
if (err != null) {
|
||||||
|
future.completeExceptionally(err);
|
||||||
|
}
|
||||||
|
if (p != null && p.getFirst() != null) {
|
||||||
|
future.complete(p.getFirst().getRegionName());
|
||||||
|
} else {
|
||||||
|
future.completeExceptionally(
|
||||||
|
new IllegalArgumentException("Invalid region name or encoded region name: "
|
||||||
|
+ Bytes.toStringBinary(regionNameOrEncodedRegionName)));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Void> assign(byte[] regionName) {
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
getRegionName(regionName).whenComplete((fullRegionName, err) -> {
|
||||||
|
if (err != null) {
|
||||||
|
future.completeExceptionally(err);
|
||||||
|
} else {
|
||||||
|
this.<Void> newMasterCaller()
|
||||||
|
.action(
|
||||||
|
((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call(
|
||||||
|
controller, stub, RequestConverter.buildAssignRegionRequest(fullRegionName),
|
||||||
|
(s, c, req, done) -> s.assignRegion(c, req, done), resp -> null)))
|
||||||
|
.call().whenComplete((ret, err2) -> {
|
||||||
|
if (err2 != null) {
|
||||||
|
future.completeExceptionally(err2);
|
||||||
|
} else {
|
||||||
|
future.complete(ret);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Void> unassign(byte[] regionName, boolean force) {
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
getRegionName(regionName).whenComplete((fullRegionName, err) -> {
|
||||||
|
if (err != null) {
|
||||||
|
future.completeExceptionally(err);
|
||||||
|
} else {
|
||||||
|
this.<Void> newMasterCaller()
|
||||||
|
.action(((controller, stub) -> this
|
||||||
|
.<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub,
|
||||||
|
RequestConverter.buildUnassignRegionRequest(fullRegionName, force),
|
||||||
|
(s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null)))
|
||||||
|
.call().whenComplete((ret, err2) -> {
|
||||||
|
if (err2 != null) {
|
||||||
|
future.completeExceptionally(err2);
|
||||||
|
} else {
|
||||||
|
future.complete(ret);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Void> offline(byte[] regionName) {
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
getRegionName(regionName).whenComplete((fullRegionName, err) -> {
|
||||||
|
if (err != null) {
|
||||||
|
future.completeExceptionally(err);
|
||||||
|
} else {
|
||||||
|
this.<Void> newMasterCaller()
|
||||||
|
.action(
|
||||||
|
((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call(
|
||||||
|
controller, stub, RequestConverter.buildOfflineRegionRequest(fullRegionName),
|
||||||
|
(s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null)))
|
||||||
|
.call().whenComplete((ret, err2) -> {
|
||||||
|
if (err2 != null) {
|
||||||
|
future.completeExceptionally(err2);
|
||||||
|
} else {
|
||||||
|
future.complete(ret);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Void> move(byte[] regionName, byte[] destServerName) {
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
getRegionName(regionName).whenComplete((fullRegionName, err) -> {
|
||||||
|
if (err != null) {
|
||||||
|
future.completeExceptionally(err);
|
||||||
|
} else {
|
||||||
|
final MoveRegionRequest request;
|
||||||
|
try {
|
||||||
|
request = RequestConverter.buildMoveRegionRequest(
|
||||||
|
Bytes.toBytes(HRegionInfo.encodeRegionName(fullRegionName)), destServerName);
|
||||||
|
} catch (DeserializationException e) {
|
||||||
|
future.completeExceptionally(e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.<Void> newMasterCaller()
|
||||||
|
.action((controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call(
|
||||||
|
controller, stub, request, (s, c, req, done) -> s.moveRegion(c, req, done),
|
||||||
|
resp -> null))
|
||||||
|
.call().whenComplete((ret, err2) -> {
|
||||||
|
if (err2 != null) {
|
||||||
|
future.completeExceptionally(err2);
|
||||||
|
} else {
|
||||||
|
future.complete(ret);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
|
private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
|
||||||
if (numRegions < 3) {
|
if (numRegions < 3) {
|
||||||
throw new IllegalArgumentException("Must create at least three regions");
|
throw new IllegalArgumentException("Must create at least three regions");
|
||||||
|
|
|
@ -50,7 +50,7 @@ public abstract class TestAsyncAdminBase {
|
||||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1000);
|
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1000);
|
||||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 3000);
|
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 3000);
|
||||||
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
|
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
|
||||||
TEST_UTIL.startMiniCluster(1);
|
TEST_UTIL.startMiniCluster(2);
|
||||||
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
|
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -30,13 +31,20 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionState;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionStates;
|
||||||
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -209,7 +217,7 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
|
||||||
HColumnDescriptor cd = new HColumnDescriptor("d");
|
HColumnDescriptor cd = new HColumnDescriptor("d");
|
||||||
HTableDescriptor td = new HTableDescriptor(tableName);
|
HTableDescriptor td = new HTableDescriptor(tableName);
|
||||||
td.addFamily(cd);
|
td.addFamily(cd);
|
||||||
byte[][] splitRows = new byte[][] { "3".getBytes(), "6".getBytes() };
|
byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") };
|
||||||
Admin syncAdmin = TEST_UTIL.getAdmin();
|
Admin syncAdmin = TEST_UTIL.getAdmin();
|
||||||
try {
|
try {
|
||||||
TEST_UTIL.createTable(td, splitRows);
|
TEST_UTIL.createTable(td, splitRows);
|
||||||
|
@ -296,4 +304,140 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
|
||||||
assertEquals(count, 2);
|
assertEquals(count, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAssignRegionAndUnassignRegion() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf("testAssignRegionAndUnassignRegion");
|
||||||
|
try {
|
||||||
|
// create test table
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||||
|
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||||
|
admin.createTable(desc).get();
|
||||||
|
|
||||||
|
// add region to meta.
|
||||||
|
Table meta = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
|
||||||
|
HRegionInfo hri =
|
||||||
|
new HRegionInfo(desc.getTableName(), Bytes.toBytes("A"), Bytes.toBytes("Z"));
|
||||||
|
MetaTableAccessor.addRegionToMeta(meta, hri);
|
||||||
|
|
||||||
|
// assign region.
|
||||||
|
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||||
|
AssignmentManager am = master.getAssignmentManager();
|
||||||
|
admin.assign(hri.getRegionName()).get();
|
||||||
|
am.waitForAssignment(hri);
|
||||||
|
|
||||||
|
// assert region on server
|
||||||
|
RegionStates regionStates = am.getRegionStates();
|
||||||
|
ServerName serverName = regionStates.getRegionServerOfRegion(hri);
|
||||||
|
TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
|
||||||
|
assertTrue(regionStates.getRegionState(hri).isOpened());
|
||||||
|
|
||||||
|
// Region is assigned now. Let's assign it again.
|
||||||
|
// Master should not abort, and region should be assigned.
|
||||||
|
admin.assign(hri.getRegionName()).get();
|
||||||
|
am.waitForAssignment(hri);
|
||||||
|
assertTrue(regionStates.getRegionState(hri).isOpened());
|
||||||
|
|
||||||
|
// unassign region
|
||||||
|
admin.unassign(hri.getRegionName(), true).get();
|
||||||
|
am.waitForAssignment(hri);
|
||||||
|
assertTrue(regionStates.getRegionState(hri).isOpened());
|
||||||
|
} finally {
|
||||||
|
TEST_UTIL.deleteTable(tableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
HRegionInfo createTableAndGetOneRegion(final TableName tableName)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||||
|
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||||
|
admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5);
|
||||||
|
|
||||||
|
// wait till the table is assigned
|
||||||
|
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||||
|
long timeoutTime = System.currentTimeMillis() + 3000;
|
||||||
|
while (true) {
|
||||||
|
List<HRegionInfo> regions =
|
||||||
|
master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
|
||||||
|
if (regions.size() > 3) {
|
||||||
|
return regions.get(2);
|
||||||
|
}
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
if (now > timeoutTime) {
|
||||||
|
fail("Could not find an online region");
|
||||||
|
}
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOfflineRegion() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf("testOfflineRegion");
|
||||||
|
try {
|
||||||
|
HRegionInfo hri = createTableAndGetOneRegion(tableName);
|
||||||
|
|
||||||
|
RegionStates regionStates =
|
||||||
|
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
|
||||||
|
ServerName serverName = regionStates.getRegionServerOfRegion(hri);
|
||||||
|
TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
|
||||||
|
admin.offline(hri.getRegionName()).get();
|
||||||
|
|
||||||
|
long timeoutTime = System.currentTimeMillis() + 3000;
|
||||||
|
while (true) {
|
||||||
|
if (regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OFFLINE)
|
||||||
|
.contains(hri))
|
||||||
|
break;
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
if (now > timeoutTime) {
|
||||||
|
fail("Failed to offline the region in time");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
|
RegionState regionState = regionStates.getRegionState(hri);
|
||||||
|
assertTrue(regionState.isOffline());
|
||||||
|
} finally {
|
||||||
|
TEST_UTIL.deleteTable(tableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMoveRegion() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf("testMoveRegion");
|
||||||
|
try {
|
||||||
|
HRegionInfo hri = createTableAndGetOneRegion(tableName);
|
||||||
|
|
||||||
|
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
|
||||||
|
RegionStates regionStates = master.getAssignmentManager().getRegionStates();
|
||||||
|
ServerName serverName = regionStates.getRegionServerOfRegion(hri);
|
||||||
|
ServerManager serverManager = master.getServerManager();
|
||||||
|
ServerName destServerName = null;
|
||||||
|
List<JVMClusterUtil.RegionServerThread> regionServers =
|
||||||
|
TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
|
||||||
|
for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
|
||||||
|
HRegionServer destServer = regionServer.getRegionServer();
|
||||||
|
destServerName = destServer.getServerName();
|
||||||
|
if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue(destServerName != null && !destServerName.equals(serverName));
|
||||||
|
admin.move(hri.getEncodedNameAsBytes(), Bytes.toBytes(destServerName.getServerName())).get();
|
||||||
|
|
||||||
|
long timeoutTime = System.currentTimeMillis() + 30000;
|
||||||
|
while (true) {
|
||||||
|
ServerName sn = regionStates.getRegionServerOfRegion(hri);
|
||||||
|
if (sn != null && sn.equals(destServerName)) {
|
||||||
|
TEST_UTIL.assertRegionOnServer(hri, sn, 200);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
if (now > timeoutTime) {
|
||||||
|
fail("Failed to move the region in time: " + regionStates.getRegionState(hri));
|
||||||
|
}
|
||||||
|
regionStates.waitForUpdate(50);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
TEST_UTIL.deleteTable(tableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue