HBASE-19242 Add MOB compact support for AsyncAdmin

Signed-off-by: Michael Stack <stack@apache.org>
Signed-off-by: Guanghao Zhang <zghaobac@gmail.com>
This commit is contained in:
Balazs Meszaros 2017-11-23 14:42:39 +01:00 committed by Michael Stack
parent 1447956846
commit e946d9d841
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
7 changed files with 344 additions and 179 deletions

View File

@ -871,6 +871,33 @@ public interface Admin extends Abortable, Closeable {
void compactRegion(byte[] regionName, byte[] columnFamily) void compactRegion(byte[] regionName, byte[] columnFamily)
throws IOException; throws IOException;
/**
* Compact a table. Asynchronous operation in that this method requests that a
* Compaction run and then it returns. It does not wait on the completion of Compaction
* (it can take a while).
*
* @param tableName table to compact
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
void compact(TableName tableName, CompactType compactType)
throws IOException, InterruptedException;
/**
* Compact a column family within a table. Asynchronous operation in that this method
* requests that a Compaction run and then it returns. It does not wait on the
* completion of Compaction (it can take a while).
*
* @param tableName table to compact
* @param columnFamily column family within a table
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
* @throws IOException if not a mob column family or if a remote or network exception occurs
* @throws InterruptedException
*/
void compact(TableName tableName, byte[] columnFamily, CompactType compactType)
throws IOException, InterruptedException;
/** /**
* Major compact a table. Asynchronous operation in that this method requests * Major compact a table. Asynchronous operation in that this method requests
* that a Compaction run and then it returns. It does not wait on the completion of Compaction * that a Compaction run and then it returns. It does not wait on the completion of Compaction
@ -915,6 +942,33 @@ public interface Admin extends Abortable, Closeable {
void majorCompactRegion(byte[] regionName, byte[] columnFamily) void majorCompactRegion(byte[] regionName, byte[] columnFamily)
throws IOException; throws IOException;
/**
* Major compact a table. Asynchronous operation in that this method requests that a
* Compaction run and then it returns. It does not wait on the completion of Compaction
* (it can take a while).
*
* @param tableName table to compact
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
void majorCompact(TableName tableName, CompactType compactType)
throws IOException, InterruptedException;
/**
* Major compact a column family within a table. Asynchronous operation in that this method requests that a
* Compaction run and then it returns. It does not wait on the completion of Compaction
* (it can take a while).
*
* @param tableName table to compact
* @param columnFamily column family within a table
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
* @throws IOException if not a mob column family or if a remote or network exception occurs
* @throws InterruptedException
*/
void majorCompact(TableName tableName, byte[] columnFamily, CompactType compactType)
throws IOException, InterruptedException;
/** /**
* Compact all regions on the region server. Asynchronous operation in that this method requests * Compact all regions on the region server. Asynchronous operation in that this method requests
* that a Compaction run and then it returns. It does not wait on the completion of Compaction (it * that a Compaction run and then it returns. It does not wait on the completion of Compaction (it
@ -1735,6 +1789,17 @@ public interface Admin extends Abortable, Closeable {
*/ */
CompactionState getCompactionState(TableName tableName) throws IOException; CompactionState getCompactionState(TableName tableName) throws IOException;
/**
* Get the current compaction state of a table. It could be in a compaction, or none.
*
* @param tableName table to examine
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
* @return the current compaction state
* @throws IOException if a remote or network exception occurs
*/
CompactionState getCompactionState(TableName tableName,
CompactType compactType) throws IOException;
/** /**
* Get the current compaction state of region. It could be in a major compaction, a minor * Get the current compaction state of region. It could be in a major compaction, a minor
* compaction, both, or none. * compaction, both, or none.
@ -2310,71 +2375,6 @@ public interface Admin extends Abortable, Closeable {
return getClusterStatus(EnumSet.of(Option.MASTER_INFO_PORT)).getMasterInfoPort(); return getClusterStatus(EnumSet.of(Option.MASTER_INFO_PORT)).getMasterInfoPort();
} }
/**
* Compact a table. Asynchronous operation in that this method requests that a
* Compaction run and then it returns. It does not wait on the completion of Compaction
* (it can take a while).
*
* @param tableName table to compact
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
* @throws IOException
* @throws InterruptedException
*/
void compact(TableName tableName, CompactType compactType)
throws IOException, InterruptedException;
/**
* Compact a column family within a table. Asynchronous operation in that this method requests that a
* Compaction run and then it returns. It does not wait on the completion of Compaction
* (it can take a while).
*
* @param tableName table to compact
* @param columnFamily column family within a table
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
* @throws IOException if not a mob column family or if a remote or network exception occurs
* @throws InterruptedException
*/
void compact(TableName tableName, byte[] columnFamily, CompactType compactType)
throws IOException, InterruptedException;
/**
* Major compact a table. Asynchronous operation in that this method requests that a
* Compaction run and then it returns. It does not wait on the completion of Compaction
* (it can take a while).
*
* @param tableName table to compact
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
* @throws IOException
* @throws InterruptedException
*/
void majorCompact(TableName tableName, CompactType compactType)
throws IOException, InterruptedException;
/**
* Major compact a column family within a table. Asynchronous operation in that this method requests that a
* Compaction run and then it returns. It does not wait on the completion of Compaction
* (it can take a while).
*
* @param tableName table to compact
* @param columnFamily column family within a table
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
* @throws IOException if not a mob column family or if a remote or network exception occurs
* @throws InterruptedException
*/
void majorCompact(TableName tableName, byte[] columnFamily, CompactType compactType)
throws IOException, InterruptedException;
/**
* Get the current compaction state of a table. It could be in a compaction, or none.
*
* @param tableName table to examine
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
* @return the current compaction state
* @throws IOException if a remote or network exception occurs
*/
CompactionState getCompactionState(TableName tableName,
CompactType compactType) throws IOException;
/** /**
* Return the set of supported security capabilities. * Return the set of supported security capabilities.
* @throws IOException * @throws IOException

View File

@ -299,7 +299,9 @@ public interface AsyncAdmin {
* was sent to HBase and may need some time to finish the compact operation. * was sent to HBase and may need some time to finish the compact operation.
* @param tableName table to compact * @param tableName table to compact
*/ */
CompletableFuture<Void> compact(TableName tableName); default CompletableFuture<Void> compact(TableName tableName) {
return compact(tableName, CompactType.NORMAL);
}
/** /**
* Compact a column family within a table. When the returned CompletableFuture is done, it only * Compact a column family within a table. When the returned CompletableFuture is done, it only
@ -309,7 +311,28 @@ public interface AsyncAdmin {
* @param columnFamily column family within a table. If not present, compact the table's all * @param columnFamily column family within a table. If not present, compact the table's all
* column families. * column families.
*/ */
CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily); default CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily) {
return compact(tableName, columnFamily, CompactType.NORMAL);
}
/**
* Compact a table. When the returned CompletableFuture is done, it only means the compact request
* was sent to HBase and may need some time to finish the compact operation.
* @param tableName table to compact
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
*/
CompletableFuture<Void> compact(TableName tableName, CompactType compactType);
/**
* Compact a column family within a table. When the returned CompletableFuture is done, it only
* means the compact request was sent to HBase and may need some time to finish the compact
* operation.
* @param tableName table to compact
* @param columnFamily column family within a table
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
*/
CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
CompactType compactType);
/** /**
* Compact an individual region. When the returned CompletableFuture is done, it only means the * Compact an individual region. When the returned CompletableFuture is done, it only means the
@ -333,7 +356,9 @@ public interface AsyncAdmin {
* request was sent to HBase and may need some time to finish the compact operation. * request was sent to HBase and may need some time to finish the compact operation.
* @param tableName table to major compact * @param tableName table to major compact
*/ */
CompletableFuture<Void> majorCompact(TableName tableName); default CompletableFuture<Void> majorCompact(TableName tableName) {
return majorCompact(tableName, CompactType.NORMAL);
}
/** /**
* Major compact a column family within a table. When the returned CompletableFuture is done, it * Major compact a column family within a table. When the returned CompletableFuture is done, it
@ -343,7 +368,29 @@ public interface AsyncAdmin {
* @param columnFamily column family within a table. If not present, major compact the table's all * @param columnFamily column family within a table. If not present, major compact the table's all
* column families. * column families.
*/ */
CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily); default CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily) {
return majorCompact(tableName, columnFamily, CompactType.NORMAL);
}
/**
* Major compact a table. When the returned CompletableFuture is done, it only means the compact
* request was sent to HBase and may need some time to finish the compact operation.
* @param tableName table to major compact
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
*/
CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType);
/**
* Major compact a column family within a table. When the returned CompletableFuture is done, it
* only means the compact request was sent to HBase and may need some time to finish the compact
* operation.
* @param tableName table to major compact
* @param columnFamily column family within a table. If not present, major compact the table's all
* column families.
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
*/
CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
CompactType compactType);
/** /**
* Major compact a region. When the returned CompletableFuture is done, it only means the compact * Major compact a region. When the returned CompletableFuture is done, it only means the compact
@ -960,7 +1007,19 @@ public interface AsyncAdmin {
* @param tableName table to examine * @param tableName table to examine
* @return the current compaction state wrapped by a {@link CompletableFuture} * @return the current compaction state wrapped by a {@link CompletableFuture}
*/ */
CompletableFuture<CompactionState> getCompactionState(TableName tableName); default CompletableFuture<CompactionState> getCompactionState(TableName tableName) {
return getCompactionState(tableName, CompactType.NORMAL);
}
/**
* Get the current compaction state of a table. It could be in a major compaction, a minor
* compaction, both, or none.
* @param tableName table to examine
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
* @return the current compaction state wrapped by a {@link CompletableFuture}
*/
CompletableFuture<CompactionState> getCompactionState(TableName tableName,
CompactType compactType);
/** /**
* Get the current compaction state of region. It could be in a major compaction, a minor * Get the current compaction state of region. It could be in a major compaction, a minor

View File

@ -244,13 +244,15 @@ class AsyncHBaseAdmin implements AsyncAdmin {
} }
@Override @Override
public CompletableFuture<Void> compact(TableName tableName) { public CompletableFuture<Void> compact(TableName tableName,
return wrap(rawAdmin.compact(tableName)); CompactType compactType) {
return wrap(rawAdmin.compact(tableName, compactType));
} }
@Override @Override
public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily) { public CompletableFuture<Void> compact(TableName tableName,
return wrap(rawAdmin.compact(tableName, columnFamily)); byte[] columnFamily, CompactType compactType) {
return wrap(rawAdmin.compact(tableName, columnFamily, compactType));
} }
@Override @Override
@ -264,13 +266,14 @@ class AsyncHBaseAdmin implements AsyncAdmin {
} }
@Override @Override
public CompletableFuture<Void> majorCompact(TableName tableName) { public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
return wrap(rawAdmin.majorCompact(tableName)); return wrap(rawAdmin.majorCompact(tableName, compactType));
} }
@Override @Override
public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily) { public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
return wrap(rawAdmin.majorCompact(tableName, columnFamily)); CompactType compactType) {
return wrap(rawAdmin.majorCompact(tableName, columnFamily, compactType));
} }
@Override @Override
@ -632,8 +635,9 @@ class AsyncHBaseAdmin implements AsyncAdmin {
} }
@Override @Override
public CompletableFuture<CompactionState> getCompactionState(TableName tableName) { public CompletableFuture<CompactionState> getCompactionState(
return wrap(rawAdmin.getCompactionState(tableName)); TableName tableName, CompactType compactType) {
return wrap(rawAdmin.getCompactionState(tableName, compactType));
} }
@Override @Override

View File

@ -1282,8 +1282,8 @@ public class HBaseAdmin implements Admin {
CompactType compactType) throws IOException { CompactType compactType) throws IOException {
switch (compactType) { switch (compactType) {
case MOB: case MOB:
compact(this.connection.getAdminForMaster(), getMobRegionInfo(tableName), major, compact(this.connection.getAdminForMaster(), RegionInfo.createMobRegionInfo(tableName),
columnFamily); major, columnFamily);
break; break;
case NORMAL: case NORMAL:
checkTableExists(tableName); checkTableExists(tableName);
@ -3240,7 +3240,7 @@ public class HBaseAdmin implements Admin {
new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() { new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
@Override @Override
public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception { public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception {
RegionInfo info = getMobRegionInfo(tableName); RegionInfo info = RegionInfo.createMobRegionInfo(tableName);
GetRegionInfoRequest request = GetRegionInfoRequest request =
RequestConverter.buildGetRegionInfoRequest(info.getRegionName(), true); RequestConverter.buildGetRegionInfoRequest(info.getRegionName(), true);
GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request); GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request);
@ -3304,7 +3304,7 @@ public class HBaseAdmin implements Admin {
} }
break; break;
default: default:
throw new IllegalArgumentException("Unknowne compactType: " + compactType); throw new IllegalArgumentException("Unknown compactType: " + compactType);
} }
if (state != null) { if (state != null) {
return ProtobufUtil.createCompactionState(state); return ProtobufUtil.createCompactionState(state);
@ -3839,11 +3839,6 @@ public class HBaseAdmin implements Admin {
}); });
} }
private RegionInfo getMobRegionInfo(TableName tableName) {
return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(".mob")).setRegionId(0)
.build();
}
private RpcControllerFactory getRpcControllerFactory() { private RpcControllerFactory getRpcControllerFactory() {
return this.rpcControllerFactory; return this.rpcControllerFactory;
} }

View File

@ -842,15 +842,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
} }
@Override @Override
public CompletableFuture<Void> compact(TableName tableName) { public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
return compact(tableName, null, false, CompactType.NORMAL); return compact(tableName, null, false, compactType);
} }
@Override @Override
public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily) { public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
Preconditions.checkNotNull(columnFamily, CompactType compactType) {
"columnFamily is null. If you don't specify a columnFamily, use compact(TableName) instead"); Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
return compact(tableName, columnFamily, false, CompactType.NORMAL); + "If you don't specify a columnFamily, use compact(TableName) instead");
return compact(tableName, columnFamily, false, compactType);
} }
@Override @Override
@ -866,15 +867,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
} }
@Override @Override
public CompletableFuture<Void> majorCompact(TableName tableName) { public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
return compact(tableName, null, true, CompactType.NORMAL); return compact(tableName, null, true, compactType);
} }
@Override @Override
public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily) { public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
CompactType compactType) {
Preconditions.checkNotNull(columnFamily, "columnFamily is null." Preconditions.checkNotNull(columnFamily, "columnFamily is null."
+ " If you don't specify a columnFamily, use majorCompact(TableName) instead"); + "If you don't specify a columnFamily, use compact(TableName) instead");
return compact(tableName, columnFamily, true, CompactType.NORMAL); return compact(tableName, columnFamily, true, compactType);
} }
@Override @Override
@ -926,6 +928,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily, private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
boolean major) { boolean major) {
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
getRegionLocation(regionName).whenComplete( getRegionLocation(regionName).whenComplete(
(location, err) -> { (location, err) -> {
if (err != null) { if (err != null) {
@ -981,31 +984,51 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
/** /**
* Compact column family of a table, Asynchronous operation even if CompletableFuture.get() * Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
*/ */
private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major, private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
CompactType compactType) { boolean major, CompactType compactType) {
if (CompactType.MOB.equals(compactType)) {
// TODO support MOB compact.
return failedFuture(new UnsupportedOperationException("MOB compact does not support"));
}
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
if (err != null) { switch (compactType) {
future.completeExceptionally(err); case MOB:
return; connection.registry.getMasterAddress().whenComplete((serverName, err) -> {
} if (err != null) {
CompletableFuture<?>[] compactFutures = locations.stream().filter(l -> l.getRegion() != null) future.completeExceptionally(err);
.filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null) return;
.map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily)) }
.toArray(CompletableFuture<?>[]::new); RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
// future complete unless all of the compact futures are completed. compact(serverName, regionInfo, major, columnFamily)
CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> { .whenComplete((ret, err2) -> {
if (err2 != null) { if (err2 != null) {
future.completeExceptionally(err2); future.completeExceptionally(err2);
} else { } else {
future.complete(ret); future.complete(ret);
} }
}); });
}); });
break;
case NORMAL:
getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
CompletableFuture<?>[] compactFutures = locations.stream().filter(l -> l.getRegion() != null)
.filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
.map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
.toArray(CompletableFuture<?>[]::new);
// future complete unless all of the compact futures are completed.
CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}
});
});
break;
default:
throw new IllegalArgumentException("Unknown compactType: " + compactType);
}
return future; return future;
} }
@ -2741,64 +2764,99 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
} }
@Override @Override
public CompletableFuture<CompactionState> getCompactionState(TableName tableName) { public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
CompactType compactType) {
CompletableFuture<CompactionState> future = new CompletableFuture<>(); CompletableFuture<CompactionState> future = new CompletableFuture<>();
getTableHRegionLocations(tableName).whenComplete(
(locations, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
List<CompactionState> regionStates = new ArrayList<>();
List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
locations.stream().filter(loc -> loc.getServerName() != null)
.filter(loc -> loc.getRegion() != null)
.filter(loc -> !loc.getRegion().isOffline())
.map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
// If any region compaction state is MAJOR_AND_MINOR
// the table compaction state is MAJOR_AND_MINOR, too.
if (err2 != null) {
future.completeExceptionally(err2);
} else if (regionState == CompactionState.MAJOR_AND_MINOR) {
future.complete(regionState); switch (compactType) {
} else { case MOB:
regionStates.add(regionState); connection.registry.getMasterAddress().whenComplete((serverName, err) -> {
} if (err != null) {
})); future.completeExceptionally(err);
}); return;
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])) }
.whenComplete((ret, err3) -> { RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
// If future not completed, check all regions's compaction state
if (!future.isCompletedExceptionally() && !future.isDone()) { this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName).action(
CompactionState state = CompactionState.NONE; (controller, stub) -> this
for (CompactionState regionState : regionStates) { .<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
switch (regionState) { controller, stub,
case MAJOR: RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
if (state == CompactionState.MINOR) { (s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)
future.complete(CompactionState.MAJOR_AND_MINOR); ).call().whenComplete((resp2, err2) -> {
} else { if (err2 != null) {
state = CompactionState.MAJOR; future.completeExceptionally(err2);
} } else {
break; if (resp2.hasCompactionState()) {
case MINOR: future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
if (state == CompactionState.MAJOR) { } else {
future.complete(CompactionState.MAJOR_AND_MINOR); future.complete(CompactionState.NONE);
} else {
state = CompactionState.MINOR;
}
break;
case NONE:
default:
}
if (!future.isDone()) {
future.complete(state);
}
}
} }
}); }
}); });
});
break;
case NORMAL:
getTableHRegionLocations(tableName).whenComplete(
(locations, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
List<CompactionState> regionStates = new ArrayList<>();
List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
locations.stream().filter(loc -> loc.getServerName() != null)
.filter(loc -> loc.getRegion() != null)
.filter(loc -> !loc.getRegion().isOffline())
.map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
// If any region compaction state is MAJOR_AND_MINOR
// the table compaction state is MAJOR_AND_MINOR, too.
if (err2 != null) {
future.completeExceptionally(err2);
} else if (regionState == CompactionState.MAJOR_AND_MINOR) {
future.complete(regionState);
} else {
regionStates.add(regionState);
}
}));
});
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
.whenComplete((ret, err3) -> {
// If future not completed, check all regions's compaction state
if (!future.isCompletedExceptionally() && !future.isDone()) {
CompactionState state = CompactionState.NONE;
for (CompactionState regionState : regionStates) {
switch (regionState) {
case MAJOR:
if (state == CompactionState.MINOR) {
future.complete(CompactionState.MAJOR_AND_MINOR);
} else {
state = CompactionState.MAJOR;
}
break;
case MINOR:
if (state == CompactionState.MAJOR) {
future.complete(CompactionState.MAJOR_AND_MINOR);
} else {
state = CompactionState.MINOR;
}
break;
case NONE:
default:
}
if (!future.isDone()) {
future.complete(state);
}
}
}
});
});
break;
default:
throw new IllegalArgumentException("Unknown compactType: " + compactType);
}
return future; return future;
} }

View File

@ -572,6 +572,17 @@ public interface RegionInfo {
return b; return b;
} }
/**
* Creates a RegionInfo object for MOB data.
*
* @param tableName the name of the table
* @return the MOB {@link RegionInfo}.
*/
static RegionInfo createMobRegionInfo(TableName tableName) {
return RegionInfoBuilder.newBuilder(tableName)
.setStartKey(Bytes.toBytes(".mob")).setRegionId(0).build();
}
/** /**
* Separate elements of a regionName. * Separate elements of a regionName.
* @param regionName * @param regionName

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert; import org.junit.Assert;
@ -422,6 +424,42 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
assertEquals(count, 2); assertEquals(count, 2);
} }
private void waitUntilMobCompactionFinished(TableName tableName)
throws ExecutionException, InterruptedException {
long finished = EnvironmentEdgeManager.currentTime() + 60000;
CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
while (EnvironmentEdgeManager.currentTime() < finished) {
if (state == CompactionState.NONE) {
break;
}
Thread.sleep(10);
state = admin.getCompactionState(tableName, CompactType.MOB).get();
}
assertEquals(CompactionState.NONE, state);
}
@Test
public void testCompactMob() throws Exception {
ColumnFamilyDescriptor columnDescriptor =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob"))
.setMobEnabled(true).setMobThreshold(0).build();
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.addColumnFamily(columnDescriptor).build();
admin.createTable(tableDescriptor).get();
byte[][] families = { Bytes.toBytes("mob") };
loadData(tableName, families, 3000, 8);
admin.majorCompact(tableName, CompactType.MOB).get();
CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
assertNotEquals(CompactionState.NONE, state);
waitUntilMobCompactionFinished(tableName);
}
@Test @Test
public void testCompactRegionServer() throws Exception { public void testCompactRegionServer() throws Exception {
byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") }; byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };