HBASE-23197 'IllegalArgumentException: Wrong FS' on edits replay when… (#740)
Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
parent
1fa9a8c62a
commit
5635752179
|
@ -38,9 +38,11 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.PathFilter;
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||||
|
@ -295,8 +297,45 @@ public class HFileArchiver {
|
||||||
*/
|
*/
|
||||||
public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionInfo regionInfo,
|
public static void archiveStoreFiles(Configuration conf, FileSystem fs, RegionInfo regionInfo,
|
||||||
Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles)
|
Path tableDir, byte[] family, Collection<HStoreFile> compactedFiles)
|
||||||
throws IOException, FailedArchiveException {
|
throws IOException {
|
||||||
|
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
|
||||||
|
archive(fs, regionInfo, family, compactedFiles, storeArchiveDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Archive recovered edits using existing logic for archiving store files. This is currently only
|
||||||
|
* relevant when <b>hbase.region.archive.recovered.edits</b> is true, as recovered edits shouldn't
|
||||||
|
* be kept after replay. In theory, we could use very same method available for archiving
|
||||||
|
* store files, but supporting WAL dir and store files on different FileSystems added the need for
|
||||||
|
* extra validation of the passed FileSystem instance and the path where the archiving edits
|
||||||
|
* should be placed.
|
||||||
|
* @param conf {@link Configuration} to determine the archive directory.
|
||||||
|
* @param fs the filesystem used for storing WAL files.
|
||||||
|
* @param regionInfo {@link RegionInfo} a pseudo region representation for the archiving logic.
|
||||||
|
* @param family a pseudo familiy representation for the archiving logic.
|
||||||
|
* @param replayedEdits the recovered edits to be archived.
|
||||||
|
* @throws IOException if files can't be achived due to some internal error.
|
||||||
|
*/
|
||||||
|
public static void archiveRecoveredEdits(Configuration conf, FileSystem fs, RegionInfo regionInfo,
|
||||||
|
byte[] family, Collection<HStoreFile> replayedEdits)
|
||||||
|
throws IOException {
|
||||||
|
String workingDir = conf.get(CommonFSUtils.HBASE_WAL_DIR, conf.get(HConstants.HBASE_DIR));
|
||||||
|
//extra sanity checks for the right FS
|
||||||
|
Path path = new Path(workingDir);
|
||||||
|
if(path.isAbsoluteAndSchemeAuthorityNull()){
|
||||||
|
//no schema specified on wal dir value, so it's on same FS as StoreFiles
|
||||||
|
path = new Path(conf.get(HConstants.HBASE_DIR));
|
||||||
|
}
|
||||||
|
if(path.toUri().getScheme()!=null && !path.toUri().getScheme().equals(fs.getScheme())){
|
||||||
|
throw new IOException("Wrong file system! Should be " + path.toUri().getScheme() +
|
||||||
|
", but got " + fs.getScheme());
|
||||||
|
}
|
||||||
|
path = HFileArchiveUtil.getStoreArchivePathForRootDir(path, regionInfo, family);
|
||||||
|
archive(fs, regionInfo, family, replayedEdits, path);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void archive(FileSystem fs, RegionInfo regionInfo, byte[] family,
|
||||||
|
Collection<HStoreFile> compactedFiles, Path storeArchiveDir) throws IOException {
|
||||||
// sometimes in testing, we don't have rss, so we need to check for that
|
// sometimes in testing, we don't have rss, so we need to check for that
|
||||||
if (fs == null) {
|
if (fs == null) {
|
||||||
LOG.warn("Passed filesystem is null, so just deleting files without archiving for {}," +
|
LOG.warn("Passed filesystem is null, so just deleting files without archiving for {}," +
|
||||||
|
@ -314,9 +353,6 @@ public class HFileArchiver {
|
||||||
// build the archive path
|
// build the archive path
|
||||||
if (regionInfo == null || family == null) throw new IOException(
|
if (regionInfo == null || family == null) throw new IOException(
|
||||||
"Need to have a region and a family to archive from.");
|
"Need to have a region and a family to archive from.");
|
||||||
|
|
||||||
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, regionInfo, tableDir, family);
|
|
||||||
|
|
||||||
// make sure we don't archive if we can't and that the archive dir exists
|
// make sure we don't archive if we can't and that the archive dir exists
|
||||||
if (!fs.mkdirs(storeArchiveDir)) {
|
if (!fs.mkdirs(storeArchiveDir)) {
|
||||||
throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"
|
throw new IOException("Could not make archive directory (" + storeArchiveDir + ") for store:"
|
||||||
|
|
|
@ -1945,8 +1945,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return the WAL {@link HRegionFileSystem} used by this region */
|
/** @return the WAL {@link HRegionFileSystem} used by this region */
|
||||||
HRegionFileSystem getRegionWALFileSystem() throws IOException {
|
HRegionWALFileSystem getRegionWALFileSystem() throws IOException {
|
||||||
return new HRegionFileSystem(conf, getWalFileSystem(),
|
return new HRegionWALFileSystem(conf, getWalFileSystem(),
|
||||||
FSUtils.getWALTableDir(conf, htableDescriptor.getTableName()), fs.getRegionInfo());
|
FSUtils.getWALTableDir(conf, htableDescriptor.getTableName()), fs.getRegionInfo());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4639,7 +4639,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
for (Path file : files) {
|
for (Path file : files) {
|
||||||
fakeStoreFiles.add(new HStoreFile(walFS, file, this.conf, null, null, true));
|
fakeStoreFiles.add(new HStoreFile(walFS, file, this.conf, null, null, true));
|
||||||
}
|
}
|
||||||
getRegionWALFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
|
getRegionWALFileSystem().archiveRecoveredEdits(fakeFamilyName, fakeStoreFiles);
|
||||||
} else {
|
} else {
|
||||||
for (Path file : Iterables.concat(files, filesUnderWrongRegionWALDir)) {
|
for (Path file : Iterables.concat(files, filesUnderWrongRegionWALDir)) {
|
||||||
if (!walFS.delete(file, false)) {
|
if (!walFS.delete(file, false)) {
|
||||||
|
|
|
@ -80,10 +80,10 @@ public class HRegionFileSystem {
|
||||||
|
|
||||||
private final RegionInfo regionInfo;
|
private final RegionInfo regionInfo;
|
||||||
//regionInfo for interacting with FS (getting encodedName, etc)
|
//regionInfo for interacting with FS (getting encodedName, etc)
|
||||||
private final RegionInfo regionInfoForFs;
|
final RegionInfo regionInfoForFs;
|
||||||
private final Configuration conf;
|
final Configuration conf;
|
||||||
private final Path tableDir;
|
private final Path tableDir;
|
||||||
private final FileSystem fs;
|
final FileSystem fs;
|
||||||
private final Path regionDir;
|
private final Path regionDir;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Wrapper for the region FileSystem operations adding WAL specific operations
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class HRegionWALFileSystem extends HRegionFileSystem {
|
||||||
|
|
||||||
|
HRegionWALFileSystem(Configuration conf, FileSystem fs, Path tableDir, RegionInfo regionInfo) {
|
||||||
|
super(conf, fs, tableDir, regionInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes and archives the specified store files from the specified family.
|
||||||
|
* @param familyName Family that contains the store filesMeta
|
||||||
|
* @param storeFiles set of store files to remove
|
||||||
|
* @throws IOException if the archiving fails
|
||||||
|
*/
|
||||||
|
public void archiveRecoveredEdits(String familyName, Collection<HStoreFile> storeFiles)
|
||||||
|
throws IOException {
|
||||||
|
HFileArchiver.archiveRecoveredEdits(this.conf, this.fs, this.regionInfoForFs,
|
||||||
|
Bytes.toBytes(familyName), storeFiles);
|
||||||
|
}
|
||||||
|
}
|
|
@ -85,6 +85,23 @@ public final class HFileArchiveUtil {
|
||||||
return HStore.getStoreHomedir(tableArchiveDir, region, family);
|
return HStore.getStoreHomedir(tableArchiveDir, region, family);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the archive directory under specified root dir. One scenario where this is useful is
|
||||||
|
* when WAL and root dir are configured under different file systems,
|
||||||
|
* i.e. root dir on S3 and WALs on HDFS.
|
||||||
|
* This is mostly useful for archiving recovered edits, when
|
||||||
|
* <b>hbase.region.archive.recovered.edits</b> is enabled.
|
||||||
|
* @param rootDir {@link Path} the root dir under which archive path should be created.
|
||||||
|
* @param region parent region information under which the store currently lives
|
||||||
|
* @param family name of the family in the store
|
||||||
|
* @return {@link Path} to the WAL FS directory to archive the given store
|
||||||
|
* or <tt>null</tt> if it should not be archived
|
||||||
|
*/
|
||||||
|
public static Path getStoreArchivePathForRootDir(Path rootDir, RegionInfo region, byte[] family) {
|
||||||
|
Path tableArchiveDir = getTableArchivePath(rootDir, region.getTable());
|
||||||
|
return HStore.getStoreHomedir(tableArchiveDir, region, family);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the archive directory for a given region under the specified table
|
* Get the archive directory for a given region under the specified table
|
||||||
* @param tableName the table name. Cannot be null.
|
* @param tableName the table name. Cannot be null.
|
||||||
|
|
|
@ -20,10 +20,16 @@ package org.apache.hadoop.hbase.backup;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -39,15 +45,18 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
|
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||||
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
|
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
|
import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
|
||||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||||
|
@ -62,6 +71,7 @@ import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -123,6 +133,107 @@ public class TestHFileArchiving {
|
||||||
POOL.shutdownNow();
|
POOL.shutdownNow();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testArchiveStoreFilesDifferentFileSystemsWallWithSchemaPlainRoot() throws Exception {
|
||||||
|
String walDir = "mockFS://mockFSAuthority:9876/mockDir/wals/";
|
||||||
|
String baseDir = FSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/";
|
||||||
|
testArchiveStoreFilesDifferentFileSystems(walDir, baseDir,
|
||||||
|
HFileArchiver::archiveStoreFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testArchiveStoreFilesDifferentFileSystemsWallNullPlainRoot() throws Exception {
|
||||||
|
String baseDir = FSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/";
|
||||||
|
testArchiveStoreFilesDifferentFileSystems(null, baseDir,
|
||||||
|
HFileArchiver::archiveStoreFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testArchiveStoreFilesDifferentFileSystemsWallAndRootSame() throws Exception {
|
||||||
|
String baseDir = FSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/";
|
||||||
|
testArchiveStoreFilesDifferentFileSystems("/hbase/wals/", baseDir,
|
||||||
|
HFileArchiver::archiveStoreFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testArchiveStoreFilesDifferentFileSystems(String walDir, String expectedBase,
|
||||||
|
ArchivingFunction<Configuration, FileSystem, RegionInfo, Path, byte[],
|
||||||
|
Collection<HStoreFile>> archivingFunction) throws IOException {
|
||||||
|
FileSystem mockedFileSystem = mock(FileSystem.class);
|
||||||
|
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||||
|
if(walDir != null) {
|
||||||
|
conf.set(CommonFSUtils.HBASE_WAL_DIR, walDir);
|
||||||
|
}
|
||||||
|
Path filePath = new Path("/mockDir/wals/mockFile");
|
||||||
|
when(mockedFileSystem.getScheme()).thenReturn("mockFS");
|
||||||
|
when(mockedFileSystem.mkdirs(any())).thenReturn(true);
|
||||||
|
when(mockedFileSystem.exists(any())).thenReturn(true);
|
||||||
|
RegionInfo mockedRegion = mock(RegionInfo.class);
|
||||||
|
TableName tableName = TableName.valueOf("mockTable");
|
||||||
|
when(mockedRegion.getTable()).thenReturn(tableName);
|
||||||
|
when(mockedRegion.getEncodedName()).thenReturn("mocked-region-encoded-name");
|
||||||
|
Path tableDir = new Path("mockFS://mockDir/tabledir");
|
||||||
|
byte[] family = Bytes.toBytes("testfamily");
|
||||||
|
HStoreFile mockedFile = mock(HStoreFile.class);
|
||||||
|
List<HStoreFile> list = new ArrayList<>();
|
||||||
|
list.add(mockedFile);
|
||||||
|
when(mockedFile.getPath()).thenReturn(filePath);
|
||||||
|
when(mockedFileSystem.rename(any(),any())).thenReturn(true);
|
||||||
|
archivingFunction.apply(conf, mockedFileSystem, mockedRegion, tableDir, family, list);
|
||||||
|
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
|
||||||
|
verify(mockedFileSystem, times(2)).rename(pathCaptor.capture(), any());
|
||||||
|
String expectedDir = expectedBase +
|
||||||
|
"archive/data/default/mockTable/mocked-region-encoded-name/testfamily/mockFile";
|
||||||
|
assertTrue(pathCaptor.getAllValues().get(0).toString().equals(expectedDir));
|
||||||
|
}
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
private interface ArchivingFunction<Configuration, FS, Region, Dir, Family, Files> {
|
||||||
|
void apply(Configuration config, FS fs, Region region, Dir dir, Family family, Files files)
|
||||||
|
throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testArchiveRecoveredEditsWalDirNull() throws Exception {
|
||||||
|
testArchiveRecoveredEditsWalDirNullOrSame(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testArchiveRecoveredEditsWalDirSameFsStoreFiles() throws Exception {
|
||||||
|
testArchiveRecoveredEditsWalDirNullOrSame("/wal-dir");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testArchiveRecoveredEditsWalDirNullOrSame(String walDir) throws Exception {
|
||||||
|
String originalRootDir = UTIL.getConfiguration().get(HConstants.HBASE_DIR);
|
||||||
|
try {
|
||||||
|
String baseDir = "mockFS://mockFSAuthority:9876/hbase/";
|
||||||
|
UTIL.getConfiguration().set(HConstants.HBASE_DIR, baseDir);
|
||||||
|
testArchiveStoreFilesDifferentFileSystems(walDir, baseDir,
|
||||||
|
(conf, fs, region, dir, family, list) -> HFileArchiver
|
||||||
|
.archiveRecoveredEdits(conf, fs, region, family, list));
|
||||||
|
} finally {
|
||||||
|
UTIL.getConfiguration().set(HConstants.HBASE_DIR, originalRootDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testArchiveRecoveredEditsWrongFS() throws Exception {
|
||||||
|
String baseDir = FSUtils.getRootDir(UTIL.getConfiguration()).toString() + "/";
|
||||||
|
//Internally, testArchiveStoreFilesDifferentFileSystems will pass a "mockedFS"
|
||||||
|
// to HFileArchiver.archiveRecoveredEdits, but since wal-dir is supposedly on same FS
|
||||||
|
// as root dir it would lead to conflicting FSes and an IOException is expected.
|
||||||
|
testArchiveStoreFilesDifferentFileSystems("/wal-dir", baseDir,
|
||||||
|
(conf, fs, region, dir, family, list) -> HFileArchiver
|
||||||
|
.archiveRecoveredEdits(conf, fs, region, family, list));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testArchiveRecoveredEditsWalDirDifferentFS() throws Exception {
|
||||||
|
String walDir = "mockFS://mockFSAuthority:9876/mockDir/wals/";
|
||||||
|
testArchiveStoreFilesDifferentFileSystems(walDir, walDir,
|
||||||
|
(conf, fs, region, dir, family, list) ->
|
||||||
|
HFileArchiver.archiveRecoveredEdits(conf, fs, region, family, list));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveRegionDirOnArchive() throws Exception {
|
public void testRemoveRegionDirOnArchive() throws Exception {
|
||||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
|
|
|
@ -147,9 +147,11 @@ import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
@ -679,6 +681,60 @@ public class TestHRegion {
|
||||||
scanner1.close();
|
scanner1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testArchiveRecoveredEditsReplay() throws Exception {
|
||||||
|
byte[] family = Bytes.toBytes("family");
|
||||||
|
this.region = initHRegion(tableName, method, CONF, family);
|
||||||
|
final WALFactory wals = new WALFactory(CONF, method);
|
||||||
|
try {
|
||||||
|
Path regiondir = region.getRegionFileSystem().getRegionDir();
|
||||||
|
FileSystem fs = region.getRegionFileSystem().getFileSystem();
|
||||||
|
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
|
||||||
|
|
||||||
|
Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
|
||||||
|
|
||||||
|
long maxSeqId = 1050;
|
||||||
|
long minSeqId = 1000;
|
||||||
|
|
||||||
|
for (long i = minSeqId; i <= maxSeqId; i += 10) {
|
||||||
|
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
|
||||||
|
fs.create(recoveredEdits);
|
||||||
|
WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
|
||||||
|
|
||||||
|
long time = System.nanoTime();
|
||||||
|
WALEdit edit = new WALEdit();
|
||||||
|
edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
|
||||||
|
.toBytes(i)));
|
||||||
|
writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
|
||||||
|
HConstants.DEFAULT_CLUSTER_ID), edit));
|
||||||
|
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
MonitoredTask status = TaskMonitor.get().createStatus(method);
|
||||||
|
Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
for (HStore store : region.getStores()) {
|
||||||
|
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
|
||||||
|
}
|
||||||
|
CONF.set("hbase.region.archive.recovered.edits", "true");
|
||||||
|
CONF.set(CommonFSUtils.HBASE_WAL_DIR, "/custom_wal_dir");
|
||||||
|
long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
|
||||||
|
assertEquals(maxSeqId, seqId);
|
||||||
|
region.getMVCC().advanceTo(seqId);
|
||||||
|
String fakeFamilyName = recoveredEditsDir.getName();
|
||||||
|
Path rootDir = new Path(CONF.get(HConstants.HBASE_DIR));
|
||||||
|
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(rootDir,
|
||||||
|
region.getRegionInfo(), Bytes.toBytes(fakeFamilyName));
|
||||||
|
FileStatus[] list = TEST_UTIL.getTestFileSystem().listStatus(storeArchiveDir);
|
||||||
|
assertEquals(6, list.length);
|
||||||
|
} finally {
|
||||||
|
CONF.set("hbase.region.archive.recovered.edits", "false");
|
||||||
|
CONF.set(CommonFSUtils.HBASE_WAL_DIR, "");
|
||||||
|
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
||||||
|
this.region = null;
|
||||||
|
wals.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSkipRecoveredEditsReplay() throws Exception {
|
public void testSkipRecoveredEditsReplay() throws Exception {
|
||||||
byte[] family = Bytes.toBytes("family");
|
byte[] family = Bytes.toBytes("family");
|
||||||
|
|
Loading…
Reference in New Issue