|
|
|
@ -19,45 +19,49 @@ package org.apache.hadoop.hbase.backup.example;
|
|
|
|
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
|
|
import static org.junit.Assert.assertNull;
|
|
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
|
|
|
import org.apache.hadoop.hbase.LargeTests;
|
|
|
|
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
|
|
|
import org.apache.hadoop.hbase.MediumTests;
|
|
|
|
|
import org.apache.hadoop.hbase.Stoppable;
|
|
|
|
|
import org.apache.hadoop.hbase.client.Put;
|
|
|
|
|
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
|
|
|
|
|
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
|
|
|
|
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
|
|
|
|
|
import org.apache.hadoop.hbase.regionserver.CheckedArchivingHFileCleaner;
|
|
|
|
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
|
|
|
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
|
|
|
|
import org.apache.hadoop.hbase.regionserver.Store;
|
|
|
|
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
|
|
|
|
import org.apache.hadoop.hbase.util.Bytes;
|
|
|
|
|
import org.apache.hadoop.hbase.util.FSUtils;
|
|
|
|
|
import org.apache.hadoop.hbase.util.HFileArchiveTestingUtil;
|
|
|
|
|
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
|
|
|
|
import org.apache.hadoop.hbase.util.StoppableImplementation;
|
|
|
|
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
|
|
|
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
|
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
|
|
import org.junit.After;
|
|
|
|
|
import org.junit.AfterClass;
|
|
|
|
|
import org.junit.Before;
|
|
|
|
|
import org.junit.BeforeClass;
|
|
|
|
|
import org.junit.Test;
|
|
|
|
|
import org.junit.experimental.categories.Category;
|
|
|
|
|
import org.mockito.Mockito;
|
|
|
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
|
|
|
import org.mockito.stubbing.Answer;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Spin up a small cluster and check that the hfiles of region are properly long-term archived as
|
|
|
|
|
* specified via the {@link ZKTableArchiveClient}.
|
|
|
|
|
*/
|
|
|
|
|
@Category(LargeTests.class)
|
|
|
|
|
@Category(MediumTests.class)
|
|
|
|
|
public class TestZooKeeperTableArchiveClient {
|
|
|
|
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(TestZooKeeperTableArchiveClient.class);
|
|
|
|
@ -65,10 +69,8 @@ public class TestZooKeeperTableArchiveClient {
|
|
|
|
|
private static final String STRING_TABLE_NAME = "test";
|
|
|
|
|
private static final byte[] TEST_FAM = Bytes.toBytes("fam");
|
|
|
|
|
private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
|
|
|
|
|
private static final int numRS = 2;
|
|
|
|
|
private static final int maxTries = 5;
|
|
|
|
|
private static final long ttl = 1000;
|
|
|
|
|
private static ZKTableArchiveClient archivingClient;
|
|
|
|
|
private final List<Path> toCleanup = new ArrayList<Path>();
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Setup the config for the cluster
|
|
|
|
@ -76,44 +78,35 @@ public class TestZooKeeperTableArchiveClient {
|
|
|
|
|
@BeforeClass
|
|
|
|
|
public static void setupCluster() throws Exception {
|
|
|
|
|
setupConf(UTIL.getConfiguration());
|
|
|
|
|
UTIL.startMiniCluster(numRS);
|
|
|
|
|
UTIL.startMiniZKCluster();
|
|
|
|
|
archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), UTIL.getHBaseAdmin()
|
|
|
|
|
.getConnection());
|
|
|
|
|
// make hfile archiving node so we can archive files
|
|
|
|
|
ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
|
|
|
|
|
String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
|
|
|
|
|
ZKUtil.createWithParents(watcher, archivingZNode);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static void setupConf(Configuration conf) {
|
|
|
|
|
// disable the ui
|
|
|
|
|
conf.setInt("hbase.regionsever.info.port", -1);
|
|
|
|
|
// change the flush size to a small amount, regulating number of store files
|
|
|
|
|
conf.setInt("hbase.hregion.memstore.flush.size", 25000);
|
|
|
|
|
// so make sure we get a compaction when doing a load, but keep around some
|
|
|
|
|
// files in the store
|
|
|
|
|
conf.setInt("hbase.hstore.compaction.min", 10);
|
|
|
|
|
conf.setInt("hbase.hstore.compactionThreshold", 10);
|
|
|
|
|
// block writes if we get to 12 store files
|
|
|
|
|
conf.setInt("hbase.hstore.blockingStoreFiles", 12);
|
|
|
|
|
// only compact with 3 files
|
|
|
|
|
conf.setInt("hbase.hstore.compaction.min", 3);
|
|
|
|
|
// drop the number of attempts for the hbase admin
|
|
|
|
|
conf.setInt("hbase.client.retries.number", 1);
|
|
|
|
|
// set the ttl on the hfiles
|
|
|
|
|
conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl);
|
|
|
|
|
conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
|
|
|
|
|
CheckedArchivingHFileCleaner.class.getCanonicalName(),
|
|
|
|
|
LongTermArchivingHFileCleaner.class.getCanonicalName());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Before
|
|
|
|
|
public void setup() throws Exception {
|
|
|
|
|
UTIL.createTable(TABLE_NAME, TEST_FAM);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@After
|
|
|
|
|
public void tearDown() throws Exception {
|
|
|
|
|
UTIL.deleteTable(TABLE_NAME);
|
|
|
|
|
// and cleanup the archive directory
|
|
|
|
|
try {
|
|
|
|
|
UTIL.getTestFileSystem().delete(new Path(UTIL.getDefaultRootDirPath(), ".archive"), true);
|
|
|
|
|
FileSystem fs = UTIL.getTestFileSystem();
|
|
|
|
|
// cleanup each of the files/directories registered
|
|
|
|
|
for (Path file : toCleanup) {
|
|
|
|
|
// remove the table and archive directories
|
|
|
|
|
FSUtils.delete(fs, file, true);
|
|
|
|
|
}
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
LOG.warn("Failure to delete archive directory", e);
|
|
|
|
|
} finally {
|
|
|
|
|
toCleanup.clear();
|
|
|
|
|
}
|
|
|
|
|
// make sure that backups are off for all tables
|
|
|
|
|
archivingClient.disableHFileBackup();
|
|
|
|
@ -122,7 +115,7 @@ public class TestZooKeeperTableArchiveClient {
|
|
|
|
|
@AfterClass
|
|
|
|
|
public static void cleanupTest() throws Exception {
|
|
|
|
|
try {
|
|
|
|
|
UTIL.shutdownMiniCluster();
|
|
|
|
|
UTIL.shutdownMiniZKCluster();
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
LOG.warn("problem shutting down cluster", e);
|
|
|
|
|
}
|
|
|
|
@ -156,227 +149,263 @@ public class TestZooKeeperTableArchiveClient {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testArchivingOnSingleTable() throws Exception {
|
|
|
|
|
// turn on hfile retention
|
|
|
|
|
LOG.debug("----Starting archiving");
|
|
|
|
|
archivingClient.enableHFileBackupAsync(TABLE_NAME);
|
|
|
|
|
assertTrue("Archving didn't get turned on", archivingClient
|
|
|
|
|
.getArchivingEnabled(TABLE_NAME));
|
|
|
|
|
createArchiveDirectory();
|
|
|
|
|
FileSystem fs = UTIL.getTestFileSystem();
|
|
|
|
|
Path archiveDir = getArchiveDir();
|
|
|
|
|
Path tableDir = getTableDir(STRING_TABLE_NAME);
|
|
|
|
|
toCleanup.add(archiveDir);
|
|
|
|
|
toCleanup.add(tableDir);
|
|
|
|
|
|
|
|
|
|
// get the RS and region serving our table
|
|
|
|
|
List<HRegion> servingRegions = UTIL.getHBaseCluster().getRegions(TABLE_NAME);
|
|
|
|
|
// make sure we only have 1 region serving this table
|
|
|
|
|
assertEquals(1, servingRegions.size());
|
|
|
|
|
HRegion region = servingRegions.get(0);
|
|
|
|
|
|
|
|
|
|
// get the parent RS and monitor
|
|
|
|
|
HRegionServer hrs = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
|
|
|
|
|
FileSystem fs = hrs.getFileSystem();
|
|
|
|
|
|
|
|
|
|
// put some data on the region
|
|
|
|
|
LOG.debug("-------Loading table");
|
|
|
|
|
UTIL.loadRegion(region, TEST_FAM);
|
|
|
|
|
loadAndCompact(region);
|
|
|
|
|
|
|
|
|
|
// check that we actually have some store files that were archived
|
|
|
|
|
Store store = region.getStore(TEST_FAM);
|
|
|
|
|
Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(),
|
|
|
|
|
region, store);
|
|
|
|
|
|
|
|
|
|
// check to make sure we archived some files
|
|
|
|
|
assertTrue("Didn't create a store archive directory", fs.exists(storeArchiveDir));
|
|
|
|
|
assertTrue("No files in the store archive",
|
|
|
|
|
FSUtils.listStatus(fs, storeArchiveDir, null).length > 0);
|
|
|
|
|
|
|
|
|
|
// and then put some non-tables files in the archive
|
|
|
|
|
Configuration conf = UTIL.getConfiguration();
|
|
|
|
|
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
|
|
|
|
|
// write a tmp file to the archive dir
|
|
|
|
|
Path tmpFile = new Path(archiveDir, "toDelete");
|
|
|
|
|
FSDataOutputStream out = fs.create(tmpFile);
|
|
|
|
|
out.write(1);
|
|
|
|
|
out.close();
|
|
|
|
|
// setup the delegate
|
|
|
|
|
Stoppable stop = new StoppableImplementation();
|
|
|
|
|
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
|
|
|
|
|
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
|
|
|
|
|
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
|
|
|
|
|
|
|
|
|
|
assertTrue(fs.exists(tmpFile));
|
|
|
|
|
// make sure we wait long enough for the files to expire
|
|
|
|
|
Thread.sleep(ttl);
|
|
|
|
|
// create the region
|
|
|
|
|
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
|
|
|
|
|
HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
|
|
|
|
|
|
|
|
|
|
// print currrent state for comparison
|
|
|
|
|
FSUtils.logFileSystemState(fs, archiveDir, LOG);
|
|
|
|
|
loadFlushAndCompact(region, TEST_FAM);
|
|
|
|
|
|
|
|
|
|
// ensure there are no archived files after waiting for a timeout
|
|
|
|
|
ensureHFileCleanersRun();
|
|
|
|
|
// get the current hfiles in the archive directory
|
|
|
|
|
List<Path> files = getAllFiles(fs, archiveDir);
|
|
|
|
|
if (files == null) {
|
|
|
|
|
FSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG);
|
|
|
|
|
throw new RuntimeException("Didn't archive any files!");
|
|
|
|
|
}
|
|
|
|
|
CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size());
|
|
|
|
|
|
|
|
|
|
// check to make sure the right things get deleted
|
|
|
|
|
assertTrue("Store archive got deleted", fs.exists(storeArchiveDir));
|
|
|
|
|
assertTrue("Archived HFiles got deleted",
|
|
|
|
|
FSUtils.listStatus(fs, storeArchiveDir, null).length > 0);
|
|
|
|
|
runCleaner(cleaner, finished, stop);
|
|
|
|
|
|
|
|
|
|
// know the cleaner ran, so now check all the files again to make sure they are still there
|
|
|
|
|
List<Path> archivedFiles = getAllFiles(fs, archiveDir);
|
|
|
|
|
assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles);
|
|
|
|
|
|
|
|
|
|
assertFalse(
|
|
|
|
|
"Tmp file (non-table archive file) didn't " + "get deleted, archive dir: "
|
|
|
|
|
+ fs.listStatus(archiveDir), fs.exists(tmpFile));
|
|
|
|
|
LOG.debug("Turning off hfile backup.");
|
|
|
|
|
// stop archiving the table
|
|
|
|
|
archivingClient.disableHFileBackup();
|
|
|
|
|
LOG.debug("Deleting table from archive.");
|
|
|
|
|
// now remove the archived table
|
|
|
|
|
Path primaryTable = new Path(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()),
|
|
|
|
|
STRING_TABLE_NAME);
|
|
|
|
|
fs.delete(primaryTable, true);
|
|
|
|
|
LOG.debug("Deleted primary table, waiting for file cleaners to run");
|
|
|
|
|
// and make sure the archive directory is retained after a cleanup
|
|
|
|
|
// have to do this manually since delegates aren't run if there isn't any files in the archive
|
|
|
|
|
// dir to cleanup
|
|
|
|
|
Thread.sleep(ttl);
|
|
|
|
|
UTIL.getHBaseCluster().getMaster().getHFileCleaner().triggerNow();
|
|
|
|
|
Thread.sleep(ttl);
|
|
|
|
|
LOG.debug("File cleaners done, checking results.");
|
|
|
|
|
// but we still have the archive directory
|
|
|
|
|
assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration())));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Make sure all the {@link HFileCleaner} run.
|
|
|
|
|
* <p>
|
|
|
|
|
* Blocking operation up to 3x ttl
|
|
|
|
|
* @throws InterruptedException
|
|
|
|
|
*/
|
|
|
|
|
private void ensureHFileCleanersRun() throws InterruptedException {
|
|
|
|
|
LOG.debug("Waiting on archive cleaners to run...");
|
|
|
|
|
CheckedArchivingHFileCleaner.resetCheck();
|
|
|
|
|
do {
|
|
|
|
|
UTIL.getHBaseCluster().getMaster().getHFileCleaner().triggerNow();
|
|
|
|
|
LOG.debug("Triggered, sleeping an amount until we can pass the check.");
|
|
|
|
|
Thread.sleep(ttl);
|
|
|
|
|
} while (!CheckedArchivingHFileCleaner.getChecked());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Test archiving/cleaning across multiple tables, where some are retained, and others aren't
|
|
|
|
|
* @throws Exception
|
|
|
|
|
* @throws Exception on failure
|
|
|
|
|
*/
|
|
|
|
|
@Test
|
|
|
|
|
public void testMultipleTables() throws Exception {
|
|
|
|
|
archivingClient.enableHFileBackupAsync(TABLE_NAME);
|
|
|
|
|
assertTrue("Archving didn't get turned on", archivingClient
|
|
|
|
|
.getArchivingEnabled(TABLE_NAME));
|
|
|
|
|
createArchiveDirectory();
|
|
|
|
|
String otherTable = "otherTable";
|
|
|
|
|
|
|
|
|
|
FileSystem fs = UTIL.getTestFileSystem();
|
|
|
|
|
Path archiveDir = getArchiveDir();
|
|
|
|
|
Path tableDir = getTableDir(STRING_TABLE_NAME);
|
|
|
|
|
Path otherTableDir = getTableDir(otherTable);
|
|
|
|
|
|
|
|
|
|
// register cleanup for the created directories
|
|
|
|
|
toCleanup.add(archiveDir);
|
|
|
|
|
toCleanup.add(tableDir);
|
|
|
|
|
toCleanup.add(otherTableDir);
|
|
|
|
|
Configuration conf = UTIL.getConfiguration();
|
|
|
|
|
// setup the delegate
|
|
|
|
|
Stoppable stop = new StoppableImplementation();
|
|
|
|
|
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
|
|
|
|
|
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
|
|
|
|
|
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
|
|
|
|
|
|
|
|
|
|
// create the region
|
|
|
|
|
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
|
|
|
|
|
HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
|
|
|
|
|
loadFlushAndCompact(region, TEST_FAM);
|
|
|
|
|
|
|
|
|
|
// create the another table that we don't archive
|
|
|
|
|
String otherTable = "otherTable";
|
|
|
|
|
UTIL.createTable(Bytes.toBytes(otherTable), TEST_FAM);
|
|
|
|
|
hcd = new HColumnDescriptor(TEST_FAM);
|
|
|
|
|
HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
|
|
|
|
|
loadFlushAndCompact(otherRegion, TEST_FAM);
|
|
|
|
|
|
|
|
|
|
// get the parent RS and monitor
|
|
|
|
|
FileSystem fs = FileSystem.get(UTIL.getConfiguration());
|
|
|
|
|
// get the current hfiles in the archive directory
|
|
|
|
|
List<Path> files = getAllFiles(fs, archiveDir);
|
|
|
|
|
if (files == null) {
|
|
|
|
|
FSUtils.logFileSystemState(fs, archiveDir, LOG);
|
|
|
|
|
throw new RuntimeException("Didn't load archive any files!");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// put data in the filesystem of the first table
|
|
|
|
|
LOG.debug("Loading data into:" + STRING_TABLE_NAME);
|
|
|
|
|
loadAndCompact(STRING_TABLE_NAME);
|
|
|
|
|
// make sure we have files from both tables
|
|
|
|
|
int initialCountForPrimary = 0;
|
|
|
|
|
int initialCountForOtherTable = 0;
|
|
|
|
|
for (Path file : files) {
|
|
|
|
|
String tableName = file.getParent().getParent().getParent().getName();
|
|
|
|
|
// check to which table this file belongs
|
|
|
|
|
if (tableName.equals(otherTable)) initialCountForOtherTable++;
|
|
|
|
|
else if (tableName.equals(STRING_TABLE_NAME)) initialCountForPrimary++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// and some data in the other table
|
|
|
|
|
LOG.debug("Loading data into:" + otherTable);
|
|
|
|
|
loadAndCompact(otherTable);
|
|
|
|
|
assertTrue("Didn't archive files for:" + STRING_TABLE_NAME, initialCountForPrimary > 0);
|
|
|
|
|
assertTrue("Didn't archive files for:" + otherTable, initialCountForOtherTable > 0);
|
|
|
|
|
|
|
|
|
|
// make sure we wait long enough for the other table's files to expire
|
|
|
|
|
ensureHFileCleanersRun();
|
|
|
|
|
// run the cleaners
|
|
|
|
|
CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size());
|
|
|
|
|
// run the cleaner
|
|
|
|
|
cleaner.start();
|
|
|
|
|
// wait for the cleaner to check all the files
|
|
|
|
|
finished.await();
|
|
|
|
|
// stop the cleaner
|
|
|
|
|
stop.stop("");
|
|
|
|
|
|
|
|
|
|
// know the cleaner ran, so now check all the files again to make sure they are still there
|
|
|
|
|
List<Path> archivedFiles = getAllFiles(fs, archiveDir);
|
|
|
|
|
int archivedForPrimary = 0;
|
|
|
|
|
for(Path file: archivedFiles) {
|
|
|
|
|
String tableName = file.getParent().getParent().getParent().getName();
|
|
|
|
|
// ensure we don't have files from the non-archived table
|
|
|
|
|
assertFalse("Have a file from the non-archived table: " + file, tableName.equals(otherTable));
|
|
|
|
|
if (tableName.equals(STRING_TABLE_NAME)) archivedForPrimary++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assertEquals("Not all archived files for the primary table were retained.", initialCountForPrimary,
|
|
|
|
|
archivedForPrimary);
|
|
|
|
|
|
|
|
|
|
// check to make sure the right things get deleted
|
|
|
|
|
Path primaryStoreArchive = HFileArchiveTestingUtil.getStoreArchivePath(UTIL, STRING_TABLE_NAME,
|
|
|
|
|
TEST_FAM);
|
|
|
|
|
Path otherStoreArchive = HFileArchiveTestingUtil
|
|
|
|
|
.getStoreArchivePath(UTIL, otherTable, TEST_FAM);
|
|
|
|
|
// make sure the primary store doesn't have any files
|
|
|
|
|
assertTrue("Store archive got deleted", fs.exists(primaryStoreArchive));
|
|
|
|
|
assertTrue("Archived HFiles got deleted",
|
|
|
|
|
FSUtils.listStatus(fs, primaryStoreArchive, null).length > 0);
|
|
|
|
|
FileStatus[] otherArchiveFiles = FSUtils.listStatus(fs, otherStoreArchive, null);
|
|
|
|
|
assertNull("Archived HFiles (" + otherStoreArchive
|
|
|
|
|
+ ") should have gotten deleted, but didn't, remaining files:"
|
|
|
|
|
+ getPaths(otherArchiveFiles), otherArchiveFiles);
|
|
|
|
|
// sleep again to make sure we the other table gets cleaned up
|
|
|
|
|
ensureHFileCleanersRun();
|
|
|
|
|
// first pass removes the store archive
|
|
|
|
|
assertFalse(fs.exists(otherStoreArchive));
|
|
|
|
|
// second pass removes the region
|
|
|
|
|
ensureHFileCleanersRun();
|
|
|
|
|
Path parent = otherStoreArchive.getParent();
|
|
|
|
|
assertFalse(fs.exists(parent));
|
|
|
|
|
// third pass remove the table
|
|
|
|
|
ensureHFileCleanersRun();
|
|
|
|
|
parent = otherStoreArchive.getParent();
|
|
|
|
|
assertFalse(fs.exists(parent));
|
|
|
|
|
// but we still have the archive directory
|
|
|
|
|
assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration())));
|
|
|
|
|
|
|
|
|
|
FSUtils.logFileSystemState(fs, HFileArchiveUtil.getArchivePath(UTIL.getConfiguration()), LOG);
|
|
|
|
|
UTIL.deleteTable(Bytes.toBytes(otherTable));
|
|
|
|
|
assertTrue("Archive directory was deleted via archiver", fs.exists(archiveDir));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private List<Path> getPaths(FileStatus[] files) {
|
|
|
|
|
if (files == null || files.length == 0) return null;
|
|
|
|
|
|
|
|
|
|
List<Path> paths = new ArrayList<Path>(files.length);
|
|
|
|
|
for (FileStatus file : files) {
|
|
|
|
|
paths.add(file.getPath());
|
|
|
|
|
}
|
|
|
|
|
return paths;
|
|
|
|
|
private void createArchiveDirectory() throws IOException {
|
|
|
|
|
//create the archive and test directory
|
|
|
|
|
FileSystem fs = UTIL.getTestFileSystem();
|
|
|
|
|
Path archiveDir = getArchiveDir();
|
|
|
|
|
fs.mkdirs(archiveDir);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void loadAndCompact(String tableName) throws Exception {
|
|
|
|
|
byte[] table = Bytes.toBytes(tableName);
|
|
|
|
|
// get the RS and region serving our table
|
|
|
|
|
List<HRegion> servingRegions = UTIL.getHBaseCluster().getRegions(table);
|
|
|
|
|
// make sure we only have 1 region serving this table
|
|
|
|
|
assertEquals(1, servingRegions.size());
|
|
|
|
|
HRegion region = servingRegions.get(0);
|
|
|
|
|
private Path getArchiveDir() throws IOException {
|
|
|
|
|
return new Path(UTIL.getDataTestDir(), HFileArchiveUtil.DEFAULT_HFILE_ARCHIVE_DIRECTORY);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// get the parent RS and monitor
|
|
|
|
|
HRegionServer hrs = UTIL.getRSForFirstRegionInTable(table);
|
|
|
|
|
FileSystem fs = hrs.getFileSystem();
|
|
|
|
|
private Path getTableDir(String tableName) throws IOException {
|
|
|
|
|
Path testDataDir = UTIL.getDataTestDir();
|
|
|
|
|
FSUtils.setRootDir(UTIL.getConfiguration(), testDataDir);
|
|
|
|
|
return new Path(testDataDir, tableName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// put some data on the region
|
|
|
|
|
LOG.debug("-------Loading table");
|
|
|
|
|
UTIL.loadRegion(region, TEST_FAM);
|
|
|
|
|
loadAndCompact(region);
|
|
|
|
|
|
|
|
|
|
// check that we actually have some store files that were archived
|
|
|
|
|
Store store = region.getStore(TEST_FAM);
|
|
|
|
|
Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(),
|
|
|
|
|
region, store);
|
|
|
|
|
|
|
|
|
|
// check to make sure we archived some files
|
|
|
|
|
assertTrue("Didn't create a store archive directory", fs.exists(storeArchiveDir));
|
|
|
|
|
assertTrue("No files in the store archive",
|
|
|
|
|
FSUtils.listStatus(fs, storeArchiveDir, null).length > 0);
|
|
|
|
|
|
|
|
|
|
// wait for the compactions to finish
|
|
|
|
|
region.waitForFlushesAndCompactions();
|
|
|
|
|
private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir,
|
|
|
|
|
Stoppable stop) {
|
|
|
|
|
conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
|
|
|
|
|
LongTermArchivingHFileCleaner.class.getCanonicalName());
|
|
|
|
|
return new HFileCleaner(1000, stop, conf, fs, archiveDir);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Load the given region and then ensure that it compacts some files
|
|
|
|
|
* Start archiving table for given hfile cleaner
|
|
|
|
|
* @param tableName table to archive
|
|
|
|
|
* @param cleaner cleaner to check to make sure change propagated
|
|
|
|
|
* @return underlying {@link LongTermArchivingHFileCleaner} that is managing archiving
|
|
|
|
|
* @throws IOException on failure
|
|
|
|
|
* @throws KeeperException on failure
|
|
|
|
|
*/
|
|
|
|
|
private void loadAndCompact(HRegion region) throws Exception {
|
|
|
|
|
int tries = 0;
|
|
|
|
|
Exception last = null;
|
|
|
|
|
while (tries++ <= maxTries) {
|
|
|
|
|
try {
|
|
|
|
|
// load the region with data
|
|
|
|
|
UTIL.loadRegion(region, TEST_FAM);
|
|
|
|
|
// and then trigger a compaction to be sure we try to archive
|
|
|
|
|
compactRegion(region, TEST_FAM);
|
|
|
|
|
return;
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
// keep this around for if we fail later
|
|
|
|
|
last = e;
|
|
|
|
|
private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner)
|
|
|
|
|
throws IOException, KeeperException {
|
|
|
|
|
// turn on hfile retention
|
|
|
|
|
LOG.debug("----Starting archiving for table:" + tableName);
|
|
|
|
|
archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName));
|
|
|
|
|
assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName));
|
|
|
|
|
|
|
|
|
|
// wait for the archiver to get the notification
|
|
|
|
|
List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting();
|
|
|
|
|
LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
|
|
|
|
|
while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) {
|
|
|
|
|
// spin until propagation - should be fast
|
|
|
|
|
}
|
|
|
|
|
return cleaners;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Spy on the {@link LongTermArchivingHFileCleaner} to ensure we can catch when the cleaner has
|
|
|
|
|
* seen all the files
|
|
|
|
|
* @return a {@link CountDownLatch} to wait on that releases when the cleaner has been called at
|
|
|
|
|
* least the expected number of times.
|
|
|
|
|
*/
|
|
|
|
|
private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner,
|
|
|
|
|
List<BaseHFileCleanerDelegate> cleaners, final int expected) {
|
|
|
|
|
// replace the cleaner with one that we can can check
|
|
|
|
|
BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner);
|
|
|
|
|
final int[] counter = new int[] { 0 };
|
|
|
|
|
final CountDownLatch finished = new CountDownLatch(1);
|
|
|
|
|
Mockito.doAnswer(new Answer<Boolean>() {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Boolean answer(InvocationOnMock invocation) throws Throwable {
|
|
|
|
|
counter[0]++;
|
|
|
|
|
LOG.debug(counter[0] + "/ " + expected + ") Mocking call to isFileDeletable");
|
|
|
|
|
if (counter[0] > expected) finished.countDown();
|
|
|
|
|
return (Boolean) invocation.callRealMethod();
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
throw last;
|
|
|
|
|
}).when(delegateSpy).isFileDeletable(Mockito.any(Path.class));
|
|
|
|
|
cleaners.set(0, delegateSpy);
|
|
|
|
|
|
|
|
|
|
return finished;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Compact all the store files in a given region.
|
|
|
|
|
* Get all the files (non-directory entries) in the file system under the passed directory
|
|
|
|
|
* @param dir directory to investigate
|
|
|
|
|
* @return all files under the directory
|
|
|
|
|
*/
|
|
|
|
|
private void compactRegion(HRegion region, byte[] family) throws IOException {
|
|
|
|
|
Store store = region.getStores().get(TEST_FAM);
|
|
|
|
|
store.compactRecentForTesting(store.getStorefiles().size());
|
|
|
|
|
private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException {
|
|
|
|
|
FileStatus[] files = FSUtils.listStatus(fs, dir, null);
|
|
|
|
|
if (files == null) return null;
|
|
|
|
|
|
|
|
|
|
List<Path> allFiles = new ArrayList<Path>();
|
|
|
|
|
for (FileStatus file : files) {
|
|
|
|
|
if (file.isDir()) {
|
|
|
|
|
List<Path> subFiles = getAllFiles(fs, file.getPath());
|
|
|
|
|
if (subFiles != null) allFiles.addAll(subFiles);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
allFiles.add(file.getPath());
|
|
|
|
|
}
|
|
|
|
|
return allFiles;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void loadFlushAndCompact(HRegion region, byte[] family) throws IOException {
|
|
|
|
|
// create two hfiles in the region
|
|
|
|
|
createHFileInRegion(region, family);
|
|
|
|
|
createHFileInRegion(region, family);
|
|
|
|
|
|
|
|
|
|
Store s = region.getStore(family);
|
|
|
|
|
int count = s.getStorefilesCount();
|
|
|
|
|
assertTrue("Don't have the expected store files, wanted >= 2 store files, but was:" + count,
|
|
|
|
|
count >= 2);
|
|
|
|
|
|
|
|
|
|
// compact the two files into one file to get files in the archive
|
|
|
|
|
LOG.debug("Compacting stores");
|
|
|
|
|
region.compactStores(true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Create a new hfile in the passed region
|
|
|
|
|
* @param region region to operate on
|
|
|
|
|
* @param columnFamily family for which to add data
|
|
|
|
|
* @throws IOException
|
|
|
|
|
*/
|
|
|
|
|
private void createHFileInRegion(HRegion region, byte[] columnFamily) throws IOException {
|
|
|
|
|
// put one row in the region
|
|
|
|
|
Put p = new Put(Bytes.toBytes("row"));
|
|
|
|
|
p.add(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1"));
|
|
|
|
|
region.put(p);
|
|
|
|
|
// flush the region to make a store file
|
|
|
|
|
region.flushcache();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param cleaner
|
|
|
|
|
*/
|
|
|
|
|
private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop)
|
|
|
|
|
throws InterruptedException {
|
|
|
|
|
// run the cleaner
|
|
|
|
|
cleaner.start();
|
|
|
|
|
// wait for the cleaner to check all the files
|
|
|
|
|
finished.await();
|
|
|
|
|
// stop the cleaner
|
|
|
|
|
stop.stop("");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|