HBASE-24647 Rewrite MetaTableAccessor.multiMutate to remove the deprecated coprocessorService call (#1989)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2020-07-01 08:06:29 +08:00 committed by GitHub
parent 2c43f6cde7
commit 8c4d7618b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 94 additions and 121 deletions

View File

@ -32,10 +32,12 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell.Type; import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.ClientMetaTableAccessor.QueryType; import org.apache.hadoop.hbase.ClientMetaTableAccessor.QueryType;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Consistency;
@ -52,18 +54,16 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -71,7 +71,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@ -1031,50 +1030,48 @@ public final class MetaTableAccessor {
*/ */
public static void mergeRegions(Connection connection, RegionInfo mergedRegion, public static void mergeRegions(Connection connection, RegionInfo mergedRegion,
Map<RegionInfo, Long> parentSeqNum, ServerName sn, int regionReplication) throws IOException { Map<RegionInfo, Long> parentSeqNum, ServerName sn, int regionReplication) throws IOException {
try (Table meta = getMetaHTable(connection)) { long time = HConstants.LATEST_TIMESTAMP;
long time = HConstants.LATEST_TIMESTAMP; List<Mutation> mutations = new ArrayList<>();
List<Mutation> mutations = new ArrayList<>(); List<RegionInfo> replicationParents = new ArrayList<>();
List<RegionInfo> replicationParents = new ArrayList<>(); for (Map.Entry<RegionInfo, Long> e : parentSeqNum.entrySet()) {
for (Map.Entry<RegionInfo, Long> e : parentSeqNum.entrySet()) { RegionInfo ri = e.getKey();
RegionInfo ri = e.getKey(); long seqNum = e.getValue();
long seqNum = e.getValue(); // Deletes for merging regions
// Deletes for merging regions mutations.add(makeDeleteFromRegionInfo(ri, time));
mutations.add(makeDeleteFromRegionInfo(ri, time)); if (seqNum > 0) {
if (seqNum > 0) { mutations.add(makePutForReplicationBarrier(ri, seqNum, time));
mutations.add(makePutForReplicationBarrier(ri, seqNum, time)); replicationParents.add(ri);
replicationParents.add(ri);
}
} }
// Put for parent
Put putOfMerged = makePutFromRegionInfo(mergedRegion, time);
putOfMerged = addMergeRegions(putOfMerged, parentSeqNum.keySet());
// Set initial state to CLOSED.
// NOTE: If initial state is not set to CLOSED then merged region gets added with the
// default OFFLINE state. If Master gets restarted after this step, start up sequence of
// master tries to assign this offline region. This is followed by re-assignments of the
// merged region from resumed {@link MergeTableRegionsProcedure}
addRegionStateToPut(putOfMerged, RegionState.State.CLOSED);
mutations.add(putOfMerged);
// The merged is a new region, openSeqNum = 1 is fine. ServerName may be null
// if crash after merge happened but before we got to here.. means in-memory
// locations of offlined merged, now-closed, regions is lost. Should be ok. We
// assign the merged region later.
if (sn != null) {
addLocation(putOfMerged, sn, 1, mergedRegion.getReplicaId());
}
// Add empty locations for region replicas of the merged region so that number of replicas
// can be cached whenever the primary region is looked up from meta
for (int i = 1; i < regionReplication; i++) {
addEmptyLocation(putOfMerged, i);
}
// add parent reference for serial replication
if (!replicationParents.isEmpty()) {
addReplicationParent(putOfMerged, replicationParents);
}
byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() + HConstants.DELIMITER);
multiMutate(meta, tableRow, mutations);
} }
// Put for parent
Put putOfMerged = makePutFromRegionInfo(mergedRegion, time);
putOfMerged = addMergeRegions(putOfMerged, parentSeqNum.keySet());
// Set initial state to CLOSED.
// NOTE: If initial state is not set to CLOSED then merged region gets added with the
// default OFFLINE state. If Master gets restarted after this step, start up sequence of
// master tries to assign this offline region. This is followed by re-assignments of the
// merged region from resumed {@link MergeTableRegionsProcedure}
addRegionStateToPut(putOfMerged, RegionState.State.CLOSED);
mutations.add(putOfMerged);
// The merged is a new region, openSeqNum = 1 is fine. ServerName may be null
// if crash after merge happened but before we got to here.. means in-memory
// locations of offlined merged, now-closed, regions is lost. Should be ok. We
// assign the merged region later.
if (sn != null) {
addLocation(putOfMerged, sn, 1, mergedRegion.getReplicaId());
}
// Add empty locations for region replicas of the merged region so that number of replicas
// can be cached whenever the primary region is looked up from meta
for (int i = 1; i < regionReplication; i++) {
addEmptyLocation(putOfMerged, i);
}
// add parent reference for serial replication
if (!replicationParents.isEmpty()) {
addReplicationParent(putOfMerged, replicationParents);
}
byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() + HConstants.DELIMITER);
multiMutate(connection, tableRow, mutations);
} }
/** /**
@ -1091,42 +1088,40 @@ public final class MetaTableAccessor {
*/ */
public static void splitRegion(Connection connection, RegionInfo parent, long parentOpenSeqNum, public static void splitRegion(Connection connection, RegionInfo parent, long parentOpenSeqNum,
RegionInfo splitA, RegionInfo splitB, ServerName sn, int regionReplication) throws IOException { RegionInfo splitA, RegionInfo splitB, ServerName sn, int regionReplication) throws IOException {
try (Table meta = getMetaHTable(connection)) { long time = EnvironmentEdgeManager.currentTime();
long time = EnvironmentEdgeManager.currentTime(); // Put for parent
// Put for parent Put putParent = makePutFromRegionInfo(
Put putParent = makePutFromRegionInfo( RegionInfoBuilder.newBuilder(parent).setOffline(true).setSplit(true).build(), time);
RegionInfoBuilder.newBuilder(parent).setOffline(true).setSplit(true).build(), time); addDaughtersToPut(putParent, splitA, splitB);
addDaughtersToPut(putParent, splitA, splitB);
// Puts for daughters // Puts for daughters
Put putA = makePutFromRegionInfo(splitA, time); Put putA = makePutFromRegionInfo(splitA, time);
Put putB = makePutFromRegionInfo(splitB, time); Put putB = makePutFromRegionInfo(splitB, time);
if (parentOpenSeqNum > 0) { if (parentOpenSeqNum > 0) {
addReplicationBarrier(putParent, parentOpenSeqNum); addReplicationBarrier(putParent, parentOpenSeqNum);
addReplicationParent(putA, Collections.singletonList(parent)); addReplicationParent(putA, Collections.singletonList(parent));
addReplicationParent(putB, Collections.singletonList(parent)); addReplicationParent(putB, Collections.singletonList(parent));
}
// Set initial state to CLOSED
// NOTE: If initial state is not set to CLOSED then daughter regions get added with the
// default OFFLINE state. If Master gets restarted after this step, start up sequence of
// master tries to assign these offline regions. This is followed by re-assignments of the
// daughter regions from resumed {@link SplitTableRegionProcedure}
addRegionStateToPut(putA, RegionState.State.CLOSED);
addRegionStateToPut(putB, RegionState.State.CLOSED);
addSequenceNum(putA, 1, splitA.getReplicaId()); // new regions, openSeqNum = 1 is fine.
addSequenceNum(putB, 1, splitB.getReplicaId());
// Add empty locations for region replicas of daughters so that number of replicas can be
// cached whenever the primary region is looked up from meta
for (int i = 1; i < regionReplication; i++) {
addEmptyLocation(putA, i);
addEmptyLocation(putB, i);
}
byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
multiMutate(meta, tableRow, putParent, putA, putB);
} }
// Set initial state to CLOSED
// NOTE: If initial state is not set to CLOSED then daughter regions get added with the
// default OFFLINE state. If Master gets restarted after this step, start up sequence of
// master tries to assign these offline regions. This is followed by re-assignments of the
// daughter regions from resumed {@link SplitTableRegionProcedure}
addRegionStateToPut(putA, RegionState.State.CLOSED);
addRegionStateToPut(putB, RegionState.State.CLOSED);
addSequenceNum(putA, 1, splitA.getReplicaId()); // new regions, openSeqNum = 1 is fine.
addSequenceNum(putB, 1, splitB.getReplicaId());
// Add empty locations for region replicas of daughters so that number of replicas can be
// cached whenever the primary region is looked up from meta
for (int i = 1; i < regionReplication; i++) {
addEmptyLocation(putA, i);
addEmptyLocation(putB, i);
}
byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
multiMutate(connection, tableRow, Arrays.asList(putParent, putA, putB));
} }
/** /**
@ -1163,56 +1158,34 @@ public final class MetaTableAccessor {
deleteFromMetaTable(connection, delete); deleteFromMetaTable(connection, delete);
LOG.info("Deleted table " + table + " state from META"); LOG.info("Deleted table " + table + " state from META");
} }
private static void multiMutate(Table table, byte[] row, Mutation... mutations)
throws IOException {
multiMutate(table, row, Arrays.asList(mutations));
}
/** /**
* Performs an atomic multi-mutate operation against the given table. Used by the likes of merge * Performs an atomic multi-mutate operation against the given table. Used by the likes of merge
* and split as these want to make atomic mutations across multiple rows. * and split as these want to make atomic mutations across multiple rows.
* @throws IOException even if we encounter a RuntimeException, we'll still wrap it in an IOE. * @throws IOException even if we encounter a RuntimeException, we'll still wrap it in an IOE.
*/ */
@VisibleForTesting private static void multiMutate(Connection conn, byte[] row, List<Mutation> mutations)
static void multiMutate(final Table table, byte[] row, final List<Mutation> mutations)
throws IOException { throws IOException {
debugLogMutations(mutations); debugLogMutations(mutations);
Batch.Call<MultiRowMutationService, MutateRowsResponse> callable = instance -> { MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder();
MutateRowsRequest.Builder builder = MutateRowsRequest.newBuilder(); for (Mutation mutation : mutations) {
for (Mutation mutation : mutations) { if (mutation instanceof Put) {
if (mutation instanceof Put) { builder.addMutationRequest(
builder.addMutationRequest( ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, mutation));
ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, mutation)); } else if (mutation instanceof Delete) {
} else if (mutation instanceof Delete) { builder.addMutationRequest(
builder.addMutationRequest( ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.DELETE, mutation));
ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.DELETE, mutation)); } else {
} else { throw new DoNotRetryIOException(
throw new DoNotRetryIOException( "multi in MetaEditor doesn't support " + mutation.getClass().getName());
"multi in MetaEditor doesn't support " + mutation.getClass().getName());
}
} }
ServerRpcController controller = new ServerRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<MutateRowsResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
instance.mutateRows(controller, builder.build(), rpcCallback);
MutateRowsResponse resp = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return resp;
};
try {
table.coprocessorService(MultiRowMutationService.class, row, row, callable);
} catch (Throwable e) {
// Throw if an IOE else wrap in an IOE EVEN IF IT IS a RuntimeException (e.g.
// a RejectedExecutionException because the hosting exception is shutting down.
// This is old behavior worth reexamining. Procedures doing merge or split
// currently don't handle RuntimeExceptions coming up out of meta table edits.
// Would have to work on this at least. See HBASE-23904.
Throwables.throwIfInstanceOf(e, IOException.class);
throw new IOException(e);
} }
MutateRowsRequest request = builder.build();
AsyncTable<?> table = conn.toAsyncConnection().getTable(TableName.META_TABLE_NAME);
CompletableFuture<MutateRowsResponse> future =
table.<MultiRowMutationService, MutateRowsResponse> coprocessorService(
MultiRowMutationService::newStub,
(stub, controller, done) -> stub.mutateRows(controller, request, done), row);
FutureUtils.get(future);
} }
/** /**