HBASE-7403 Online Merge (Chunhui shen)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1460306 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
zjushch 2013-03-24 10:26:21 +00:00
parent add3a064c7
commit c7309e82ef
32 changed files with 5378 additions and 470 deletions

View File

@ -1107,4 +1107,27 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
return hris;
}
/**
* Check whether two regions are adjacent
* @param regionA
* @param regionB
* @return true if two regions are adjacent
*/
public static boolean areAdjacent(HRegionInfo regionA, HRegionInfo regionB) {
if (regionA == null || regionB == null) {
throw new IllegalArgumentException(
"Can't check whether adjacent for null region");
}
HRegionInfo a = regionA;
HRegionInfo b = regionB;
if (Bytes.compareTo(a.getStartKey(), b.getStartKey()) > 0) {
a = regionB;
b = regionA;
}
if (Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0) {
return true;
}
return false;
}
}

View File

@ -233,6 +233,38 @@ public class MetaReader {
return (r == null || r.isEmpty())? null: HRegionInfo.getHRegionInfoAndServerName(r);
}
/**
* Gets the result in META for the specified region.
* @param catalogTracker
* @param regionName
* @return result of the specified region
* @throws IOException
*/
public static Result getRegionResult(CatalogTracker catalogTracker,
byte[] regionName) throws IOException {
Get get = new Get(regionName);
get.addFamily(HConstants.CATALOG_FAMILY);
return get(getCatalogHTable(catalogTracker), get);
}
/**
* Get regions from the merge qualifier of the specified merged region
* @return null if it doesn't contain merge qualifier, else two merge regions
* @throws IOException
*/
public static Pair<HRegionInfo, HRegionInfo> getRegionsFromMergeQualifier(
CatalogTracker catalogTracker, byte[] regionName) throws IOException {
Result result = getRegionResult(catalogTracker, regionName);
HRegionInfo mergeA = HRegionInfo.getHRegionInfo(result,
HConstants.MERGEA_QUALIFIER);
HRegionInfo mergeB = HRegionInfo.getHRegionInfo(result,
HConstants.MERGEB_QUALIFIER);
if (mergeA == null && mergeB == null) {
return null;
}
return new Pair<HRegionInfo, HRegionInfo>(mergeA, mergeB);
}
/**
* Checks if the specified table exists. Looks at the META table hosted on
* the specified server.

View File

@ -18,8 +18,18 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -78,6 +88,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumn
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneResponse;
@ -109,17 +120,8 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
/**
* Provides an interface to manage HBase database table metadata + general
@ -1689,6 +1691,38 @@ public class HBaseAdmin implements Abortable, Closeable {
}
}
/**
* Merge two regions. Asynchronous operation.
* @param encodedNameOfRegionA encoded name of region a
* @param encodedNameOfRegionB encoded name of region b
* @param forcible true if do a compulsory merge, otherwise we will only merge
* two adjacent regions
* @throws IOException
*/
public void mergeRegions(final byte[] encodedNameOfRegionA,
final byte[] encodedNameOfRegionB, final boolean forcible)
throws IOException {
MasterAdminKeepAliveConnection master = connection
.getKeepAliveMasterAdmin();
try {
DispatchMergingRegionsRequest request = RequestConverter
.buildDispatchMergingRegionsRequest(encodedNameOfRegionA,
encodedNameOfRegionB, forcible);
master.dispatchMergingRegions(null, request);
} catch (ServiceException se) {
IOException ioe = ProtobufUtil.getRemoteException(se);
if (ioe instanceof UnknownRegionException) {
throw (UnknownRegionException) ioe;
}
LOG.error("Unexpected exception: " + se
+ " from calling HMaster.dispatchMergingRegions");
} catch (DeserializationException de) {
LOG.error("Could not parse destination server name: " + de);
} finally {
master.close();
}
}
/**
* Split a table or an individual region.
* Asynchronous operation.

View File

@ -43,6 +43,8 @@ public enum EventType {
RS_ZK_REGION_SPLITTING (5, null), // RS has started a region split
RS_ZK_REGION_SPLIT (6, ExecutorType.MASTER_SERVER_OPERATIONS), // RS split has completed.
RS_ZK_REGION_FAILED_OPEN (7, ExecutorType.MASTER_CLOSE_REGION), // RS failed to open a region
RS_ZK_REGION_MERGING (8, null), // RS has started merging regions
RS_ZK_REGION_MERGE (9, ExecutorType.MASTER_SERVER_OPERATIONS), // RS region merge has completed.
// Messages originating from Master to RS
M_RS_OPEN_REGION (20, ExecutorType.RS_OPEN_REGION), // Master asking RS to open a region
@ -53,6 +55,7 @@ public enum EventType {
M_RS_CLOSE_META (25, ExecutorType.RS_CLOSE_META), // Master asking RS to close meta
// Messages originating from Client to Master
C_M_MERGE_REGION (30, ExecutorType.MASTER_TABLE_OPERATIONS), // Client asking Master to merge regions
C_M_DELETE_TABLE (40, ExecutorType.MASTER_TABLE_OPERATIONS), // Client asking Master to delete a table
C_M_DISABLE_TABLE (41, ExecutorType.MASTER_TABLE_OPERATIONS), // Client asking Master to disable a table
C_M_ENABLE_TABLE (42, ExecutorType.MASTER_TABLE_OPERATIONS), // Client asking Master to enable a table

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoReque
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
@ -1426,6 +1427,28 @@ public final class ProtobufUtil {
}
}
/**
* A helper to merge regions using admin protocol. Send request to
* regionserver.
* @param admin
* @param region_a
* @param region_b
* @param forcible true if do a compulsory merge, otherwise we will only merge
* two adjacent regions
* @throws IOException
*/
public static void mergeRegions(final AdminProtocol admin,
final HRegionInfo region_a, final HRegionInfo region_b,
final boolean forcible) throws IOException {
MergeRegionsRequest request = RequestConverter.buildMergeRegionsRequest(
region_a.getRegionName(), region_b.getRegionName(),forcible);
try {
admin.mergeRegions(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
// End helpers for Admin
/*

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionReq
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
@ -78,6 +79,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableR
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest;
@ -777,6 +779,26 @@ public final class RequestConverter {
return builder.build();
}
/**
* Create a MergeRegionsRequest for the given regions
* @param regionA name of region a
* @param regionB name of region b
* @param forcible true if it is a compulsory merge
* @return a MergeRegionsRequest
*/
public static MergeRegionsRequest buildMergeRegionsRequest(
final byte[] regionA, final byte[] regionB, final boolean forcible) {
MergeRegionsRequest.Builder builder = MergeRegionsRequest.newBuilder();
RegionSpecifier regionASpecifier = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionA);
RegionSpecifier regionBSpecifier = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionB);
builder.setRegionA(regionASpecifier);
builder.setRegionB(regionBSpecifier);
builder.setForcible(forcible);
return builder.build();
}
/**
* Create a CompactRegionRequest for a given region name
*
@ -936,6 +958,18 @@ public final class RequestConverter {
return builder.build();
}
public static DispatchMergingRegionsRequest buildDispatchMergingRegionsRequest(
final byte[] encodedNameOfRegionA, final byte[] encodedNameOfRegionB,
final boolean forcible) throws DeserializationException {
DispatchMergingRegionsRequest.Builder builder = DispatchMergingRegionsRequest.newBuilder();
builder.setRegionA(buildRegionSpecifier(
RegionSpecifierType.ENCODED_REGION_NAME, encodedNameOfRegionA));
builder.setRegionB(buildRegionSpecifier(
RegionSpecifierType.ENCODED_REGION_NAME, encodedNameOfRegionB));
builder.setForcible(forcible);
return builder.build();
}
/**
* Create a protocol buffer AssignRegionRequest
*

View File

@ -381,6 +381,12 @@ public final class HConstants {
/** The upper-half split region column qualifier */
public static final byte [] SPLITB_QUALIFIER = Bytes.toBytes("splitB");
/** The lower-half merge region column qualifier */
public static final byte[] MERGEA_QUALIFIER = Bytes.toBytes("mergeA");
/** The upper-half merge region column qualifier */
public static final byte[] MERGEB_QUALIFIER = Bytes.toBytes("mergeB");
/**
* The meta table version column qualifier.
* We keep current version of the meta table in this column in <code>-ROOT-</code>

View File

@ -141,6 +141,20 @@ message CompactRegionRequest {
message CompactRegionResponse {
}
/**
* Merges the specified regions.
* <p>
* This method currently closes the regions and then merges them
*/
message MergeRegionsRequest {
required RegionSpecifier regionA = 1;
required RegionSpecifier regionB = 2;
optional bool forcible = 3 [default = false];
}
message MergeRegionsResponse {
}
message UUID {
required uint64 leastSigBits = 1;
required uint64 mostSigBits = 2;
@ -240,6 +254,9 @@ service AdminService {
rpc compactRegion(CompactRegionRequest)
returns(CompactRegionResponse);
rpc mergeRegions(MergeRegionsRequest)
returns(MergeRegionsResponse);
rpc replicateWALEntry(ReplicateWALEntryRequest)
returns(ReplicateWALEntryResponse);

View File

@ -63,6 +63,18 @@ message MoveRegionRequest {
message MoveRegionResponse {
}
/**
* Dispatch merging the specified regions.
*/
message DispatchMergingRegionsRequest {
required RegionSpecifier regionA = 1;
required RegionSpecifier regionB = 2;
optional bool forcible = 3 [default = false];
}
message DispatchMergingRegionsResponse {
}
message AssignRegionRequest {
required RegionSpecifier region = 1;
}
@ -243,6 +255,10 @@ service MasterAdminService {
rpc moveRegion(MoveRegionRequest)
returns(MoveRegionResponse);
/** Master dispatch merging the regions */
rpc dispatchMergingRegions(DispatchMergingRegionsRequest)
returns(DispatchMergingRegionsResponse);
/** Assign a region to a server chosen at random. */
rpc assignRegion(AssignRegionRequest)
returns(AssignRegionResponse);

View File

@ -28,12 +28,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.exceptions.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
@ -65,6 +66,18 @@ public class MetaEditor {
return put;
}
/**
* Generates and returns a Delete containing the region info for the catalog
* table
*/
public static Delete makeDeleteFromRegionInfo(HRegionInfo regionInfo) {
if (regionInfo == null) {
throw new IllegalArgumentException("Can't make a delete for null region");
}
Delete delete = new Delete(regionInfo.getRegionName());
return delete;
}
/**
* Adds split daughters to the Put
*/
@ -260,6 +273,42 @@ public class MetaEditor {
(sn == null? ", serverName=null": ", serverName=" + sn.toString()));
}
/**
* Merge the two regions into one in an atomic operation. Deletes the two
* merging regions in META and adds the merged region with the information of
* two merging regions.
* @param catalogTracker the catalog tracker
* @param mergedRegion the merged region
* @param regionA
* @param regionB
* @param sn the location of the region
* @throws IOException
*/
public static void mergeRegions(final CatalogTracker catalogTracker,
HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB,
ServerName sn) throws IOException {
HTable meta = MetaReader.getMetaHTable(catalogTracker);
HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
// Put for parent
Put putOfMerged = makePutFromRegionInfo(copyOfMerged);
putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER,
regionA.toByteArray());
putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER,
regionB.toByteArray());
// Deletes for merging regions
Delete deleteA = makeDeleteFromRegionInfo(regionA);
Delete deleteB = makeDeleteFromRegionInfo(regionB);
// The merged is a new region, openSeqNum = 1 is fine.
addLocation(putOfMerged, sn, 1);
byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
+ HConstants.DELIMITER);
multiMutate(meta, tableRow, putOfMerged, deleteA, deleteB);
}
/**
* Splits the region into two in an atomic operation. Offlines the parent
* region with the information that it is split into two, and also adds
@ -291,17 +340,24 @@ public class MetaEditor {
addLocation(putB, sn, 1);
byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
multiPut(meta, tableRow, putParent, putA, putB);
multiMutate(meta, tableRow, putParent, putA, putB);
}
/**
* Performs an atomic multi-Put operation against the given table.
* Performs an atomic multi-Mutate operation against the given table.
*/
private static void multiPut(HTable table, byte[] row, Put... puts) throws IOException {
private static void multiMutate(HTable table, byte[] row, Mutation... mutations) throws IOException {
CoprocessorRpcChannel channel = table.coprocessorService(row);
MultiMutateRequest.Builder mmrBuilder = MultiMutateRequest.newBuilder();
for (Put put : puts) {
mmrBuilder.addMutationRequest(ProtobufUtil.toMutation(MutationType.PUT, put));
for (Mutation mutation : mutations) {
if (mutation instanceof Put) {
mmrBuilder.addMutationRequest(ProtobufUtil.toMutation(MutationType.PUT, mutation));
} else if (mutation instanceof Delete) {
mmrBuilder.addMutationRequest(ProtobufUtil.toMutation(MutationType.DELETE, mutation));
} else {
throw new DoNotRetryIOException("multi in MetaEditor doesn't support "
+ mutation.getClass().getName());
}
}
MultiRowMutationService.BlockingInterface service =
@ -456,6 +512,24 @@ public class MetaEditor {
", from parent " + parent.getRegionNameAsString());
}
/**
* Deletes merge qualifiers for the specified merged region.
* @param catalogTracker
* @param mergedRegion
* @throws IOException
*/
public static void deleteMergeQualifiers(CatalogTracker catalogTracker,
final HRegionInfo mergedRegion) throws IOException {
Delete delete = new Delete(mergedRegion.getRegionName());
delete.deleteColumns(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER);
delete.deleteColumns(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER);
deleteFromMetaTable(catalogTracker, delete);
LOG.info("Deleted references in merged region "
+ mergedRegion.getRegionNameAsString() + ", qualifier="
+ Bytes.toStringBinary(HConstants.MERGEA_QUALIFIER) + " and qualifier="
+ Bytes.toStringBinary(HConstants.MERGEB_QUALIFIER));
}
private static Put addRegionInfo(final Put p, final HRegionInfo hri)
throws IOException {
p.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,

View File

@ -39,36 +39,36 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException;
import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
import org.apache.hadoop.hbase.exceptions.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException;
import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
import org.apache.hadoop.hbase.master.handler.MergedRegionHandler;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
import org.apache.hadoop.hbase.exceptions.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
@ -85,6 +85,7 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedHashMultimap;
/**
@ -623,6 +624,24 @@ public class AssignmentManager extends ZooKeeperListener {
// multiple times so if it's still up we will receive an update soon.
}
break;
case RS_ZK_REGION_MERGING:
// nothing to do
LOG.info("Processed region " + regionInfo.getEncodedName()
+ " in state : " + et + " nothing to do.");
break;
case RS_ZK_REGION_MERGE:
if (!serverManager.isServerOnline(sn)) {
// ServerShutdownHandler would handle this region
LOG.warn("Processed region " + regionInfo.getEncodedName()
+ " in state : " + et + " on a dead regionserver: " + sn
+ " doing nothing");
} else {
LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
et + " nothing to do.");
// We don't do anything. The regionserver is supposed to update the znode
// multiple times so if it's still up we will receive an update soon.
}
break;
default:
throw new IllegalStateException("Received region in state :" + et + " is not valid.");
}
@ -783,6 +802,34 @@ public class AssignmentManager extends ZooKeeperListener {
regionState.getRegion(), sn, daughters));
break;
case RS_ZK_REGION_MERGING:
// Merged region is a new region, we can't find it in the region states now.
// Do nothing.
break;
case RS_ZK_REGION_MERGE:
// Assert that we can get a serverinfo for this server.
if (!this.serverManager.isServerOnline(sn)) {
LOG.error("Dropped merge! ServerName=" + sn + " unknown.");
break;
}
// Get merged and merging regions.
byte[] payloadOfMerge = rt.getPayload();
List<HRegionInfo> mergeRegions;
try {
mergeRegions = HRegionInfo.parseDelimitedFrom(payloadOfMerge, 0,
payloadOfMerge.length);
} catch (IOException e) {
LOG.error("Dropped merge! Failed reading merge payload for " +
prettyPrintedRegionName);
break;
}
assert mergeRegions.size() == 3;
// Run handler to do the rest of the MERGE handling.
this.executorService.submit(new MergedRegionHandler(server, this, sn,
mergeRegions));
break;
case M_ZK_REGION_CLOSING:
// Should see CLOSING after we have asked it to CLOSE or additional
// times after already being in state of CLOSING
@ -2056,9 +2103,9 @@ public class AssignmentManager extends ZooKeeperListener {
NodeExistsException nee = (NodeExistsException)e;
String path = nee.getPath();
try {
if (isSplitOrSplitting(path)) {
LOG.debug(path + " is SPLIT or SPLITTING; " +
"skipping unassign because region no longer exists -- its split");
if (isSplitOrSplittingOrMergeOrMerging(path)) {
LOG.debug(path + " is SPLIT or SPLITTING or MERGE or MERGING; " +
"skipping unassign because region no longer exists -- its split or merge");
return;
}
} catch (KeeperException.NoNodeException ke) {
@ -2136,21 +2183,23 @@ public class AssignmentManager extends ZooKeeperListener {
/**
* @param path
* @return True if znode is in SPLIT or SPLITTING state.
* @return True if znode is in SPLIT or SPLITTING or MERGE or MERGING state.
* @throws KeeperException Can happen if the znode went away in meantime.
* @throws DeserializationException
*/
private boolean isSplitOrSplitting(final String path)
private boolean isSplitOrSplittingOrMergeOrMerging(final String path)
throws KeeperException, DeserializationException {
boolean result = false;
// This may fail if the SPLIT or SPLITTING znode gets cleaned up before we
// can get data from it.
// This may fail if the SPLIT or SPLITTING or MERGE or MERGING znode gets
// cleaned up before we can get data from it.
byte [] data = ZKAssign.getData(watcher, path);
if (data == null) return false;
RegionTransition rt = RegionTransition.parseFrom(data);
switch (rt.getEventType()) {
case RS_ZK_REGION_SPLIT:
case RS_ZK_REGION_SPLITTING:
case RS_ZK_REGION_MERGE:
case RS_ZK_REGION_MERGING:
result = true;
break;
default:
@ -2897,10 +2946,32 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
/**
* Update inmemory structures.
* @param sn Server that reported the merge
* @param merged regioninfo of merged
* @param a region a
* @param b region b
*/
public void handleRegionsMergeReport(final ServerName sn,
final HRegionInfo merged, final HRegionInfo a, final HRegionInfo b) {
regionOffline(a);
regionOffline(b);
regionOnline(merged, sn);
// There's a possibility that the region was merging while a user asked
// the master to disable, we need to make sure we close those regions in
// that case. This is not racing with the region server itself since RS
// report is done after the regions merge transaction completed.
if (this.zkTable.isDisablingOrDisabledTable(merged.getTableNameAsString())) {
unassign(merged);
}
}
/**
* @param plan Plan to execute.
*/
void balance(final RegionPlan plan) {
public void balance(final RegionPlan plan) {
synchronized (this.regionPlans) {
this.regionPlans.put(plan.getRegionName(), plan);
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
@ -45,13 +46,14 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Triple;
/**
* A janitor for the catalog tables. Scans the <code>.META.</code> catalog
* table on a period looking for unused regions to garbage collect.
*/
@InterfaceAudience.Private
class CatalogJanitor extends Chore {
public class CatalogJanitor extends Chore {
private static final Log LOG = LogFactory.getLog(CatalogJanitor.class.getName());
private final Server server;
private final MasterServices services;
@ -102,16 +104,37 @@ class CatalogJanitor extends Chore {
}
/**
* Scans META and returns a number of scanned rows, and
* an ordered map of split parents.
* Scans META and returns a number of scanned rows, and a map of merged
* regions, and an ordered map of split parents.
* @return triple of scanned rows, map of merged regions and map of split
* parent regioninfos
* @throws IOException
*/
Pair<Integer, Map<HRegionInfo, Result>> getSplitParents() throws IOException {
Triple<Integer, Map<HRegionInfo, Result>, Map<HRegionInfo, Result>> getMergedRegionsAndSplitParents()
throws IOException {
return getMergedRegionsAndSplitParents(null);
}
/**
* Scans META and returns a number of scanned rows, and a map of merged
* regions, and an ordered map of split parents. if the given table name is
* null, return merged regions and split parents of all tables, else only the
* specified table
* @param tableName null represents all tables
* @return triple of scanned rows, and map of merged regions, and map of split
* parent regioninfos
* @throws IOException
*/
Triple<Integer, Map<HRegionInfo, Result>, Map<HRegionInfo, Result>> getMergedRegionsAndSplitParents(
final byte[] tableName) throws IOException {
final boolean isTableSpecified = (tableName != null && tableName.length != 0);
// TODO: Only works with single .META. region currently. Fix.
final AtomicInteger count = new AtomicInteger(0);
// Keep Map of found split parents. There are candidates for cleanup.
// Use a comparator that has split parents come before its daughters.
final Map<HRegionInfo, Result> splitParents =
new TreeMap<HRegionInfo, Result>(new SplitParentFirstComparator());
final Map<HRegionInfo, Result> mergedRegions = new TreeMap<HRegionInfo, Result>();
// This visitor collects split parents and counts rows in the .META. table
MetaReader.Visitor visitor = new MetaReader.Visitor() {
@Override
@ -120,20 +143,72 @@ class CatalogJanitor extends Chore {
count.incrementAndGet();
HRegionInfo info = HRegionInfo.getHRegionInfo(r);
if (info == null) return true; // Keep scanning
if (isTableSpecified
&& Bytes.compareTo(info.getTableName(), tableName) > 0) {
// Another table, stop scanning
return false;
}
if (info.isSplitParent()) splitParents.put(info, r);
if (r.getValue(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER) != null) {
mergedRegions.put(info, r);
}
// Returning true means "keep scanning"
return true;
}
};
// Run full scan of .META. catalog table passing in our custom visitor
MetaReader.fullScan(this.server.getCatalogTracker(), visitor);
return new Pair<Integer, Map<HRegionInfo, Result>>(count.get(), splitParents);
byte[] startRow = (!isTableSpecified) ? HConstants.EMPTY_START_ROW
: HRegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW,
HConstants.ZEROES, false);
// Run full scan of .META. catalog table passing in our custom visitor with
// the start row
MetaReader.fullScan(this.server.getCatalogTracker(), visitor, startRow);
return new Triple<Integer, Map<HRegionInfo, Result>, Map<HRegionInfo, Result>>(
count.get(), mergedRegions, splitParents);
}
/**
* If merged region no longer holds reference to the merge regions, archive
* merge region on hdfs and perform deleting references in .META.
* @param mergedRegion
* @param regionA
* @param regionB
* @return true if we delete references in merged region on .META. and archive
* the files on the file system
* @throws IOException
*/
boolean cleanMergeRegion(final HRegionInfo mergedRegion,
final HRegionInfo regionA, final HRegionInfo regionB) throws IOException {
FileSystem fs = this.services.getMasterFileSystem().getFileSystem();
Path rootdir = this.services.getMasterFileSystem().getRootDir();
Path tabledir = HTableDescriptor.getTableDir(rootdir,
mergedRegion.getTableName());
HTableDescriptor htd = getTableDescriptor(mergedRegion
.getTableNameAsString());
HRegionFileSystem regionFs = null;
try {
regionFs = HRegionFileSystem.openRegionFromFileSystem(
this.services.getConfiguration(), fs, tabledir, mergedRegion, true);
} catch (IOException e) {
LOG.warn("Merged region does not exist: " + mergedRegion.getEncodedName());
}
if (regionFs == null || !regionFs.hasReferences(htd)) {
LOG.debug("Deleting region " + regionA.getRegionNameAsString() + " and "
+ regionB.getRegionNameAsString()
+ " from fs because merged region no longer holds references");
HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, regionA);
HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, regionB);
MetaEditor.deleteMergeQualifiers(server.getCatalogTracker(), mergedRegion);
return true;
}
return false;
}
/**
* Run janitorial scan of catalog <code>.META.</code> table looking for
* garbage to collect.
* @return number of cleaned regions
* @throws IOException
*/
int scan() throws IOException {
@ -141,18 +216,44 @@ class CatalogJanitor extends Chore {
if (!alreadyRunning.compareAndSet(false, true)) {
return 0;
}
Pair<Integer, Map<HRegionInfo, Result>> pair = getSplitParents();
int count = pair.getFirst();
Map<HRegionInfo, Result> splitParents = pair.getSecond();
Triple<Integer, Map<HRegionInfo, Result>, Map<HRegionInfo, Result>> scanTriple =
getMergedRegionsAndSplitParents();
int count = scanTriple.getFirst();
/**
* clean merge regions first
*/
int mergeCleaned = 0;
Map<HRegionInfo, Result> mergedRegions = scanTriple.getSecond();
for (Map.Entry<HRegionInfo, Result> e : mergedRegions.entrySet()) {
HRegionInfo regionA = HRegionInfo.getHRegionInfo(e.getValue(),
HConstants.MERGEA_QUALIFIER);
HRegionInfo regionB = HRegionInfo.getHRegionInfo(e.getValue(),
HConstants.MERGEB_QUALIFIER);
if (regionA == null || regionB == null) {
LOG.warn("Unexpected references regionA="
+ (regionA == null ? "null" : regionA.getRegionNameAsString())
+ ",regionB="
+ (regionB == null ? "null" : regionB.getRegionNameAsString())
+ " in merged region " + e.getKey().getRegionNameAsString());
} else {
if (cleanMergeRegion(e.getKey(), regionA, regionB)) {
mergeCleaned++;
}
}
}
/**
* clean split parents
*/
Map<HRegionInfo, Result> splitParents = scanTriple.getThird();
// Now work on our list of found parents. See if any we can clean up.
int cleaned = 0;
//regions whose parents are still around
int splitCleaned = 0;
// regions whose parents are still around
HashSet<String> parentNotCleaned = new HashSet<String>();
for (Map.Entry<HRegionInfo, Result> e : splitParents.entrySet()) {
if (!parentNotCleaned.contains(e.getKey().getEncodedName()) &&
cleanParent(e.getKey(), e.getValue())) {
cleaned++;
splitCleaned++;
} else {
// We could not clean the parent, so it's daughters should not be cleaned either (HBASE-6160)
PairOfSameType<HRegionInfo> daughters = HRegionInfo.getDaughterRegions(e.getValue());
@ -160,14 +261,16 @@ class CatalogJanitor extends Chore {
parentNotCleaned.add(daughters.getSecond().getEncodedName());
}
}
if (cleaned != 0) {
LOG.info("Scanned " + count + " catalog row(s) and gc'd " + cleaned +
" unreferenced parent region(s)");
if ((mergeCleaned + splitCleaned) != 0) {
LOG.info("Scanned " + count + " catalog row(s), gc'd " + mergeCleaned
+ " unreferenced merged region(s) and " + splitCleaned
+ " unreferenced parent region(s)");
} else if (LOG.isDebugEnabled()) {
LOG.debug("Scanned " + count + " catalog row(s) and gc'd " + cleaned +
" unreferenced parent region(s)");
LOG.debug("Scanned " + count + " catalog row(s), gc'd " + mergeCleaned
+ " unreferenced merged region(s) and " + splitCleaned
+ " unreferenced parent region(s)");
}
return cleaned;
return mergeCleaned + splitCleaned;
} finally {
alreadyRunning.set(false);
}
@ -220,6 +323,14 @@ class CatalogJanitor extends Chore {
boolean cleanParent(final HRegionInfo parent, Result rowContent)
throws IOException {
boolean result = false;
// Check whether it is a merged region and not clean reference
// No necessary to check MERGEB_QUALIFIER because these two qualifiers will
// be inserted/deleted together
if (rowContent.getValue(HConstants.CATALOG_FAMILY,
HConstants.MERGEA_QUALIFIER) != null) {
// wait cleaning merge region first
return result;
}
// Run checks on each daughter split.
PairOfSameType<HRegionInfo> daughters = HRegionInfo.getDaughterRegions(rowContent);
Pair<Boolean, Boolean> a = checkDaughterInFs(parent, daughters.getFirst());
@ -309,4 +420,33 @@ class CatalogJanitor extends Chore {
throws FileNotFoundException, IOException {
return this.services.getTableDescriptors().get(tableName);
}
/**
* Checks if the specified region has merge qualifiers, if so, try to clean
* them
* @param region
* @return true if the specified region doesn't have merge qualifier now
* @throws IOException
*/
public boolean cleanMergeQualifier(final HRegionInfo region)
throws IOException {
// Get merge regions if it is a merged region and already has merge
// qualifier
Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaReader
.getRegionsFromMergeQualifier(this.services.getCatalogTracker(),
region.getRegionName());
if (mergeRegions == null
|| (mergeRegions.getFirst() == null && mergeRegions.getSecond() == null)) {
// It doesn't have merge qualifier, no need to clean
return true;
}
// It shouldn't happen, we must insert/delete these two qualifiers together
if (mergeRegions.getFirst() == null || mergeRegions.getSecond() == null) {
LOG.error("Merged region " + region.getRegionNameAsString()
+ " has only one merge qualifier in META.");
return false;
}
return cleanMergeRegion(region, mergeRegions.getFirst(),
mergeRegions.getSecond());
}
}

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -56,17 +55,11 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HealthCheckChore;
import org.apache.hadoop.hbase.MasterAdminProtocol;
import org.apache.hadoop.hbase.MasterMonitorProtocol;
import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
import org.apache.hadoop.hbase.exceptions.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.exceptions.PleaseHoldException;
import org.apache.hadoop.hbase.RegionServerStatusProtocol;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.exceptions.TableNotDisabledException;
import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
import org.apache.hadoop.hbase.exceptions.UnknownRegionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HConnectionManager;
@ -75,13 +68,20 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
import org.apache.hadoop.hbase.exceptions.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.exceptions.PleaseHoldException;
import org.apache.hadoop.hbase.exceptions.TableNotDisabledException;
import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.exceptions.UnknownRegionException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
@ -90,6 +90,7 @@ import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
import org.apache.hadoop.hbase.master.handler.DeleteTableHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
import org.apache.hadoop.hbase.master.handler.ModifyTableHandler;
import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
@ -125,6 +126,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableR
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest;
@ -1395,6 +1398,57 @@ Server {
this.catalogJanitorChore.setEnabled(b);
}
@Override
public DispatchMergingRegionsResponse dispatchMergingRegions(
RpcController controller, DispatchMergingRegionsRequest request)
throws ServiceException {
final byte[] encodedNameOfRegionA = request.getRegionA().getValue()
.toByteArray();
final byte[] encodedNameOfRegionB = request.getRegionB().getValue()
.toByteArray();
final boolean forcible = request.getForcible();
if (request.getRegionA().getType() != RegionSpecifierType.ENCODED_REGION_NAME
|| request.getRegionB().getType() != RegionSpecifierType.ENCODED_REGION_NAME) {
LOG.warn("mergeRegions specifier type: expected: "
+ RegionSpecifierType.ENCODED_REGION_NAME + " actual: region_a="
+ request.getRegionA().getType() + ", region_b="
+ request.getRegionB().getType());
}
RegionState regionStateA = assignmentManager.getRegionStates()
.getRegionState(Bytes.toString(encodedNameOfRegionA));
RegionState regionStateB = assignmentManager.getRegionStates()
.getRegionState(Bytes.toString(encodedNameOfRegionB));
if (regionStateA == null || regionStateB == null) {
throw new ServiceException(new UnknownRegionException(
Bytes.toStringBinary(regionStateA == null ? encodedNameOfRegionA
: encodedNameOfRegionB)));
}
if (!forcible && !HRegionInfo.areAdjacent(regionStateA.getRegion(),
regionStateB.getRegion())) {
throw new ServiceException("Unable to merge not adjacent regions "
+ regionStateA.getRegion().getRegionNameAsString() + ", "
+ regionStateB.getRegion().getRegionNameAsString()
+ " where forcible = " + forcible);
}
try {
dispatchMergingRegions(regionStateA.getRegion(), regionStateB.getRegion(), forcible);
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
return DispatchMergingRegionsResponse.newBuilder().build();
}
@Override
public void dispatchMergingRegions(final HRegionInfo region_a,
final HRegionInfo region_b, final boolean forcible) throws IOException {
checkInitialized();
this.executorService.submit(new DispatchMergingRegionHandler(this,
this.catalogJanitorChore, region_a, region_b, forcible));
}
@Override
public MoveRegionResponse moveRegion(RpcController controller, MoveRegionRequest req)
throws ServiceException {

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
@ -171,4 +172,16 @@ public interface MasterServices extends Server {
*/
public boolean registerService(Service instance);
/**
* Merge two regions. The real implementation is on the regionserver, master
* just move the regions together and send MERGE RPC to regionserver
* @param region_a region to merge
* @param region_b region to merge
* @param forcible true if do a compulsory merge, otherwise we will only merge
* two adjacent regions
* @throws IOException
*/
public void dispatchMergingRegions(final HRegionInfo region_a,
final HRegionInfo region_b, final boolean forcible) throws IOException;
}

View File

@ -669,6 +669,36 @@ public class ServerManager {
return sendRegionClose(server, region, versionOfClosingNode, null, true);
}
/**
* Sends an MERGE REGIONS RPC to the specified server to merge the specified
* regions.
* <p>
* A region server could reject the close request because it either does not
* have the specified region.
* @param server server to merge regions
* @param region_a region to merge
* @param region_b region to merge
* @param forcible true if do a compulsory merge, otherwise we will only merge
* two adjacent regions
* @throws IOException
*/
public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
HRegionInfo region_b, boolean forcible) throws IOException {
if (server == null)
throw new NullPointerException("Passed server is null");
if (region_a == null || region_b == null)
throw new NullPointerException("Passed region is null");
AdminProtocol admin = getServerConnection(server);
if (admin == null) {
throw new IOException("Attempting to send MERGE REGIONS RPC to server "
+ server.toString() + " for region "
+ region_a.getRegionNameAsString() + ","
+ region_b.getRegionNameAsString()
+ " failed because no RPC connection found to this server");
}
ProtobufUtil.mergeRegions(admin, region_a, region_b, forcible);
}
/**
* @param sn
* @return

View File

@ -0,0 +1,164 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.CatalogJanitor;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Handles MERGE regions request on master: move the regions together(on the
* same regionserver) and send MERGE RPC to regionserver.
*
* NOTE:The real merge is executed on the regionserver
*
*/
@InterfaceAudience.Private
public class DispatchMergingRegionHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(DispatchMergingRegionHandler.class);
private final MasterServices masterServices;
private final CatalogJanitor catalogJanitor;
private HRegionInfo region_a;
private HRegionInfo region_b;
private final boolean forcible;
private final int timeout;
public DispatchMergingRegionHandler(final MasterServices services,
final CatalogJanitor catalogJanitor, final HRegionInfo region_a,
final HRegionInfo region_b, final boolean forcible) {
super(services, EventType.C_M_MERGE_REGION);
this.masterServices = services;
this.catalogJanitor = catalogJanitor;
this.region_a = region_a;
this.region_b = region_b;
this.forcible = forcible;
this.timeout = server.getConfiguration().getInt(
"hbase.master.regionmerge.timeout", 30 * 1000);
}
@Override
public void process() throws IOException {
boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(region_a);
if (regionAHasMergeQualifier
|| !catalogJanitor.cleanMergeQualifier(region_b)) {
LOG.info("Skip merging regions " + region_a.getRegionNameAsString()
+ ", " + region_b.getRegionNameAsString() + ", because region "
+ (regionAHasMergeQualifier ? region_a.getEncodedName() : region_b
.getEncodedName()) + " has merge qualifier");
return;
}
RegionStates regionStates = masterServices.getAssignmentManager()
.getRegionStates();
ServerName region_a_location = regionStates.getRegionServerOfRegion(region_a);
ServerName region_b_location = regionStates.getRegionServerOfRegion(region_b);
if (region_a_location == null || region_b_location == null) {
LOG.info("Skip merging regions " + region_a.getRegionNameAsString()
+ ", " + region_b.getRegionNameAsString() + ", because region "
+ (region_a_location == null ? region_a.getEncodedName() : region_b
.getEncodedName()) + " is not online now");
return;
}
long startTime = EnvironmentEdgeManager.currentTimeMillis();
boolean onSameRS = region_a_location.equals(region_b_location);
// Make sure regions are on the same regionserver before send merge
// regions request to regionserver
if (!onSameRS) {
// Move region_b to region a's location, switch region_a and region_b if
// region_a's load lower than region_b's, so we will always move lower
// load region
RegionLoad loadOfRegionA = masterServices.getServerManager()
.getLoad(region_a_location).getRegionsLoad()
.get(region_a.getRegionName());
RegionLoad loadOfRegionB = masterServices.getServerManager()
.getLoad(region_b_location).getRegionsLoad()
.get(region_b.getRegionName());
if (loadOfRegionA != null && loadOfRegionB != null
&& loadOfRegionA.getRequestsCount() < loadOfRegionB
.getRequestsCount()) {
// switch region_a and region_b
HRegionInfo tmpRegion = this.region_a;
this.region_a = this.region_b;
this.region_b = tmpRegion;
ServerName tmpLocation = region_a_location;
region_a_location = region_b_location;
region_b_location = tmpLocation;
}
RegionPlan regionPlan = new RegionPlan(region_b, region_b_location,
region_a_location);
masterServices.getAssignmentManager().balance(regionPlan);
while (!masterServices.isStopped()) {
try {
Thread.sleep(20);
region_b_location = masterServices.getAssignmentManager()
.getRegionStates().getRegionServerOfRegion(region_b);
onSameRS = region_a_location.equals(region_b_location);
if (onSameRS || !regionStates.isRegionInTransition(region_b)) {
// Regions are on the same RS, or region_b is not in
// RegionInTransition any more
break;
}
if ((EnvironmentEdgeManager.currentTimeMillis() - startTime) > timeout) break;
} catch (InterruptedException e) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(e);
throw iioe;
}
}
}
if (onSameRS) {
try{
masterServices.getServerManager().sendRegionsMerge(region_a_location,
region_a, region_b, forcible);
LOG.info("Successfully send MERGE REGIONS RPC to server "
+ region_a_location.toString() + " for region "
+ region_a.getRegionNameAsString() + ","
+ region_b.getRegionNameAsString() + ", focible=" + forcible);
} catch (IOException ie) {
LOG.info("Failed send MERGE REGIONS RPC to server "
+ region_a_location.toString() + " for region "
+ region_a.getRegionNameAsString() + ","
+ region_b.getRegionNameAsString() + ", focible=" + forcible + ", "
+ ie.getMessage());
}
} else {
LOG.info("Cancel merging regions " + region_a.getRegionNameAsString()
+ ", " + region_b.getRegionNameAsString()
+ ", because can't move them together after "
+ (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
}
}
}

View File

@ -0,0 +1,117 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.master.handler;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
/**
* Handles MERGE regions event on Master, master receive the merge report from
* the regionserver, then offline the merging regions and online the merged
* region.Here region_a sorts before region_b.
*/
@InterfaceAudience.Private
public class MergedRegionHandler extends EventHandler implements
TotesHRegionInfo {
private static final Log LOG = LogFactory.getLog(MergedRegionHandler.class);
private final AssignmentManager assignmentManager;
private final HRegionInfo merged;
private final HRegionInfo region_a;
private final HRegionInfo region_b;
private final ServerName sn;
public MergedRegionHandler(Server server,
AssignmentManager assignmentManager, ServerName sn,
final List<HRegionInfo> mergeRegions) {
super(server, EventType.RS_ZK_REGION_MERGE);
assert mergeRegions.size() == 3;
this.assignmentManager = assignmentManager;
this.merged = mergeRegions.get(0);
this.region_a = mergeRegions.get(1);
this.region_b = mergeRegions.get(2);
this.sn = sn;
}
@Override
public HRegionInfo getHRegionInfo() {
return this.merged;
}
@Override
public String toString() {
String name = "UnknownServerName";
if (server != null && server.getServerName() != null) {
name = server.getServerName().toString();
}
String mergedRegion = "UnknownRegion";
if (merged != null) {
mergedRegion = merged.getRegionNameAsString();
}
return getClass().getSimpleName() + "-" + name + "-" + getSeqid() + "-"
+ mergedRegion;
}
@Override
public void process() {
String encodedRegionName = this.merged.getEncodedName();
LOG.debug("Handling MERGE event for " + encodedRegionName
+ "; deleting node");
this.assignmentManager.handleRegionsMergeReport(this.sn, this.merged,
this.region_a, this.region_b);
// Remove region from ZK
try {
boolean successful = false;
while (!successful) {
// It's possible that the RS tickles in between the reading of the
// znode and the deleting, so it's safe to retry.
successful = ZKAssign.deleteNode(this.server.getZooKeeper(),
encodedRegionName, EventType.RS_ZK_REGION_MERGE);
}
} catch (KeeperException e) {
if (e instanceof NoNodeException) {
String znodePath = ZKUtil.joinZNode(
this.server.getZooKeeper().splitLogZNode, encodedRegionName);
LOG.debug("The znode " + znodePath
+ " does not exist. May be deleted already.");
} else {
server.abort("Error deleting MERGE node in ZK for transition ZK node ("
+ merged.getEncodedName() + ")", e);
}
}
LOG.info("Handled MERGE event; merged="
+ this.merged.getRegionNameAsString() + " region_a="
+ this.region_a.getRegionNameAsString() + "region_b="
+ this.region_b.getRegionNameAsString());
}
}

View File

@ -57,6 +57,7 @@ public class CompactSplitThread implements CompactionRequestor {
private final ThreadPoolExecutor largeCompactions;
private final ThreadPoolExecutor smallCompactions;
private final ThreadPoolExecutor splits;
private final ThreadPoolExecutor mergePool;
/**
* Splitting should not take place if the total number of regions exceed this.
@ -118,6 +119,16 @@ public class CompactSplitThread implements CompactionRequestor {
return t;
}
});
int mergeThreads = conf.getInt("hbase.regionserver.thread.merge", 1);
this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
mergeThreads, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(n + "-merges-" + System.currentTimeMillis());
return t;
}
});
}
@Override
@ -125,7 +136,8 @@ public class CompactSplitThread implements CompactionRequestor {
return "compaction_queue=("
+ largeCompactions.getQueue().size() + ":"
+ smallCompactions.getQueue().size() + ")"
+ ", split_queue=" + splits.getQueue().size();
+ ", split_queue=" + splits.getQueue().size()
+ ", merge_queue=" + mergePool.getQueue().size();
}
public String dumpQueue() {
@ -159,9 +171,32 @@ public class CompactSplitThread implements CompactionRequestor {
queueLists.append("\n");
}
queueLists.append("\n");
queueLists.append(" Region Merge Queue:\n");
lq = mergePool.getQueue();
it = lq.iterator();
while (it.hasNext()) {
queueLists.append(" " + it.next().toString());
queueLists.append("\n");
}
return queueLists.toString();
}
public synchronized void requestRegionsMerge(final HRegion a,
final HRegion b, final boolean forcible) {
try {
mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible));
if (LOG.isDebugEnabled()) {
LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
+ forcible + ". " + this);
}
} catch (RejectedExecutionException ree) {
LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
+ forcible, ree);
}
}
public synchronized boolean requestSplit(final HRegion r) {
// don't split regions that are blocking
if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
@ -270,6 +305,7 @@ public class CompactSplitThread implements CompactionRequestor {
*/
void interruptIfNecessary() {
splits.shutdown();
mergePool.shutdown();
largeCompactions.shutdown();
smallCompactions.shutdown();
}
@ -291,6 +327,7 @@ public class CompactSplitThread implements CompactionRequestor {
void join() {
waitFor(splits, "Split Thread");
waitFor(mergePool, "Merge Thread");
waitFor(largeCompactions, "Large Compaction Thread");
waitFor(smallCompactions, "Small Compaction Thread");
}

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -77,6 +78,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@ -783,6 +785,24 @@ public class HRegion implements HeapSize { // , Writable{
return isAvailable() && !hasReferences();
}
/**
* @return true if region is mergeable
*/
public boolean isMergeable() {
if (!isAvailable()) {
LOG.debug("Region " + this.getRegionNameAsString()
+ " is not mergeable because it is closing or closed");
return false;
}
if (hasReferences()) {
LOG.debug("Region " + this.getRegionNameAsString()
+ " is not mergeable because it has references");
return false;
}
return true;
}
public boolean areWritesEnabled() {
synchronized(this.writestate) {
return this.writestate.writesEnabled;
@ -4060,6 +4080,26 @@ public class HRegion implements HeapSize { // , Writable{
return r;
}
/**
* Create a merged region given a temp directory with the region data.
* @param mergedRegionInfo
* @param region_b another merging region
* @return merged hregion
* @throws IOException
*/
HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
final HRegion region_b) throws IOException {
HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
this.getTableDesc(), this.rsServices);
r.readRequestsCount.set(this.getReadRequestsCount()
+ region_b.getReadRequestsCount());
r.writeRequestsCount.set(this.getWriteRequestsCount()
+ region_b.getWriteRequestsCount());
this.fs.commitMergedRegion(mergedRegionInfo);
return r;
}
/**
* Inserts a new region's meta information into the passed
* <code>meta</code> region. Used by the HMaster bootstrap code adding

View File

@ -40,8 +40,10 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.Reference;
@ -196,6 +198,21 @@ public class HRegionFileSystem {
return files != null && files.length > 0;
}
/**
* Check whether region has Reference file
* @param htd table desciptor of the region
* @return true if region has reference file
* @throws IOException
*/
public boolean hasReferences(final HTableDescriptor htd) throws IOException {
for (HColumnDescriptor family : htd.getFamilies()) {
if (hasReferences(family.getNameAsString())) {
return true;
}
}
return false;
}
/**
* @return the set of families present on disk
* @throws IOException
@ -509,6 +526,10 @@ public class HRegionFileSystem {
return new Path(getRegionDir(), REGION_MERGES_DIR);
}
Path getMergesDir(final HRegionInfo hri) {
return new Path(getMergesDir(), hri.getEncodedName());
}
/**
* Clean up any merge detritus that may have been left around from previous merge attempts.
*/
@ -516,6 +537,84 @@ public class HRegionFileSystem {
FSUtils.deleteDirectory(fs, getMergesDir());
}
/**
* Remove merged region
* @param mergedRegion {@link HRegionInfo}
* @throws IOException
*/
void cleanupMergedRegion(final HRegionInfo mergedRegion) throws IOException {
Path regionDir = new Path(this.tableDir, mergedRegion.getEncodedName());
if (this.fs.exists(regionDir) && !this.fs.delete(regionDir, true)) {
throw new IOException("Failed delete of " + regionDir);
}
}
/**
* Create the region merges directory.
* @throws IOException If merges dir already exists or we fail to create it.
* @see HRegionFileSystem#cleanupMergesDir()
*/
void createMergesDir() throws IOException {
Path mergesdir = getMergesDir();
if (fs.exists(mergesdir)) {
LOG.info("The " + mergesdir
+ " directory exists. Hence deleting it to recreate it");
if (!fs.delete(mergesdir, true)) {
throw new IOException("Failed deletion of " + mergesdir
+ " before creating them again.");
}
}
if (!fs.mkdirs(mergesdir))
throw new IOException("Failed create of " + mergesdir);
}
/**
* Write out a merge reference under the given merges directory. Package local
* so it doesnt leak out of regionserver.
* @param mergedRegion {@link HRegionInfo} of the merged region
* @param familyName Column Family Name
* @param f File to create reference.
* @param mergedDir
* @return Path to created reference.
* @throws IOException
*/
Path mergeStoreFile(final HRegionInfo mergedRegion, final String familyName,
final StoreFile f, final Path mergedDir)
throws IOException {
Path referenceDir = new Path(new Path(mergedDir,
mergedRegion.getEncodedName()), familyName);
// A whole reference to the store file.
Reference r = Reference.createTopReference(regionInfo.getStartKey());
// Add the referred-to regions name as a dot separated suffix.
// See REF_NAME_REGEX regex above. The referred-to regions name is
// up in the path of the passed in <code>f</code> -- parentdir is family,
// then the directory above is the region name.
String mergingRegionName = regionInfo.getEncodedName();
// Write reference with same file id only with the other region name as
// suffix and into the new region location (under same family).
Path p = new Path(referenceDir, f.getPath().getName() + "."
+ mergingRegionName);
return r.write(fs, p);
}
/**
* Commit a merged region, moving it from the merges temporary directory to
* the proper location in the filesystem.
* @param mergedRegionInfo merged region {@link HRegionInfo}
* @throws IOException
*/
void commitMergedRegion(final HRegionInfo mergedRegionInfo) throws IOException {
Path regionDir = new Path(this.tableDir, mergedRegionInfo.getEncodedName());
Path mergedRegionTmpDir = this.getMergesDir(mergedRegionInfo);
// Move the tmp dir in the expected location
if (mergedRegionTmpDir != null && fs.exists(mergedRegionTmpDir)) {
if (!fs.rename(mergedRegionTmpDir, regionDir)) {
throw new IOException("Unable to rename " + mergedRegionTmpDir + " to "
+ regionDir);
}
}
}
// ===========================================================================
// Create/Open/Delete Helpers
// ===========================================================================

View File

@ -55,35 +55,21 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.exceptions.ClockOutOfSyncException;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HealthCheckChore;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.LeaseException;
import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.RegionServerStatusProtocol;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.exceptions.RegionServerRunningException;
import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
import org.apache.hadoop.hbase.exceptions.YouAreDeadException;
import org.apache.hadoop.hbase.ZNodeClearer;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
@ -101,6 +87,21 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.ClockOutOfSyncException;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.LeaseException;
import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.exceptions.RegionServerRunningException;
import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException;
import org.apache.hadoop.hbase.exceptions.UnknownScannerException;
import org.apache.hadoop.hbase.exceptions.YouAreDeadException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
@ -114,7 +115,6 @@ import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
import org.apache.hadoop.hbase.ipc.RpcClientEngine;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -135,6 +135,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoReque
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
@ -203,11 +205,11 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.net.DNS;
@ -3502,6 +3504,35 @@ public class HRegionServer implements ClientProtocol,
}
}
/**
* Merge regions on the region server.
*
* @param controller the RPC controller
* @param request the request
* @return merge regions response
* @throws ServiceException
*/
@Override
@QosPriority(priority = HConstants.HIGH_QOS)
public MergeRegionsResponse mergeRegions(final RpcController controller,
final MergeRegionsRequest request) throws ServiceException {
try {
checkOpen();
requestCount.increment();
HRegion regionA = getRegion(request.getRegionA());
HRegion regionB = getRegion(request.getRegionB());
boolean forcible = request.getForcible();
LOG.info("Receiving merging request for " + regionA + ", " + regionB
+ ",forcible=" + forcible);
regionA.flushcache();
regionB.flushcache();
compactSplitThread.requestRegionsMerge(regionA, regionB, forcible);
return MergeRegionsResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
/**
* Compact a region on the region server.
*

View File

@ -0,0 +1,112 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
/**
* Handles processing region merges. Put in a queue, owned by HRegionServer.
*/
@InterfaceAudience.Private
class RegionMergeRequest implements Runnable {
static final Log LOG = LogFactory.getLog(RegionMergeRequest.class);
private final HRegion region_a;
private final HRegion region_b;
private final HRegionServer server;
private final boolean forcible;
RegionMergeRequest(HRegion a, HRegion b, HRegionServer hrs, boolean forcible) {
Preconditions.checkNotNull(hrs);
this.region_a = a;
this.region_b = b;
this.server = hrs;
this.forcible = forcible;
}
@Override
public String toString() {
return "MergeRequest,regions:" + region_a + ", " + region_b + ", forcible="
+ forcible;
}
@Override
public void run() {
if (this.server.isStopping() || this.server.isStopped()) {
LOG.debug("Skipping merge because server is stopping="
+ this.server.isStopping() + " or stopped=" + this.server.isStopped());
return;
}
try {
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
RegionMergeTransaction mt = new RegionMergeTransaction(region_a,
region_b, forcible);
// If prepare does not return true, for some reason -- logged inside in
// the prepare call -- we are not ready to merge just now. Just return.
if (!mt.prepare(this.server)) return;
try {
mt.execute(this.server, this.server);
} catch (Exception e) {
if (this.server.isStopping() || this.server.isStopped()) {
LOG.info(
"Skip rollback/cleanup of failed merge of " + region_a + " and "
+ region_b + " because server is"
+ (this.server.isStopping() ? " stopping" : " stopped"), e);
return;
}
try {
LOG.warn("Running rollback/cleanup of failed merge of "
+ region_a +" and "+ region_b + "; " + e.getMessage(), e);
if (mt.rollback(this.server, this.server)) {
LOG.info("Successful rollback of failed merge of "
+ region_a +" and "+ region_b);
} else {
this.server.abort("Abort; we got an error after point-of-no-return"
+ "when merging " + region_a + " and " + region_b);
}
} catch (RuntimeException ee) {
String msg = "Failed rollback of failed merge of "
+ region_a +" and "+ region_b + " -- aborting server";
// If failed rollback, kill this server to avoid having a hole in
// table.
LOG.info(msg, ee);
this.server.abort(msg);
}
return;
}
LOG.info("Regions merged, META updated, and report to master. region_a="
+ region_a + ", region_b=" + region_b + ",merged region="
+ mt.getMergedRegionInfo().getRegionNameAsString()
+ ". Region merge took "
+ StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(), startTime));
} catch (IOException ex) {
LOG.error("Merge failed " + this,
RemoteExceptionHandler.checkIOException(ex));
server.checkFileSystem();
}
}
}

View File

@ -0,0 +1,770 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
/**
* Executes region merge as a "transaction". It is similar with
* SplitTransaction. Call {@link #prepare(RegionServerServices)} to setup the
* transaction, {@link #execute(Server, RegionServerServices)} to run the
* transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if
* execute fails.
*
* <p>
* Here is an example of how you would use this class:
*
* <pre>
* RegionMergeTransaction mt = new RegionMergeTransaction(this.conf, parent, midKey)
* if (!mt.prepare(services)) return;
* try {
* mt.execute(server, services);
* } catch (IOException ioe) {
* try {
* mt.rollback(server, services);
* return;
* } catch (RuntimeException e) {
* myAbortable.abort("Failed merge, abort");
* }
* }
* </Pre>
* <p>
* This class is not thread safe. Caller needs ensure merge is run by one thread
* only.
*/
@InterfaceAudience.Private
public class RegionMergeTransaction {
private static final Log LOG = LogFactory.getLog(RegionMergeTransaction.class);
// Merged region info
private HRegionInfo mergedRegionInfo;
// region_a sorts before region_b
private final HRegion region_a;
private final HRegion region_b;
// merges dir is under region_a
private final Path mergesdir;
private int znodeVersion = -1;
// We only merge adjacent regions if forcible is false
private final boolean forcible;
/**
* Types to add to the transaction journal. Each enum is a step in the merge
* transaction. Used to figure how much we need to rollback.
*/
enum JournalEntry {
/**
* Set region as in transition, set it into MERGING state.
*/
SET_MERGING_IN_ZK,
/**
* We created the temporary merge data directory.
*/
CREATED_MERGE_DIR,
/**
* Closed the merging region A.
*/
CLOSED_REGION_A,
/**
* The merging region A has been taken out of the server's online regions list.
*/
OFFLINED_REGION_A,
/**
* Closed the merging region B.
*/
CLOSED_REGION_B,
/**
* The merging region B has been taken out of the server's online regions list.
*/
OFFLINED_REGION_B,
/**
* Started in on creation of the merged region.
*/
STARTED_MERGED_REGION_CREATION,
/**
* Point of no return. If we got here, then transaction is not recoverable
* other than by crashing out the regionserver.
*/
PONR
}
/*
* Journal of how far the merge transaction has progressed.
*/
private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
private static IOException closedByOtherException = new IOException(
"Failed to close region: already closed by another thread");
/**
* Constructor
* @param a region a to merge
* @param b region b to merge
* @param forcible if false, we will only merge adjacent regions
*/
public RegionMergeTransaction(final HRegion a, final HRegion b,
final boolean forcible) {
if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) {
this.region_a = a;
this.region_b = b;
} else {
this.region_a = b;
this.region_b = a;
}
this.forcible = forcible;
this.mergesdir = region_a.getRegionFileSystem().getMergesDir();
}
/**
* Does checks on merge inputs.
* @param services
* @return <code>true</code> if the regions are mergeable else
* <code>false</code> if they are not (e.g. its already closed, etc.).
*/
public boolean prepare(final RegionServerServices services) {
if (!region_a.getTableDesc().getNameAsString()
.equals(region_b.getTableDesc().getNameAsString())) {
LOG.info("Can't merge regions " + region_a + "," + region_b
+ " because they do not belong to the same table");
return false;
}
if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) {
LOG.info("Can't merge the same region " + region_a);
return false;
}
if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(),
region_b.getRegionInfo())) {
String msg = "Skip merging " + this.region_a.getRegionNameAsString()
+ " and " + this.region_b.getRegionNameAsString()
+ ", because they are not adjacent.";
LOG.info(msg);
return false;
}
if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) {
return false;
}
try {
boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services,
region_a.getRegionName());
if (regionAHasMergeQualifier ||
hasMergeQualifierInMeta(services, region_b.getRegionName())) {
LOG.debug("Region " + (regionAHasMergeQualifier ? region_a.getRegionNameAsString()
: region_b.getRegionNameAsString())
+ " is not mergeable because it has merge qualifier in META");
return false;
}
} catch (IOException e) {
LOG.warn("Failed judging whether merge transaction is available for "
+ region_a.getRegionNameAsString() + " and "
+ region_b.getRegionNameAsString(), e);
return false;
}
// WARN: make sure there is no parent region of the two merging regions in
// .META. If exists, fixing up daughters would cause daughter regions(we
// have merged one) online again when we restart master, so we should clear
// the parent region to prevent the above case
// Since HBASE-7721, we don't need fix up daughters any more. so here do
// nothing
this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(),
region_b.getRegionInfo());
return true;
}
/**
* Run the transaction.
* @param server Hosting server instance. Can be null when testing (won't try
* and update in zk if a null server)
* @param services Used to online/offline regions.
* @throws IOException If thrown, transaction failed. Call
* {@link #rollback(Server, RegionServerServices)}
* @return merged region
* @throws IOException
* @see #rollback(Server, RegionServerServices)
*/
public HRegion execute(final Server server,
final RegionServerServices services) throws IOException {
HRegion mergedRegion = createMergedRegion(server, services);
openMergedRegion(server, services, mergedRegion);
transitionZKNode(server, services);
return mergedRegion;
}
/**
* Prepare the merged region and region files.
* @param server Hosting server instance. Can be null when testing (won't try
* and update in zk if a null server)
* @param services Used to online/offline regions.
* @return merged region
* @throws IOException If thrown, transaction failed. Call
* {@link #rollback(Server, RegionServerServices)}
*/
HRegion createMergedRegion(final Server server,
final RegionServerServices services) throws IOException {
LOG.info("Starting merge of " + region_a + " and "
+ region_b.getRegionNameAsString() + ", forcible=" + forcible);
if ((server != null && server.isStopped())
|| (services != null && services.isStopping())) {
throw new IOException("Server is stopped or stopping");
}
// If true, no cluster to write meta edits to or to update znodes in.
boolean testing = server == null ? true : server.getConfiguration()
.getBoolean("hbase.testing.nocluster", false);
// Set ephemeral MERGING znode up in zk. Mocked servers sometimes don't
// have zookeeper so don't do zk stuff if server or zookeeper is null
if (server != null && server.getZooKeeper() != null) {
try {
createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo,
server.getServerName());
} catch (KeeperException e) {
throw new IOException("Failed creating MERGING znode on "
+ this.mergedRegionInfo.getRegionNameAsString(), e);
}
}
this.journal.add(JournalEntry.SET_MERGING_IN_ZK);
if (server != null && server.getZooKeeper() != null) {
try {
// Transition node from MERGING to MERGING after creating the merge
// node. Master will get the callback for node change only if the
// transition is successful.
// Note that if the transition fails then the rollback will delete the
// created znode as the journal entry SET_MERGING_IN_ZK is added.
this.znodeVersion = transitionNodeMerging(server.getZooKeeper(),
this.mergedRegionInfo, server.getServerName(), -1);
} catch (KeeperException e) {
throw new IOException("Failed setting MERGING znode on "
+ this.mergedRegionInfo.getRegionNameAsString(), e);
}
}
this.region_a.getRegionFileSystem().createMergesDir();
this.journal.add(JournalEntry.CREATED_MERGE_DIR);
Map<byte[], List<StoreFile>> hstoreFilesOfRegionA = closeAndOfflineRegion(
services, this.region_a, true, testing);
Map<byte[], List<StoreFile>> hstoreFilesOfRegionB = closeAndOfflineRegion(
services, this.region_b, false, testing);
assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null;
//
// mergeStoreFiles creates merged region dirs under the region_a merges dir
// Nothing to unroll here if failure -- clean up of CREATE_MERGE_DIR will
// clean this up.
mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
// Log to the journal that we are creating merged region. We could fail
// halfway through. If we do, we could have left
// stuff in fs that needs cleanup -- a storefile or two. Thats why we
// add entry to journal BEFORE rather than AFTER the change.
this.journal.add(JournalEntry.STARTED_MERGED_REGION_CREATION);
HRegion mergedRegion = createMergedRegionFromMerges(this.region_a,
this.region_b, this.mergedRegionInfo);
// This is the point of no return. Similar with SplitTransaction.
// IF we reach the PONR then subsequent failures need to crash out this
// regionserver
this.journal.add(JournalEntry.PONR);
// Add merged region and delete region_a and region_b
// as an atomic update. See HBASE-7721. This update to META makes the region
// will determine whether the region is merged or not in case of failures.
// If it is successful, master will roll-forward, if not, master will
// rollback
if (!testing) {
MetaEditor.mergeRegions(server.getCatalogTracker(),
mergedRegion.getRegionInfo(), region_a.getRegionInfo(),
region_b.getRegionInfo(), server.getServerName());
}
return mergedRegion;
}
/**
* Create a merged region from the merges directory under region a. In order
* to mock it for tests, place it with a new method.
* @param a hri of region a
* @param b hri of region b
* @param mergedRegion hri of merged region
* @return merged HRegion.
* @throws IOException
*/
HRegion createMergedRegionFromMerges(final HRegion a, final HRegion b,
final HRegionInfo mergedRegion) throws IOException {
return a.createMergedRegionFromMerges(mergedRegion, b);
}
/**
* Close the merging region and offline it in regionserver
* @param services
* @param region
* @param isRegionA true if it is merging region a, false if it is region b
* @param testing true if it is testing
* @return a map of family name to list of store files
* @throws IOException
*/
private Map<byte[], List<StoreFile>> closeAndOfflineRegion(
final RegionServerServices services, final HRegion region,
final boolean isRegionA, final boolean testing) throws IOException {
Map<byte[], List<StoreFile>> hstoreFilesToMerge = null;
Exception exceptionToThrow = null;
try {
hstoreFilesToMerge = region.close(false);
} catch (Exception e) {
exceptionToThrow = e;
}
if (exceptionToThrow == null && hstoreFilesToMerge == null) {
// The region was closed by a concurrent thread. We can't continue
// with the merge, instead we must just abandon the merge. If we
// reopen or merge this could cause problems because the region has
// probably already been moved to a different server, or is in the
// process of moving to a different server.
exceptionToThrow = closedByOtherException;
}
if (exceptionToThrow != closedByOtherException) {
this.journal.add(isRegionA ? JournalEntry.CLOSED_REGION_A
: JournalEntry.CLOSED_REGION_B);
}
if (exceptionToThrow != null) {
if (exceptionToThrow instanceof IOException)
throw (IOException) exceptionToThrow;
throw new IOException(exceptionToThrow);
}
if (!testing) {
services.removeFromOnlineRegions(region, null);
}
this.journal.add(isRegionA ? JournalEntry.OFFLINED_REGION_A
: JournalEntry.OFFLINED_REGION_B);
return hstoreFilesToMerge;
}
/**
* Get merged region info through the specified two regions
* @param a merging region A
* @param b merging region B
* @return the merged region info
*/
public static HRegionInfo getMergedRegionInfo(final HRegionInfo a,
final HRegionInfo b) {
long rid = EnvironmentEdgeManager.currentTimeMillis();
// Regionid is timestamp. Merged region's id can't be less than that of
// merging regions else will insert at wrong location in .META.
if (rid < a.getRegionId() || rid < b.getRegionId()) {
LOG.warn("Clock skew; merging regions id are " + a.getRegionId()
+ " and " + b.getRegionId() + ", but current time here is " + rid);
rid = Math.max(a.getRegionId(), b.getRegionId()) + 1;
}
byte[] startKey = null;
byte[] endKey = null;
if (a.compareTo(b) <= 0) {
startKey = a.getStartKey();
endKey = b.getEndKey();
} else {
startKey = b.getStartKey();
endKey = a.getEndKey();
}
// Merged region is sorted between two merging regions in META
HRegionInfo mergedRegionInfo = new HRegionInfo(a.getTableName(), startKey,
endKey, false, rid);
return mergedRegionInfo;
}
/**
* Perform time consuming opening of the merged region.
* @param server Hosting server instance. Can be null when testing (won't try
* and update in zk if a null server)
* @param services Used to online/offline regions.
* @param merged the merged region
* @throws IOException If thrown, transaction failed. Call
* {@link #rollback(Server, RegionServerServices)}
*/
void openMergedRegion(final Server server,
final RegionServerServices services, HRegion merged) throws IOException {
boolean stopped = server != null && server.isStopped();
boolean stopping = services != null && services.isStopping();
if (stopped || stopping) {
LOG.info("Not opening merged region " + merged.getRegionNameAsString()
+ " because stopping=" + stopping + ", stopped=" + stopped);
return;
}
HRegionInfo hri = merged.getRegionInfo();
LoggingProgressable reporter = server == null ? null
: new LoggingProgressable(hri, server.getConfiguration().getLong(
"hbase.regionserver.regionmerge.open.log.interval", 10000));
merged.openHRegion(reporter);
if (services != null) {
try {
services.postOpenDeployTasks(merged, server.getCatalogTracker());
services.addToOnlineRegions(merged);
} catch (KeeperException ke) {
throw new IOException(ke);
}
}
}
/**
* Finish off merge transaction, transition the zknode
* @param server Hosting server instance. Can be null when testing (won't try
* and update in zk if a null server)
* @param services Used to online/offline regions.
* @throws IOException If thrown, transaction failed. Call
* {@link #rollback(Server, RegionServerServices)}
*/
void transitionZKNode(final Server server, final RegionServerServices services)
throws IOException {
if (server == null || server.getZooKeeper() == null) {
return;
}
// Tell master about merge by updating zk. If we fail, abort.
try {
this.znodeVersion = transitionNodeMerge(server.getZooKeeper(),
this.mergedRegionInfo, region_a.getRegionInfo(),
region_b.getRegionInfo(), server.getServerName(), this.znodeVersion);
long startTime = EnvironmentEdgeManager.currentTimeMillis();
int spins = 0;
// Now wait for the master to process the merge. We know it's done
// when the znode is deleted. The reason we keep tickling the znode is
// that it's possible for the master to miss an event.
do {
if (spins % 10 == 0) {
LOG.debug("Still waiting on the master to process the merge for "
+ this.mergedRegionInfo.getEncodedName() + ", waited "
+ (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
}
Thread.sleep(100);
// When this returns -1 it means the znode doesn't exist
this.znodeVersion = tickleNodeMerge(server.getZooKeeper(),
this.mergedRegionInfo, region_a.getRegionInfo(),
region_b.getRegionInfo(), server.getServerName(), this.znodeVersion);
spins++;
} while (this.znodeVersion != -1 && !server.isStopped()
&& !services.isStopping());
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new IOException("Failed telling master about merge "
+ mergedRegionInfo.getEncodedName(), e);
}
// Leaving here, the mergedir with its dross will be in place but since the
// merge was successful, just leave it; it'll be cleaned when region_a is
// cleaned up by CatalogJanitor on master
}
/**
* Create reference file(s) of merging regions under the region_a merges dir
* @param hstoreFilesOfRegionA
* @param hstoreFilesOfRegionB
* @throws IOException
*/
private void mergeStoreFiles(
Map<byte[], List<StoreFile>> hstoreFilesOfRegionA,
Map<byte[], List<StoreFile>> hstoreFilesOfRegionB)
throws IOException {
// Create reference file(s) of region A in mergdir
HRegionFileSystem fs_a = this.region_a.getRegionFileSystem();
for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionA
.entrySet()) {
String familyName = Bytes.toString(entry.getKey());
for (StoreFile storeFile : entry.getValue()) {
fs_a.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
this.mergesdir);
}
}
// Create reference file(s) of region B in mergedir
HRegionFileSystem fs_b = this.region_b.getRegionFileSystem();
for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionB
.entrySet()) {
String familyName = Bytes.toString(entry.getKey());
for (StoreFile storeFile : entry.getValue()) {
fs_b.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
this.mergesdir);
}
}
}
/**
* @param server Hosting server instance (May be null when testing).
* @param services Services of regionserver, used to online regions.
* @throws IOException If thrown, rollback failed. Take drastic action.
* @return True if we successfully rolled back, false if we got to the point
* of no return and so now need to abort the server to minimize
* damage.
*/
public boolean rollback(final Server server,
final RegionServerServices services) throws IOException {
assert this.mergedRegionInfo != null;
boolean result = true;
ListIterator<JournalEntry> iterator = this.journal
.listIterator(this.journal.size());
// Iterate in reverse.
while (iterator.hasPrevious()) {
JournalEntry je = iterator.previous();
switch (je) {
case SET_MERGING_IN_ZK:
if (server != null && server.getZooKeeper() != null) {
cleanZK(server, this.mergedRegionInfo);
}
break;
case CREATED_MERGE_DIR:
this.region_a.writestate.writesEnabled = true;
this.region_b.writestate.writesEnabled = true;
this.region_a.getRegionFileSystem().cleanupMergesDir();
break;
case CLOSED_REGION_A:
try {
// So, this returns a seqid but if we just closed and then reopened,
// we should be ok. On close, we flushed using sequenceid obtained
// from hosting regionserver so no need to propagate the sequenceid
// returned out of initialize below up into regionserver as we
// normally do.
this.region_a.initialize();
} catch (IOException e) {
LOG.error("Failed rollbacking CLOSED_REGION_A of region "
+ this.region_a.getRegionNameAsString(), e);
throw new RuntimeException(e);
}
break;
case OFFLINED_REGION_A:
if (services != null)
services.addToOnlineRegions(this.region_a);
break;
case CLOSED_REGION_B:
try {
this.region_b.initialize();
} catch (IOException e) {
LOG.error("Failed rollbacking CLOSED_REGION_A of region "
+ this.region_b.getRegionNameAsString(), e);
throw new RuntimeException(e);
}
break;
case OFFLINED_REGION_B:
if (services != null)
services.addToOnlineRegions(this.region_b);
break;
case STARTED_MERGED_REGION_CREATION:
this.region_a.getRegionFileSystem().cleanupMergedRegion(
this.mergedRegionInfo);
break;
case PONR:
// We got to the point-of-no-return so we need to just abort. Return
// immediately. Do not clean up created merged regions.
return false;
default:
throw new RuntimeException("Unhandled journal entry: " + je);
}
}
return result;
}
HRegionInfo getMergedRegionInfo() {
return this.mergedRegionInfo;
}
// For unit testing.
Path getMergesDir() {
return this.mergesdir;
}
private static void cleanZK(final Server server, final HRegionInfo hri) {
try {
// Only delete if its in expected state; could have been hijacked.
ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
EventType.RS_ZK_REGION_MERGING);
} catch (KeeperException.NoNodeException e) {
LOG.warn("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
} catch (KeeperException e) {
server.abort("Failed cleanup zk node of " + hri.getRegionNameAsString(),e);
}
}
/**
* Creates a new ephemeral node in the MERGING state for the merged region.
* Create it ephemeral in case regionserver dies mid-merge.
*
* <p>
* Does not transition nodes from other states. If a node already exists for
* this region, a {@link NodeExistsException} will be thrown.
*
* @param zkw zk reference
* @param region region to be created as offline
* @param serverName server event originates from
* @return Version of znode created.
* @throws KeeperException
* @throws IOException
*/
int createNodeMerging(final ZooKeeperWatcher zkw, final HRegionInfo region,
final ServerName serverName) throws KeeperException, IOException {
LOG.debug(zkw.prefix("Creating ephemeral node for "
+ region.getEncodedName() + " in MERGING state"));
RegionTransition rt = RegionTransition.createRegionTransition(
EventType.RS_ZK_REGION_MERGING, region.getRegionName(), serverName);
String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
throw new IOException("Failed create of ephemeral " + node);
}
// Transition node from MERGING to MERGING and pick up version so we
// can be sure this znode is ours; version is needed deleting.
return transitionNodeMerging(zkw, region, serverName, -1);
}
/**
* Transitions an existing node for the specified region which is currently in
* the MERGING state to be in the MERGE state. Converts the ephemeral MERGING
* znode to an ephemeral MERGE node. Master cleans up MERGE znode when it
* reads it (or if we crash, zk will clean it up).
*
* <p>
* Does not transition nodes from other states. If for some reason the node
* could not be transitioned, the method returns -1. If the transition is
* successful, the version of the node after transition is returned.
*
* <p>
* This method can fail and return false for three different reasons:
* <ul>
* <li>Node for this region does not exist</li>
* <li>Node for this region is not in MERGING state</li>
* <li>After verifying MERGING state, update fails because of wrong version
* (this should never actually happen since an RS only does this transition
* following a transition to MERGING. if two RS are conflicting, one would
* fail the original transition to MERGING and not this transition)</li>
* </ul>
*
* <p>
* Does not set any watches.
*
* <p>
* This method should only be used by a RegionServer when completing the open
* of merged region.
*
* @param zkw zk reference
* @param merged region to be transitioned to opened
* @param a merging region A
* @param b merging region B
* @param serverName server event originates from
* @param znodeVersion expected version of data before modification
* @return version of node after transition, -1 if unsuccessful transition
* @throws KeeperException if unexpected zookeeper exception
* @throws IOException
*/
private static int transitionNodeMerge(ZooKeeperWatcher zkw,
HRegionInfo merged, HRegionInfo a, HRegionInfo b, ServerName serverName,
final int znodeVersion) throws KeeperException, IOException {
byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
return ZKAssign.transitionNode(zkw, merged, serverName,
EventType.RS_ZK_REGION_MERGING, EventType.RS_ZK_REGION_MERGE,
znodeVersion, payload);
}
/**
*
* @param zkw zk reference
* @param parent region to be transitioned to merging
* @param serverName server event originates from
* @param version znode version
* @return version of node after transition, -1 if unsuccessful transition
* @throws KeeperException
* @throws IOException
*/
int transitionNodeMerging(final ZooKeeperWatcher zkw,
final HRegionInfo parent, final ServerName serverName, final int version)
throws KeeperException, IOException {
return ZKAssign.transitionNode(zkw, parent, serverName,
EventType.RS_ZK_REGION_MERGING, EventType.RS_ZK_REGION_MERGING,
version);
}
private static int tickleNodeMerge(ZooKeeperWatcher zkw, HRegionInfo merged,
HRegionInfo a, HRegionInfo b, ServerName serverName,
final int znodeVersion) throws KeeperException, IOException {
byte[] payload = HRegionInfo.toDelimitedByteArray(a, b);
return ZKAssign.transitionNode(zkw, merged, serverName,
EventType.RS_ZK_REGION_MERGE, EventType.RS_ZK_REGION_MERGE,
znodeVersion, payload);
}
/**
* Checks if the given region has merge qualifier in .META.
* @param services
* @param regionName name of specified region
* @return true if the given region has merge qualifier in META.(It will be
* cleaned by CatalogJanitor)
* @throws IOException
*/
boolean hasMergeQualifierInMeta(final RegionServerServices services,
final byte[] regionName) throws IOException {
// Get merge regions if it is a merged region and already has merge
// qualifier
Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaReader
.getRegionsFromMergeQualifier(services.getCatalogTracker(), regionName);
if (mergeRegions != null &&
(mergeRegions.getFirst() != null || mergeRegions.getSecond() != null)) {
// It has merge qualifier
return true;
}
return false;
}
}

View File

@ -102,4 +102,9 @@ public interface RegionServerServices extends OnlineRegions {
* @return hbase executor service
*/
public ExecutorService getExecutorService();
/**
* @return The RegionServer's CatalogTracker
*/
public CatalogTracker getCatalogTracker();
}

View File

@ -514,8 +514,9 @@ public class SplitTransaction {
void openDaughterRegion(final Server server, final HRegion daughter)
throws IOException, KeeperException {
HRegionInfo hri = daughter.getRegionInfo();
LoggingProgressable reporter = server == null? null:
new LoggingProgressable(hri, server.getConfiguration());
LoggingProgressable reporter = server == null ? null
: new LoggingProgressable(hri, server.getConfiguration().getLong(
"hbase.regionserver.split.daughter.open.log.interval", 10000));
daughter.openHRegion(reporter);
}
@ -524,10 +525,9 @@ public class SplitTransaction {
private long lastLog = -1;
private final long interval;
LoggingProgressable(final HRegionInfo hri, final Configuration c) {
LoggingProgressable(final HRegionInfo hri, final long interval) {
this.hri = hri;
this.interval = c.getLong("hbase.regionserver.split.daughter.open.log.interval",
10000);
this.interval = interval;
}
@Override

View File

@ -1168,6 +1168,23 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return new HTable(new Configuration(getConfiguration()), tableName);
}
/**
* Create a table.
* @param tableName
* @param family
* @param splitRows
* @return An HTable instance for the created table.
* @throws IOException
*/
public HTable createTable(byte[] tableName, byte[] family, byte[][] splitRows)
throws IOException {
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor hcd = new HColumnDescriptor(family);
desc.addFamily(hcd);
getHBaseAdmin().createTable(desc, splitRows);
return new HTable(getConfiguration(), tableName);
}
/**
* Drop an existing table
* @param tableName existing table

View File

@ -29,13 +29,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoReque
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
@ -453,6 +455,13 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
return null;
}
@Override
public MergeRegionsResponse mergeRegions(RpcController controller,
MergeRegionsRequest request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public CompactRegionResponse compactRegion(RpcController controller,
CompactRegionRequest request) throws ServiceException {

View File

@ -65,7 +65,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Triple;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -325,6 +325,11 @@ public class TestCatalogJanitor {
public TableLockManager getTableLockManager() {
return null;
}
@Override
public void dispatchMergingRegions(HRegionInfo region_a, HRegionInfo region_b,
boolean forcible) throws IOException {
}
}
@Test
@ -546,9 +551,11 @@ public class TestCatalogJanitor {
splita.setOffline(true); //simulate that splita goes offline when it is split
splitParents.put(splita, createResult(splita, splitaa,splitab));
final Map<HRegionInfo, Result> mergedRegions = new TreeMap<HRegionInfo, Result>();
CatalogJanitor janitor = spy(new CatalogJanitor(server, services));
doReturn(new Pair<Integer, Map<HRegionInfo, Result>>(
10, splitParents)).when(janitor).getSplitParents();
doReturn(new Triple<Integer, Map<HRegionInfo, Result>, Map<HRegionInfo, Result>>(
10, mergedRegions, splitParents)).when(janitor)
.getMergedRegionsAndSplitParents();
//create ref from splita to parent
Path splitaRef =

View File

@ -0,0 +1,426 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import com.google.common.collect.ImmutableList;
/**
* Test the {@link RegionMergeTransaction} class against two HRegions (as
* opposed to running cluster).
*/
@Category(SmallTests.class)
public class TestRegionMergeTransaction {
private final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final Path testdir = TEST_UTIL.getDataTestDir(this.getClass()
.getName());
private HRegion region_a;
private HRegion region_b;
private HRegion region_c;
private HLog wal;
private FileSystem fs;
// Start rows of region_a,region_b,region_c
private static final byte[] STARTROW_A = new byte[] { 'a', 'a', 'a' };
private static final byte[] STARTROW_B = new byte[] { 'g', 'g', 'g' };
private static final byte[] STARTROW_C = new byte[] { 'w', 'w', 'w' };
private static final byte[] ENDROW = new byte[] { '{', '{', '{' };
private static final byte[] CF = HConstants.CATALOG_FAMILY;
@Before
public void setup() throws IOException {
this.fs = FileSystem.get(TEST_UTIL.getConfiguration());
this.fs.delete(this.testdir, true);
this.wal = HLogFactory.createHLog(fs, this.testdir, "logs",
TEST_UTIL.getConfiguration());
this.region_a = createRegion(this.testdir, this.wal, STARTROW_A, STARTROW_B);
this.region_b = createRegion(this.testdir, this.wal, STARTROW_B, STARTROW_C);
this.region_c = createRegion(this.testdir, this.wal, STARTROW_C, ENDROW);
assert region_a != null && region_b != null && region_c != null;
TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster", true);
}
@After
public void teardown() throws IOException {
for (HRegion region : new HRegion[] { region_a, region_b, region_c }) {
if (region != null && !region.isClosed()) region.close();
if (this.fs.exists(region.getRegionFileSystem().getRegionDir())
&& !this.fs.delete(region.getRegionFileSystem().getRegionDir(), true)) {
throw new IOException("Failed deleting of "
+ region.getRegionFileSystem().getRegionDir());
}
}
if (this.wal != null)
this.wal.closeAndDelete();
this.fs.delete(this.testdir, true);
}
/**
* Test straight prepare works. Tries to merge on {@link #region_a} and
* {@link #region_b}
* @throws IOException
*/
@Test
public void testPrepare() throws IOException {
prepareOnGoodRegions();
}
private RegionMergeTransaction prepareOnGoodRegions() throws IOException {
RegionMergeTransaction mt = new RegionMergeTransaction(region_a, region_b,
false);
RegionMergeTransaction spyMT = Mockito.spy(mt);
doReturn(false).when(spyMT).hasMergeQualifierInMeta(null,
region_a.getRegionName());
doReturn(false).when(spyMT).hasMergeQualifierInMeta(null,
region_b.getRegionName());
assertTrue(spyMT.prepare(null));
return spyMT;
}
/**
* Test merging the same region
*/
@Test
public void testPrepareWithSameRegion() throws IOException {
RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a,
this.region_a, true);
assertFalse("should not merge the same region even if it is forcible ",
mt.prepare(null));
}
/**
* Test merging two not adjacent regions under a common merge
*/
@Test
public void testPrepareWithRegionsNotAdjacent() throws IOException {
RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a,
this.region_c, false);
assertFalse("should not merge two regions if they are adjacent except it is forcible",
mt.prepare(null));
}
/**
* Test merging two not adjacent regions under a compulsory merge
*/
@Test
public void testPrepareWithRegionsNotAdjacentUnderCompulsory()
throws IOException {
RegionMergeTransaction mt = new RegionMergeTransaction(region_a, region_c,
true);
RegionMergeTransaction spyMT = Mockito.spy(mt);
doReturn(false).when(spyMT).hasMergeQualifierInMeta(null,
region_a.getRegionName());
doReturn(false).when(spyMT).hasMergeQualifierInMeta(null,
region_c.getRegionName());
assertTrue("Since focible is true, should merge two regions even if they are not adjacent",
spyMT.prepare(null));
}
/**
* Pass a reference store
*/
@Test
public void testPrepareWithRegionsWithReference() throws IOException {
HStore storeMock = Mockito.mock(HStore.class);
when(storeMock.hasReferences()).thenReturn(true);
when(storeMock.getFamily()).thenReturn(new HColumnDescriptor("cf"));
when(storeMock.close()).thenReturn(ImmutableList.<StoreFile>of());
this.region_a.stores.put(Bytes.toBytes(""), storeMock);
RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a,
this.region_b, false);
assertFalse(
"a region should not be mergeable if it has instances of store file references",
mt.prepare(null));
}
@Test
public void testPrepareWithClosedRegion() throws IOException {
this.region_a.close();
RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a,
this.region_b, false);
assertFalse(mt.prepare(null));
}
/**
* Test merging regions which are merged regions and has reference in META all
* the same
*/
@Test
public void testPrepareWithRegionsWithMergeReference() throws IOException {
RegionMergeTransaction mt = new RegionMergeTransaction(region_a, region_b,
false);
RegionMergeTransaction spyMT = Mockito.spy(mt);
doReturn(true).when(spyMT).hasMergeQualifierInMeta(null,
region_a.getRegionName());
doReturn(true).when(spyMT).hasMergeQualifierInMeta(null,
region_b.getRegionName());
assertFalse(spyMT.prepare(null));
}
@Test
public void testWholesomeMerge() throws IOException {
final int rowCountOfRegionA = loadRegion(this.region_a, CF, true);
final int rowCountOfRegionB = loadRegion(this.region_b, CF, true);
assertTrue(rowCountOfRegionA > 0 && rowCountOfRegionB > 0);
assertEquals(rowCountOfRegionA, countRows(this.region_a));
assertEquals(rowCountOfRegionB, countRows(this.region_b));
// Start transaction.
RegionMergeTransaction mt = prepareOnGoodRegions();
// Run the execute. Look at what it returns.
Server mockServer = Mockito.mock(Server.class);
when(mockServer.getConfiguration())
.thenReturn(TEST_UTIL.getConfiguration());
HRegion mergedRegion = mt.execute(mockServer, null);
// Do some assertions about execution.
assertTrue(this.fs.exists(mt.getMergesDir()));
// Assert region_a and region_b is closed.
assertTrue(region_a.isClosed());
assertTrue(region_b.isClosed());
// Assert mergedir is empty -- because its content will have been moved out
// to be under the merged region dirs.
assertEquals(0, this.fs.listStatus(mt.getMergesDir()).length);
// Check merged region have correct key span.
assertTrue(Bytes.equals(this.region_a.getStartKey(),
mergedRegion.getStartKey()));
assertTrue(Bytes.equals(this.region_b.getEndKey(),
mergedRegion.getEndKey()));
// Count rows. merged region are already open
try {
int mergedRegionRowCount = countRows(mergedRegion);
assertEquals((rowCountOfRegionA + rowCountOfRegionB),
mergedRegionRowCount);
} finally {
HRegion.closeHRegion(mergedRegion);
}
// Assert the write lock is no longer held on region_a and region_b
assertTrue(!this.region_a.lock.writeLock().isHeldByCurrentThread());
assertTrue(!this.region_b.lock.writeLock().isHeldByCurrentThread());
}
@Test
public void testRollback() throws IOException {
final int rowCountOfRegionA = loadRegion(this.region_a, CF, true);
final int rowCountOfRegionB = loadRegion(this.region_b, CF, true);
assertTrue(rowCountOfRegionA > 0 && rowCountOfRegionB > 0);
assertEquals(rowCountOfRegionA, countRows(this.region_a));
assertEquals(rowCountOfRegionB, countRows(this.region_b));
// Start transaction.
RegionMergeTransaction mt = prepareOnGoodRegions();
when(mt.createMergedRegionFromMerges(region_a, region_b,
mt.getMergedRegionInfo())).thenThrow(
new MockedFailedMergedRegionCreation());
// Run the execute. Look at what it returns.
boolean expectedException = false;
Server mockServer = Mockito.mock(Server.class);
when(mockServer.getConfiguration())
.thenReturn(TEST_UTIL.getConfiguration());
try {
mt.execute(mockServer, null);
} catch (MockedFailedMergedRegionCreation e) {
expectedException = true;
}
assertTrue(expectedException);
// Run rollback
assertTrue(mt.rollback(null, null));
// Assert I can scan region_a and region_b.
int rowCountOfRegionA2 = countRows(this.region_a);
assertEquals(rowCountOfRegionA, rowCountOfRegionA2);
int rowCountOfRegionB2 = countRows(this.region_b);
assertEquals(rowCountOfRegionB, rowCountOfRegionB2);
// Assert rollback cleaned up stuff in fs
assertTrue(!this.fs.exists(HRegion.getRegionDir(this.testdir,
mt.getMergedRegionInfo())));
assertTrue(!this.region_a.lock.writeLock().isHeldByCurrentThread());
assertTrue(!this.region_b.lock.writeLock().isHeldByCurrentThread());
// Now retry the merge but do not throw an exception this time.
assertTrue(mt.prepare(null));
HRegion mergedRegion = mt.execute(mockServer, null);
// Count rows. daughters are already open
// Count rows. merged region are already open
try {
int mergedRegionRowCount = countRows(mergedRegion);
assertEquals((rowCountOfRegionA + rowCountOfRegionB),
mergedRegionRowCount);
} finally {
HRegion.closeHRegion(mergedRegion);
}
// Assert the write lock is no longer held on region_a and region_b
assertTrue(!this.region_a.lock.writeLock().isHeldByCurrentThread());
assertTrue(!this.region_b.lock.writeLock().isHeldByCurrentThread());
}
@Test
public void testFailAfterPONR() throws IOException, KeeperException {
final int rowCountOfRegionA = loadRegion(this.region_a, CF, true);
final int rowCountOfRegionB = loadRegion(this.region_b, CF, true);
assertTrue(rowCountOfRegionA > 0 && rowCountOfRegionB > 0);
assertEquals(rowCountOfRegionA, countRows(this.region_a));
assertEquals(rowCountOfRegionB, countRows(this.region_b));
// Start transaction.
RegionMergeTransaction mt = prepareOnGoodRegions();
Mockito.doThrow(new MockedFailedMergedRegionOpen())
.when(mt)
.openMergedRegion((Server) Mockito.anyObject(),
(RegionServerServices) Mockito.anyObject(),
(HRegion) Mockito.anyObject());
// Run the execute. Look at what it returns.
boolean expectedException = false;
Server mockServer = Mockito.mock(Server.class);
when(mockServer.getConfiguration())
.thenReturn(TEST_UTIL.getConfiguration());
try {
mt.execute(mockServer, null);
} catch (MockedFailedMergedRegionOpen e) {
expectedException = true;
}
assertTrue(expectedException);
// Run rollback returns false that we should restart.
assertFalse(mt.rollback(null, null));
// Make sure that merged region is still in the filesystem, that
// they have not been removed; this is supposed to be the case if we go
// past point of no return.
Path tableDir = this.region_a.getRegionFileSystem().getRegionDir()
.getParent();
Path mergedRegionDir = new Path(tableDir, mt.getMergedRegionInfo()
.getEncodedName());
assertTrue(TEST_UTIL.getTestFileSystem().exists(mergedRegionDir));
}
/**
* Exception used in this class only.
*/
@SuppressWarnings("serial")
private class MockedFailedMergedRegionCreation extends IOException {
}
@SuppressWarnings("serial")
private class MockedFailedMergedRegionOpen extends IOException {
}
private HRegion createRegion(final Path testdir, final HLog wal,
final byte[] startrow, final byte[] endrow)
throws IOException {
// Make a region with start and end keys.
HTableDescriptor htd = new HTableDescriptor("table");
HColumnDescriptor hcd = new HColumnDescriptor(CF);
htd.addFamily(hcd);
HRegionInfo hri = new HRegionInfo(htd.getName(), startrow, endrow);
HRegion a = HRegion.createHRegion(hri, testdir,
TEST_UTIL.getConfiguration(), htd);
HRegion.closeHRegion(a);
return HRegion.openHRegion(testdir, hri, htd, wal,
TEST_UTIL.getConfiguration());
}
private int countRows(final HRegion r) throws IOException {
int rowcount = 0;
InternalScanner scanner = r.getScanner(new Scan());
try {
List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean hasNext = true;
while (hasNext) {
hasNext = scanner.next(kvs);
if (!kvs.isEmpty())
rowcount++;
}
} finally {
scanner.close();
}
return rowcount;
}
/**
* Load region with rows from 'aaa' to 'zzz', skip the rows which are out of
* range of the region
* @param r Region
* @param f Family
* @param flush flush the cache if true
* @return Count of rows loaded.
* @throws IOException
*/
private int loadRegion(final HRegion r, final byte[] f, final boolean flush)
throws IOException {
byte[] k = new byte[3];
int rowCount = 0;
for (byte b1 = 'a'; b1 <= 'z'; b1++) {
for (byte b2 = 'a'; b2 <= 'z'; b2++) {
for (byte b3 = 'a'; b3 <= 'z'; b3++) {
k[0] = b1;
k[1] = b2;
k[2] = b3;
if (!HRegion.rowIsInRange(r.getRegionInfo(), k)) {
continue;
}
Put put = new Put(k);
put.add(f, null, k);
if (r.getLog() == null)
put.setWriteToWAL(false);
r.put(put);
rowCount++;
}
}
if (flush) {
r.flushcache();
}
}
return rowCount;
}
}

View File

@ -0,0 +1,297 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.base.Joiner;
/**
* Like {@link TestRegionMergeTransaction} in that we're testing
* {@link RegionMergeTransaction} only the below tests are against a running
* cluster where {@link TestRegionMergeTransaction} is tests against bare
* {@link HRegion}.
*/
@Category(LargeTests.class)
public class TestRegionMergeTransactionOnCluster {
private static final Log LOG = LogFactory
.getLog(TestRegionMergeTransactionOnCluster.class);
private static final int NB_SERVERS = 3;
private static final byte[] FAMILYNAME = Bytes.toBytes("fam");
private static final byte[] QUALIFIER = Bytes.toBytes("q");
private static byte[] ROW = Bytes.toBytes("testRow");
private static final int INITIAL_REGION_NUM = 10;
private static final int ROWSIZE = 200;
private static byte[][] ROWS = makeN(ROW, ROWSIZE);
private static int waitTime = 60 * 1000;
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static HMaster master;
private static HBaseAdmin admin;
@BeforeClass
public static void beforeAllTests() throws Exception {
// Start a cluster
TEST_UTIL.startMiniCluster(NB_SERVERS);
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
master = cluster.getMaster();
master.balanceSwitch(false);
admin = TEST_UTIL.getHBaseAdmin();
}
@AfterClass
public static void afterAllTests() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testWholesomeMerge() throws Exception {
LOG.info("Starting testWholesomeMerge");
final byte[] tableName = Bytes.toBytes("testWholesomeMerge");
// Create table and load data.
HTable table = createTableAndLoadData(master, tableName);
// Merge 1st and 2nd region
mergeRegionsAndVerifyRegionNum(master, tableName, 0, 1,
INITIAL_REGION_NUM - 1);
// Merge 2nd and 3th region
mergeRegionsAndVerifyRegionNum(master, tableName, 1, 2,
INITIAL_REGION_NUM - 2);
verifyRowCount(table, ROWSIZE);
table.close();
}
@Test
public void testCleanMergeReference() throws Exception {
LOG.info("Starting testCleanMergeReference");
admin.enableCatalogJanitor(false);
try {
final byte[] tableName = Bytes.toBytes("testCleanMergeReference");
// Create table and load data.
HTable table = createTableAndLoadData(master, tableName);
// Merge 1st and 2nd region
mergeRegionsAndVerifyRegionNum(master, tableName, 0, 1,
INITIAL_REGION_NUM - 1);
verifyRowCount(table, ROWSIZE);
table.close();
List<Pair<HRegionInfo, ServerName>> tableRegions = MetaReader
.getTableRegionsAndLocations(master.getCatalogTracker(),
Bytes.toString(tableName));
HRegionInfo mergedRegionInfo = tableRegions.get(0).getFirst();
HTableDescriptor tableDescritor = master.getTableDescriptors().get(
Bytes.toString(tableName));
Result mergedRegionResult = MetaReader.getRegionResult(
master.getCatalogTracker(), mergedRegionInfo.getRegionName());
// contains merge reference in META
assertTrue(mergedRegionResult.getValue(HConstants.CATALOG_FAMILY,
HConstants.MERGEA_QUALIFIER) != null);
assertTrue(mergedRegionResult.getValue(HConstants.CATALOG_FAMILY,
HConstants.MERGEB_QUALIFIER) != null);
// merging regions' directory are in the file system all the same
HRegionInfo regionA = HRegionInfo.getHRegionInfo(mergedRegionResult,
HConstants.MERGEA_QUALIFIER);
HRegionInfo regionB = HRegionInfo.getHRegionInfo(mergedRegionResult,
HConstants.MERGEB_QUALIFIER);
FileSystem fs = master.getMasterFileSystem().getFileSystem();
Path rootDir = master.getMasterFileSystem().getRootDir();
Path tabledir = new Path(rootDir, mergedRegionInfo.getTableNameAsString());
Path regionAdir = new Path(tabledir, regionA.getEncodedName());
Path regionBdir = new Path(tabledir, regionB.getEncodedName());
assertTrue(fs.exists(regionAdir));
assertTrue(fs.exists(regionBdir));
admin.compact(mergedRegionInfo.getRegionName());
// wait until merged region doesn't have reference file
long timeout = System.currentTimeMillis() + waitTime;
HRegionFileSystem hrfs = new HRegionFileSystem(
TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
while (System.currentTimeMillis() < timeout) {
if (!hrfs.hasReferences(tableDescritor)) {
break;
}
Thread.sleep(50);
}
assertFalse(hrfs.hasReferences(tableDescritor));
// run CatalogJanitor to clean merge references in META and archive the
// files of merging regions
int cleaned = admin.runCatalogScan();
assertTrue(cleaned > 0);
assertFalse(fs.exists(regionAdir));
assertFalse(fs.exists(regionBdir));
mergedRegionResult = MetaReader.getRegionResult(
master.getCatalogTracker(), mergedRegionInfo.getRegionName());
assertFalse(mergedRegionResult.getValue(HConstants.CATALOG_FAMILY,
HConstants.MERGEA_QUALIFIER) != null);
assertFalse(mergedRegionResult.getValue(HConstants.CATALOG_FAMILY,
HConstants.MERGEB_QUALIFIER) != null);
} finally {
admin.enableCatalogJanitor(true);
}
}
private void mergeRegionsAndVerifyRegionNum(HMaster master, byte[] tablename,
int regionAnum, int regionBnum, int expectedRegionNum) throws Exception {
requestMergeRegion(master, tablename, regionAnum, regionBnum);
waitAndVerifyRegionNum(master, tablename, expectedRegionNum);
}
private void requestMergeRegion(HMaster master, byte[] tablename,
int regionAnum, int regionBnum) throws Exception {
List<Pair<HRegionInfo, ServerName>> tableRegions = MetaReader
.getTableRegionsAndLocations(master.getCatalogTracker(),
Bytes.toString(tablename));
TEST_UTIL.getHBaseAdmin().mergeRegions(
tableRegions.get(regionAnum).getFirst().getEncodedNameAsBytes(),
tableRegions.get(regionBnum).getFirst().getEncodedNameAsBytes(), false);
}
private void waitAndVerifyRegionNum(HMaster master, byte[] tablename,
int expectedRegionNum) throws Exception {
List<Pair<HRegionInfo, ServerName>> tableRegionsInMeta;
List<HRegionInfo> tableRegionsInMaster;
long timeout = System.currentTimeMillis() + waitTime;
while (System.currentTimeMillis() < timeout) {
tableRegionsInMeta = MetaReader.getTableRegionsAndLocations(
master.getCatalogTracker(), Bytes.toString(tablename));
tableRegionsInMaster = master.getAssignmentManager().getRegionStates()
.getRegionsOfTable(tablename);
if (tableRegionsInMeta.size() == expectedRegionNum
&& tableRegionsInMaster.size() == expectedRegionNum) {
break;
}
Thread.sleep(250);
}
tableRegionsInMeta = MetaReader.getTableRegionsAndLocations(
master.getCatalogTracker(), Bytes.toString(tablename));
LOG.info("Regions after merge:" + Joiner.on(',').join(tableRegionsInMeta));
assertEquals(expectedRegionNum, tableRegionsInMeta.size());
}
private HTable createTableAndLoadData(HMaster master, byte[] tablename)
throws Exception {
return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM);
}
private HTable createTableAndLoadData(HMaster master, byte[] tablename,
int numRegions) throws Exception {
assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > numRegions);
byte[][] splitRows = new byte[numRegions - 1][];
for (int i = 0; i < splitRows.length; i++) {
splitRows[i] = ROWS[(i + 1) * ROWSIZE / numRegions];
}
HTable table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows);
loadData(table);
verifyRowCount(table, ROWSIZE);
// sleep here is an ugly hack to allow region transitions to finish
long timeout = System.currentTimeMillis() + waitTime;
List<Pair<HRegionInfo, ServerName>> tableRegions;
while (System.currentTimeMillis() < timeout) {
tableRegions = MetaReader.getTableRegionsAndLocations(
master.getCatalogTracker(), Bytes.toString(tablename));
if (tableRegions.size() == numRegions)
break;
Thread.sleep(250);
}
tableRegions = MetaReader.getTableRegionsAndLocations(
master.getCatalogTracker(), Bytes.toString(tablename));
LOG.info("Regions after load: " + Joiner.on(',').join(tableRegions));
assertEquals(numRegions, tableRegions.size());
return table;
}
private static byte[][] makeN(byte[] base, int n) {
byte[][] ret = new byte[n][];
for (int i = 0; i < n; i++) {
ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
}
return ret;
}
private void loadData(HTable table) throws IOException {
for (int i = 0; i < ROWSIZE; i++) {
Put put = new Put(ROWS[i]);
put.add(FAMILYNAME, QUALIFIER, Bytes.toBytes(i));
table.put(put);
}
}
private void verifyRowCount(HTable table, int expectedRegionNum)
throws IOException {
ResultScanner scanner = table.getScanner(new Scan());
int rowCount = 0;
while (scanner.next() != null) {
rowCount++;
}
assertEquals(expectedRegionNum, rowCount);
scanner.close();
}
}