From e946d9d841a72d41c0c7458667c7fedc495bd306 Mon Sep 17 00:00:00 2001 From: Balazs Meszaros Date: Thu, 23 Nov 2017 14:42:39 +0100 Subject: [PATCH] HBASE-19242 Add MOB compact support for AsyncAdmin Signed-off-by: Michael Stack Signed-off-by: Guanghao Zhang --- .../org/apache/hadoop/hbase/client/Admin.java | 130 +++++----- .../hadoop/hbase/client/AsyncAdmin.java | 69 ++++- .../hadoop/hbase/client/AsyncHBaseAdmin.java | 24 +- .../hadoop/hbase/client/HBaseAdmin.java | 13 +- .../hbase/client/RawAsyncHBaseAdmin.java | 238 +++++++++++------- .../hadoop/hbase/client/RegionInfo.java | 11 + .../hbase/client/TestAsyncRegionAdminApi.java | 38 +++ 7 files changed, 344 insertions(+), 179 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 6f1190e5734..d9f8e899819 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -871,6 +871,33 @@ public interface Admin extends Abortable, Closeable { void compactRegion(byte[] regionName, byte[] columnFamily) throws IOException; + /** + * Compact a table. Asynchronous operation in that this method requests that a + * Compaction run and then it returns. It does not wait on the completion of Compaction + * (it can take a while). + * + * @param tableName table to compact + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + */ + void compact(TableName tableName, CompactType compactType) + throws IOException, InterruptedException; + + /** + * Compact a column family within a table. Asynchronous operation in that this method + * requests that a Compaction run and then it returns. It does not wait on the + * completion of Compaction (it can take a while). + * + * @param tableName table to compact + * @param columnFamily column family within a table + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + * @throws IOException if not a mob column family or if a remote or network exception occurs + * @throws InterruptedException + */ + void compact(TableName tableName, byte[] columnFamily, CompactType compactType) + throws IOException, InterruptedException; + /** * Major compact a table. Asynchronous operation in that this method requests * that a Compaction run and then it returns. It does not wait on the completion of Compaction @@ -915,6 +942,33 @@ public interface Admin extends Abortable, Closeable { void majorCompactRegion(byte[] regionName, byte[] columnFamily) throws IOException; + /** + * Major compact a table. Asynchronous operation in that this method requests that a + * Compaction run and then it returns. It does not wait on the completion of Compaction + * (it can take a while). + * + * @param tableName table to compact + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + * @throws IOException if a remote or network exception occurs + * @throws InterruptedException + */ + void majorCompact(TableName tableName, CompactType compactType) + throws IOException, InterruptedException; + + /** + * Major compact a column family within a table. Asynchronous operation in that this method requests that a + * Compaction run and then it returns. It does not wait on the completion of Compaction + * (it can take a while). + * + * @param tableName table to compact + * @param columnFamily column family within a table + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + * @throws IOException if not a mob column family or if a remote or network exception occurs + * @throws InterruptedException + */ + void majorCompact(TableName tableName, byte[] columnFamily, CompactType compactType) + throws IOException, InterruptedException; + /** * Compact all regions on the region server. Asynchronous operation in that this method requests * that a Compaction run and then it returns. It does not wait on the completion of Compaction (it @@ -1735,6 +1789,17 @@ public interface Admin extends Abortable, Closeable { */ CompactionState getCompactionState(TableName tableName) throws IOException; + /** + * Get the current compaction state of a table. It could be in a compaction, or none. + * + * @param tableName table to examine + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + * @return the current compaction state + * @throws IOException if a remote or network exception occurs + */ + CompactionState getCompactionState(TableName tableName, + CompactType compactType) throws IOException; + /** * Get the current compaction state of region. It could be in a major compaction, a minor * compaction, both, or none. @@ -2310,71 +2375,6 @@ public interface Admin extends Abortable, Closeable { return getClusterStatus(EnumSet.of(Option.MASTER_INFO_PORT)).getMasterInfoPort(); } - /** - * Compact a table. Asynchronous operation in that this method requests that a - * Compaction run and then it returns. It does not wait on the completion of Compaction - * (it can take a while). - * - * @param tableName table to compact - * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} - * @throws IOException - * @throws InterruptedException - */ - void compact(TableName tableName, CompactType compactType) - throws IOException, InterruptedException; - - /** - * Compact a column family within a table. Asynchronous operation in that this method requests that a - * Compaction run and then it returns. It does not wait on the completion of Compaction - * (it can take a while). - * - * @param tableName table to compact - * @param columnFamily column family within a table - * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} - * @throws IOException if not a mob column family or if a remote or network exception occurs - * @throws InterruptedException - */ - void compact(TableName tableName, byte[] columnFamily, CompactType compactType) - throws IOException, InterruptedException; - - /** - * Major compact a table. Asynchronous operation in that this method requests that a - * Compaction run and then it returns. It does not wait on the completion of Compaction - * (it can take a while). - * - * @param tableName table to compact - * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} - * @throws IOException - * @throws InterruptedException - */ - void majorCompact(TableName tableName, CompactType compactType) - throws IOException, InterruptedException; - - /** - * Major compact a column family within a table. Asynchronous operation in that this method requests that a - * Compaction run and then it returns. It does not wait on the completion of Compaction - * (it can take a while). - * - * @param tableName table to compact - * @param columnFamily column family within a table - * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} - * @throws IOException if not a mob column family or if a remote or network exception occurs - * @throws InterruptedException - */ - void majorCompact(TableName tableName, byte[] columnFamily, CompactType compactType) - throws IOException, InterruptedException; - - /** - * Get the current compaction state of a table. It could be in a compaction, or none. - * - * @param tableName table to examine - * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} - * @return the current compaction state - * @throws IOException if a remote or network exception occurs - */ - CompactionState getCompactionState(TableName tableName, - CompactType compactType) throws IOException; - /** * Return the set of supported security capabilities. * @throws IOException diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index abd8e5a60b4..2ae51aca8cb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -299,7 +299,9 @@ public interface AsyncAdmin { * was sent to HBase and may need some time to finish the compact operation. * @param tableName table to compact */ - CompletableFuture compact(TableName tableName); + default CompletableFuture compact(TableName tableName) { + return compact(tableName, CompactType.NORMAL); + } /** * Compact a column family within a table. When the returned CompletableFuture is done, it only @@ -309,7 +311,28 @@ public interface AsyncAdmin { * @param columnFamily column family within a table. If not present, compact the table's all * column families. */ - CompletableFuture compact(TableName tableName, byte[] columnFamily); + default CompletableFuture compact(TableName tableName, byte[] columnFamily) { + return compact(tableName, columnFamily, CompactType.NORMAL); + } + + /** + * Compact a table. When the returned CompletableFuture is done, it only means the compact request + * was sent to HBase and may need some time to finish the compact operation. + * @param tableName table to compact + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + */ + CompletableFuture compact(TableName tableName, CompactType compactType); + + /** + * Compact a column family within a table. When the returned CompletableFuture is done, it only + * means the compact request was sent to HBase and may need some time to finish the compact + * operation. + * @param tableName table to compact + * @param columnFamily column family within a table + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + */ + CompletableFuture compact(TableName tableName, byte[] columnFamily, + CompactType compactType); /** * Compact an individual region. When the returned CompletableFuture is done, it only means the @@ -333,7 +356,9 @@ public interface AsyncAdmin { * request was sent to HBase and may need some time to finish the compact operation. * @param tableName table to major compact */ - CompletableFuture majorCompact(TableName tableName); + default CompletableFuture majorCompact(TableName tableName) { + return majorCompact(tableName, CompactType.NORMAL); + } /** * Major compact a column family within a table. When the returned CompletableFuture is done, it @@ -343,7 +368,29 @@ public interface AsyncAdmin { * @param columnFamily column family within a table. If not present, major compact the table's all * column families. */ - CompletableFuture majorCompact(TableName tableName, byte[] columnFamily); + default CompletableFuture majorCompact(TableName tableName, byte[] columnFamily) { + return majorCompact(tableName, columnFamily, CompactType.NORMAL); + } + + /** + * Major compact a table. When the returned CompletableFuture is done, it only means the compact + * request was sent to HBase and may need some time to finish the compact operation. + * @param tableName table to major compact + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + */ + CompletableFuture majorCompact(TableName tableName, CompactType compactType); + + /** + * Major compact a column family within a table. When the returned CompletableFuture is done, it + * only means the compact request was sent to HBase and may need some time to finish the compact + * operation. + * @param tableName table to major compact + * @param columnFamily column family within a table. If not present, major compact the table's all + * column families. + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + */ + CompletableFuture majorCompact(TableName tableName, byte[] columnFamily, + CompactType compactType); /** * Major compact a region. When the returned CompletableFuture is done, it only means the compact @@ -960,7 +1007,19 @@ public interface AsyncAdmin { * @param tableName table to examine * @return the current compaction state wrapped by a {@link CompletableFuture} */ - CompletableFuture getCompactionState(TableName tableName); + default CompletableFuture getCompactionState(TableName tableName) { + return getCompactionState(tableName, CompactType.NORMAL); + } + + /** + * Get the current compaction state of a table. It could be in a major compaction, a minor + * compaction, both, or none. + * @param tableName table to examine + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} + * @return the current compaction state wrapped by a {@link CompletableFuture} + */ + CompletableFuture getCompactionState(TableName tableName, + CompactType compactType); /** * Get the current compaction state of region. It could be in a major compaction, a minor diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index fb16fcedd41..0f0679d8ce7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -244,13 +244,15 @@ class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture compact(TableName tableName) { - return wrap(rawAdmin.compact(tableName)); + public CompletableFuture compact(TableName tableName, + CompactType compactType) { + return wrap(rawAdmin.compact(tableName, compactType)); } @Override - public CompletableFuture compact(TableName tableName, byte[] columnFamily) { - return wrap(rawAdmin.compact(tableName, columnFamily)); + public CompletableFuture compact(TableName tableName, + byte[] columnFamily, CompactType compactType) { + return wrap(rawAdmin.compact(tableName, columnFamily, compactType)); } @Override @@ -264,13 +266,14 @@ class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture majorCompact(TableName tableName) { - return wrap(rawAdmin.majorCompact(tableName)); + public CompletableFuture majorCompact(TableName tableName, CompactType compactType) { + return wrap(rawAdmin.majorCompact(tableName, compactType)); } @Override - public CompletableFuture majorCompact(TableName tableName, byte[] columnFamily) { - return wrap(rawAdmin.majorCompact(tableName, columnFamily)); + public CompletableFuture majorCompact(TableName tableName, byte[] columnFamily, + CompactType compactType) { + return wrap(rawAdmin.majorCompact(tableName, columnFamily, compactType)); } @Override @@ -632,8 +635,9 @@ class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture getCompactionState(TableName tableName) { - return wrap(rawAdmin.getCompactionState(tableName)); + public CompletableFuture getCompactionState( + TableName tableName, CompactType compactType) { + return wrap(rawAdmin.getCompactionState(tableName, compactType)); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 05157dd99db..1a00efe134f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1282,8 +1282,8 @@ public class HBaseAdmin implements Admin { CompactType compactType) throws IOException { switch (compactType) { case MOB: - compact(this.connection.getAdminForMaster(), getMobRegionInfo(tableName), major, - columnFamily); + compact(this.connection.getAdminForMaster(), RegionInfo.createMobRegionInfo(tableName), + major, columnFamily); break; case NORMAL: checkTableExists(tableName); @@ -3240,7 +3240,7 @@ public class HBaseAdmin implements Admin { new Callable() { @Override public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception { - RegionInfo info = getMobRegionInfo(tableName); + RegionInfo info = RegionInfo.createMobRegionInfo(tableName); GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(info.getRegionName(), true); GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request); @@ -3304,7 +3304,7 @@ public class HBaseAdmin implements Admin { } break; default: - throw new IllegalArgumentException("Unknowne compactType: " + compactType); + throw new IllegalArgumentException("Unknown compactType: " + compactType); } if (state != null) { return ProtobufUtil.createCompactionState(state); @@ -3839,11 +3839,6 @@ public class HBaseAdmin implements Admin { }); } - private RegionInfo getMobRegionInfo(TableName tableName) { - return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(".mob")).setRegionId(0) - .build(); - } - private RpcControllerFactory getRpcControllerFactory() { return this.rpcControllerFactory; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index f56e7cafc19..5e9356a1fe3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -842,15 +842,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture compact(TableName tableName) { - return compact(tableName, null, false, CompactType.NORMAL); + public CompletableFuture compact(TableName tableName, CompactType compactType) { + return compact(tableName, null, false, compactType); } @Override - public CompletableFuture compact(TableName tableName, byte[] columnFamily) { - Preconditions.checkNotNull(columnFamily, - "columnFamily is null. If you don't specify a columnFamily, use compact(TableName) instead"); - return compact(tableName, columnFamily, false, CompactType.NORMAL); + public CompletableFuture compact(TableName tableName, byte[] columnFamily, + CompactType compactType) { + Preconditions.checkNotNull(columnFamily, "columnFamily is null. " + + "If you don't specify a columnFamily, use compact(TableName) instead"); + return compact(tableName, columnFamily, false, compactType); } @Override @@ -866,15 +867,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture majorCompact(TableName tableName) { - return compact(tableName, null, true, CompactType.NORMAL); + public CompletableFuture majorCompact(TableName tableName, CompactType compactType) { + return compact(tableName, null, true, compactType); } @Override - public CompletableFuture majorCompact(TableName tableName, byte[] columnFamily) { + public CompletableFuture majorCompact(TableName tableName, byte[] columnFamily, + CompactType compactType) { Preconditions.checkNotNull(columnFamily, "columnFamily is null." - + " If you don't specify a columnFamily, use majorCompact(TableName) instead"); - return compact(tableName, columnFamily, true, CompactType.NORMAL); + + "If you don't specify a columnFamily, use compact(TableName) instead"); + return compact(tableName, columnFamily, true, compactType); } @Override @@ -926,6 +928,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture compactRegion(byte[] regionName, byte[] columnFamily, boolean major) { CompletableFuture future = new CompletableFuture<>(); + getRegionLocation(regionName).whenComplete( (location, err) -> { if (err != null) { @@ -981,31 +984,51 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { /** * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() */ - private CompletableFuture compact(TableName tableName, byte[] columnFamily, boolean major, - CompactType compactType) { - if (CompactType.MOB.equals(compactType)) { - // TODO support MOB compact. - return failedFuture(new UnsupportedOperationException("MOB compact does not support")); - } + private CompletableFuture compact(TableName tableName, byte[] columnFamily, + boolean major, CompactType compactType) { CompletableFuture future = new CompletableFuture<>(); - getTableHRegionLocations(tableName).whenComplete((locations, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - CompletableFuture[] compactFutures = locations.stream().filter(l -> l.getRegion() != null) - .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) - .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) - .toArray(CompletableFuture[]::new); - // future complete unless all of the compact futures are completed. - CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); - }); + + switch (compactType) { + case MOB: + connection.registry.getMasterAddress().whenComplete((serverName, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); + compact(serverName, regionInfo, major, columnFamily) + .whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + break; + case NORMAL: + getTableHRegionLocations(tableName).whenComplete((locations, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + CompletableFuture[] compactFutures = locations.stream().filter(l -> l.getRegion() != null) + .filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) + .map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) + .toArray(CompletableFuture[]::new); + // future complete unless all of the compact futures are completed. + CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + break; + default: + throw new IllegalArgumentException("Unknown compactType: " + compactType); + } return future; } @@ -2741,64 +2764,99 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture getCompactionState(TableName tableName) { + public CompletableFuture getCompactionState(TableName tableName, + CompactType compactType) { CompletableFuture future = new CompletableFuture<>(); - getTableHRegionLocations(tableName).whenComplete( - (locations, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - List regionStates = new ArrayList<>(); - List> futures = new ArrayList<>(); - locations.stream().filter(loc -> loc.getServerName() != null) - .filter(loc -> loc.getRegion() != null) - .filter(loc -> !loc.getRegion().isOffline()) - .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { - futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { - // If any region compaction state is MAJOR_AND_MINOR - // the table compaction state is MAJOR_AND_MINOR, too. - if (err2 != null) { - future.completeExceptionally(err2); - } else if (regionState == CompactionState.MAJOR_AND_MINOR) { - future.complete(regionState); - } else { - regionStates.add(regionState); - } - })); - }); - CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) - .whenComplete((ret, err3) -> { - // If future not completed, check all regions's compaction state - if (!future.isCompletedExceptionally() && !future.isDone()) { - CompactionState state = CompactionState.NONE; - for (CompactionState regionState : regionStates) { - switch (regionState) { - case MAJOR: - if (state == CompactionState.MINOR) { - future.complete(CompactionState.MAJOR_AND_MINOR); - } else { - state = CompactionState.MAJOR; - } - break; - case MINOR: - if (state == CompactionState.MAJOR) { - future.complete(CompactionState.MAJOR_AND_MINOR); - } else { - state = CompactionState.MINOR; - } - break; - case NONE: - default: - } - if (!future.isDone()) { - future.complete(state); - } - } + switch (compactType) { + case MOB: + connection.registry.getMasterAddress().whenComplete((serverName, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName); + + this. newAdminCaller().serverName(serverName).action( + (controller, stub) -> this + . adminCall( + controller, stub, + RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true), + (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp) + ).call().whenComplete((resp2, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + if (resp2.hasCompactionState()) { + future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState())); + } else { + future.complete(CompactionState.NONE); } - }); - }); + } + }); + }); + break; + case NORMAL: + getTableHRegionLocations(tableName).whenComplete( + (locations, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + List regionStates = new ArrayList<>(); + List> futures = new ArrayList<>(); + locations.stream().filter(loc -> loc.getServerName() != null) + .filter(loc -> loc.getRegion() != null) + .filter(loc -> !loc.getRegion().isOffline()) + .map(loc -> loc.getRegion().getRegionName()).forEach(region -> { + futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> { + // If any region compaction state is MAJOR_AND_MINOR + // the table compaction state is MAJOR_AND_MINOR, too. + if (err2 != null) { + future.completeExceptionally(err2); + } else if (regionState == CompactionState.MAJOR_AND_MINOR) { + future.complete(regionState); + } else { + regionStates.add(regionState); + } + })); + }); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) + .whenComplete((ret, err3) -> { + // If future not completed, check all regions's compaction state + if (!future.isCompletedExceptionally() && !future.isDone()) { + CompactionState state = CompactionState.NONE; + for (CompactionState regionState : regionStates) { + switch (regionState) { + case MAJOR: + if (state == CompactionState.MINOR) { + future.complete(CompactionState.MAJOR_AND_MINOR); + } else { + state = CompactionState.MAJOR; + } + break; + case MINOR: + if (state == CompactionState.MAJOR) { + future.complete(CompactionState.MAJOR_AND_MINOR); + } else { + state = CompactionState.MINOR; + } + break; + case NONE: + default: + } + if (!future.isDone()) { + future.complete(state); + } + } + } + }); + }); + break; + default: + throw new IllegalArgumentException("Unknown compactType: " + compactType); + } + return future; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java index 0eb4e422148..cfca6daa73d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java @@ -572,6 +572,17 @@ public interface RegionInfo { return b; } + /** + * Creates a RegionInfo object for MOB data. + * + * @param tableName the name of the table + * @return the MOB {@link RegionInfo}. + */ + static RegionInfo createMobRegionInfo(TableName tableName) { + return RegionInfoBuilder.newBuilder(tableName) + .setStartKey(Bytes.toBytes(".mob")).setRegionId(0).build(); + } + /** * Separate elements of a regionName. * @param regionName diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java index 8a1afab60a3..e6cffd6333f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; @@ -422,6 +424,42 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { assertEquals(count, 2); } + private void waitUntilMobCompactionFinished(TableName tableName) + throws ExecutionException, InterruptedException { + long finished = EnvironmentEdgeManager.currentTime() + 60000; + CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); + while (EnvironmentEdgeManager.currentTime() < finished) { + if (state == CompactionState.NONE) { + break; + } + Thread.sleep(10); + state = admin.getCompactionState(tableName, CompactType.MOB).get(); + } + assertEquals(CompactionState.NONE, state); + } + + @Test + public void testCompactMob() throws Exception { + ColumnFamilyDescriptor columnDescriptor = + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob")) + .setMobEnabled(true).setMobThreshold(0).build(); + + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .addColumnFamily(columnDescriptor).build(); + + admin.createTable(tableDescriptor).get(); + + byte[][] families = { Bytes.toBytes("mob") }; + loadData(tableName, families, 3000, 8); + + admin.majorCompact(tableName, CompactType.MOB).get(); + + CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get(); + assertNotEquals(CompactionState.NONE, state); + + waitUntilMobCompactionFinished(tableName); + } + @Test public void testCompactRegionServer() throws Exception { byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };