HBASE-19242 Add MOB compact support for AsyncAdmin
Signed-off-by: Michael Stack <stack@apache.org> Signed-off-by: Guanghao Zhang <zghaobac@gmail.com>
This commit is contained in:
parent
520b9efc2d
commit
f6582400be
|
@ -871,6 +871,33 @@ public interface Admin extends Abortable, Closeable {
|
|||
void compactRegion(byte[] regionName, byte[] columnFamily)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Compact a table. Asynchronous operation in that this method requests that a
|
||||
* Compaction run and then it returns. It does not wait on the completion of Compaction
|
||||
* (it can take a while).
|
||||
*
|
||||
* @param tableName table to compact
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void compact(TableName tableName, CompactType compactType)
|
||||
throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Compact a column family within a table. Asynchronous operation in that this method
|
||||
* requests that a Compaction run and then it returns. It does not wait on the
|
||||
* completion of Compaction (it can take a while).
|
||||
*
|
||||
* @param tableName table to compact
|
||||
* @param columnFamily column family within a table
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
* @throws IOException if not a mob column family or if a remote or network exception occurs
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void compact(TableName tableName, byte[] columnFamily, CompactType compactType)
|
||||
throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Major compact a table. Asynchronous operation in that this method requests
|
||||
* that a Compaction run and then it returns. It does not wait on the completion of Compaction
|
||||
|
@ -915,6 +942,33 @@ public interface Admin extends Abortable, Closeable {
|
|||
void majorCompactRegion(byte[] regionName, byte[] columnFamily)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Major compact a table. Asynchronous operation in that this method requests that a
|
||||
* Compaction run and then it returns. It does not wait on the completion of Compaction
|
||||
* (it can take a while).
|
||||
*
|
||||
* @param tableName table to compact
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void majorCompact(TableName tableName, CompactType compactType)
|
||||
throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Major compact a column family within a table. Asynchronous operation in that this method requests that a
|
||||
* Compaction run and then it returns. It does not wait on the completion of Compaction
|
||||
* (it can take a while).
|
||||
*
|
||||
* @param tableName table to compact
|
||||
* @param columnFamily column family within a table
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
* @throws IOException if not a mob column family or if a remote or network exception occurs
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void majorCompact(TableName tableName, byte[] columnFamily, CompactType compactType)
|
||||
throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Compact all regions on the region server. Asynchronous operation in that this method requests
|
||||
* that a Compaction run and then it returns. It does not wait on the completion of Compaction (it
|
||||
|
@ -1735,6 +1789,17 @@ public interface Admin extends Abortable, Closeable {
|
|||
*/
|
||||
CompactionState getCompactionState(TableName tableName) throws IOException;
|
||||
|
||||
/**
|
||||
* Get the current compaction state of a table. It could be in a compaction, or none.
|
||||
*
|
||||
* @param tableName table to examine
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
* @return the current compaction state
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
CompactionState getCompactionState(TableName tableName,
|
||||
CompactType compactType) throws IOException;
|
||||
|
||||
/**
|
||||
* Get the current compaction state of region. It could be in a major compaction, a minor
|
||||
* compaction, both, or none.
|
||||
|
@ -2310,71 +2375,6 @@ public interface Admin extends Abortable, Closeable {
|
|||
return getClusterStatus(EnumSet.of(Option.MASTER_INFO_PORT)).getMasterInfoPort();
|
||||
}
|
||||
|
||||
/**
|
||||
* Compact a table. Asynchronous operation in that this method requests that a
|
||||
* Compaction run and then it returns. It does not wait on the completion of Compaction
|
||||
* (it can take a while).
|
||||
*
|
||||
* @param tableName table to compact
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void compact(TableName tableName, CompactType compactType)
|
||||
throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Compact a column family within a table. Asynchronous operation in that this method requests that a
|
||||
* Compaction run and then it returns. It does not wait on the completion of Compaction
|
||||
* (it can take a while).
|
||||
*
|
||||
* @param tableName table to compact
|
||||
* @param columnFamily column family within a table
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
* @throws IOException if not a mob column family or if a remote or network exception occurs
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void compact(TableName tableName, byte[] columnFamily, CompactType compactType)
|
||||
throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Major compact a table. Asynchronous operation in that this method requests that a
|
||||
* Compaction run and then it returns. It does not wait on the completion of Compaction
|
||||
* (it can take a while).
|
||||
*
|
||||
* @param tableName table to compact
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void majorCompact(TableName tableName, CompactType compactType)
|
||||
throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Major compact a column family within a table. Asynchronous operation in that this method requests that a
|
||||
* Compaction run and then it returns. It does not wait on the completion of Compaction
|
||||
* (it can take a while).
|
||||
*
|
||||
* @param tableName table to compact
|
||||
* @param columnFamily column family within a table
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
* @throws IOException if not a mob column family or if a remote or network exception occurs
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void majorCompact(TableName tableName, byte[] columnFamily, CompactType compactType)
|
||||
throws IOException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Get the current compaction state of a table. It could be in a compaction, or none.
|
||||
*
|
||||
* @param tableName table to examine
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
* @return the current compaction state
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
CompactionState getCompactionState(TableName tableName,
|
||||
CompactType compactType) throws IOException;
|
||||
|
||||
/**
|
||||
* Return the set of supported security capabilities.
|
||||
* @throws IOException
|
||||
|
|
|
@ -299,7 +299,9 @@ public interface AsyncAdmin {
|
|||
* was sent to HBase and may need some time to finish the compact operation.
|
||||
* @param tableName table to compact
|
||||
*/
|
||||
CompletableFuture<Void> compact(TableName tableName);
|
||||
default CompletableFuture<Void> compact(TableName tableName) {
|
||||
return compact(tableName, CompactType.NORMAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compact a column family within a table. When the returned CompletableFuture is done, it only
|
||||
|
@ -309,7 +311,28 @@ public interface AsyncAdmin {
|
|||
* @param columnFamily column family within a table. If not present, compact the table's all
|
||||
* column families.
|
||||
*/
|
||||
CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily);
|
||||
default CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily) {
|
||||
return compact(tableName, columnFamily, CompactType.NORMAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compact a table. When the returned CompletableFuture is done, it only means the compact request
|
||||
* was sent to HBase and may need some time to finish the compact operation.
|
||||
* @param tableName table to compact
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
*/
|
||||
CompletableFuture<Void> compact(TableName tableName, CompactType compactType);
|
||||
|
||||
/**
|
||||
* Compact a column family within a table. When the returned CompletableFuture is done, it only
|
||||
* means the compact request was sent to HBase and may need some time to finish the compact
|
||||
* operation.
|
||||
* @param tableName table to compact
|
||||
* @param columnFamily column family within a table
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
*/
|
||||
CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
|
||||
CompactType compactType);
|
||||
|
||||
/**
|
||||
* Compact an individual region. When the returned CompletableFuture is done, it only means the
|
||||
|
@ -333,7 +356,9 @@ public interface AsyncAdmin {
|
|||
* request was sent to HBase and may need some time to finish the compact operation.
|
||||
* @param tableName table to major compact
|
||||
*/
|
||||
CompletableFuture<Void> majorCompact(TableName tableName);
|
||||
default CompletableFuture<Void> majorCompact(TableName tableName) {
|
||||
return majorCompact(tableName, CompactType.NORMAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Major compact a column family within a table. When the returned CompletableFuture is done, it
|
||||
|
@ -343,7 +368,29 @@ public interface AsyncAdmin {
|
|||
* @param columnFamily column family within a table. If not present, major compact the table's all
|
||||
* column families.
|
||||
*/
|
||||
CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily);
|
||||
default CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily) {
|
||||
return majorCompact(tableName, columnFamily, CompactType.NORMAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Major compact a table. When the returned CompletableFuture is done, it only means the compact
|
||||
* request was sent to HBase and may need some time to finish the compact operation.
|
||||
* @param tableName table to major compact
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
*/
|
||||
CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType);
|
||||
|
||||
/**
|
||||
* Major compact a column family within a table. When the returned CompletableFuture is done, it
|
||||
* only means the compact request was sent to HBase and may need some time to finish the compact
|
||||
* operation.
|
||||
* @param tableName table to major compact
|
||||
* @param columnFamily column family within a table. If not present, major compact the table's all
|
||||
* column families.
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
*/
|
||||
CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
|
||||
CompactType compactType);
|
||||
|
||||
/**
|
||||
* Major compact a region. When the returned CompletableFuture is done, it only means the compact
|
||||
|
@ -960,7 +1007,19 @@ public interface AsyncAdmin {
|
|||
* @param tableName table to examine
|
||||
* @return the current compaction state wrapped by a {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<CompactionState> getCompactionState(TableName tableName);
|
||||
default CompletableFuture<CompactionState> getCompactionState(TableName tableName) {
|
||||
return getCompactionState(tableName, CompactType.NORMAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current compaction state of a table. It could be in a major compaction, a minor
|
||||
* compaction, both, or none.
|
||||
* @param tableName table to examine
|
||||
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
|
||||
* @return the current compaction state wrapped by a {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<CompactionState> getCompactionState(TableName tableName,
|
||||
CompactType compactType);
|
||||
|
||||
/**
|
||||
* Get the current compaction state of region. It could be in a major compaction, a minor
|
||||
|
|
|
@ -244,13 +244,15 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> compact(TableName tableName) {
|
||||
return wrap(rawAdmin.compact(tableName));
|
||||
public CompletableFuture<Void> compact(TableName tableName,
|
||||
CompactType compactType) {
|
||||
return wrap(rawAdmin.compact(tableName, compactType));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily) {
|
||||
return wrap(rawAdmin.compact(tableName, columnFamily));
|
||||
public CompletableFuture<Void> compact(TableName tableName,
|
||||
byte[] columnFamily, CompactType compactType) {
|
||||
return wrap(rawAdmin.compact(tableName, columnFamily, compactType));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -264,13 +266,14 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> majorCompact(TableName tableName) {
|
||||
return wrap(rawAdmin.majorCompact(tableName));
|
||||
public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
|
||||
return wrap(rawAdmin.majorCompact(tableName, compactType));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily) {
|
||||
return wrap(rawAdmin.majorCompact(tableName, columnFamily));
|
||||
public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
|
||||
CompactType compactType) {
|
||||
return wrap(rawAdmin.majorCompact(tableName, columnFamily, compactType));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -632,8 +635,9 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CompactionState> getCompactionState(TableName tableName) {
|
||||
return wrap(rawAdmin.getCompactionState(tableName));
|
||||
public CompletableFuture<CompactionState> getCompactionState(
|
||||
TableName tableName, CompactType compactType) {
|
||||
return wrap(rawAdmin.getCompactionState(tableName, compactType));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1282,8 +1282,8 @@ public class HBaseAdmin implements Admin {
|
|||
CompactType compactType) throws IOException {
|
||||
switch (compactType) {
|
||||
case MOB:
|
||||
compact(this.connection.getAdminForMaster(), getMobRegionInfo(tableName), major,
|
||||
columnFamily);
|
||||
compact(this.connection.getAdminForMaster(), RegionInfo.createMobRegionInfo(tableName),
|
||||
major, columnFamily);
|
||||
break;
|
||||
case NORMAL:
|
||||
checkTableExists(tableName);
|
||||
|
@ -3240,7 +3240,7 @@ public class HBaseAdmin implements Admin {
|
|||
new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
|
||||
@Override
|
||||
public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception {
|
||||
RegionInfo info = getMobRegionInfo(tableName);
|
||||
RegionInfo info = RegionInfo.createMobRegionInfo(tableName);
|
||||
GetRegionInfoRequest request =
|
||||
RequestConverter.buildGetRegionInfoRequest(info.getRegionName(), true);
|
||||
GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request);
|
||||
|
@ -3304,7 +3304,7 @@ public class HBaseAdmin implements Admin {
|
|||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknowne compactType: " + compactType);
|
||||
throw new IllegalArgumentException("Unknown compactType: " + compactType);
|
||||
}
|
||||
if (state != null) {
|
||||
return ProtobufUtil.createCompactionState(state);
|
||||
|
@ -3839,11 +3839,6 @@ public class HBaseAdmin implements Admin {
|
|||
});
|
||||
}
|
||||
|
||||
private RegionInfo getMobRegionInfo(TableName tableName) {
|
||||
return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(".mob")).setRegionId(0)
|
||||
.build();
|
||||
}
|
||||
|
||||
private RpcControllerFactory getRpcControllerFactory() {
|
||||
return this.rpcControllerFactory;
|
||||
}
|
||||
|
|
|
@ -842,15 +842,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> compact(TableName tableName) {
|
||||
return compact(tableName, null, false, CompactType.NORMAL);
|
||||
public CompletableFuture<Void> compact(TableName tableName, CompactType compactType) {
|
||||
return compact(tableName, null, false, compactType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily) {
|
||||
Preconditions.checkNotNull(columnFamily,
|
||||
"columnFamily is null. If you don't specify a columnFamily, use compact(TableName) instead");
|
||||
return compact(tableName, columnFamily, false, CompactType.NORMAL);
|
||||
public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
|
||||
CompactType compactType) {
|
||||
Preconditions.checkNotNull(columnFamily, "columnFamily is null. "
|
||||
+ "If you don't specify a columnFamily, use compact(TableName) instead");
|
||||
return compact(tableName, columnFamily, false, compactType);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -866,15 +867,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> majorCompact(TableName tableName) {
|
||||
return compact(tableName, null, true, CompactType.NORMAL);
|
||||
public CompletableFuture<Void> majorCompact(TableName tableName, CompactType compactType) {
|
||||
return compact(tableName, null, true, compactType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily) {
|
||||
public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily,
|
||||
CompactType compactType) {
|
||||
Preconditions.checkNotNull(columnFamily, "columnFamily is null."
|
||||
+ " If you don't specify a columnFamily, use majorCompact(TableName) instead");
|
||||
return compact(tableName, columnFamily, true, CompactType.NORMAL);
|
||||
+ "If you don't specify a columnFamily, use compact(TableName) instead");
|
||||
return compact(tableName, columnFamily, true, compactType);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -926,6 +928,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily,
|
||||
boolean major) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
|
||||
getRegionLocation(regionName).whenComplete(
|
||||
(location, err) -> {
|
||||
if (err != null) {
|
||||
|
@ -981,31 +984,51 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
/**
|
||||
* Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
|
||||
*/
|
||||
private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily, boolean major,
|
||||
CompactType compactType) {
|
||||
if (CompactType.MOB.equals(compactType)) {
|
||||
// TODO support MOB compact.
|
||||
return failedFuture(new UnsupportedOperationException("MOB compact does not support"));
|
||||
}
|
||||
private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily,
|
||||
boolean major, CompactType compactType) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
CompletableFuture<?>[] compactFutures = locations.stream().filter(l -> l.getRegion() != null)
|
||||
.filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
|
||||
.map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
|
||||
.toArray(CompletableFuture<?>[]::new);
|
||||
// future complete unless all of the compact futures are completed.
|
||||
CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else {
|
||||
future.complete(ret);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
switch (compactType) {
|
||||
case MOB:
|
||||
connection.registry.getMasterAddress().whenComplete((serverName, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
|
||||
compact(serverName, regionInfo, major, columnFamily)
|
||||
.whenComplete((ret, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else {
|
||||
future.complete(ret);
|
||||
}
|
||||
});
|
||||
});
|
||||
break;
|
||||
case NORMAL:
|
||||
getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
CompletableFuture<?>[] compactFutures = locations.stream().filter(l -> l.getRegion() != null)
|
||||
.filter(l -> !l.getRegion().isOffline()).filter(l -> l.getServerName() != null)
|
||||
.map(l -> compact(l.getServerName(), l.getRegion(), major, columnFamily))
|
||||
.toArray(CompletableFuture<?>[]::new);
|
||||
// future complete unless all of the compact futures are completed.
|
||||
CompletableFuture.allOf(compactFutures).whenComplete((ret, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else {
|
||||
future.complete(ret);
|
||||
}
|
||||
});
|
||||
});
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown compactType: " + compactType);
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
|
@ -2741,64 +2764,99 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CompactionState> getCompactionState(TableName tableName) {
|
||||
public CompletableFuture<CompactionState> getCompactionState(TableName tableName,
|
||||
CompactType compactType) {
|
||||
CompletableFuture<CompactionState> future = new CompletableFuture<>();
|
||||
getTableHRegionLocations(tableName).whenComplete(
|
||||
(locations, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
List<CompactionState> regionStates = new ArrayList<>();
|
||||
List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
|
||||
locations.stream().filter(loc -> loc.getServerName() != null)
|
||||
.filter(loc -> loc.getRegion() != null)
|
||||
.filter(loc -> !loc.getRegion().isOffline())
|
||||
.map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
|
||||
futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
|
||||
// If any region compaction state is MAJOR_AND_MINOR
|
||||
// the table compaction state is MAJOR_AND_MINOR, too.
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else if (regionState == CompactionState.MAJOR_AND_MINOR) {
|
||||
|
||||
future.complete(regionState);
|
||||
} else {
|
||||
regionStates.add(regionState);
|
||||
}
|
||||
}));
|
||||
});
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
|
||||
.whenComplete((ret, err3) -> {
|
||||
// If future not completed, check all regions's compaction state
|
||||
if (!future.isCompletedExceptionally() && !future.isDone()) {
|
||||
CompactionState state = CompactionState.NONE;
|
||||
for (CompactionState regionState : regionStates) {
|
||||
switch (regionState) {
|
||||
case MAJOR:
|
||||
if (state == CompactionState.MINOR) {
|
||||
future.complete(CompactionState.MAJOR_AND_MINOR);
|
||||
} else {
|
||||
state = CompactionState.MAJOR;
|
||||
}
|
||||
break;
|
||||
case MINOR:
|
||||
if (state == CompactionState.MAJOR) {
|
||||
future.complete(CompactionState.MAJOR_AND_MINOR);
|
||||
} else {
|
||||
state = CompactionState.MINOR;
|
||||
}
|
||||
break;
|
||||
case NONE:
|
||||
default:
|
||||
}
|
||||
if (!future.isDone()) {
|
||||
future.complete(state);
|
||||
}
|
||||
}
|
||||
switch (compactType) {
|
||||
case MOB:
|
||||
connection.registry.getMasterAddress().whenComplete((serverName, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
RegionInfo regionInfo = RegionInfo.createMobRegionInfo(tableName);
|
||||
|
||||
this.<GetRegionInfoResponse> newAdminCaller().serverName(serverName).action(
|
||||
(controller, stub) -> this
|
||||
.<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
|
||||
controller, stub,
|
||||
RequestConverter.buildGetRegionInfoRequest(regionInfo.getRegionName(), true),
|
||||
(s, c, req, done) -> s.getRegionInfo(controller, req, done), resp -> resp)
|
||||
).call().whenComplete((resp2, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else {
|
||||
if (resp2.hasCompactionState()) {
|
||||
future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
|
||||
} else {
|
||||
future.complete(CompactionState.NONE);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
break;
|
||||
case NORMAL:
|
||||
getTableHRegionLocations(tableName).whenComplete(
|
||||
(locations, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
List<CompactionState> regionStates = new ArrayList<>();
|
||||
List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
|
||||
locations.stream().filter(loc -> loc.getServerName() != null)
|
||||
.filter(loc -> loc.getRegion() != null)
|
||||
.filter(loc -> !loc.getRegion().isOffline())
|
||||
.map(loc -> loc.getRegion().getRegionName()).forEach(region -> {
|
||||
futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
|
||||
// If any region compaction state is MAJOR_AND_MINOR
|
||||
// the table compaction state is MAJOR_AND_MINOR, too.
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else if (regionState == CompactionState.MAJOR_AND_MINOR) {
|
||||
future.complete(regionState);
|
||||
} else {
|
||||
regionStates.add(regionState);
|
||||
}
|
||||
}));
|
||||
});
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
|
||||
.whenComplete((ret, err3) -> {
|
||||
// If future not completed, check all regions's compaction state
|
||||
if (!future.isCompletedExceptionally() && !future.isDone()) {
|
||||
CompactionState state = CompactionState.NONE;
|
||||
for (CompactionState regionState : regionStates) {
|
||||
switch (regionState) {
|
||||
case MAJOR:
|
||||
if (state == CompactionState.MINOR) {
|
||||
future.complete(CompactionState.MAJOR_AND_MINOR);
|
||||
} else {
|
||||
state = CompactionState.MAJOR;
|
||||
}
|
||||
break;
|
||||
case MINOR:
|
||||
if (state == CompactionState.MAJOR) {
|
||||
future.complete(CompactionState.MAJOR_AND_MINOR);
|
||||
} else {
|
||||
state = CompactionState.MINOR;
|
||||
}
|
||||
break;
|
||||
case NONE:
|
||||
default:
|
||||
}
|
||||
if (!future.isDone()) {
|
||||
future.complete(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown compactType: " + compactType);
|
||||
}
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
|
|
|
@ -572,6 +572,17 @@ public interface RegionInfo {
|
|||
return b;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a RegionInfo object for MOB data.
|
||||
*
|
||||
* @param tableName the name of the table
|
||||
* @return the MOB {@link RegionInfo}.
|
||||
*/
|
||||
static RegionInfo createMobRegionInfo(TableName tableName) {
|
||||
return RegionInfoBuilder.newBuilder(tableName)
|
||||
.setStartKey(Bytes.toBytes(".mob")).setRegionId(0).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Separate elements of a regionName.
|
||||
* @param regionName
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
|
|||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
|
|||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.Assert;
|
||||
|
@ -422,6 +424,42 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
|
|||
assertEquals(count, 2);
|
||||
}
|
||||
|
||||
private void waitUntilMobCompactionFinished(TableName tableName)
|
||||
throws ExecutionException, InterruptedException {
|
||||
long finished = EnvironmentEdgeManager.currentTime() + 60000;
|
||||
CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
|
||||
while (EnvironmentEdgeManager.currentTime() < finished) {
|
||||
if (state == CompactionState.NONE) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(10);
|
||||
state = admin.getCompactionState(tableName, CompactType.MOB).get();
|
||||
}
|
||||
assertEquals(CompactionState.NONE, state);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactMob() throws Exception {
|
||||
ColumnFamilyDescriptor columnDescriptor =
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob"))
|
||||
.setMobEnabled(true).setMobThreshold(0).build();
|
||||
|
||||
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.addColumnFamily(columnDescriptor).build();
|
||||
|
||||
admin.createTable(tableDescriptor).get();
|
||||
|
||||
byte[][] families = { Bytes.toBytes("mob") };
|
||||
loadData(tableName, families, 3000, 8);
|
||||
|
||||
admin.majorCompact(tableName, CompactType.MOB).get();
|
||||
|
||||
CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
|
||||
assertNotEquals(CompactionState.NONE, state);
|
||||
|
||||
waitUntilMobCompactionFinished(tableName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactRegionServer() throws Exception {
|
||||
byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
|
||||
|
|
Loading…
Reference in New Issue