HBASE-11185 Parallelize Snapshot operations
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1596168 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f980d27472
commit
0ea99974fe
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -36,6 +37,9 @@ import org.apache.hadoop.hbase.master.MasterServices;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
|
@ -88,8 +92,17 @@ public class DisabledTableSnapshotHandler extends TakeSnapshotHandler {
|
|||
+ ClientSnapshotDescriptionUtils.toString(snapshot);
|
||||
LOG.info(msg);
|
||||
status.setStatus(msg);
|
||||
for (HRegionInfo regionInfo: regions) {
|
||||
snapshotDisabledRegion(regionInfo);
|
||||
|
||||
ThreadPoolExecutor exec = SnapshotManifest.createExecutor(conf, "DisabledTableSnapshot");
|
||||
try {
|
||||
ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
|
||||
@Override
|
||||
public void editRegion(final HRegionInfo regionInfo) throws IOException {
|
||||
snapshotManifest.addRegion(FSUtils.getTableDir(rootDir, snapshotTable), regionInfo);
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
exec.shutdown();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// make sure we capture the exception to propagate back to the client later
|
||||
|
|
|
@ -129,7 +129,7 @@ public final class MasterSnapshotVerifier {
|
|||
|
||||
/**
|
||||
* Check that the table descriptor for the snapshot is a valid table descriptor
|
||||
* @param snapshotDir snapshot directory to check
|
||||
* @param manifest snapshot manifest to inspect
|
||||
*/
|
||||
private void verifyTableInfo(final SnapshotManifest manifest) throws IOException {
|
||||
HTableDescriptor htd = manifest.getTableDescriptor();
|
||||
|
@ -145,7 +145,7 @@ public final class MasterSnapshotVerifier {
|
|||
|
||||
/**
|
||||
* Check that all the regions in the snapshot are valid, and accounted for.
|
||||
* @param snapshotDir snapshot directory to check
|
||||
* @param manifest snapshot manifest to inspect
|
||||
* @throws IOException if we can't reach hbase:meta or read the files from the FS
|
||||
*/
|
||||
private void verifyRegions(final SnapshotManifest manifest) throws IOException {
|
||||
|
@ -167,6 +167,7 @@ public final class MasterSnapshotVerifier {
|
|||
LOG.error(errorMsg);
|
||||
}
|
||||
|
||||
// Verify HRegionInfo
|
||||
for (HRegionInfo region : regions) {
|
||||
SnapshotRegionManifest regionManifest = regionManifests.get(region.getEncodedName());
|
||||
if (regionManifest == null) {
|
||||
|
@ -177,20 +178,23 @@ public final class MasterSnapshotVerifier {
|
|||
continue;
|
||||
}
|
||||
|
||||
verifyRegion(fs, manifest.getSnapshotDir(), region, regionManifest);
|
||||
verifyRegionInfo(region, regionManifest);
|
||||
}
|
||||
|
||||
if (!errorMsg.isEmpty()) {
|
||||
throw new CorruptedSnapshotException(errorMsg);
|
||||
}
|
||||
|
||||
// Verify Snapshot HFiles
|
||||
SnapshotReferenceUtil.verifySnapshot(services.getConfiguration(), fs, manifest);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the region (regioninfo, hfiles) are valid
|
||||
* @param fs the FileSystem instance
|
||||
* @param snapshotDir snapshot directory to check
|
||||
* Verify that the regionInfo is valid
|
||||
* @param region the region to check
|
||||
* @param manifest snapshot manifest to inspect
|
||||
*/
|
||||
private void verifyRegion(final FileSystem fs, final Path snapshotDir, final HRegionInfo region,
|
||||
private void verifyRegionInfo(final HRegionInfo region,
|
||||
final SnapshotRegionManifest manifest) throws IOException {
|
||||
HRegionInfo manifestRegionInfo = HRegionInfo.convert(manifest.getRegionInfo());
|
||||
if (!region.equals(manifestRegionInfo)) {
|
||||
|
@ -198,16 +202,5 @@ public final class MasterSnapshotVerifier {
|
|||
"doesn't match expected region:" + region;
|
||||
throw new CorruptedSnapshotException(msg, snapshot);
|
||||
}
|
||||
|
||||
// make sure we have all the expected store files
|
||||
SnapshotReferenceUtil.visitRegionStoreFiles(manifest,
|
||||
new SnapshotReferenceUtil.StoreFileVisitor() {
|
||||
@Override
|
||||
public void storeFile(final HRegionInfo regionInfo, final String family,
|
||||
final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
|
||||
SnapshotReferenceUtil.verifyStoreFile(services.getConfiguration(), fs, snapshotDir,
|
||||
snapshot, region, family, storeFile);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -223,6 +223,20 @@ public class StoreFileInfo {
|
|||
*/
|
||||
public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs)
|
||||
throws IOException {
|
||||
FileStatus status = getReferencedFileStatus(fs);
|
||||
if (this.reference != null) {
|
||||
return computeRefFileHDFSBlockDistribution(fs, reference, status);
|
||||
} else {
|
||||
return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the {@link FileStatus} of the file referenced by this StoreFileInfo
|
||||
* @param fs The current file system to use.
|
||||
* @return The {@link FileStatus} of the file referenced by this StoreFileInfo
|
||||
*/
|
||||
public FileStatus getReferencedFileStatus(final FileSystem fs) throws IOException {
|
||||
FileStatus status;
|
||||
if (this.reference != null) {
|
||||
if (this.link != null) {
|
||||
|
@ -233,7 +247,6 @@ public class StoreFileInfo {
|
|||
Path referencePath = getReferredToFile(this.getPath());
|
||||
status = fs.getFileStatus(referencePath);
|
||||
}
|
||||
return computeRefFileHDFSBlockDistribution(fs, reference, status);
|
||||
} else {
|
||||
if (this.link != null) {
|
||||
// HFileLink
|
||||
|
@ -241,8 +254,8 @@ public class StoreFileInfo {
|
|||
} else {
|
||||
status = this.fileStatus;
|
||||
}
|
||||
return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
/** @return The {@link Path} of the file */
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hbase.snapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
|
@ -30,6 +31,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -153,6 +155,15 @@ public class RestoreSnapshotHelper {
|
|||
* @return the set of regions touched by the restore operation
|
||||
*/
|
||||
public RestoreMetaChanges restoreHdfsRegions() throws IOException {
|
||||
ThreadPoolExecutor exec = SnapshotManifest.createExecutor(conf, "RestoreSnapshot");
|
||||
try {
|
||||
return restoreHdfsRegions(exec);
|
||||
} finally {
|
||||
exec.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private RestoreMetaChanges restoreHdfsRegions(final ThreadPoolExecutor exec) throws IOException {
|
||||
LOG.debug("starting restore");
|
||||
|
||||
Map<String, SnapshotRegionManifest> regionManifests = snapshotManifest.getRegionManifestsMap();
|
||||
|
@ -187,13 +198,13 @@ public class RestoreSnapshotHelper {
|
|||
// Restore regions using the snapshot data
|
||||
monitor.rethrowException();
|
||||
status.setStatus("Restoring table regions...");
|
||||
restoreHdfsRegions(regionManifests, metaChanges.getRegionsToRestore());
|
||||
restoreHdfsRegions(exec, regionManifests, metaChanges.getRegionsToRestore());
|
||||
status.setStatus("Finished restoring all table regions.");
|
||||
|
||||
// Remove regions from the current table
|
||||
monitor.rethrowException();
|
||||
status.setStatus("Starting to delete excess regions from table");
|
||||
removeHdfsRegions(metaChanges.getRegionsToRemove());
|
||||
removeHdfsRegions(exec, metaChanges.getRegionsToRemove());
|
||||
status.setStatus("Finished deleting excess regions from table.");
|
||||
}
|
||||
|
||||
|
@ -210,7 +221,7 @@ public class RestoreSnapshotHelper {
|
|||
// Create new regions cloning from the snapshot
|
||||
monitor.rethrowException();
|
||||
status.setStatus("Cloning regions...");
|
||||
HRegionInfo[] clonedRegions = cloneHdfsRegions(regionManifests, regionsToAdd);
|
||||
HRegionInfo[] clonedRegions = cloneHdfsRegions(exec, regionManifests, regionsToAdd);
|
||||
metaChanges.setNewRegions(clonedRegions);
|
||||
status.setStatus("Finished cloning regions.");
|
||||
}
|
||||
|
@ -345,23 +356,30 @@ public class RestoreSnapshotHelper {
|
|||
/**
|
||||
* Remove specified regions from the file-system, using the archiver.
|
||||
*/
|
||||
private void removeHdfsRegions(final List<HRegionInfo> regions) throws IOException {
|
||||
if (regions != null && regions.size() > 0) {
|
||||
for (HRegionInfo hri: regions) {
|
||||
private void removeHdfsRegions(final ThreadPoolExecutor exec, final List<HRegionInfo> regions)
|
||||
throws IOException {
|
||||
if (regions == null || regions.size() == 0) return;
|
||||
ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
|
||||
@Override
|
||||
public void editRegion(final HRegionInfo hri) throws IOException {
|
||||
HFileArchiver.archiveRegion(conf, fs, hri);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Restore specified regions by restoring content to the snapshot state.
|
||||
*/
|
||||
private void restoreHdfsRegions(final Map<String, SnapshotRegionManifest> regionManifests,
|
||||
private void restoreHdfsRegions(final ThreadPoolExecutor exec,
|
||||
final Map<String, SnapshotRegionManifest> regionManifests,
|
||||
final List<HRegionInfo> regions) throws IOException {
|
||||
if (regions == null || regions.size() == 0) return;
|
||||
for (HRegionInfo hri: regions) {
|
||||
restoreRegion(hri, regionManifests.get(hri.getEncodedName()));
|
||||
}
|
||||
ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
|
||||
@Override
|
||||
public void editRegion(final HRegionInfo hri) throws IOException {
|
||||
restoreRegion(hri, regionManifests.get(hri.getEncodedName()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private Map<String, List<SnapshotRegionManifest.StoreFile>> getRegionHFileReferences(
|
||||
|
@ -465,7 +483,8 @@ public class RestoreSnapshotHelper {
|
|||
* Clone specified regions. For each region create a new region
|
||||
* and create a HFileLink for each hfile.
|
||||
*/
|
||||
private HRegionInfo[] cloneHdfsRegions(final Map<String, SnapshotRegionManifest> regionManifests,
|
||||
private HRegionInfo[] cloneHdfsRegions(final ThreadPoolExecutor exec,
|
||||
final Map<String, SnapshotRegionManifest> regionManifests,
|
||||
final List<HRegionInfo> regions) throws IOException {
|
||||
if (regions == null || regions.size() == 0) return null;
|
||||
|
||||
|
@ -490,7 +509,7 @@ public class RestoreSnapshotHelper {
|
|||
}
|
||||
|
||||
// create the regions on disk
|
||||
ModifyRegionUtils.createRegions(conf, rootDir, tableDir,
|
||||
ModifyRegionUtils.createRegions(exec, conf, rootDir, tableDir,
|
||||
tableDesc, clonedRegionsInfo, new ModifyRegionUtils.RegionFillTask() {
|
||||
@Override
|
||||
public void fillRegion(final HRegion region) throws IOException {
|
||||
|
|
|
@ -25,6 +25,8 @@ import java.text.SimpleDateFormat;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -99,14 +101,14 @@ public final class SnapshotInfo extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
private int hfileArchiveCount = 0;
|
||||
private int hfilesMissing = 0;
|
||||
private int hfilesCount = 0;
|
||||
private int logsMissing = 0;
|
||||
private int logsCount = 0;
|
||||
private long hfileArchiveSize = 0;
|
||||
private long hfileSize = 0;
|
||||
private long logSize = 0;
|
||||
private AtomicInteger hfileArchiveCount = new AtomicInteger();
|
||||
private AtomicInteger hfilesMissing = new AtomicInteger();
|
||||
private AtomicInteger hfilesCount = new AtomicInteger();
|
||||
private AtomicInteger logsMissing = new AtomicInteger();
|
||||
private AtomicInteger logsCount = new AtomicInteger();
|
||||
private AtomicLong hfileArchiveSize = new AtomicLong();
|
||||
private AtomicLong hfileSize = new AtomicLong();
|
||||
private AtomicLong logSize = new AtomicLong();
|
||||
|
||||
private final SnapshotDescription snapshot;
|
||||
private final TableName snapshotTable;
|
||||
|
@ -128,57 +130,57 @@ public final class SnapshotInfo extends Configured implements Tool {
|
|||
|
||||
/** @return true if the snapshot is corrupted */
|
||||
public boolean isSnapshotCorrupted() {
|
||||
return hfilesMissing > 0 || logsMissing > 0;
|
||||
return hfilesMissing.get() > 0 || logsMissing.get() > 0;
|
||||
}
|
||||
|
||||
/** @return the number of available store files */
|
||||
public int getStoreFilesCount() {
|
||||
return hfilesCount + hfileArchiveCount;
|
||||
return hfilesCount.get() + hfileArchiveCount.get();
|
||||
}
|
||||
|
||||
/** @return the number of available store files in the archive */
|
||||
public int getArchivedStoreFilesCount() {
|
||||
return hfileArchiveCount;
|
||||
return hfileArchiveCount.get();
|
||||
}
|
||||
|
||||
/** @return the number of available log files */
|
||||
public int getLogsCount() {
|
||||
return logsCount;
|
||||
return logsCount.get();
|
||||
}
|
||||
|
||||
/** @return the number of missing store files */
|
||||
public int getMissingStoreFilesCount() {
|
||||
return hfilesMissing;
|
||||
return hfilesMissing.get();
|
||||
}
|
||||
|
||||
/** @return the number of missing log files */
|
||||
public int getMissingLogsCount() {
|
||||
return logsMissing;
|
||||
return logsMissing.get();
|
||||
}
|
||||
|
||||
/** @return the total size of the store files referenced by the snapshot */
|
||||
public long getStoreFilesSize() {
|
||||
return hfileSize + hfileArchiveSize;
|
||||
return hfileSize.get() + hfileArchiveSize.get();
|
||||
}
|
||||
|
||||
/** @return the total size of the store files shared */
|
||||
public long getSharedStoreFilesSize() {
|
||||
return hfileSize;
|
||||
return hfileSize.get();
|
||||
}
|
||||
|
||||
/** @return the total size of the store files in the archive */
|
||||
public long getArchivedStoreFileSize() {
|
||||
return hfileArchiveSize;
|
||||
return hfileArchiveSize.get();
|
||||
}
|
||||
|
||||
/** @return the percentage of the shared store files */
|
||||
public float getSharedStoreFilePercentage() {
|
||||
return ((float)hfileSize / (hfileSize + hfileArchiveSize)) * 100;
|
||||
return ((float)hfileSize.get() / (hfileSize.get() + hfileArchiveSize.get())) * 100;
|
||||
}
|
||||
|
||||
/** @return the total log size */
|
||||
public long getLogsSize() {
|
||||
return logSize;
|
||||
return logSize.get();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -197,15 +199,15 @@ public final class SnapshotInfo extends Configured implements Tool {
|
|||
try {
|
||||
if ((inArchive = fs.exists(link.getArchivePath()))) {
|
||||
size = fs.getFileStatus(link.getArchivePath()).getLen();
|
||||
hfileArchiveSize += size;
|
||||
hfileArchiveCount++;
|
||||
hfileArchiveSize.addAndGet(size);
|
||||
hfileArchiveCount.incrementAndGet();
|
||||
} else {
|
||||
size = link.getFileStatus(fs).getLen();
|
||||
hfileSize += size;
|
||||
hfilesCount++;
|
||||
hfileSize.addAndGet(size);
|
||||
hfilesCount.incrementAndGet();
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
hfilesMissing++;
|
||||
hfilesMissing.incrementAndGet();
|
||||
}
|
||||
return new FileInfo(inArchive, size);
|
||||
}
|
||||
|
@ -221,10 +223,10 @@ public final class SnapshotInfo extends Configured implements Tool {
|
|||
long size = -1;
|
||||
try {
|
||||
size = logLink.getFileStatus(fs).getLen();
|
||||
logSize += size;
|
||||
logsCount++;
|
||||
logSize.addAndGet(size);
|
||||
logsCount.incrementAndGet();
|
||||
} catch (FileNotFoundException e) {
|
||||
logsMissing++;
|
||||
logsMissing.incrementAndGet();
|
||||
}
|
||||
return new FileInfo(false, size);
|
||||
}
|
||||
|
@ -368,8 +370,8 @@ public final class SnapshotInfo extends Configured implements Tool {
|
|||
final SnapshotDescription snapshotDesc = snapshotManifest.getSnapshotDescription();
|
||||
final String table = snapshotDesc.getTable();
|
||||
final SnapshotStats stats = new SnapshotStats(this.getConf(), this.fs, snapshotDesc);
|
||||
SnapshotReferenceUtil.visitReferencedFiles(getConf(), fs,
|
||||
snapshotManifest.getSnapshotDir(), snapshotDesc, new SnapshotReferenceUtil.SnapshotVisitor() {
|
||||
SnapshotReferenceUtil.concurrentVisitReferencedFiles(getConf(), fs, snapshotManifest,
|
||||
new SnapshotReferenceUtil.SnapshotVisitor() {
|
||||
@Override
|
||||
public void storeFile(final HRegionInfo regionInfo, final String family,
|
||||
final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
|
||||
|
@ -448,8 +450,9 @@ public final class SnapshotInfo extends Configured implements Tool {
|
|||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
FileSystem fs = FileSystem.get(rootDir.toUri(), conf);
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
|
||||
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshot);
|
||||
final SnapshotStats stats = new SnapshotStats(conf, fs, snapshot);
|
||||
SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshot,
|
||||
SnapshotReferenceUtil.concurrentVisitReferencedFiles(conf, fs, manifest,
|
||||
new SnapshotReferenceUtil.SnapshotVisitor() {
|
||||
@Override
|
||||
public void storeFile(final HRegionInfo regionInfo, final String family,
|
||||
|
|
|
@ -442,8 +442,8 @@ public class SnapshotManifest {
|
|||
return createExecutor(conf, name);
|
||||
}
|
||||
|
||||
static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
|
||||
int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 4);
|
||||
public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
|
||||
int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
|
||||
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
|
||||
Threads.getNamedThreadFactory(name));
|
||||
}
|
||||
|
|
|
@ -103,14 +103,15 @@ public class SnapshotManifestV2 {
|
|||
}
|
||||
|
||||
public void storeFile(final SnapshotRegionManifest.Builder region,
|
||||
final SnapshotRegionManifest.FamilyFiles.Builder family, final StoreFileInfo storeFile) {
|
||||
final SnapshotRegionManifest.FamilyFiles.Builder family, final StoreFileInfo storeFile)
|
||||
throws IOException {
|
||||
SnapshotRegionManifest.StoreFile.Builder sfManifest =
|
||||
SnapshotRegionManifest.StoreFile.newBuilder();
|
||||
sfManifest.setName(storeFile.getPath().getName());
|
||||
if (storeFile.isReference()) {
|
||||
sfManifest.setReference(storeFile.getReference().convert());
|
||||
}
|
||||
sfManifest.setFileSize(storeFile.getFileStatus().getLen());
|
||||
sfManifest.setFileSize(storeFile.getReferencedFileStatus(fs).getLen());
|
||||
family.addStoreFiles(sfManifest.build());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.hbase.snapshot;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.HashSet;
|
||||
|
@ -32,6 +33,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
@ -107,7 +109,7 @@ public final class SnapshotReferenceUtil {
|
|||
visitLogFiles(fs, snapshotDir, visitor);
|
||||
}
|
||||
|
||||
/**
|
||||
/**©
|
||||
* Iterate over the snapshot store files
|
||||
*
|
||||
* @param conf The current {@link Configuration} instance.
|
||||
|
@ -117,7 +119,7 @@ public final class SnapshotReferenceUtil {
|
|||
* @param visitor callback object to get the store files
|
||||
* @throws IOException if an error occurred while scanning the directory
|
||||
*/
|
||||
public static void visitTableStoreFiles(final Configuration conf, final FileSystem fs,
|
||||
static void visitTableStoreFiles(final Configuration conf, final FileSystem fs,
|
||||
final Path snapshotDir, final SnapshotDescription desc, final StoreFileVisitor visitor)
|
||||
throws IOException {
|
||||
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, desc);
|
||||
|
@ -139,7 +141,7 @@ public final class SnapshotReferenceUtil {
|
|||
* @param visitor callback object to get the store files
|
||||
* @throws IOException if an error occurred while scanning the directory
|
||||
*/
|
||||
public static void visitRegionStoreFiles(final SnapshotRegionManifest manifest,
|
||||
static void visitRegionStoreFiles(final SnapshotRegionManifest manifest,
|
||||
final StoreFileVisitor visitor) throws IOException {
|
||||
HRegionInfo regionInfo = HRegionInfo.convert(manifest.getRegionInfo());
|
||||
for (SnapshotRegionManifest.FamilyFiles familyFiles: manifest.getFamilyFilesList()) {
|
||||
|
@ -192,6 +194,19 @@ public final class SnapshotReferenceUtil {
|
|||
final SnapshotManifest manifest) throws IOException {
|
||||
final SnapshotDescription snapshotDesc = manifest.getSnapshotDescription();
|
||||
final Path snapshotDir = manifest.getSnapshotDir();
|
||||
concurrentVisitReferencedFiles(conf, fs, manifest, new StoreFileVisitor() {
|
||||
@Override
|
||||
public void storeFile(final HRegionInfo regionInfo, final String family,
|
||||
final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
|
||||
verifyStoreFile(conf, fs, snapshotDir, snapshotDesc, regionInfo, family, storeFile);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public static void concurrentVisitReferencedFiles(final Configuration conf, final FileSystem fs,
|
||||
final SnapshotManifest manifest, final StoreFileVisitor visitor) throws IOException {
|
||||
final SnapshotDescription snapshotDesc = manifest.getSnapshotDescription();
|
||||
final Path snapshotDir = manifest.getSnapshotDir();
|
||||
|
||||
List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
|
||||
if (regionManifests == null || regionManifests.size() == 0) {
|
||||
|
@ -207,13 +222,7 @@ public final class SnapshotReferenceUtil {
|
|||
completionService.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
visitRegionStoreFiles(regionManifest, new StoreFileVisitor() {
|
||||
@Override
|
||||
public void storeFile(final HRegionInfo regionInfo, final String family,
|
||||
final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
|
||||
verifyStoreFile(conf, fs, snapshotDir, snapshotDesc, regionInfo, family, storeFile);
|
||||
}
|
||||
});
|
||||
visitRegionStoreFiles(regionManifest, visitor);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
@ -251,19 +260,28 @@ public final class SnapshotReferenceUtil {
|
|||
* @throws CorruptedSnapshotException if the snapshot is corrupted
|
||||
* @throws IOException if an error occurred while scanning the directory
|
||||
*/
|
||||
public static void verifyStoreFile(final Configuration conf, final FileSystem fs,
|
||||
private static void verifyStoreFile(final Configuration conf, final FileSystem fs,
|
||||
final Path snapshotDir, final SnapshotDescription snapshot, final HRegionInfo regionInfo,
|
||||
final String family, final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
|
||||
TableName table = TableName.valueOf(snapshot.getTable());
|
||||
String fileName = storeFile.getName();
|
||||
|
||||
Path refPath = null;
|
||||
if (StoreFileInfo.isReference(fileName)) {
|
||||
// If is a reference file check if the parent file is present in the snapshot
|
||||
Path snapshotHFilePath = new Path(new Path(
|
||||
new Path(snapshotDir, regionInfo.getEncodedName()), family), fileName);
|
||||
refPath = StoreFileInfo.getReferredToFile(snapshotHFilePath);
|
||||
if (!fs.exists(refPath)) {
|
||||
throw new CorruptedSnapshotException("Missing parent hfile for: " + fileName, snapshot);
|
||||
refPath = new Path(new Path(regionInfo.getEncodedName(), family), fileName);
|
||||
refPath = StoreFileInfo.getReferredToFile(refPath);
|
||||
String refRegion = refPath.getParent().getParent().getName();
|
||||
refPath = HFileLink.createPath(table, refRegion, family, refPath.getName());
|
||||
if (!new HFileLink(conf, refPath).exists(fs)) {
|
||||
throw new CorruptedSnapshotException("Missing parent hfile for: " + fileName +
|
||||
" path=" + refPath, snapshot);
|
||||
}
|
||||
|
||||
if (storeFile.hasReference()) {
|
||||
// We don't really need to look for the file on-disk
|
||||
// we already have the Reference information embedded here.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -274,15 +292,25 @@ public final class SnapshotReferenceUtil {
|
|||
linkPath = new Path(family, fileName);
|
||||
} else {
|
||||
linkPath = new Path(family, HFileLink.createHFileLinkName(
|
||||
TableName.valueOf(snapshot.getTable()), regionInfo.getEncodedName(), fileName));
|
||||
table, regionInfo.getEncodedName(), fileName));
|
||||
}
|
||||
|
||||
// check if the linked file exists (in the archive, or in the table dir)
|
||||
HFileLink link = new HFileLink(conf, linkPath);
|
||||
if (!link.exists(fs)) {
|
||||
throw new CorruptedSnapshotException("Can't find hfile: " + fileName
|
||||
+ " in the real (" + link.getOriginPath() + ") or archive (" + link.getArchivePath()
|
||||
+ ") directory for the primary table.", snapshot);
|
||||
try {
|
||||
FileStatus fstat = link.getFileStatus(fs);
|
||||
if (storeFile.hasFileSize() && storeFile.getFileSize() != fstat.getLen()) {
|
||||
String msg = "hfile: " + fileName + " size does not match with the expected one. " +
|
||||
" found=" + fstat.getLen() + " expected=" + storeFile.getFileSize();
|
||||
LOG.error(msg);
|
||||
throw new CorruptedSnapshotException(msg, snapshot);
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
String msg = "Can't find hfile: " + fileName + " in the real (" +
|
||||
link.getOriginPath() + ") or archive (" + link.getArchivePath()
|
||||
+ ") directory for the primary table.";
|
||||
LOG.error(msg);
|
||||
throw new CorruptedSnapshotException(msg, snapshot);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.util;
|
|||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
|
@ -56,6 +57,10 @@ public abstract class ModifyRegionUtils {
|
|||
void fillRegion(final HRegion region) throws IOException;
|
||||
}
|
||||
|
||||
public interface RegionEditTask {
|
||||
void editRegion(final HRegionInfo region) throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create new set of regions on the specified file-system.
|
||||
* NOTE: that you should add the regions to hbase:meta after this operation.
|
||||
|
@ -107,10 +112,36 @@ public abstract class ModifyRegionUtils {
|
|||
final RegionFillTask task) throws IOException {
|
||||
if (newRegions == null) return null;
|
||||
int regionNumber = newRegions.length;
|
||||
ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf,
|
||||
ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf,
|
||||
"RegionOpenAndInitThread-" + hTableDescriptor.getTableName(), regionNumber);
|
||||
CompletionService<HRegionInfo> completionService = new ExecutorCompletionService<HRegionInfo>(
|
||||
regionOpenAndInitThreadPool);
|
||||
try {
|
||||
return createRegions(exec, conf, rootDir, tableDir, hTableDescriptor, newRegions, task);
|
||||
} finally {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create new set of regions on the specified file-system.
|
||||
* NOTE: that you should add the regions to hbase:meta after this operation.
|
||||
*
|
||||
* @param exec Thread Pool Executor
|
||||
* @param conf {@link Configuration}
|
||||
* @param rootDir Root directory for HBase instance
|
||||
* @param tableDir table directory
|
||||
* @param hTableDescriptor description of the table
|
||||
* @param newRegions {@link HRegionInfo} that describes the regions to create
|
||||
* @param task {@link RegionFillTask} custom code to populate region after creation
|
||||
* @throws IOException
|
||||
*/
|
||||
public static List<HRegionInfo> createRegions(final ThreadPoolExecutor exec,
|
||||
final Configuration conf, final Path rootDir, final Path tableDir,
|
||||
final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
|
||||
final RegionFillTask task) throws IOException {
|
||||
if (newRegions == null) return null;
|
||||
int regionNumber = newRegions.length;
|
||||
CompletionService<HRegionInfo> completionService =
|
||||
new ExecutorCompletionService<HRegionInfo>(exec);
|
||||
List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
|
||||
for (final HRegionInfo newRegion : newRegions) {
|
||||
completionService.submit(new Callable<HRegionInfo>() {
|
||||
|
@ -132,8 +163,6 @@ public abstract class ModifyRegionUtils {
|
|||
throw new InterruptedIOException(e.getMessage());
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
regionOpenAndInitThreadPool.shutdownNow();
|
||||
}
|
||||
return regionInfos;
|
||||
}
|
||||
|
@ -167,6 +196,41 @@ public abstract class ModifyRegionUtils {
|
|||
return region.getRegionInfo();
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the task on the specified set of regions.
|
||||
*
|
||||
* @param exec Thread Pool Executor
|
||||
* @param regions {@link HRegionInfo} that describes the regions to edit
|
||||
* @param task {@link RegionFillTask} custom code to edit the region
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void editRegions(final ThreadPoolExecutor exec,
|
||||
final Collection<HRegionInfo> regions, final RegionEditTask task) throws IOException {
|
||||
final ExecutorCompletionService<Void> completionService =
|
||||
new ExecutorCompletionService<Void>(exec);
|
||||
for (final HRegionInfo hri: regions) {
|
||||
completionService.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
task.editRegion(hri);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
for (HRegionInfo hri: regions) {
|
||||
completionService.take().get();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException(e.getMessage());
|
||||
} catch (ExecutionException e) {
|
||||
IOException ex = new IOException();
|
||||
ex.initCause(e.getCause());
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* used by createRegions() to get the thread pool executor based on the
|
||||
* "hbase.hregion.open.and.init.threads.max" property.
|
||||
|
|
Loading…
Reference in New Issue