HBASE-6230 Restore Snapshots for HBase 0.96 (Matteo Bertozzi)

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445786 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2013-02-13 18:10:58 +00:00
parent eb4346f144
commit 0bcb524581
20 changed files with 2019 additions and 239 deletions

View File

@ -17357,10 +17357,6 @@ public final class MasterAdminProtos {
public interface RestoreSnapshotResponseOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// required int64 expectedTimeout = 1;
boolean hasExpectedTimeout();
long getExpectedTimeout();
}
public static final class RestoreSnapshotResponse extends
com.google.protobuf.GeneratedMessage
@ -17390,29 +17386,13 @@ public final class MasterAdminProtos {
return org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.internal_static_RestoreSnapshotResponse_fieldAccessorTable;
}
private int bitField0_;
// required int64 expectedTimeout = 1;
public static final int EXPECTEDTIMEOUT_FIELD_NUMBER = 1;
private long expectedTimeout_;
public boolean hasExpectedTimeout() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public long getExpectedTimeout() {
return expectedTimeout_;
}
private void initFields() {
expectedTimeout_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
if (!hasExpectedTimeout()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
@ -17420,9 +17400,6 @@ public final class MasterAdminProtos {
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeInt64(1, expectedTimeout_);
}
getUnknownFields().writeTo(output);
}
@ -17432,10 +17409,6 @@ public final class MasterAdminProtos {
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeInt64Size(1, expectedTimeout_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -17459,11 +17432,6 @@ public final class MasterAdminProtos {
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse other = (org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse) obj;
boolean result = true;
result = result && (hasExpectedTimeout() == other.hasExpectedTimeout());
if (hasExpectedTimeout()) {
result = result && (getExpectedTimeout()
== other.getExpectedTimeout());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -17473,10 +17441,6 @@ public final class MasterAdminProtos {
public int hashCode() {
int hash = 41;
hash = (19 * hash) + getDescriptorForType().hashCode();
if (hasExpectedTimeout()) {
hash = (37 * hash) + EXPECTEDTIMEOUT_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getExpectedTimeout());
}
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
@ -17593,8 +17557,6 @@ public final class MasterAdminProtos {
public Builder clear() {
super.clear();
expectedTimeout_ = 0L;
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
@ -17631,13 +17593,6 @@ public final class MasterAdminProtos {
public org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse buildPartial() {
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse result = new org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
result.expectedTimeout_ = expectedTimeout_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
@ -17653,18 +17608,11 @@ public final class MasterAdminProtos {
public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse other) {
if (other == org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse.getDefaultInstance()) return this;
if (other.hasExpectedTimeout()) {
setExpectedTimeout(other.getExpectedTimeout());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
if (!hasExpectedTimeout()) {
return false;
}
return true;
}
@ -17691,37 +17639,10 @@ public final class MasterAdminProtos {
}
break;
}
case 8: {
bitField0_ |= 0x00000001;
expectedTimeout_ = input.readInt64();
break;
}
}
}
}
private int bitField0_;
// required int64 expectedTimeout = 1;
private long expectedTimeout_ ;
public boolean hasExpectedTimeout() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public long getExpectedTimeout() {
return expectedTimeout_;
}
public Builder setExpectedTimeout(long value) {
bitField0_ |= 0x00000001;
expectedTimeout_ = value;
onChanged();
return this;
}
public Builder clearExpectedTimeout() {
bitField0_ = (bitField0_ & ~0x00000001);
expectedTimeout_ = 0L;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:RestoreSnapshotResponse)
}
@ -19210,7 +19131,7 @@ public final class MasterAdminProtos {
public interface IsRestoreSnapshotDoneResponseOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// optional bool done = 1 [default = false];
// optional bool done = 1 [default = true];
boolean hasDone();
boolean getDone();
}
@ -19243,7 +19164,7 @@ public final class MasterAdminProtos {
}
private int bitField0_;
// optional bool done = 1 [default = false];
// optional bool done = 1 [default = true];
public static final int DONE_FIELD_NUMBER = 1;
private boolean done_;
public boolean hasDone() {
@ -19254,7 +19175,7 @@ public final class MasterAdminProtos {
}
private void initFields() {
done_ = false;
done_ = true;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -19441,7 +19362,7 @@ public final class MasterAdminProtos {
public Builder clear() {
super.clear();
done_ = false;
done_ = true;
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
@ -19546,8 +19467,8 @@ public final class MasterAdminProtos {
private int bitField0_;
// optional bool done = 1 [default = false];
private boolean done_ ;
// optional bool done = 1 [default = true];
private boolean done_ = true;
public boolean hasDone() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
@ -19562,7 +19483,7 @@ public final class MasterAdminProtos {
}
public Builder clearDone() {
bitField0_ = (bitField0_ & ~0x00000001);
done_ = false;
done_ = true;
onChanged();
return this;
}
@ -21732,60 +21653,59 @@ public final class MasterAdminProtos {
"SnapshotRequest\022&\n\010snapshot\030\001 \002(\0132\024.Snap" +
"shotDescription\"\030\n\026DeleteSnapshotRespons" +
"e\"@\n\026RestoreSnapshotRequest\022&\n\010snapshot\030" +
"\001 \002(\0132\024.SnapshotDescription\"2\n\027RestoreSn",
"apshotResponse\022\027\n\017expectedTimeout\030\001 \002(\003\"" +
"?\n\025IsSnapshotDoneRequest\022&\n\010snapshot\030\001 \001" +
"(\0132\024.SnapshotDescription\"U\n\026IsSnapshotDo" +
"neResponse\022\023\n\004done\030\001 \001(\010:\005false\022&\n\010snaps" +
"hot\030\002 \001(\0132\024.SnapshotDescription\"F\n\034IsRes" +
"toreSnapshotDoneRequest\022&\n\010snapshot\030\001 \001(" +
"\0132\024.SnapshotDescription\"4\n\035IsRestoreSnap" +
"shotDoneResponse\022\023\n\004done\030\001 \001(\010:\005false2\234\r" +
"\n\022MasterAdminService\0222\n\taddColumn\022\021.AddC" +
"olumnRequest\032\022.AddColumnResponse\022;\n\014dele",
"teColumn\022\024.DeleteColumnRequest\032\025.DeleteC" +
"olumnResponse\022;\n\014modifyColumn\022\024.ModifyCo" +
"lumnRequest\032\025.ModifyColumnResponse\0225\n\nmo" +
"veRegion\022\022.MoveRegionRequest\032\023.MoveRegio" +
"nResponse\022;\n\014assignRegion\022\024.AssignRegion" +
"Request\032\025.AssignRegionResponse\022A\n\016unassi" +
"gnRegion\022\026.UnassignRegionRequest\032\027.Unass" +
"ignRegionResponse\022>\n\rofflineRegion\022\025.Off" +
"lineRegionRequest\032\026.OfflineRegionRespons" +
"e\0228\n\013deleteTable\022\023.DeleteTableRequest\032\024.",
"DeleteTableResponse\0228\n\013enableTable\022\023.Ena" +
"bleTableRequest\032\024.EnableTableResponse\022;\n" +
"\014disableTable\022\024.DisableTableRequest\032\025.Di" +
"sableTableResponse\0228\n\013modifyTable\022\023.Modi" +
"fyTableRequest\032\024.ModifyTableResponse\0228\n\013" +
"createTable\022\023.CreateTableRequest\032\024.Creat" +
"eTableResponse\022/\n\010shutdown\022\020.ShutdownReq" +
"uest\032\021.ShutdownResponse\0225\n\nstopMaster\022\022." +
"StopMasterRequest\032\023.StopMasterResponse\022," +
"\n\007balance\022\017.BalanceRequest\032\020.BalanceResp",
"onse\022M\n\022setBalancerRunning\022\032.SetBalancer" +
"RunningRequest\032\033.SetBalancerRunningRespo" +
"nse\022;\n\016runCatalogScan\022\023.CatalogScanReque" +
"st\032\024.CatalogScanResponse\022S\n\024enableCatalo" +
"gJanitor\022\034.EnableCatalogJanitorRequest\032\035" +
".EnableCatalogJanitorResponse\022\\\n\027isCatal" +
"ogJanitorEnabled\022\037.IsCatalogJanitorEnabl" +
"edRequest\032 .IsCatalogJanitorEnabledRespo" +
"nse\022L\n\021execMasterService\022\032.CoprocessorSe" +
"rviceRequest\032\033.CoprocessorServiceRespons",
"e\0227\n\010snapshot\022\024.TakeSnapshotRequest\032\025.Ta" +
"keSnapshotResponse\022<\n\rlistSnapshots\022\024.Li" +
"stSnapshotRequest\032\025.ListSnapshotResponse" +
"\022A\n\016deleteSnapshot\022\026.DeleteSnapshotReque" +
"st\032\027.DeleteSnapshotResponse\022A\n\016isSnapsho" +
"tDone\022\026.IsSnapshotDoneRequest\032\027.IsSnapsh" +
"otDoneResponse\022D\n\017restoreSnapshot\022\027.Rest" +
"oreSnapshotRequest\032\030.RestoreSnapshotResp" +
"onse\022V\n\025isRestoreSnapshotDone\022\035.IsRestor" +
"eSnapshotDoneRequest\032\036.IsRestoreSnapshot",
"DoneResponseBG\n*org.apache.hadoop.hbase." +
"protobuf.generatedB\021MasterAdminProtosH\001\210" +
"\001\001\240\001\001"
"\001 \002(\0132\024.SnapshotDescription\"\031\n\027RestoreSn",
"apshotResponse\"?\n\025IsSnapshotDoneRequest\022" +
"&\n\010snapshot\030\001 \001(\0132\024.SnapshotDescription\"" +
"U\n\026IsSnapshotDoneResponse\022\023\n\004done\030\001 \001(\010:" +
"\005false\022&\n\010snapshot\030\002 \001(\0132\024.SnapshotDescr" +
"iption\"F\n\034IsRestoreSnapshotDoneRequest\022&" +
"\n\010snapshot\030\001 \001(\0132\024.SnapshotDescription\"3" +
"\n\035IsRestoreSnapshotDoneResponse\022\022\n\004done\030" +
"\001 \001(\010:\004true2\234\r\n\022MasterAdminService\0222\n\tad" +
"dColumn\022\021.AddColumnRequest\032\022.AddColumnRe" +
"sponse\022;\n\014deleteColumn\022\024.DeleteColumnReq",
"uest\032\025.DeleteColumnResponse\022;\n\014modifyCol" +
"umn\022\024.ModifyColumnRequest\032\025.ModifyColumn" +
"Response\0225\n\nmoveRegion\022\022.MoveRegionReque" +
"st\032\023.MoveRegionResponse\022;\n\014assignRegion\022" +
"\024.AssignRegionRequest\032\025.AssignRegionResp" +
"onse\022A\n\016unassignRegion\022\026.UnassignRegionR" +
"equest\032\027.UnassignRegionResponse\022>\n\roffli" +
"neRegion\022\025.OfflineRegionRequest\032\026.Offlin" +
"eRegionResponse\0228\n\013deleteTable\022\023.DeleteT" +
"ableRequest\032\024.DeleteTableResponse\0228\n\013ena",
"bleTable\022\023.EnableTableRequest\032\024.EnableTa" +
"bleResponse\022;\n\014disableTable\022\024.DisableTab" +
"leRequest\032\025.DisableTableResponse\0228\n\013modi" +
"fyTable\022\023.ModifyTableRequest\032\024.ModifyTab" +
"leResponse\0228\n\013createTable\022\023.CreateTableR" +
"equest\032\024.CreateTableResponse\022/\n\010shutdown" +
"\022\020.ShutdownRequest\032\021.ShutdownResponse\0225\n" +
"\nstopMaster\022\022.StopMasterRequest\032\023.StopMa" +
"sterResponse\022,\n\007balance\022\017.BalanceRequest" +
"\032\020.BalanceResponse\022M\n\022setBalancerRunning",
"\022\032.SetBalancerRunningRequest\032\033.SetBalanc" +
"erRunningResponse\022;\n\016runCatalogScan\022\023.Ca" +
"talogScanRequest\032\024.CatalogScanResponse\022S" +
"\n\024enableCatalogJanitor\022\034.EnableCatalogJa" +
"nitorRequest\032\035.EnableCatalogJanitorRespo" +
"nse\022\\\n\027isCatalogJanitorEnabled\022\037.IsCatal" +
"ogJanitorEnabledRequest\032 .IsCatalogJanit" +
"orEnabledResponse\022L\n\021execMasterService\022\032" +
".CoprocessorServiceRequest\032\033.Coprocessor" +
"ServiceResponse\0227\n\010snapshot\022\024.TakeSnapsh",
"otRequest\032\025.TakeSnapshotResponse\022<\n\rlist" +
"Snapshots\022\024.ListSnapshotRequest\032\025.ListSn" +
"apshotResponse\022A\n\016deleteSnapshot\022\026.Delet" +
"eSnapshotRequest\032\027.DeleteSnapshotRespons" +
"e\022A\n\016isSnapshotDone\022\026.IsSnapshotDoneRequ" +
"est\032\027.IsSnapshotDoneResponse\022D\n\017restoreS" +
"napshot\022\027.RestoreSnapshotRequest\032\030.Resto" +
"reSnapshotResponse\022V\n\025isRestoreSnapshotD" +
"one\022\035.IsRestoreSnapshotDoneRequest\032\036.IsR" +
"estoreSnapshotDoneResponseBG\n*org.apache",
".hadoop.hbase.protobuf.generatedB\021Master" +
"AdminProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -22157,7 +22077,7 @@ public final class MasterAdminProtos {
internal_static_RestoreSnapshotResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RestoreSnapshotResponse_descriptor,
new java.lang.String[] { "ExpectedTimeout", },
new java.lang.String[] { },
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse.class,
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse.Builder.class);
internal_static_IsSnapshotDoneRequest_descriptor =

View File

@ -204,7 +204,6 @@ message RestoreSnapshotRequest {
}
message RestoreSnapshotResponse {
required int64 expectedTimeout = 1;
}
/* if you don't send the snapshot, then you will get it back
@ -224,7 +223,7 @@ message IsRestoreSnapshotDoneRequest {
}
message IsRestoreSnapshotDoneResponse {
optional bool done = 1 [default = false];
optional bool done = 1 [default = true];
}
service MasterAdminService {

View File

@ -232,6 +232,28 @@ public class HFileArchiver {
}
}
/**
* Archive the store file
* @param fs the filesystem where the store files live
* @param regionInfo region hosting the store files
* @param conf {@link Configuration} to examine to determine the archive directory
* @param tableDir {@link Path} to where the table is being stored (for building the archive path)
* @param family the family hosting the store files
* @param storeFile file to be archived
* @throws IOException if the files could not be correctly disposed.
*/
public static void archiveStoreFile(FileSystem fs, HRegionInfo regionInfo,
Configuration conf, Path tableDir, byte[] family, Path storeFile) throws IOException {
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
// make sure we don't archive if we can't and that the archive dir exists
if (!fs.mkdirs(storeArchiveDir)) {
throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"
+ Bytes.toString(family) + ", deleting compacted files instead.");
}
fs.rename(storeFile, new Path(storeArchiveDir, storeFile.getName()));
}
/**
* Archive the given files and resolve any conflicts with existing files via appending the time
* archiving started (so all conflicts in the same group have the same timestamp appended).

View File

@ -136,6 +136,7 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
C_M_MODIFY_FAMILY (46, null), // Client asking Master to modify family of table
C_M_CREATE_TABLE (47, ExecutorType.MASTER_TABLE_OPERATIONS), // Client asking Master to create a table
C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_TABLE_OPERATIONS), // Client asking Master to snapshot an offline table
C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_TABLE_OPERATIONS), // Client asking Master to restore a snapshot
// Updates from master to ZK. This is done by the master and there is
// nothing to process by either Master or RS

View File

@ -271,7 +271,7 @@ public class HFileLink extends FileLink {
/**
* Create a new HFileLink
*
* <p>It also add a back-reference to the hfile back-reference directory
* <p>It also adds a back-reference to the hfile back-reference directory
* to simplify the reference-count and the cleaning process.
*
* @param conf {@link Configuration} to read for the archive directory name
@ -285,11 +285,34 @@ public class HFileLink extends FileLink {
public static boolean create(final Configuration conf, final FileSystem fs,
final Path dstFamilyPath, final HRegionInfo hfileRegionInfo,
final String hfileName) throws IOException {
String linkedTable = hfileRegionInfo.getTableNameAsString();
String linkedRegion = hfileRegionInfo.getEncodedName();
return create(conf, fs, dstFamilyPath, linkedTable, linkedRegion, hfileName);
}
/**
* Create a new HFileLink
*
* <p>It also adds a back-reference to the hfile back-reference directory
* to simplify the reference-count and the cleaning process.
*
* @param conf {@link Configuration} to read for the archive directory name
* @param fs {@link FileSystem} on which to write the HFileLink
* @param dstFamilyPath - Destination path (table/region/cf/)
* @param linkedTable - Linked Table Name
* @param linkedRegion - Linked Region Name
* @param hfileName - Linked HFile name
* @return true if the file is created, otherwise the file exists.
* @throws IOException on file or parent directory creation failure
*/
public static boolean create(final Configuration conf, final FileSystem fs,
final Path dstFamilyPath, final String linkedTable, final String linkedRegion,
final String hfileName) throws IOException {
String familyName = dstFamilyPath.getName();
String regionName = dstFamilyPath.getParent().getName();
String tableName = dstFamilyPath.getParent().getParent().getName();
String name = createHFileLinkName(hfileRegionInfo, hfileName);
String name = createHFileLinkName(linkedTable, linkedRegion, hfileName);
String refName = createBackReferenceName(tableName, regionName);
// Make sure the destination directory exists
@ -297,7 +320,7 @@ public class HFileLink extends FileLink {
// Make sure the FileLink reference directory exists
Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf,
hfileRegionInfo.getTableNameAsString(), hfileRegionInfo.getEncodedName(), familyName);
linkedTable, linkedRegion, familyName);
Path backRefssDir = getBackReferencesDir(archiveStoreDir, hfileName);
fs.mkdirs(backRefssDir);
@ -315,6 +338,28 @@ public class HFileLink extends FileLink {
}
}
/**
* Create a new HFileLink starting from a hfileLink name
*
* <p>It also adds a back-reference to the hfile back-reference directory
* to simplify the reference-count and the cleaning process.
*
* @param conf {@link Configuration} to read for the archive directory name
* @param fs {@link FileSystem} on which to write the HFileLink
* @param dstFamilyPath - Destination path (table/region/cf/)
* @param hfileLinkName - HFileLink name (it contains hfile-region-table)
* @return true if the file is created, otherwise the file exists.
* @throws IOException on file or parent directory creation failure
*/
public static boolean createFromHFileLink(final Configuration conf, final FileSystem fs,
final Path dstFamilyPath, final String hfileLinkName) throws IOException {
Matcher m = LINK_NAME_PARSER.matcher(hfileLinkName);
if (!m.matches()) {
throw new IllegalArgumentException(hfileLinkName + " is not a valid HFileLink name!");
}
return create(conf, fs, dstFamilyPath, m.group(3), m.group(2), m.group(1));
}
/**
* Create the back reference name
*/

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.FSUtils;
/**
* HLogLink describes a link to a WAL.
*
* An hlog can be in /hbase/.logs/<server>/<hlog>
* or it can be in /hbase/.oldlogs/<hlog>
*
* The link checks first in the original path,
* if it is not present it fallbacks to the archived path.
*/
@InterfaceAudience.Private
public class HLogLink extends FileLink {
/**
* @param conf {@link Configuration} from which to extract specific archive locations
* @param serverName Region Server owner of the log
* @param logName WAL file name
* @throws IOException on unexpected error.
*/
public HLogLink(final Configuration conf,
final String serverName, final String logName) throws IOException {
this(FSUtils.getRootDir(conf), serverName, logName);
}
/**
* @param rootdir Path to the root directory where hbase files are stored
* @param serverName Region Server owner of the log
* @param logName WAL file name
*/
public HLogLink(final Path rootDir, final String serverName, final String logName) {
final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
final Path logDir = new Path(new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), serverName);
setLocations(new Path(logDir, logName), new Path(oldLogDir, logName));
}
/**
* @param originPath Path to the wal in the log directory
* @param archiveDir Path to the wal in the archived log directory
*/
public HLogLink(final Path originPath, final Path archivePath) {
setLocations(originPath, archivePath);
}
}

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
@ -190,11 +191,13 @@ import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.exception.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotCreationException;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotDoesNotExistException;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotExistsException;
import org.apache.hadoop.hbase.snapshot.exception.TablePartiallyOpenException;
import org.apache.hadoop.hbase.snapshot.exception.UnknownSnapshotException;
import org.apache.hadoop.hbase.snapshot.restore.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
@ -2657,18 +2660,98 @@ Server {
}
}
/**
* Execute Restore/Clone snapshot operation.
*
* <p>If the specified table exists a "Restore" is executed, replacing the table
* schema and directory data with the content of the snapshot.
* The table must be disabled, or a UnsupportedOperationException will be thrown.
*
* <p>If the table doesn't exist a "Clone" is executed, a new table is created
* using the schema at the time of the snapshot, and the content of the snapshot.
*
* <p>The restore/clone operation does not require copying HFiles. Since HFiles
* are immutable the table can point to and use the same files as the original one.
*/
@Override
public RestoreSnapshotResponse restoreSnapshot(RpcController controller,
RestoreSnapshotRequest request) throws ServiceException {
throw new ServiceException(new UnsupportedOperationException(
"Snapshots restore is not implemented yet."));
SnapshotDescription reqSnapshot = request.getSnapshot();
FileSystem fs = this.getMasterFileSystem().getFileSystem();
Path rootDir = this.getMasterFileSystem().getRootDir();
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(request.getSnapshot(), rootDir);
try {
// check if the snapshot exists
if (!fs.exists(snapshotDir)) {
LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
throw new SnapshotDoesNotExistException(reqSnapshot);
}
// read snapshot information
SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
HTableDescriptor snapshotTableDesc = FSTableDescriptors.getTableDescriptor(fs, snapshotDir);
String tableName = reqSnapshot.getTable();
// Execute the restore/clone operation
if (MetaReader.tableExists(catalogTracker, tableName)) {
if (this.assignmentManager.getZKTable().isEnabledTable(snapshot.getTable())) {
throw new ServiceException(new UnsupportedOperationException("Table '" +
snapshot.getTable() + "' must be disabled in order to perform a restore operation."));
}
snapshotManager.restoreSnapshot(snapshot, snapshotTableDesc);
LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
} else {
HTableDescriptor htd = RestoreSnapshotHelper.cloneTableSchema(snapshotTableDesc,
Bytes.toBytes(tableName));
snapshotManager.cloneSnapshot(snapshot, htd);
LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
}
return RestoreSnapshotResponse.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
/**
* Returns the status of the requested snapshot restore/clone operation.
* This method is not exposed to the user, it is just used internally by HBaseAdmin
* to verify if the restore is completed.
*
* No exceptions are thrown if the restore is not running, the result will be "done".
*
* @return done <tt>true</tt> if the restore/clone operation is completed.
* @throws RestoreSnapshotExcepton if the operation failed.
*/
@Override
public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(RpcController controller,
IsRestoreSnapshotDoneRequest request) throws ServiceException {
throw new ServiceException(new UnsupportedOperationException(
"Snapshots restore is not implemented yet."));
try {
SnapshotDescription snapshot = request.getSnapshot();
SnapshotSentinel sentinel = this.snapshotManager.getRestoreSnapshotSentinel(snapshot.getTable());
IsRestoreSnapshotDoneResponse.Builder builder = IsRestoreSnapshotDoneResponse.newBuilder();
LOG.debug("Verify snapshot=" + snapshot.getName() + " against=" + sentinel.getSnapshot().getName() +
" table=" + snapshot.getTable());
if (sentinel != null && sentinel.getSnapshot().getName().equals(snapshot.getName())) {
HBaseSnapshotException e = sentinel.getExceptionIfFailed();
if (e != null) throw e;
// check to see if we are done
if (sentinel.isFinished()) {
LOG.debug("Restore snapshot=" + snapshot + " has completed. Notifying the client.");
} else {
builder.setDone(false);
if (LOG.isDebugEnabled()) {
LOG.debug("Sentinel is not yet finished with restoring snapshot=" + snapshot);
}
}
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -19,18 +19,7 @@
package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -49,9 +38,8 @@ import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.zookeeper.KeeperException;
/**
@ -60,11 +48,11 @@ import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private
public class CreateTableHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(CreateTableHandler.class);
private MasterFileSystem fileSystemManager;
private final HTableDescriptor hTableDescriptor;
private Configuration conf;
private final AssignmentManager assignmentManager;
private final CatalogTracker catalogTracker;
protected final MasterFileSystem fileSystemManager;
protected final HTableDescriptor hTableDescriptor;
protected final Configuration conf;
protected final AssignmentManager assignmentManager;
protected final CatalogTracker catalogTracker;
private final HRegionInfo [] newRegions;
public CreateTableHandler(Server server, MasterFileSystem fileSystemManager,
@ -145,61 +133,21 @@ public class CreateTableHandler extends EventHandler {
private void handleCreateTable(String tableName) throws IOException,
KeeperException {
int regionNumber = newRegions.length;
ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(
"RegionOpenAndInitThread-" + tableName, regionNumber);
CompletionService<HRegion> completionService = new ExecutorCompletionService<HRegion>(
regionOpenAndInitThreadPool);
// 1. Create table descriptor on disk
// TODO: Currently we make the table descriptor and as side-effect the
// tableDir is created. Should we change below method to be createTable
// where we create table in tmp dir with its table descriptor file and then
// do rename to move it into place?
FSTableDescriptors.createTableDescriptor(this.hTableDescriptor, this.conf);
List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
for (final HRegionInfo newRegion : newRegions) {
completionService.submit(new Callable<HRegion>() {
public HRegion call() throws IOException {
// 1. Create HRegion
HRegion region = HRegion.createHRegion(newRegion,
fileSystemManager.getRootDir(), conf, hTableDescriptor, null,
false, true);
// 2. Close the new region to flush to disk. Close log file too.
region.close();
return region;
}
});
}
try {
// 3. wait for all regions to finish creation
for (int i = 0; i < regionNumber; i++) {
Future<HRegion> future = completionService.take();
HRegion region = future.get();
regionInfos.add(region.getRegionInfo());
}
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
regionOpenAndInitThreadPool.shutdownNow();
}
if (regionInfos.size() > 0) {
MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos);
// 2. Create regions
List<HRegionInfo> regions = handleCreateRegions(tableName);
if (regions != null && regions.size() > 0) {
// 3. Trigger immediate assignment of the regions in round-robin fashion
ModifyRegionUtils.assignRegions(assignmentManager, regions);
}
// 4. Trigger immediate assignment of the regions in round-robin fashion
try {
List<HRegionInfo> regions = Arrays.asList(newRegions);
assignmentManager.getRegionStates().createRegionStates(regions);
assignmentManager.assign(regions);
} catch (InterruptedException ie) {
LOG.error("Caught " + ie + " during round-robin assignment");
throw new IOException(ie);
}
// 5. Set table enabled flag up in zk.
// 4. Set table enabled flag up in zk.
try {
assignmentManager.getZKTable().
setEnabledTable(this.hTableDescriptor.getNameAsString());
@ -209,20 +157,14 @@ public class CreateTableHandler extends EventHandler {
}
}
protected ThreadPoolExecutor getRegionOpenAndInitThreadPool(
final String threadNamePrefix, int regionNumber) {
int maxThreads = Math.min(regionNumber, conf.getInt(
"hbase.hregion.open.and.init.threads.max", 10));
ThreadPoolExecutor openAndInitializeThreadPool = Threads
.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
new ThreadFactory() {
private int count = 1;
public Thread newThread(Runnable r) {
Thread t = new Thread(r, threadNamePrefix + "-" + count++);
return t;
}
});
return openAndInitializeThreadPool;
protected List<HRegionInfo> handleCreateRegions(String tableName) throws IOException {
// 1. create regions
List<HRegionInfo> regions = ModifyRegionUtils.createRegions(conf, fileSystemManager.getRootDir(),
hTableDescriptor, newRegions, catalogTracker);
if (regions != null && regions.size() > 0) {
// 2. add regions to .META.
MetaEditor.addRegionsToMeta(catalogTracker, regions);
}
return regions;
}
}

View File

@ -195,7 +195,7 @@ public abstract class TableEventHandler extends EventHandler {
* @throws FileNotFoundException
* @throws IOException
*/
HTableDescriptor getTableDescriptor()
protected HTableDescriptor getTableDescriptor()
throws FileNotFoundException, IOException {
final String name = Bytes.toString(tableName);
HTableDescriptor htd =

View File

@ -0,0 +1,142 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.snapshot;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.SnapshotSentinel;
import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.restore.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.exception.RestoreSnapshotException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
/**
* Handler to Clone a snapshot.
*
* <p>Uses {@link RestoreSnapshotHelper} to create a new table with the same
* content of the specified snapshot.
*/
@InterfaceAudience.Private
public class CloneSnapshotHandler extends CreateTableHandler implements SnapshotSentinel {
private static final Log LOG = LogFactory.getLog(CloneSnapshotHandler.class);
private final SnapshotDescription snapshot;
private final SnapshotExceptionSnare monitor;
private volatile boolean stopped = false;
public CloneSnapshotHandler(final MasterServices masterServices,
final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
throws NotAllMetaRegionsOnlineException, TableExistsException, IOException {
super(masterServices, masterServices.getMasterFileSystem(), hTableDescriptor,
masterServices.getConfiguration(), null, masterServices.getCatalogTracker(),
masterServices.getAssignmentManager());
// Snapshot information
this.snapshot = snapshot;
// Monitor
this.monitor = new SnapshotExceptionSnare(snapshot);
}
@Override
protected List<HRegionInfo> handleCreateRegions(String tableName) throws IOException {
FileSystem fs = fileSystemManager.getFileSystem();
Path rootDir = fileSystemManager.getRootDir();
Path tableDir = HTableDescriptor.getTableDir(rootDir, Bytes.toBytes(tableName));
try {
// Execute the Clone
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
RestoreSnapshotHelper restoreHelper = new RestoreSnapshotHelper(conf, fs,
catalogTracker, snapshot, snapshotDir, hTableDescriptor, tableDir, monitor);
restoreHelper.restore();
// At this point the clone is complete. Next step is enabling the table.
LOG.info("Clone snapshot=" + snapshot.getName() + " on table=" + tableName + " completed!");
return MetaReader.getTableRegions(catalogTracker, Bytes.toBytes(tableName));
} catch (Exception e) {
String msg = "clone snapshot=" + snapshot + " failed";
LOG.error(msg, e);
monitor.snapshotFailure("Failed due to exception:" + e.getMessage(), snapshot, e);
throw new RestoreSnapshotException(msg, e);
} finally {
this.stopped = true;
}
}
@Override
public boolean isFinished() {
return this.stopped;
}
@Override
public SnapshotDescription getSnapshot() {
return snapshot;
}
@Override
public void stop(String why) {
if (this.stopped) return;
this.stopped = true;
LOG.info("Stopping clone snapshot=" + snapshot + " because: " + why);
this.monitor.snapshotFailure("Failing clone snapshot because server is stopping.", snapshot);
}
@Override
public boolean isStopped() {
return this.stopped;
}
@Override
public HBaseSnapshotException getExceptionIfFailed() {
try {
this.monitor.failOnError();
} catch (HBaseSnapshotException e) {
return e;
}
return null;
}
}

View File

@ -0,0 +1,156 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.snapshot;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.SnapshotSentinel;
import org.apache.hadoop.hbase.master.handler.TableEventHandler;
import org.apache.hadoop.hbase.master.snapshot.manage.SnapshotManager;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.restore.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.exception.RestoreSnapshotException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
/**
* Handler to Restore a snapshot.
*
* <p>Uses {@link RestoreSnapshotHelper} to replace the table content with the
* data available in the snapshot.
*/
@InterfaceAudience.Private
public class RestoreSnapshotHandler extends TableEventHandler implements SnapshotSentinel {
private static final Log LOG = LogFactory.getLog(RestoreSnapshotHandler.class);
private final HTableDescriptor hTableDescriptor;
private final SnapshotDescription snapshot;
private final SnapshotExceptionSnare monitor;
private volatile boolean stopped = false;
public RestoreSnapshotHandler(final MasterServices masterServices,
final SnapshotDescription snapshot, final HTableDescriptor htd)
throws IOException {
super(EventType.C_M_RESTORE_SNAPSHOT, htd.getName(), masterServices, masterServices);
// Snapshot information
this.snapshot = snapshot;
// Monitor
this.monitor = new SnapshotExceptionSnare(snapshot);
// Check table exists.
getTableDescriptor();
// This is the new schema we are going to write out as this modification.
this.hTableDescriptor = htd;
}
@Override
protected void handleTableOperation(List<HRegionInfo> hris) throws IOException {
MasterFileSystem fileSystemManager = masterServices.getMasterFileSystem();
CatalogTracker catalogTracker = masterServices.getCatalogTracker();
FileSystem fs = fileSystemManager.getFileSystem();
Path rootDir = fileSystemManager.getRootDir();
byte[] tableName = hTableDescriptor.getName();
Path tableDir = HTableDescriptor.getTableDir(rootDir, tableName);
try {
// Update descriptor
this.masterServices.getTableDescriptors().add(hTableDescriptor);
// Execute the Restore
LOG.debug("Starting restore snapshot=" + snapshot);
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
RestoreSnapshotHelper restoreHelper = new RestoreSnapshotHelper(
masterServices.getConfiguration(), fs, catalogTracker,
snapshot, snapshotDir, hTableDescriptor, tableDir, monitor);
restoreHelper.restore();
// At this point the restore is complete. Next step is enabling the table.
LOG.info("Restore snapshot=" + snapshot.getName() + " on table=" +
Bytes.toString(tableName) + " completed!");
hris.clear();
hris.addAll(MetaReader.getTableRegions(catalogTracker, tableName));
} catch (IOException e) {
String msg = "restore snapshot=" + snapshot + " failed";
LOG.error(msg, e);
monitor.snapshotFailure("Failed due to exception:" + e.getMessage(), snapshot, e);
throw new RestoreSnapshotException(msg, e);
} finally {
this.stopped = true;
}
}
@Override
public boolean isFinished() {
return this.stopped;
}
@Override
public SnapshotDescription getSnapshot() {
return snapshot;
}
@Override
public void stop(String why) {
if (this.stopped) return;
this.stopped = true;
LOG.info("Stopping restore snapshot=" + snapshot + " because: " + why);
this.monitor.snapshotFailure("Failing restore because server is stopping.", snapshot);
}
@Override
public boolean isStopped() {
return this.stopped;
}
@Override
public HBaseSnapshotException getExceptionIfFailed() {
try {
this.monitor.failOnError();
} catch (HBaseSnapshotException e) {
return e;
}
return null;
}
}

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.master.snapshot.manage;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -25,16 +28,20 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.SnapshotSentinel;
import org.apache.hadoop.hbase.master.snapshot.CloneSnapshotHandler;
import org.apache.hadoop.hbase.master.snapshot.DisabledTableSnapshotHandler;
import org.apache.hadoop.hbase.master.snapshot.RestoreSnapshotHandler;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotCreationException;
import org.apache.hadoop.hbase.snapshot.exception.RestoreSnapshotException;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -56,6 +63,9 @@ public class SnapshotManager implements Stoppable {
// TODO - enable having multiple snapshots with multiple monitors
// Restore Sentinels map, with table name as key
private Map<String, SnapshotSentinel> restoreHandlers = new HashMap<String, SnapshotSentinel>();
private final MasterServices master;
private SnapshotSentinel handler;
private ExecutorService pool;
@ -77,6 +87,16 @@ public class SnapshotManager implements Stoppable {
return handler != null && !handler.isFinished();
}
/*
* @return <tt>true</tt> if there is a snapshot in progress on the specified table.
*/
public boolean isTakingSnapshot(final String tableName) {
if (handler != null && handler.getSnapshot().getTable().equals(tableName)) {
return !handler.isFinished();
}
return false;
}
/**
* Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
* aren't already running a snapshot.
@ -94,6 +114,12 @@ public class SnapshotManager implements Stoppable {
+ this.handler.getSnapshot(), snapshot);
}
// make sure we aren't running a restore on the same table
if (isRestoringTable(snapshot.getTable())) {
throw new SnapshotCreationException("Restore in progress on the same table snapshot:"
+ this.handler.getSnapshot(), snapshot);
}
try {
// delete the working directory, since we aren't running the snapshot
fs.delete(workingDir, true);
@ -155,6 +181,114 @@ public class SnapshotManager implements Stoppable {
return this.handler;
}
/**
* Restore the specified snapshot.
* The restore will fail if the destination table has a snapshot or restore in progress.
*
* @param snapshot Snapshot Descriptor
* @param hTableDescriptor Table Descriptor of the table to create
* @param waitTime timeout before considering the clone failed
*/
public synchronized void cloneSnapshot(final SnapshotDescription snapshot,
final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
String tableName = hTableDescriptor.getNameAsString();
cleanupRestoreSentinels();
// make sure we aren't running a snapshot on the same table
if (isTakingSnapshot(tableName)) {
throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
}
// make sure we aren't running a restore on the same table
if (isRestoringTable(tableName)) {
throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
}
try {
CloneSnapshotHandler handler =
new CloneSnapshotHandler(master, snapshot, hTableDescriptor);
this.pool.submit(handler);
restoreHandlers.put(tableName, handler);
} catch (Exception e) {
String msg = "Couldn't clone the snapshot=" + snapshot + " on table=" + tableName;
LOG.error(msg, e);
throw new RestoreSnapshotException(msg, e);
}
}
/**
* Restore the specified snapshot.
* The restore will fail if the destination table has a snapshot or restore in progress.
*
* @param snapshot Snapshot Descriptor
* @param hTableDescriptor Table Descriptor
* @param waitTime timeout before considering the restore failed
*/
public synchronized void restoreSnapshot(final SnapshotDescription snapshot,
final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
String tableName = hTableDescriptor.getNameAsString();
cleanupRestoreSentinels();
// make sure we aren't running a snapshot on the same table
if (isTakingSnapshot(tableName)) {
throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
}
// make sure we aren't running a restore on the same table
if (isRestoringTable(tableName)) {
throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
}
try {
RestoreSnapshotHandler handler =
new RestoreSnapshotHandler(master, snapshot, hTableDescriptor);
this.pool.submit(handler);
restoreHandlers.put(hTableDescriptor.getNameAsString(), handler);
} catch (Exception e) {
String msg = "Couldn't restore the snapshot=" + snapshot + " on table=" + tableName;
LOG.error(msg, e);
throw new RestoreSnapshotException(msg, e);
}
}
/**
* Verify if the the restore of the specified table is in progress.
*
* @param tableName table under restore
* @return <tt>true</tt> if there is a restore in progress of the specified table.
*/
public boolean isRestoringTable(final String tableName) {
SnapshotSentinel sentinel = restoreHandlers.get(tableName);
return(sentinel != null && !sentinel.isFinished());
}
/**
* Get the restore snapshot sentinel for the specified table
* @param tableName table under restore
* @return the restore snapshot handler
*/
public synchronized SnapshotSentinel getRestoreSnapshotSentinel(final String tableName) {
try {
return restoreHandlers.get(tableName);
} finally {
cleanupRestoreSentinels();
}
}
/**
* Scan the restore handlers and remove the finished ones.
*/
private void cleanupRestoreSentinels() {
Iterator<Map.Entry<String, SnapshotSentinel>> it = restoreHandlers.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, SnapshotSentinel> entry = it.next();
SnapshotSentinel sentinel = entry.getValue();
if (sentinel.isFinished()) {
it.remove();
}
}
}
@Override
public void stop(String why) {
// short circuit
@ -163,6 +297,10 @@ public class SnapshotManager implements Stoppable {
this.stopped = true;
// pass the stop onto all the listeners
if (this.handler != null) this.handler.stop(why);
// pass the stop onto all the restore handlers
for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
restoreHandler.stop(why);
}
}
@Override
@ -179,4 +317,4 @@ public class SnapshotManager implements Stoppable {
public void setSnapshotHandlerForTesting(SnapshotSentinel handler) {
this.handler = handler;
}
}
}

View File

@ -57,7 +57,7 @@ public interface HLog {
*/
static final String RECOVERED_EDITS_DIR = "recovered.edits";
static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
public static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
public interface Reader {
void init(FileSystem fs, Path path, Configuration c) throws IOException;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.snapshot;
import java.io.IOException;
import java.io.FileNotFoundException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -40,16 +41,16 @@ import org.apache.hadoop.hbase.util.FSUtils;
* Utility class to help manage {@link SnapshotDescription SnapshotDesriptions}.
* <p>
* Snapshots are laid out on disk like this:
*
*
* <pre>
* /hbase/.snapshots
* /.tmp <---- working directory
* /[snapshot name] <----- completed snapshot
* </pre>
*
*
* A completed snapshot named 'completed' then looks like (multiple regions, servers, files, etc.
* signified by '...' on the same directory depth).
*
*
* <pre>
* /hbase/.snapshots/completed
* .snapshotinfo <--- Description of the snapshot
@ -66,7 +67,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
* ...
* ...
* </pre>
*
*
* Utility methods in this class are useful for getting the correct locations for different parts of
* the snapshot, as well as moving completed snapshots into place (see
* {@link #completeSnapshot(SnapshotDescription, Path, Path, FileSystem)}, and writing the
@ -370,4 +371,4 @@ public class SnapshotDescriptionUtils {
+ ") to completed directory(" + finishedDir + ").", snapshot);
}
}
}
}

View File

@ -29,6 +29,12 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class SnapshotDoesNotExistException extends HBaseSnapshotException {
/**
* @param msg full description of the failure
*/
public SnapshotDoesNotExistException(String msg) {
super(msg);
}
/**
* @param desc expected snapshot to find
@ -36,5 +42,4 @@ public class SnapshotDoesNotExistException extends HBaseSnapshotException {
public SnapshotDoesNotExistException(SnapshotDescription desc) {
super("Snapshot doesn't exist on the filesystem", desc);
}
}

View File

@ -0,0 +1,436 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.snapshot.restore;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.TreeMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.snapshot.exception.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FSVisitor;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
/**
* Helper to Restore/Clone a Snapshot
*
* <p>The helper assumes that a table is already created, and by calling restore()
* the content present in the snapshot will be restored as the new content of the table.
*
* <p>Clone from Snapshot: If the target table is empty, the restore operation
* is just a "clone operation", where the only operations are:
* <ul>
* <li>for each region in the snapshot create a new region
* (note that the region will have a different name, since the encoding contains the table name)
* <li>for each file in the region create a new HFileLink to point to the original file.
* <li>restore the logs, if any
* </ul>
*
* <p>Restore from Snapshot:
* <ul>
* <li>for each region in the table verify which are available in the snapshot and which are not
* <ul>
* <li>if the region is not present in the snapshot, remove it.
* <li>if the region is present in the snapshot
* <ul>
* <li>for each file in the table region verify which are available in the snapshot
* <ul>
* <li>if the hfile is not present in the snapshot, remove it
* <li>if the hfile is present, keep it (nothing to do)
* </ul>
* <li>for each file in the snapshot region but not in the table
* <ul>
* <li>create a new HFileLink that point to the original file
* </ul>
* </ul>
* </ul>
* <li>for each region in the snapshot not present in the current table state
* <ul>
* <li>create a new region and for each file in the region create a new HFileLink
* (This is the same as the clone operation)
* </ul>
* <li>restore the logs, if any
* </ul>
*/
@InterfaceAudience.Private
public class RestoreSnapshotHelper {
private static final Log LOG = LogFactory.getLog(RestoreSnapshotHelper.class);
private final Map<byte[], byte[]> regionsMap =
new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
private final SnapshotExceptionSnare monitor;
private final SnapshotDescription snapshotDesc;
private final Path snapshotDir;
private final HTableDescriptor tableDesc;
private final Path tableDir;
private final CatalogTracker catalogTracker;
private final Configuration conf;
private final FileSystem fs;
public RestoreSnapshotHelper(final Configuration conf, final FileSystem fs,
final CatalogTracker catalogTracker,
final SnapshotDescription snapshotDescription, final Path snapshotDir,
final HTableDescriptor tableDescriptor, final Path tableDir,
final SnapshotExceptionSnare monitor)
{
this.fs = fs;
this.conf = conf;
this.catalogTracker = catalogTracker;
this.snapshotDesc = snapshotDescription;
this.snapshotDir = snapshotDir;
this.tableDesc = tableDescriptor;
this.tableDir = tableDir;
this.monitor = monitor;
}
/**
* Restore table to a specified snapshot state.
*/
public void restore() throws IOException {
long startTime = EnvironmentEdgeManager.currentTimeMillis();
LOG.debug("starting restore");
Set<String> snapshotRegionNames = SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir);
if (snapshotRegionNames == null) {
LOG.warn("Nothing to restore. Snapshot " + snapshotDesc + " looks empty");
return;
}
// Identify which region are still available and which not.
// NOTE: we rely upon the region name as: "table name, start key, end key"
List<HRegionInfo> tableRegions = getTableRegions();
if (tableRegions != null) {
monitor.failOnError();
List<HRegionInfo> regionsToRestore = new LinkedList<HRegionInfo>();
List<HRegionInfo> regionsToRemove = new LinkedList<HRegionInfo>();
for (HRegionInfo regionInfo: tableRegions) {
String regionName = regionInfo.getEncodedName();
if (snapshotRegionNames.contains(regionName)) {
LOG.info("region to restore: " + regionName);
snapshotRegionNames.remove(regionInfo);
regionsToRestore.add(regionInfo);
} else {
LOG.info("region to remove: " + regionName);
regionsToRemove.add(regionInfo);
}
}
// Restore regions using the snapshot data
monitor.failOnError();
restoreRegions(regionsToRestore);
// Remove regions from the current table
monitor.failOnError();
ModifyRegionUtils.deleteRegions(fs, catalogTracker, regionsToRemove);
}
// Regions to Add: present in the snapshot but not in the current table
if (snapshotRegionNames.size() > 0) {
List<HRegionInfo> regionsToAdd = new LinkedList<HRegionInfo>();
monitor.failOnError();
for (String regionName: snapshotRegionNames) {
LOG.info("region to add: " + regionName);
Path regionDir = new Path(snapshotDir, regionName);
regionsToAdd.add(HRegion.loadDotRegionInfoFileContent(fs, regionDir));
}
// Create new regions cloning from the snapshot
monitor.failOnError();
cloneRegions(regionsToAdd);
}
// Restore WALs
monitor.failOnError();
restoreWALs();
}
/**
* Restore specified regions by restoring content to the snapshot state.
*/
private void restoreRegions(final List<HRegionInfo> regions) throws IOException {
if (regions == null || regions.size() == 0) return;
for (HRegionInfo hri: regions) restoreRegion(hri);
}
/**
* Restore region by removing files not it in the snapshot
* and adding the missing ones from the snapshot.
*/
private void restoreRegion(HRegionInfo regionInfo) throws IOException {
Path snapshotRegionDir = new Path(snapshotDir, regionInfo.getEncodedName());
Map<String, List<String>> snapshotFiles =
SnapshotReferenceUtil.getRegionHFileReferences(fs, snapshotRegionDir);
Path regionDir = new Path(tableDir, regionInfo.getEncodedName());
String tableName = tableDesc.getNameAsString();
for (Map.Entry<String, List<String>> familyEntry: snapshotFiles.entrySet()) {
byte[] family = Bytes.toBytes(familyEntry.getKey());
Path familyDir = new Path(regionDir, familyEntry.getKey());
Set<String> familyFiles = getTableRegionFamilyFiles(familyDir);
List<String> hfilesToAdd = new LinkedList<String>();
for (String hfileName: familyEntry.getValue()) {
if (familyFiles.contains(hfileName)) {
// HFile already present
familyFiles.remove(hfileName);
} else {
// HFile missing
hfilesToAdd.add(hfileName);
}
}
// Remove hfiles not present in the snapshot
for (String hfileName: familyFiles) {
Path hfile = new Path(familyDir, hfileName);
LOG.trace("Removing hfile=" + hfile + " from table=" + tableName);
HFileArchiver.archiveStoreFile(fs, regionInfo, conf, tableDir, family, hfile);
}
// Restore Missing files
for (String hfileName: hfilesToAdd) {
LOG.trace("Adding HFileLink " + hfileName + " to table=" + tableName);
restoreStoreFile(familyDir, regionInfo, hfileName);
}
}
}
/**
* @return The set of files in the specified family directory.
*/
private Set<String> getTableRegionFamilyFiles(final Path familyDir) throws IOException {
Set<String> familyFiles = new HashSet<String>();
FileStatus[] hfiles = FSUtils.listStatus(fs, familyDir);
if (hfiles == null) return familyFiles;
for (FileStatus hfileRef: hfiles) {
String hfileName = hfileRef.getPath().getName();
familyFiles.add(hfileName);
}
return familyFiles;
}
/**
* Clone specified regions. For each region create a new region
* and create a HFileLink for each hfile.
*/
private void cloneRegions(final List<HRegionInfo> regions) throws IOException {
if (regions == null || regions.size() == 0) return;
final Map<String, HRegionInfo> snapshotRegions =
new HashMap<String, HRegionInfo>(regions.size());
// clone region info (change embedded tableName with the new one)
HRegionInfo[] clonedRegionsInfo = new HRegionInfo[regions.size()];
for (int i = 0; i < clonedRegionsInfo.length; ++i) {
// clone the region info from the snapshot region info
HRegionInfo snapshotRegionInfo = regions.get(i);
clonedRegionsInfo[i] = cloneRegionInfo(snapshotRegionInfo);
// add the region name mapping between snapshot and cloned
String snapshotRegionName = snapshotRegionInfo.getEncodedName();
String clonedRegionName = clonedRegionsInfo[i].getEncodedName();
regionsMap.put(Bytes.toBytes(snapshotRegionName), Bytes.toBytes(clonedRegionName));
LOG.info("clone region=" + snapshotRegionName + " as " + clonedRegionName);
// Add mapping between cloned region name and snapshot region info
snapshotRegions.put(clonedRegionName, snapshotRegionInfo);
}
// create the regions on disk
List<HRegionInfo> clonedRegions = ModifyRegionUtils.createRegions(conf, FSUtils.getRootDir(conf),
tableDesc, clonedRegionsInfo, catalogTracker, new ModifyRegionUtils.RegionFillTask() {
public void fillRegion(final HRegion region) throws IOException {
cloneRegion(region, snapshotRegions.get(region.getRegionInfo().getEncodedName()));
}
});
if (regions != null && regions.size() > 0) {
// add regions to .META.
MetaEditor.addRegionsToMeta(catalogTracker, clonedRegions);
}
}
/**
* Clone region directory content from the snapshot info.
*
* Each region is encoded with the table name, so the cloned region will have
* a different region name.
*
* Instead of copying the hfiles a HFileLink is created.
*
* @param region {@link HRegion} cloned
* @param snapshotRegionInfo
*/
private void cloneRegion(final HRegion region, final HRegionInfo snapshotRegionInfo)
throws IOException {
final Path snapshotRegionDir = new Path(snapshotDir, snapshotRegionInfo.getEncodedName());
final Path regionDir = new Path(tableDir, region.getRegionInfo().getEncodedName());
final String tableName = tableDesc.getNameAsString();
SnapshotReferenceUtil.visitRegionStoreFiles(fs, snapshotRegionDir,
new FSVisitor.StoreFileVisitor() {
public void storeFile (final String region, final String family, final String hfile)
throws IOException {
LOG.info("Adding HFileLink " + hfile + " to table=" + tableName);
Path familyDir = new Path(regionDir, family);
restoreStoreFile(familyDir, snapshotRegionInfo, hfile);
}
});
}
/**
* Create a new {@link HFileLink} to reference the store file.
*
* @param familyDir destination directory for the store file
* @param regionInfo destination region info for the table
* @param hfileName store file name (can be a Reference, HFileLink or simple HFile)
*/
private void restoreStoreFile(final Path familyDir, final HRegionInfo regionInfo,
final String hfileName) throws IOException {
if (HFileLink.isHFileLink(hfileName)) {
HFileLink.createFromHFileLink(conf, fs, familyDir, hfileName);
} else {
HFileLink.create(conf, fs, familyDir, regionInfo, hfileName);
}
}
/**
* Create a new {@link HRegionInfo} from the snapshot region info.
* Keep the same startKey, endKey, regionId and split information but change
* the table name.
*
* @param snapshotRegionInfo Info for region to clone.
* @return the new HRegion instance
*/
public HRegionInfo cloneRegionInfo(final HRegionInfo snapshotRegionInfo) {
return new HRegionInfo(tableDesc.getName(),
snapshotRegionInfo.getStartKey(), snapshotRegionInfo.getEndKey(),
snapshotRegionInfo.isSplit(), snapshotRegionInfo.getRegionId());
}
/**
* Restore snapshot WALs.
*
* Global Snapshot keep a reference to region servers logs present during the snapshot.
* (/hbase/.snapshot/snapshotName/.logs/hostName/logName)
*
* Since each log contains different tables data, logs must be split to
* extract the table that we are interested in.
*/
private void restoreWALs() throws IOException {
final SnapshotLogSplitter logSplitter = new SnapshotLogSplitter(conf, fs, tableDir,
Bytes.toBytes(snapshotDesc.getTable()), regionsMap);
try {
// Recover.Edits
SnapshotReferenceUtil.visitRecoveredEdits(fs, snapshotDir,
new FSVisitor.RecoveredEditsVisitor() {
public void recoveredEdits (final String region, final String logfile) throws IOException {
Path path = SnapshotReferenceUtil.getRecoveredEdits(snapshotDir, region, logfile);
logSplitter.splitRecoveredEdit(path);
}
});
// Region Server Logs
SnapshotReferenceUtil.visitLogFiles(fs, snapshotDir, new FSVisitor.LogFileVisitor() {
public void logFile (final String server, final String logfile) throws IOException {
logSplitter.splitLog(server, logfile);
}
});
} finally {
logSplitter.close();
}
}
/**
* @return the set of the regions contained in the table
*/
private List<HRegionInfo> getTableRegions() throws IOException {
LOG.debug("get table regions: " + tableDir);
FileStatus[] regionDirs = FSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs));
if (regionDirs == null) return null;
List<HRegionInfo> regions = new LinkedList<HRegionInfo>();
for (FileStatus regionDir: regionDirs) {
HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir.getPath());
regions.add(hri);
}
LOG.debug("found " + regions.size() + " regions for table=" + tableDesc.getNameAsString());
return regions;
}
/**
* Create a new table descriptor cloning the snapshot table schema.
*
* @param admin
* @param snapshotTableDescriptor
* @param tableName
* @return cloned table descriptor
* @throws IOException
*/
public static HTableDescriptor cloneTableSchema(final HTableDescriptor snapshotTableDescriptor,
final byte[] tableName) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
for (HColumnDescriptor hcd: snapshotTableDescriptor.getColumnFamilies()) {
htd.addFamily(hcd);
}
return htd;
}
}

View File

@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.snapshot.restore;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.util.TreeMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.io.HLogLink;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
/**
* If the snapshot has references to one or more log files,
* those must be split (each log contains multiple tables and regions)
* and must be placed in the region/recovered.edits folder.
* (recovered.edits files will be played on region startup)
*
* In case of Restore: the log can just be split in the recovered.edits folder.
* In case of Clone: each entry in the log must be modified to use the new region name.
* (region names are encoded with: tableName, startKey, regionIdTimeStamp)
*
* We can't use the normal split code, because the HLogKey contains the
* table name and the region name, and in case of "clone from snapshot"
* region name and table name will be different and must be replaced in
* the recovered.edits.
*/
@InterfaceAudience.Private
class SnapshotLogSplitter implements Closeable {
static final Log LOG = LogFactory.getLog(SnapshotLogSplitter.class);
private final class LogWriter implements Closeable {
private HLog.Writer writer;
private Path logFile;
private long seqId;
public LogWriter(final Configuration conf, final FileSystem fs,
final Path logDir, long seqId) throws IOException {
logFile = new Path(logDir, logFileName(seqId, true));
this.writer = HLogFactory.createWriter(fs, logFile, conf);
this.seqId = seqId;
}
public void close() throws IOException {
writer.close();
Path finalFile = new Path(logFile.getParent(), logFileName(seqId, false));
LOG.debug("LogWriter tmpLogFile=" + logFile + " -> logFile=" + finalFile);
fs.rename(logFile, finalFile);
}
public void append(final HLog.Entry entry) throws IOException {
writer.append(entry);
if (seqId < entry.getKey().getLogSeqNum()) {
seqId = entry.getKey().getLogSeqNum();
}
}
private String logFileName(long seqId, boolean temp) {
String fileName = String.format("%019d", seqId);
if (temp) fileName += HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
return fileName;
}
}
private final Map<byte[], LogWriter> regionLogWriters =
new TreeMap<byte[], LogWriter>(Bytes.BYTES_COMPARATOR);
private final Map<byte[], byte[]> regionsMap;
private final Configuration conf;
private final byte[] snapshotTableName;
private final byte[] tableName;
private final Path tableDir;
private final FileSystem fs;
/**
* @params tableName snapshot table name
* @params regionsMap maps original region names to the new ones.
*/
public SnapshotLogSplitter(final Configuration conf, final FileSystem fs,
final Path tableDir, final byte[] snapshotTableName,
final Map<byte[], byte[]> regionsMap) {
this.regionsMap = regionsMap;
this.snapshotTableName = snapshotTableName;
this.tableName = Bytes.toBytes(tableDir.getName());
this.tableDir = tableDir;
this.conf = conf;
this.fs = fs;
}
public void close() throws IOException {
for (LogWriter writer: regionLogWriters.values()) {
writer.close();
}
}
public void splitLog(final String serverName, final String logfile) throws IOException {
LOG.debug("Restore log=" + logfile + " server=" + serverName +
" for snapshotTable=" + Bytes.toString(snapshotTableName) +
" to table=" + Bytes.toString(tableName));
splitLog(new HLogLink(conf, serverName, logfile).getAvailablePath(fs));
}
public void splitRecoveredEdit(final Path editPath) throws IOException {
LOG.debug("Restore recover.edits=" + editPath +
" for snapshotTable=" + Bytes.toString(snapshotTableName) +
" to table=" + Bytes.toString(tableName));
splitLog(editPath);
}
/**
* Split the snapshot HLog reference into regions recovered.edits.
*
* The HLogKey contains the table name and the region name,
* and they must be changed to the restored table names.
*
* @param logPath Snapshot HLog reference path
*/
public void splitLog(final Path logPath) throws IOException {
HLog.Reader log = HLogFactory.createReader(fs, logPath, conf);
try {
HLog.Entry entry;
LogWriter writer = null;
byte[] regionName = null;
byte[] newRegionName = null;
while ((entry = log.next()) != null) {
HLogKey key = entry.getKey();
// We're interested only in the snapshot table that we're restoring
if (!Bytes.equals(key.getTablename(), snapshotTableName)) continue;
// Writer for region.
if (!Bytes.equals(regionName, key.getEncodedRegionName())) {
regionName = key.getEncodedRegionName().clone();
// Get the new region name in case of clone, or use the original one
newRegionName = regionsMap.get(regionName);
if (newRegionName == null) newRegionName = regionName;
writer = getOrCreateWriter(newRegionName, key.getLogSeqNum());
LOG.debug("+ regionName=" + Bytes.toString(regionName));
}
// Append Entry
key = new HLogKey(newRegionName, tableName,
key.getLogSeqNum(), key.getWriteTime(), key.getClusterId());
writer.append(new HLog.Entry(key, entry.getEdit()));
}
} catch (IOException e) {
LOG.warn("Something wrong during the log split", e);
} finally {
log.close();
}
}
/**
* Create a LogWriter for specified region if not already created.
*/
private LogWriter getOrCreateWriter(final byte[] regionName, long seqId) throws IOException {
LogWriter writer = regionLogWriters.get(regionName);
if (writer == null) {
Path regionDir = HRegion.getRegionDir(tableDir, Bytes.toString(regionName));
Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regionDir);
fs.mkdirs(dir);
writer = new LogWriter(conf, fs, dir, seqId);
regionLogWriters.put(regionName, writer);
}
return(writer);
}
}

View File

@ -0,0 +1,213 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
/**
* Utility methods for interacting with the regions.
*/
@InterfaceAudience.Private
public abstract class ModifyRegionUtils {
private static final Log LOG = LogFactory.getLog(ModifyRegionUtils.class);
private ModifyRegionUtils() {
}
public interface RegionFillTask {
public void fillRegion(final HRegion region) throws IOException;
}
/**
* Create new set of regions on the specified file-system.
* NOTE: that you should add the regions to .META. after this operation.
*
* @param conf {@link Configuration}
* @param rootDir Root directory for HBase instance
* @param hTableDescriptor description of the table
* @param newRegions {@link HRegionInfo} that describes the regions to create
* @param catalogTracker the catalog tracker
* @throws IOException
*/
public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
final CatalogTracker catalogTracker) throws IOException {
return createRegions(conf, rootDir, hTableDescriptor, newRegions, catalogTracker, null);
}
/**
* Create new set of regions on the specified file-system.
* NOTE: that you should add the regions to .META. after this operation.
*
* @param conf {@link Configuration}
* @param rootDir Root directory for HBase instance
* @param hTableDescriptor description of the table
* @param newRegions {@link HRegionInfo} that describes the regions to create
* @param catalogTracker the catalog tracker
* @param task {@link RegionFillTask} custom code to populate region after creation
* @throws IOException
*/
public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
final CatalogTracker catalogTracker, final RegionFillTask task) throws IOException {
if (newRegions == null) return null;
int regionNumber = newRegions.length;
ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf,
"RegionOpenAndInitThread-" + hTableDescriptor.getNameAsString(), regionNumber);
CompletionService<HRegionInfo> completionService = new ExecutorCompletionService<HRegionInfo>(
regionOpenAndInitThreadPool);
List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
for (final HRegionInfo newRegion : newRegions) {
completionService.submit(new Callable<HRegionInfo>() {
public HRegionInfo call() throws IOException {
// 1. Create HRegion
HRegion region = HRegion.createHRegion(newRegion,
rootDir, conf, hTableDescriptor, null,
false, true);
try {
// 2. Custom user code to interact with the created region
if (task != null) {
task.fillRegion(region);
}
} finally {
// 3. Close the new region to flush to disk. Close log file too.
region.close();
}
return region.getRegionInfo();
}
});
}
try {
// 4. wait for all regions to finish creation
for (int i = 0; i < regionNumber; i++) {
Future<HRegionInfo> future = completionService.take();
HRegionInfo regionInfo = future.get();
regionInfos.add(regionInfo);
}
} catch (InterruptedException e) {
LOG.error("Caught " + e + " during region creation");
throw new InterruptedIOException(e.getMessage());
} catch (ExecutionException e) {
throw new IOException(e);
} finally {
regionOpenAndInitThreadPool.shutdownNow();
}
return regionInfos;
}
/*
* used by createRegions() to get the thread pool executor based on the
* "hbase.hregion.open.and.init.threads.max" property.
*/
static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
final String threadNamePrefix, int regionNumber) {
int maxThreads = Math.min(regionNumber, conf.getInt(
"hbase.hregion.open.and.init.threads.max", 10));
ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
new ThreadFactory() {
private int count = 1;
public Thread newThread(Runnable r) {
Thread t = new Thread(r, threadNamePrefix + "-" + count++);
return t;
}
});
return regionOpenAndInitThreadPool;
}
/**
* Trigger immediate assignment of the regions in round-robin fashion
*
* @param assignmentManager
* @param regions
*/
public static void assignRegions(final AssignmentManager assignmentManager,
final List<HRegionInfo> regions) throws IOException {
try {
assignmentManager.getRegionStates().createRegionStates(regions);
assignmentManager.assign(regions);
} catch (InterruptedException e) {
LOG.error("Caught " + e + " during round-robin assignment");
throw new InterruptedIOException(e.getMessage());
}
}
/**
* Remove specified regions by removing them from file-system and .META.
* (The regions must be offline).
*
* @param fs {@link FileSystem} on which to delete the region directory
* @param catalogTracker the catalog tracker
* @param regions list of {@link HRegionInfo} to delete.
*/
public static void deleteRegions(final FileSystem fs, final CatalogTracker catalogTracker,
final List<HRegionInfo> regions) throws IOException {
if (regions != null && regions.size() > 0) {
for (HRegionInfo hri: regions) {
deleteRegion(fs, catalogTracker, hri);
}
}
}
/**
* Remove region from file-system and .META.
* (The region must be offline).
*
* @param fs {@link FileSystem} on which to delete the region directory
* @param catalogTracker the catalog tracker
* @param regionInfo {@link HRegionInfo} to delete.
*/
public static void deleteRegion(final FileSystem fs, final CatalogTracker catalogTracker,
final HRegionInfo regionInfo) throws IOException {
// Remove region from .META.
MetaEditor.deleteRegion(catalogTracker, regionInfo);
// "Delete" region from FS
HFileArchiver.archiveRegion(fs, regionInfo);
}
}

View File

@ -0,0 +1,228 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotDoesNotExistException;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.junit.*;
import org.junit.experimental.categories.Category;
/**
* Test clone/restore snapshots from the client
*/
@Category(LargeTests.class)
public class TestRestoreSnapshotFromClient {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final byte[] FAMILY = Bytes.toBytes("cf");
private byte[] snapshotName0;
private byte[] snapshotName1;
private byte[] snapshotName2;
private int snapshot0Rows;
private int snapshot1Rows;
private byte[] tableName;
private HBaseAdmin admin;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
TEST_UTIL.getConfiguration().setBoolean(
"hbase.master.enabletable.roundrobin", true);
TEST_UTIL.startMiniCluster(3);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* Initialize the tests with a table filled with some data
* and two snapshots (snapshotName0, snapshotName1) of different states.
* The tableName, snapshotNames and the number of rows in the snapshot are initialized.
*/
@Before
public void setup() throws Exception {
this.admin = TEST_UTIL.getHBaseAdmin();
long tid = System.currentTimeMillis();
tableName = Bytes.toBytes("testtb-" + tid);
snapshotName0 = Bytes.toBytes("snaptb0-" + tid);
snapshotName1 = Bytes.toBytes("snaptb1-" + tid);
snapshotName2 = Bytes.toBytes("snaptb2-" + tid);
// create Table and disable it
createTable(tableName, FAMILY);
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
loadData(table, 500, FAMILY);
snapshot0Rows = TEST_UTIL.countRows(table);
admin.disableTable(tableName);
// take a snapshot
admin.snapshot(snapshotName0, tableName);
// enable table and insert more data
admin.enableTable(tableName);
loadData(table, 500, FAMILY);
snapshot1Rows = TEST_UTIL.countRows(table);
admin.disableTable(tableName);
// take a snapshot of the updated table
admin.snapshot(snapshotName1, tableName);
// re-enable table
admin.enableTable(tableName);
}
@After
public void tearDown() throws Exception {
admin.disableTable(tableName);
admin.deleteTable(tableName);
admin.deleteSnapshot(snapshotName0);
admin.deleteSnapshot(snapshotName1);
}
@Test
public void testRestoreSnapshot() throws IOException {
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
assertEquals(snapshot1Rows, TEST_UTIL.countRows(table));
// Restore from snapshot-0
admin.disableTable(tableName);
admin.restoreSnapshot(snapshotName0);
admin.enableTable(tableName);
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
// Restore from snapshot-1
admin.disableTable(tableName);
admin.restoreSnapshot(snapshotName1);
admin.enableTable(tableName);
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
assertEquals(snapshot1Rows, TEST_UTIL.countRows(table));
}
@Test(expected=SnapshotDoesNotExistException.class)
public void testCloneNonExistentSnapshot() throws IOException, InterruptedException {
String snapshotName = "random-snapshot-" + System.currentTimeMillis();
String tableName = "random-table-" + System.currentTimeMillis();
admin.cloneSnapshot(snapshotName, tableName);
}
@Test
public void testCloneSnapshot() throws IOException, InterruptedException {
byte[] clonedTableName = Bytes.toBytes("clonedtb-" + System.currentTimeMillis());
testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows);
testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows);
}
private void testCloneSnapshot(final byte[] tableName, final byte[] snapshotName,
int snapshotRows) throws IOException, InterruptedException {
// create a new table from snapshot
admin.cloneSnapshot(snapshotName, tableName);
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
assertEquals(snapshotRows, TEST_UTIL.countRows(table));
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
@Test
public void testRestoreSnapshotOfCloned() throws IOException, InterruptedException {
byte[] clonedTableName = Bytes.toBytes("clonedtb-" + System.currentTimeMillis());
admin.cloneSnapshot(snapshotName0, clonedTableName);
HTable table = new HTable(TEST_UTIL.getConfiguration(), clonedTableName);
assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
admin.disableTable(clonedTableName);
admin.snapshot(snapshotName2, clonedTableName);
admin.deleteTable(clonedTableName);
admin.cloneSnapshot(snapshotName2, clonedTableName);
table = new HTable(TEST_UTIL.getConfiguration(), clonedTableName);
assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
admin.disableTable(clonedTableName);
admin.deleteTable(clonedTableName);
}
// ==========================================================================
// Helpers
// ==========================================================================
private void createTable(final byte[] tableName, final byte[]... families) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
for (byte[] family: families) {
HColumnDescriptor hcd = new HColumnDescriptor(family);
htd.addFamily(hcd);
}
byte[][] splitKeys = new byte[16][];
byte[] hex = Bytes.toBytes("0123456789abcdef");
for (int i = 0; i < 16; ++i) {
splitKeys[i] = new byte[] { hex[i] };
}
admin.createTable(htd, splitKeys);
}
public void loadData(final HTable table, int rows, byte[]... families) throws IOException {
byte[] qualifier = Bytes.toBytes("q");
table.setAutoFlush(false);
while (rows-- > 0) {
byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows));
byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
Put put = new Put(key);
put.setWriteToWAL(false);
for (byte[] family: families) {
put.add(family, qualifier, value);
}
table.put(put);
}
table.flushCommits();
}
}

View File

@ -0,0 +1,178 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.snapshot.restore;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.*;
import org.junit.experimental.categories.Category;
/**
* Test snapshot log splitter
*/
@Category(SmallTests.class)
public class TestSnapshotLogSplitter {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private byte[] TEST_QUALIFIER = Bytes.toBytes("q");
private byte[] TEST_FAMILY = Bytes.toBytes("f");
private Configuration conf;
private FileSystem fs;
private Path logFile;
@Before
public void setup() throws Exception {
conf = TEST_UTIL.getConfiguration();
fs = FileSystem.get(conf);
logFile = new Path(TEST_UTIL.getDataTestDir(), "test.log");
writeTestLog(logFile);
}
@After
public void tearDown() throws Exception {
fs.delete(logFile, false);
}
@Test
public void testSplitLogs() throws IOException {
Map<byte[], byte[]> regionsMap = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
splitTestLogs(getTableName(5), regionsMap);
}
@Test
public void testSplitLogsOnDifferentTable() throws IOException {
byte[] tableName = getTableName(1);
Map<byte[], byte[]> regionsMap = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
for (int j = 0; j < 10; ++j) {
byte[] regionName = getRegionName(tableName, j);
byte[] newRegionName = getNewRegionName(tableName, j);
regionsMap.put(regionName, newRegionName);
}
splitTestLogs(tableName, regionsMap);
}
/*
* Split and verify test logs for the specified table
*/
private void splitTestLogs(final byte[] tableName, final Map<byte[], byte[]> regionsMap)
throws IOException {
Path tableDir = new Path(TEST_UTIL.getDataTestDir(), Bytes.toString(tableName));
SnapshotLogSplitter logSplitter = new SnapshotLogSplitter(conf, fs, tableDir,
tableName, regionsMap);
try {
logSplitter.splitLog(logFile);
} finally {
logSplitter.close();
}
verifyRecoverEdits(tableDir, tableName, regionsMap);
}
/*
* Verify that every logs in the table directory has just the specified table and regions.
*/
private void verifyRecoverEdits(final Path tableDir, final byte[] tableName,
final Map<byte[], byte[]> regionsMap) throws IOException {
for (FileStatus regionStatus: FSUtils.listStatus(fs, tableDir)) {
assertTrue(regionStatus.getPath().getName().startsWith(Bytes.toString(tableName)));
Path regionEdits = HLogUtil.getRegionDirRecoveredEditsDir(regionStatus.getPath());
byte[] regionName = Bytes.toBytes(regionStatus.getPath().getName());
assertFalse(regionsMap.containsKey(regionName));
for (FileStatus logStatus: FSUtils.listStatus(fs, regionEdits)) {
HLog.Reader reader = HLogFactory.createReader(fs, logStatus.getPath(), conf);
try {
HLog.Entry entry;
while ((entry = reader.next()) != null) {
HLogKey key = entry.getKey();
assertArrayEquals(tableName, key.getTablename());
assertArrayEquals(regionName, key.getEncodedRegionName());
}
} finally {
reader.close();
}
}
}
}
/*
* Write some entries in the log file.
* 7 different tables with name "testtb-%d"
* 10 region per table with name "tableName-region-%d"
* 50 entry with row key "row-%d"
*/
private void writeTestLog(final Path logFile) throws IOException {
fs.mkdirs(logFile.getParent());
HLog.Writer writer = HLogFactory.createWriter(fs, logFile, conf);
try {
for (int i = 0; i < 7; ++i) {
byte[] tableName = getTableName(i);
for (int j = 0; j < 10; ++j) {
byte[] regionName = getRegionName(tableName, j);
for (int k = 0; k < 50; ++k) {
byte[] rowkey = Bytes.toBytes("row-" + k);
HLogKey key = new HLogKey(regionName, tableName, (long)k,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowkey, TEST_FAMILY, TEST_QUALIFIER, rowkey));
writer.append(new HLog.Entry(key, edit));
}
}
}
} finally {
writer.close();
}
}
private byte[] getTableName(int tableId) {
return Bytes.toBytes("testtb-" + tableId);
}
private byte[] getRegionName(final byte[] tableName, int regionId) {
return Bytes.toBytes(Bytes.toString(tableName) + "-region-" + regionId);
}
private byte[] getNewRegionName(final byte[] tableName, int regionId) {
return Bytes.toBytes(Bytes.toString(tableName) + "-new-region-" + regionId);
}
}