HBASE-18107: [AMv2] Remove DispatchMergingRegionsRequest & DispatchMergingRegions
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
f391770f47
commit
e9d8a7b6d5
|
@ -1341,13 +1341,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
return stub.mergeTableRegions(controller, request);
|
return stub.mergeTableRegions(controller, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public MasterProtos.DispatchMergingRegionsResponse dispatchMergingRegions(
|
|
||||||
RpcController controller, MasterProtos.DispatchMergingRegionsRequest request)
|
|
||||||
throws ServiceException {
|
|
||||||
return stub.dispatchMergingRegions(controller, request);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MasterProtos.AssignRegionResponse assignRegion(RpcController controller,
|
public MasterProtos.AssignRegionResponse assignRegion(RpcController controller,
|
||||||
MasterProtos.AssignRegionRequest request) throws ServiceException {
|
MasterProtos.AssignRegionRequest request) throws ServiceException {
|
||||||
|
|
|
@ -505,10 +505,4 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
return stub.splitRegion(controller, request);
|
return stub.splitRegion(controller, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public DispatchMergingRegionsResponse dispatchMergingRegions(RpcController controller,
|
|
||||||
DispatchMergingRegionsRequest request) throws ServiceException {
|
|
||||||
return stub.dispatchMergingRegions(controller, request);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,7 +95,6 @@ import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.master.RegionState;
|
import org.apache.hadoop.hbase.master.RegionState;
|
||||||
import org.apache.hadoop.hbase.procedure2.LockInfo;
|
import org.apache.hadoop.hbase.procedure2.LockInfo;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
|
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsRequest;
|
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaScope;
|
import org.apache.hadoop.hbase.quotas.QuotaScope;
|
||||||
import org.apache.hadoop.hbase.quotas.QuotaType;
|
import org.apache.hadoop.hbase.quotas.QuotaType;
|
||||||
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
|
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
|
||||||
|
@ -2024,46 +2023,6 @@ public final class ProtobufUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* A helper to merge regions using admin protocol. Send request to
|
|
||||||
* regionserver.
|
|
||||||
* @param admin
|
|
||||||
* @param region_a
|
|
||||||
* @param region_b
|
|
||||||
* @param forcible true if do a compulsory merge, otherwise we will only merge
|
|
||||||
* two adjacent regions
|
|
||||||
* @param user effective user
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public static void mergeRegions(final RpcController controller,
|
|
||||||
final AdminService.BlockingInterface admin,
|
|
||||||
final HRegionInfo region_a, final HRegionInfo region_b,
|
|
||||||
final boolean forcible, final User user) throws IOException {
|
|
||||||
final MergeRegionsRequest request = ProtobufUtil.buildMergeRegionsRequest(
|
|
||||||
region_a.getRegionName(), region_b.getRegionName(),forcible);
|
|
||||||
if (user != null) {
|
|
||||||
try {
|
|
||||||
user.runAs(new PrivilegedExceptionAction<Void>() {
|
|
||||||
@Override
|
|
||||||
public Void run() throws Exception {
|
|
||||||
admin.mergeRegions(controller, request);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
InterruptedIOException iioe = new InterruptedIOException();
|
|
||||||
iioe.initCause(ie);
|
|
||||||
throw iioe;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
admin.mergeRegions(controller, request);
|
|
||||||
} catch (ServiceException se) {
|
|
||||||
throw ProtobufUtil.getRemoteException(se);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// End helpers for Admin
|
// End helpers for Admin
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -3393,28 +3352,6 @@ public final class ProtobufUtil {
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a MergeRegionsRequest for the given regions
|
|
||||||
* @param regionA name of region a
|
|
||||||
* @param regionB name of region b
|
|
||||||
* @param forcible true if it is a compulsory merge
|
|
||||||
* @return a MergeRegionsRequest
|
|
||||||
*/
|
|
||||||
public static MergeRegionsRequest buildMergeRegionsRequest(
|
|
||||||
final byte[] regionA, final byte[] regionB, final boolean forcible) {
|
|
||||||
MergeRegionsRequest.Builder builder = MergeRegionsRequest.newBuilder();
|
|
||||||
RegionSpecifier regionASpecifier = RequestConverter.buildRegionSpecifier(
|
|
||||||
RegionSpecifierType.REGION_NAME, regionA);
|
|
||||||
RegionSpecifier regionBSpecifier = RequestConverter.buildRegionSpecifier(
|
|
||||||
RegionSpecifierType.REGION_NAME, regionB);
|
|
||||||
builder.setRegionA(regionASpecifier);
|
|
||||||
builder.setRegionB(regionBSpecifier);
|
|
||||||
builder.setForcible(forcible);
|
|
||||||
// send the master's wall clock time as well, so that the RS can refer to it
|
|
||||||
builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime());
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a ServerName from the passed in data bytes.
|
* Get a ServerName from the passed in data bytes.
|
||||||
* @param data Data with a serialize server name in it; can handle the old style
|
* @param data Data with a serialize server name in it; can handle the old style
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -273,22 +273,6 @@ message ExecuteProceduresResponse {
|
||||||
repeated CloseRegionResponse close_region = 2;
|
repeated CloseRegionResponse close_region = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Merges the specified regions.
|
|
||||||
* <p>
|
|
||||||
* This method currently closes the regions and then merges them
|
|
||||||
*/
|
|
||||||
message MergeRegionsRequest {
|
|
||||||
required RegionSpecifier region_a = 1;
|
|
||||||
required RegionSpecifier region_b = 2;
|
|
||||||
optional bool forcible = 3 [default = false];
|
|
||||||
// wall clock time from master
|
|
||||||
optional uint64 master_system_time = 4;
|
|
||||||
}
|
|
||||||
|
|
||||||
message MergeRegionsResponse {
|
|
||||||
}
|
|
||||||
|
|
||||||
service AdminService {
|
service AdminService {
|
||||||
rpc GetRegionInfo(GetRegionInfoRequest)
|
rpc GetRegionInfo(GetRegionInfoRequest)
|
||||||
returns(GetRegionInfoResponse);
|
returns(GetRegionInfoResponse);
|
||||||
|
@ -350,7 +334,4 @@ service AdminService {
|
||||||
|
|
||||||
rpc ExecuteProcedures(ExecuteProceduresRequest)
|
rpc ExecuteProcedures(ExecuteProceduresRequest)
|
||||||
returns(ExecuteProceduresResponse);
|
returns(ExecuteProceduresResponse);
|
||||||
|
|
||||||
rpc MergeRegions(MergeRegionsRequest)
|
|
||||||
returns(MergeRegionsResponse);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,20 +81,6 @@ message MoveRegionRequest {
|
||||||
message MoveRegionResponse {
|
message MoveRegionResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Dispatch merging the specified regions.
|
|
||||||
*/
|
|
||||||
message DispatchMergingRegionsRequest {
|
|
||||||
required RegionSpecifier region_a = 1;
|
|
||||||
required RegionSpecifier region_b = 2;
|
|
||||||
optional bool forcible = 3 [default = false];
|
|
||||||
optional uint64 nonce_group = 4 [default = 0];
|
|
||||||
optional uint64 nonce = 5 [default = 0];
|
|
||||||
}
|
|
||||||
|
|
||||||
message DispatchMergingRegionsResponse {
|
|
||||||
optional uint64 proc_id = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merging the specified regions in a table.
|
* Merging the specified regions in a table.
|
||||||
|
@ -667,10 +653,6 @@ service MasterService {
|
||||||
rpc ModifyColumn(ModifyColumnRequest)
|
rpc ModifyColumn(ModifyColumnRequest)
|
||||||
returns(ModifyColumnResponse);
|
returns(ModifyColumnResponse);
|
||||||
|
|
||||||
/** Master dispatch merging the regions */
|
|
||||||
rpc DispatchMergingRegions(DispatchMergingRegionsRequest)
|
|
||||||
returns(DispatchMergingRegionsResponse);
|
|
||||||
|
|
||||||
/** Move the region region to the destination server. */
|
/** Move the region region to the destination server. */
|
||||||
rpc MoveRegion(MoveRegionRequest)
|
rpc MoveRegion(MoveRegionRequest)
|
||||||
returns(MoveRegionResponse);
|
returns(MoveRegionResponse);
|
||||||
|
|
|
@ -115,7 +115,6 @@ import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
|
||||||
import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
|
import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
|
||||||
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
|
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
|
||||||
import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
|
import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
|
||||||
import org.apache.hadoop.hbase.master.procedure.DispatchMergingRegionsProcedure;
|
|
||||||
import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
|
import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
@ -1530,59 +1529,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
this.catalogJanitorChore.setEnabled(b);
|
this.catalogJanitorChore.setEnabled(b);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long dispatchMergingRegions(
|
|
||||||
final HRegionInfo regionInfoA,
|
|
||||||
final HRegionInfo regionInfoB,
|
|
||||||
final boolean forcible,
|
|
||||||
final long nonceGroup,
|
|
||||||
final long nonce) throws IOException {
|
|
||||||
checkInitialized();
|
|
||||||
|
|
||||||
TableName tableName = regionInfoA.getTable();
|
|
||||||
if (tableName == null || regionInfoB.getTable() == null) {
|
|
||||||
throw new UnknownRegionException ("Can't merge regions without table associated");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!tableName.equals(regionInfoB.getTable())) {
|
|
||||||
throw new IOException ("Cannot merge regions from two different tables");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (regionInfoA.compareTo(regionInfoB) == 0) {
|
|
||||||
throw new MergeRegionException(
|
|
||||||
"Cannot merge a region to itself " + regionInfoA + ", " + regionInfoB);
|
|
||||||
}
|
|
||||||
|
|
||||||
final HRegionInfo [] regionsToMerge = new HRegionInfo[2];
|
|
||||||
regionsToMerge [0] = regionInfoA;
|
|
||||||
regionsToMerge [1] = regionInfoB;
|
|
||||||
|
|
||||||
return MasterProcedureUtil.submitProcedure(
|
|
||||||
new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
|
|
||||||
@Override
|
|
||||||
protected void run() throws IOException {
|
|
||||||
MasterCoprocessorHost mcph = getMaster().getMasterCoprocessorHost();
|
|
||||||
if (mcph != null) {
|
|
||||||
mcph.preDispatchMerge(regionInfoA, regionInfoB);
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG.info(getClientIdAuditPrefix() + " Dispatch merge regions " +
|
|
||||||
regionsToMerge[0].getEncodedName() + " and " + regionsToMerge[1].getEncodedName());
|
|
||||||
|
|
||||||
submitProcedure(new DispatchMergingRegionsProcedure(
|
|
||||||
procedureExecutor.getEnvironment(), tableName, regionsToMerge, forcible));
|
|
||||||
if (mcph != null) {
|
|
||||||
mcph.postDispatchMerge(regionInfoA, regionInfoB);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getDescription() {
|
|
||||||
return "DispatchMergingRegionsProcedure";
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long mergeRegions(
|
public long mergeRegions(
|
||||||
final HRegionInfo[] regionsToMerge,
|
final HRegionInfo[] regionsToMerge,
|
||||||
|
|
|
@ -810,28 +810,6 @@ public class MasterCoprocessorHost
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void preDispatchMerge(final HRegionInfo regionInfoA, final HRegionInfo regionInfoB)
|
|
||||||
throws IOException {
|
|
||||||
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
|
|
||||||
@Override
|
|
||||||
public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
|
|
||||||
throws IOException {
|
|
||||||
oserver.preDispatchMerge(ctx, regionInfoA, regionInfoB);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public void postDispatchMerge(final HRegionInfo regionInfoA, final HRegionInfo regionInfoB)
|
|
||||||
throws IOException {
|
|
||||||
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
|
|
||||||
@Override
|
|
||||||
public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx)
|
|
||||||
throws IOException {
|
|
||||||
oserver.postDispatchMerge(ctx, regionInfoA, regionInfoB);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public void preMergeRegions(final HRegionInfo[] regionsToMerge)
|
public void preMergeRegions(final HRegionInfo[] regionsToMerge)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
|
execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
|
||||||
|
|
|
@ -2009,34 +2009,4 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public DispatchMergingRegionsResponse dispatchMergingRegions(RpcController controller,
|
|
||||||
DispatchMergingRegionsRequest request) throws ServiceException {
|
|
||||||
final byte[] encodedNameOfRegionA = request.getRegionA().getValue().toByteArray();
|
|
||||||
final byte[] encodedNameOfRegionB = request.getRegionB().getValue().toByteArray();
|
|
||||||
if (request.getRegionA().getType() != RegionSpecifierType.ENCODED_REGION_NAME ||
|
|
||||||
request.getRegionB().getType() != RegionSpecifierType.ENCODED_REGION_NAME) {
|
|
||||||
LOG.warn("mergeRegions specifier type: expected: " + RegionSpecifierType.ENCODED_REGION_NAME +
|
|
||||||
" actual: region_a=" +
|
|
||||||
request.getRegionA().getType() + ", region_b=" +
|
|
||||||
request.getRegionB().getType());
|
|
||||||
}
|
|
||||||
RegionStates regionStates = master.getAssignmentManager().getRegionStates();
|
|
||||||
RegionState regionStateA = regionStates.getRegionState(Bytes.toString(encodedNameOfRegionA));
|
|
||||||
RegionState regionStateB = regionStates.getRegionState(Bytes.toString(encodedNameOfRegionB));
|
|
||||||
if (regionStateA == null || regionStateB == null) {
|
|
||||||
throw new ServiceException(new UnknownRegionException(
|
|
||||||
Bytes.toStringBinary(regionStateA == null? encodedNameOfRegionA: encodedNameOfRegionB)));
|
|
||||||
}
|
|
||||||
final HRegionInfo regionInfoA = regionStateA.getRegion();
|
|
||||||
final HRegionInfo regionInfoB = regionStateB.getRegion();
|
|
||||||
try {
|
|
||||||
long procId = master.dispatchMergingRegions(regionInfoA, regionInfoB, request.getForcible(),
|
|
||||||
request.getNonceGroup(), request.getNonce());
|
|
||||||
return DispatchMergingRegionsResponse.newBuilder().setProcId(procId).build();
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
throw new ServiceException(ioe);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -281,23 +281,6 @@ public interface MasterServices extends Server {
|
||||||
final long nonce)
|
final long nonce)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Merge two regions. The real implementation is on the regionserver, master
|
|
||||||
* just move the regions together and send MERGE RPC to regionserver
|
|
||||||
* @param region_a region to merge
|
|
||||||
* @param region_b region to merge
|
|
||||||
* @param forcible true if do a compulsory merge, otherwise we will only merge
|
|
||||||
* two adjacent regions
|
|
||||||
* @return procedure Id
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
long dispatchMergingRegions(
|
|
||||||
final HRegionInfo region_a,
|
|
||||||
final HRegionInfo region_b,
|
|
||||||
final boolean forcible,
|
|
||||||
final long nonceGroup,
|
|
||||||
final long nonce) throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Merge regions in a table.
|
* Merge regions in a table.
|
||||||
* @param regionsToMerge daughter regions to merge
|
* @param regionsToMerge daughter regions to merge
|
||||||
|
|
|
@ -622,37 +622,6 @@ public class ServerManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Sends an MERGE REGIONS RPC to the specified server to merge the specified
|
|
||||||
* regions.
|
|
||||||
* <p>
|
|
||||||
* A region server could reject the close request because it either does not
|
|
||||||
* have the specified region.
|
|
||||||
* @param server server to merge regions
|
|
||||||
* @param region_a region to merge
|
|
||||||
* @param region_b region to merge
|
|
||||||
* @param forcible true if do a compulsory merge, otherwise we will only merge
|
|
||||||
* two adjacent regions
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
|
|
||||||
HRegionInfo region_b, boolean forcible, final User user) throws IOException {
|
|
||||||
if (server == null)
|
|
||||||
throw new NullPointerException("Passed server is null");
|
|
||||||
if (region_a == null || region_b == null)
|
|
||||||
throw new NullPointerException("Passed region is null");
|
|
||||||
AdminService.BlockingInterface admin = getRsAdmin(server);
|
|
||||||
if (admin == null) {
|
|
||||||
throw new IOException("Attempting to send MERGE REGIONS RPC to server "
|
|
||||||
+ server.toString() + " for region "
|
|
||||||
+ region_a.getRegionNameAsString() + ","
|
|
||||||
+ region_b.getRegionNameAsString()
|
|
||||||
+ " failed because no RPC connection found to this server");
|
|
||||||
}
|
|
||||||
HBaseRpcController controller = newRpcController();
|
|
||||||
ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible, user);
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void moveFromOnlineToDeadServers(final ServerName sn) {
|
public void moveFromOnlineToDeadServers(final ServerName sn) {
|
||||||
synchronized (onlineServers) {
|
synchronized (onlineServers) {
|
||||||
|
|
|
@ -1,584 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.master.procedure;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.InterruptedIOException;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.RegionLoad;
|
|
||||||
import org.apache.hadoop.hbase.ServerLoad;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
|
|
||||||
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
|
|
||||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
|
||||||
import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
|
||||||
import org.apache.hadoop.hbase.master.CatalogJanitor;
|
|
||||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
|
||||||
import org.apache.hadoop.hbase.master.RegionState;
|
|
||||||
import org.apache.hadoop.hbase.master.ServerManager;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DispatchMergingRegionsState;
|
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The procedure to Merge a region in a table.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class DispatchMergingRegionsProcedure
|
|
||||||
extends AbstractStateMachineTableProcedure<DispatchMergingRegionsState> {
|
|
||||||
private static final Log LOG = LogFactory.getLog(DispatchMergingRegionsProcedure.class);
|
|
||||||
|
|
||||||
private final AtomicBoolean aborted = new AtomicBoolean(false);
|
|
||||||
private Boolean traceEnabled;
|
|
||||||
private AssignmentManager assignmentManager;
|
|
||||||
private int timeout;
|
|
||||||
private ServerName regionLocation;
|
|
||||||
private String regionsToMergeListFullName;
|
|
||||||
private String regionsToMergeListEncodedName;
|
|
||||||
|
|
||||||
private TableName tableName;
|
|
||||||
private HRegionInfo [] regionsToMerge;
|
|
||||||
private boolean forcible;
|
|
||||||
|
|
||||||
public DispatchMergingRegionsProcedure() {
|
|
||||||
this.traceEnabled = isTraceEnabled();
|
|
||||||
this.assignmentManager = null;
|
|
||||||
this.timeout = -1;
|
|
||||||
this.regionLocation = null;
|
|
||||||
this.regionsToMergeListFullName = null;
|
|
||||||
this.regionsToMergeListEncodedName = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public DispatchMergingRegionsProcedure(
|
|
||||||
final MasterProcedureEnv env,
|
|
||||||
final TableName tableName,
|
|
||||||
final HRegionInfo [] regionsToMerge,
|
|
||||||
final boolean forcible) {
|
|
||||||
super(env);
|
|
||||||
this.traceEnabled = isTraceEnabled();
|
|
||||||
this.assignmentManager = getAssignmentManager(env);
|
|
||||||
this.tableName = tableName;
|
|
||||||
// For now, we only merge 2 regions. It could be extended to more than 2 regions in
|
|
||||||
// the future.
|
|
||||||
assert(regionsToMerge.length == 2);
|
|
||||||
this.regionsToMerge = regionsToMerge;
|
|
||||||
this.forcible = forcible;
|
|
||||||
|
|
||||||
this.timeout = -1;
|
|
||||||
this.regionsToMergeListFullName = getRegionsToMergeListFullNameString();
|
|
||||||
this.regionsToMergeListEncodedName = getRegionsToMergeListEncodedNameString();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Flow executeFromState(
|
|
||||||
final MasterProcedureEnv env,
|
|
||||||
final DispatchMergingRegionsState state) throws InterruptedException {
|
|
||||||
if (isTraceEnabled()) {
|
|
||||||
LOG.trace(this + " execute state=" + state);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
switch (state) {
|
|
||||||
case DISPATCH_MERGING_REGIONS_PREPARE:
|
|
||||||
prepareMergeRegion(env);
|
|
||||||
setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_PRE_OPERATION);
|
|
||||||
break;
|
|
||||||
case DISPATCH_MERGING_REGIONS_PRE_OPERATION:
|
|
||||||
//Unused for now - reserve to add preMerge coprocessor in the future
|
|
||||||
setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS);
|
|
||||||
break;
|
|
||||||
case DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS:
|
|
||||||
if (MoveRegionsToSameRS(env)) {
|
|
||||||
setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS);
|
|
||||||
} else {
|
|
||||||
LOG.info("Cancel merging regions " + getRegionsToMergeListFullNameString()
|
|
||||||
+ ", because can't move them to the same RS");
|
|
||||||
setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_POST_OPERATION);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS:
|
|
||||||
doMergeInRS(env);
|
|
||||||
setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_POST_OPERATION);
|
|
||||||
break;
|
|
||||||
case DISPATCH_MERGING_REGIONS_POST_OPERATION:
|
|
||||||
//Unused for now - reserve to add postCompletedMerge coprocessor in the future
|
|
||||||
return Flow.NO_MORE_STATE;
|
|
||||||
default:
|
|
||||||
throw new UnsupportedOperationException(this + " unhandled state=" + state);
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Error trying to merge regions " + getRegionsToMergeListFullNameString() +
|
|
||||||
" in the table " + tableName + " (in state=" + state + ")", e);
|
|
||||||
|
|
||||||
setFailure("master-merge-regions", e);
|
|
||||||
}
|
|
||||||
return Flow.HAS_MORE_STATE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void rollbackState(
|
|
||||||
final MasterProcedureEnv env,
|
|
||||||
final DispatchMergingRegionsState state) throws IOException, InterruptedException {
|
|
||||||
if (isTraceEnabled()) {
|
|
||||||
LOG.trace(this + " rollback state=" + state);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
switch (state) {
|
|
||||||
case DISPATCH_MERGING_REGIONS_POST_OPERATION:
|
|
||||||
case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS:
|
|
||||||
String msg = this + " We are in the " + state + " state."
|
|
||||||
+ " It is complicated to rollback the merge operation that region server is working on."
|
|
||||||
+ " Rollback is not supported and we should let the merge operation to complete";
|
|
||||||
LOG.warn(msg);
|
|
||||||
// PONR
|
|
||||||
throw new UnsupportedOperationException(this + " unhandled state=" + state);
|
|
||||||
case DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS:
|
|
||||||
break; // nothing to rollback
|
|
||||||
case DISPATCH_MERGING_REGIONS_PRE_OPERATION:
|
|
||||||
break; // nothing to rollback
|
|
||||||
case DISPATCH_MERGING_REGIONS_PREPARE:
|
|
||||||
break; // nothing to rollback
|
|
||||||
default:
|
|
||||||
throw new UnsupportedOperationException(this + " unhandled state=" + state);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
// This will be retried. Unless there is a bug in the code,
|
|
||||||
// this should be just a "temporary error" (e.g. network down)
|
|
||||||
LOG.warn("Failed rollback attempt step " + state + " for merging the regions "
|
|
||||||
+ getRegionsToMergeListFullNameString() + " in table " + tableName, e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected DispatchMergingRegionsState getState(final int stateId) {
|
|
||||||
return DispatchMergingRegionsState.valueOf(stateId);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected int getStateId(final DispatchMergingRegionsState state) {
|
|
||||||
return state.getNumber();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected DispatchMergingRegionsState getInitialState() {
|
|
||||||
return DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_PREPARE;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Check whether we are in the state that can be rollback
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
protected boolean isRollbackSupported(final DispatchMergingRegionsState state) {
|
|
||||||
switch (state) {
|
|
||||||
case DISPATCH_MERGING_REGIONS_POST_OPERATION:
|
|
||||||
case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS:
|
|
||||||
// It is not safe to rollback if we reach to these states.
|
|
||||||
return false;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void serializeStateData(final OutputStream stream) throws IOException {
|
|
||||||
super.serializeStateData(stream);
|
|
||||||
|
|
||||||
MasterProcedureProtos.DispatchMergingRegionsStateData.Builder dispatchMergingRegionsMsg =
|
|
||||||
MasterProcedureProtos.DispatchMergingRegionsStateData.newBuilder()
|
|
||||||
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
|
|
||||||
.setTableName(ProtobufUtil.toProtoTableName(tableName))
|
|
||||||
.setForcible(forcible);
|
|
||||||
for (HRegionInfo hri: regionsToMerge) {
|
|
||||||
dispatchMergingRegionsMsg.addRegionInfo(HRegionInfo.convert(hri));
|
|
||||||
}
|
|
||||||
dispatchMergingRegionsMsg.build().writeDelimitedTo(stream);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deserializeStateData(final InputStream stream) throws IOException {
|
|
||||||
super.deserializeStateData(stream);
|
|
||||||
|
|
||||||
MasterProcedureProtos.DispatchMergingRegionsStateData dispatchMergingRegionsMsg =
|
|
||||||
MasterProcedureProtos.DispatchMergingRegionsStateData.parseDelimitedFrom(stream);
|
|
||||||
setUser(MasterProcedureUtil.toUserInfo(dispatchMergingRegionsMsg.getUserInfo()));
|
|
||||||
tableName = ProtobufUtil.toTableName(dispatchMergingRegionsMsg.getTableName());
|
|
||||||
|
|
||||||
assert(dispatchMergingRegionsMsg.getRegionInfoCount() == 2);
|
|
||||||
regionsToMerge = new HRegionInfo[dispatchMergingRegionsMsg.getRegionInfoCount()];
|
|
||||||
for (int i = 0; i < regionsToMerge.length; i++) {
|
|
||||||
regionsToMerge[i] = HRegionInfo.convert(dispatchMergingRegionsMsg.getRegionInfo(i));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void toStringClassDetails(StringBuilder sb) {
|
|
||||||
sb.append(getClass().getSimpleName());
|
|
||||||
sb.append(" (table=");
|
|
||||||
sb.append(tableName);
|
|
||||||
sb.append(" regions=");
|
|
||||||
sb.append(getRegionsToMergeListFullNameString());
|
|
||||||
sb.append(" forcible=");
|
|
||||||
sb.append(forcible);
|
|
||||||
sb.append(")");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected LockState acquireLock(final MasterProcedureEnv env) {
|
|
||||||
if (!getTableName().isSystemTable() && env.waitInitialized(this)) {
|
|
||||||
return LockState.LOCK_EVENT_WAIT;
|
|
||||||
}
|
|
||||||
if (env.getProcedureScheduler().waitRegions(this, getTableName(), regionsToMerge)) {
|
|
||||||
return LockState.LOCK_EVENT_WAIT;
|
|
||||||
}
|
|
||||||
return LockState.LOCK_ACQUIRED;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void releaseLock(final MasterProcedureEnv env) {
|
|
||||||
env.getProcedureScheduler().wakeRegions(this, getTableName(), regionsToMerge[0], regionsToMerge[1]);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TableName getTableName() {
|
|
||||||
return tableName;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TableOperationType getTableOperationType() {
|
|
||||||
return TableOperationType.REGION_MERGE;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Prepare merge and do some check
|
|
||||||
* @param env MasterProcedureEnv
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void prepareMergeRegion(final MasterProcedureEnv env) throws IOException {
|
|
||||||
// Note: the following logic assumes that we only have 2 regions to merge. In the future,
|
|
||||||
// if we want to extend to more than 2 regions, the code needs to modify a little bit.
|
|
||||||
//
|
|
||||||
CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor();
|
|
||||||
boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]);
|
|
||||||
if (regionAHasMergeQualifier
|
|
||||||
|| !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) {
|
|
||||||
String msg = "Skip merging regions " + regionsToMerge[0].getRegionNameAsString()
|
|
||||||
+ ", " + regionsToMerge[1].getRegionNameAsString() + ", because region "
|
|
||||||
+ (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1]
|
|
||||||
.getEncodedName()) + " has merge qualifier";
|
|
||||||
LOG.info(msg);
|
|
||||||
throw new MergeRegionException(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
RegionStates regionStates = getAssignmentManager(env).getRegionStates();
|
|
||||||
RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName());
|
|
||||||
RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName());
|
|
||||||
if (regionStateA == null || regionStateB == null) {
|
|
||||||
throw new UnknownRegionException(
|
|
||||||
regionStateA == null ?
|
|
||||||
regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!regionStateA.isOpened() || !regionStateB.isOpened()) {
|
|
||||||
throw new MergeRegionException(
|
|
||||||
"Unable to merge regions not online " + regionStateA + ", " + regionStateB);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (regionsToMerge[0].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
|
|
||||||
regionsToMerge[1].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
|
|
||||||
throw new MergeRegionException("Can't merge non-default replicas");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!forcible && !HRegionInfo.areAdjacent(regionsToMerge[0], regionsToMerge[1])) {
|
|
||||||
throw new MergeRegionException(
|
|
||||||
"Unable to merge not adjacent regions "
|
|
||||||
+ regionsToMerge[0].getRegionNameAsString() + ", "
|
|
||||||
+ regionsToMerge[1].getRegionNameAsString()
|
|
||||||
+ " where forcible = " + forcible);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Move all regions to the same region server
|
|
||||||
* @param env MasterProcedureEnv
|
|
||||||
* @return whether target regions hosted by the same RS
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private boolean MoveRegionsToSameRS(final MasterProcedureEnv env) throws IOException {
|
|
||||||
// Make sure regions are on the same regionserver before send merge
|
|
||||||
// regions request to region server.
|
|
||||||
//
|
|
||||||
boolean onSameRS = isRegionsOnTheSameServer(env);
|
|
||||||
if (!onSameRS) {
|
|
||||||
// Note: the following logic assumes that we only have 2 regions to merge. In the future,
|
|
||||||
// if we want to extend to more than 2 regions, the code needs to modify a little bit.
|
|
||||||
//
|
|
||||||
RegionStates regionStates = getAssignmentManager(env).getRegionStates();
|
|
||||||
ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
|
|
||||||
|
|
||||||
RegionLoad loadOfRegionA = getRegionLoad(env, regionLocation, regionsToMerge[0]);
|
|
||||||
RegionLoad loadOfRegionB = getRegionLoad(env, regionLocation2, regionsToMerge[1]);
|
|
||||||
if (loadOfRegionA != null && loadOfRegionB != null
|
|
||||||
&& loadOfRegionA.getRequestsCount() < loadOfRegionB.getRequestsCount()) {
|
|
||||||
// switch regionsToMerge[0] and regionsToMerge[1]
|
|
||||||
HRegionInfo tmpRegion = this.regionsToMerge[0];
|
|
||||||
this.regionsToMerge[0] = this.regionsToMerge[1];
|
|
||||||
this.regionsToMerge[1] = tmpRegion;
|
|
||||||
ServerName tmpLocation = regionLocation;
|
|
||||||
regionLocation = regionLocation2;
|
|
||||||
regionLocation2 = tmpLocation;
|
|
||||||
}
|
|
||||||
|
|
||||||
long startTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
|
|
||||||
RegionPlan regionPlan = new RegionPlan(regionsToMerge[1], regionLocation2, regionLocation);
|
|
||||||
LOG.info("Moving regions to same server for merge: " + regionPlan.toString());
|
|
||||||
getAssignmentManager(env).moveAsync(regionPlan);
|
|
||||||
do {
|
|
||||||
try {
|
|
||||||
Thread.sleep(20);
|
|
||||||
// Make sure check RIT first, then get region location, otherwise
|
|
||||||
// we would make a wrong result if region is online between getting
|
|
||||||
// region location and checking RIT
|
|
||||||
boolean isRIT = regionStates.isRegionInTransition(regionsToMerge[1]);
|
|
||||||
regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]);
|
|
||||||
onSameRS = regionLocation.equals(regionLocation2);
|
|
||||||
if (onSameRS || !isRIT) {
|
|
||||||
// Regions are on the same RS, or regionsToMerge[1] is not in
|
|
||||||
// RegionInTransition any more
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
InterruptedIOException iioe = new InterruptedIOException();
|
|
||||||
iioe.initCause(e);
|
|
||||||
throw iioe;
|
|
||||||
}
|
|
||||||
} while ((EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env));
|
|
||||||
}
|
|
||||||
return onSameRS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Do the real merge operation in the region server that hosts regions
|
|
||||||
* @param env MasterProcedureEnv
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void doMergeInRS(final MasterProcedureEnv env) throws IOException {
|
|
||||||
long duration = 0;
|
|
||||||
long startTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
do {
|
|
||||||
try {
|
|
||||||
if (getServerName(env) == null) {
|
|
||||||
// The merge probably already happen. Check
|
|
||||||
RegionState regionState = getAssignmentManager(env).getRegionStates().getRegionState(
|
|
||||||
regionsToMerge[0].getEncodedName());
|
|
||||||
if (regionState.isMerging() || regionState.isMerged()) {
|
|
||||||
LOG.info("Merge regions " + getRegionsToMergeListEncodedNameString() +
|
|
||||||
" is in progress or completed. No need to send a new request.");
|
|
||||||
} else {
|
|
||||||
LOG.warn("Cannot sending merge to hosting server of the regions " +
|
|
||||||
getRegionsToMergeListEncodedNameString() + " as the server is unknown");
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// TODO: the following RPC call is not idempotent. Multiple calls (eg. after master
|
|
||||||
// failover, re-execute this step) could result in some exception thrown that does not
|
|
||||||
// paint the correct picture. This behavior is on-par with old releases. Improvement
|
|
||||||
// could happen in the future.
|
|
||||||
env.getMasterServices().getServerManager().sendRegionsMerge(
|
|
||||||
getServerName(env),
|
|
||||||
regionsToMerge[0],
|
|
||||||
regionsToMerge[1],
|
|
||||||
forcible,
|
|
||||||
getUser());
|
|
||||||
LOG.info("Sent merge to server " + getServerName(env) + " for region " +
|
|
||||||
getRegionsToMergeListEncodedNameString() + ", forcible=" + forcible);
|
|
||||||
return;
|
|
||||||
} catch (RegionOpeningException roe) {
|
|
||||||
// Do a retry since region should be online on RS immediately
|
|
||||||
LOG.warn("Failed mergering regions in " + getServerName(env) + ", retrying...", roe);
|
|
||||||
} catch (Exception ie) {
|
|
||||||
LOG.warn("Failed sending merge to " + getServerName(env) + " for regions " +
|
|
||||||
getRegionsToMergeListEncodedNameString() + ", forcible=" + forcible, ie);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} while ((duration = EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env));
|
|
||||||
|
|
||||||
// If we reaches here, it means that we get timed out.
|
|
||||||
String msg = "Failed sending merge to " + getServerName(env) + " after " + duration + "ms";
|
|
||||||
LOG.warn(msg);
|
|
||||||
throw new IOException(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
private RegionLoad getRegionLoad(
|
|
||||||
final MasterProcedureEnv env,
|
|
||||||
final ServerName sn,
|
|
||||||
final HRegionInfo hri) {
|
|
||||||
ServerManager serverManager = env.getMasterServices().getServerManager();
|
|
||||||
ServerLoad load = serverManager.getLoad(sn);
|
|
||||||
if (load != null) {
|
|
||||||
Map<byte[], RegionLoad> regionsLoad = load.getRegionsLoad();
|
|
||||||
if (regionsLoad != null) {
|
|
||||||
return regionsLoad.get(hri.getRegionName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The procedure could be restarted from a different machine. If the variable is null, we need to
|
|
||||||
* retrieve it.
|
|
||||||
* @param env MasterProcedureEnv
|
|
||||||
* @return whether target regions hosted by the same RS
|
|
||||||
*/
|
|
||||||
private boolean isRegionsOnTheSameServer(final MasterProcedureEnv env) throws IOException{
|
|
||||||
Boolean onSameRS = true;
|
|
||||||
int i = 0;
|
|
||||||
RegionStates regionStates = getAssignmentManager(env).getRegionStates();
|
|
||||||
regionLocation = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
|
|
||||||
if (regionLocation != null) {
|
|
||||||
for(i = 1; i < regionsToMerge.length; i++) {
|
|
||||||
ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[i]);
|
|
||||||
if (regionLocation2 != null) {
|
|
||||||
if (onSameRS) {
|
|
||||||
onSameRS = regionLocation.equals(regionLocation2);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// At least one region is not online, merge will fail, no need to continue.
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (i == regionsToMerge.length) {
|
|
||||||
// Finish checking all regions, return the result;
|
|
||||||
return onSameRS;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If reaching here, at least one region is not online.
|
|
||||||
String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() +
|
|
||||||
", because region " + regionsToMerge[i].getEncodedName() + " is not online now.";
|
|
||||||
LOG.warn(msg);
|
|
||||||
throw new IOException(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The procedure could be restarted from a different machine. If the variable is null, we need to
|
|
||||||
* retrieve it.
|
|
||||||
* @param env MasterProcedureEnv
|
|
||||||
* @return assignmentManager
|
|
||||||
*/
|
|
||||||
private AssignmentManager getAssignmentManager(final MasterProcedureEnv env) {
|
|
||||||
if (assignmentManager == null) {
|
|
||||||
assignmentManager = env.getMasterServices().getAssignmentManager();
|
|
||||||
}
|
|
||||||
return assignmentManager;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The procedure could be restarted from a different machine. If the variable is null, we need to
|
|
||||||
* retrieve it.
|
|
||||||
* @param env MasterProcedureEnv
|
|
||||||
* @return timeout value
|
|
||||||
*/
|
|
||||||
private int getTimeout(final MasterProcedureEnv env) {
|
|
||||||
if (timeout == -1) {
|
|
||||||
timeout = env.getMasterConfiguration().getInt(
|
|
||||||
"hbase.master.regionmerge.timeout", regionsToMerge.length * 60 * 1000);
|
|
||||||
}
|
|
||||||
return timeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The procedure could be restarted from a different machine. If the variable is null, we need to
|
|
||||||
* retrieve it.
|
|
||||||
* @param env MasterProcedureEnv
|
|
||||||
* @return serverName
|
|
||||||
*/
|
|
||||||
private ServerName getServerName(final MasterProcedureEnv env) {
|
|
||||||
if (regionLocation == null) {
|
|
||||||
regionLocation =
|
|
||||||
getAssignmentManager(env).getRegionStates().getRegionServerOfRegion(regionsToMerge[0]);
|
|
||||||
}
|
|
||||||
return regionLocation;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The procedure could be restarted from a different machine. If the variable is null, we need to
|
|
||||||
* retrieve it.
|
|
||||||
* @param fullName whether return only encoded name
|
|
||||||
* @return region names in a list
|
|
||||||
*/
|
|
||||||
private String getRegionsToMergeListFullNameString() {
|
|
||||||
if (regionsToMergeListFullName == null) {
|
|
||||||
StringBuilder sb = new StringBuilder("[");
|
|
||||||
int i = 0;
|
|
||||||
while(i < regionsToMerge.length - 1) {
|
|
||||||
sb.append(regionsToMerge[i].getRegionNameAsString() + ", ");
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
sb.append(regionsToMerge[i].getRegionNameAsString() + " ]");
|
|
||||||
regionsToMergeListFullName = sb.toString();
|
|
||||||
}
|
|
||||||
return regionsToMergeListFullName;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The procedure could be restarted from a different machine. If the variable is null, we need to
|
|
||||||
* retrieve it.
|
|
||||||
* @return encoded region names
|
|
||||||
*/
|
|
||||||
private String getRegionsToMergeListEncodedNameString() {
|
|
||||||
if (regionsToMergeListEncodedName == null) {
|
|
||||||
StringBuilder sb = new StringBuilder("[");
|
|
||||||
int i = 0;
|
|
||||||
while(i < regionsToMerge.length - 1) {
|
|
||||||
sb.append(regionsToMerge[i].getEncodedName() + ", ");
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
sb.append(regionsToMerge[i].getEncodedName() + " ]");
|
|
||||||
regionsToMergeListEncodedName = sb.toString();
|
|
||||||
}
|
|
||||||
return regionsToMergeListEncodedName;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The procedure could be restarted from a different machine. If the variable is null, we need to
|
|
||||||
* retrieve it.
|
|
||||||
* @return traceEnabled
|
|
||||||
*/
|
|
||||||
private Boolean isTraceEnabled() {
|
|
||||||
if (traceEnabled == null) {
|
|
||||||
traceEnabled = LOG.isTraceEnabled();
|
|
||||||
}
|
|
||||||
return traceEnabled;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -208,20 +208,6 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
|
||||||
return queueLists.toString();
|
return queueLists.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void requestRegionsMerge(final Region a,
|
|
||||||
final Region b, final boolean forcible, long masterSystemTime, User user) {
|
|
||||||
try {
|
|
||||||
mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user));
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
|
|
||||||
+ forcible + ". " + this);
|
|
||||||
}
|
|
||||||
} catch (RejectedExecutionException ree) {
|
|
||||||
LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
|
|
||||||
+ forcible, ree);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized boolean requestSplit(final Region r) {
|
public synchronized boolean requestSplit(final Region r) {
|
||||||
// don't split regions that are blocking
|
// don't split regions that are blocking
|
||||||
if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
|
if (shouldSplitRegion() && ((HRegion)r).getCompactPriority() >= Store.PRIORITY_USER) {
|
||||||
|
|
|
@ -84,7 +84,6 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
||||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
||||||
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
|
|
||||||
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
||||||
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
||||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||||
|
@ -150,8 +149,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerIn
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
|
||||||
|
@ -3490,45 +3487,4 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Merge regions on the region server.
|
|
||||||
*
|
|
||||||
* @param controller the RPC controller
|
|
||||||
* @param request the request
|
|
||||||
* @return merge regions response
|
|
||||||
* @throws ServiceException
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
@QosPriority(priority = HConstants.ADMIN_QOS)
|
|
||||||
// UNUSED AS OF AMv2 PURGE!
|
|
||||||
public MergeRegionsResponse mergeRegions(final RpcController controller,
|
|
||||||
final MergeRegionsRequest request) throws ServiceException {
|
|
||||||
try {
|
|
||||||
checkOpen();
|
|
||||||
requestCount.increment();
|
|
||||||
Region regionA = getRegion(request.getRegionA());
|
|
||||||
Region regionB = getRegion(request.getRegionB());
|
|
||||||
boolean forcible = request.getForcible();
|
|
||||||
long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
|
|
||||||
regionA.startRegionOperation(Operation.MERGE_REGION);
|
|
||||||
regionB.startRegionOperation(Operation.MERGE_REGION);
|
|
||||||
if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
|
|
||||||
regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
|
|
||||||
throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
|
|
||||||
}
|
|
||||||
LOG.info("Receiving merging request for " + regionA + ", " + regionB
|
|
||||||
+ ",forcible=" + forcible);
|
|
||||||
regionA.flush(true);
|
|
||||||
regionB.flush(true);
|
|
||||||
regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
|
|
||||||
masterSystemTime, RpcServer.getRequestUser());
|
|
||||||
return MergeRegionsResponse.newBuilder().build();
|
|
||||||
} catch (DroppedSnapshotException ex) {
|
|
||||||
regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
|
|
||||||
throw new ServiceException(ex);
|
|
||||||
} catch (IOException ie) {
|
|
||||||
throw new ServiceException(ie);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -451,12 +451,6 @@ public class MockNoopMasterServices implements MasterServices, Server {
|
||||||
public void checkIfShouldMoveSystemRegionAsync() {
|
public void checkIfShouldMoveSystemRegionAsync() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long dispatchMergingRegions(HRegionInfo region_a, HRegionInfo region_b, boolean forcible, long nonceGroup,
|
|
||||||
long nonce) throws IOException {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ProcedureEvent getInitializedEvent() {
|
public ProcedureEvent getInitializedEvent() {
|
||||||
// TODO Auto-generated method stub
|
// TODO Auto-generated method stub
|
||||||
|
|
|
@ -69,8 +69,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerIn
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.MergeRegionsResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
|
||||||
|
@ -733,11 +731,4 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public MergeRegionsResponse mergeRegions(RpcController controller, MergeRegionsRequest request)
|
|
||||||
throws ServiceException {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -403,13 +403,6 @@ public class TestReplicator extends TestReplicationBase {
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public MergeRegionsResponse mergeRegions(RpcController controller,
|
|
||||||
MergeRegionsRequest request)
|
|
||||||
throws ServiceException {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public class FailureInjectingReplicatorForTest extends ReplicatorForTest {
|
public class FailureInjectingReplicatorForTest extends ReplicatorForTest {
|
||||||
|
|
Loading…
Reference in New Issue