HBASE-27732 NPE in TestBasicWALEntryStreamFSHLog.testEOFExceptionInOldWALsDirectory (#5119)
Add a 'closed' flag in WALProps in AbstractFSWAL to indicate that whether a WAL
file has been closed, if not, we will not try to archive it. Will mark it as
closed after we fully close it in the background close task, and try to archive
again.
Also modified some tests since now the archiving of a rolled WAL file is also
asynchronous, we need to wait instead of asserting directly.
Signed-off-by: Viraj Jasani <vjasani@apache.org>
(cherry picked from commit 230fdc0b50
)
This commit is contained in:
parent
32f95c4b13
commit
23ce03c7c5
|
@ -288,22 +288,29 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
final Comparator<Path> LOG_NAME_COMPARATOR =
|
||||
(o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
|
||||
|
||||
private static final class WalProps {
|
||||
private static final class WALProps {
|
||||
|
||||
/**
|
||||
* Map the encoded region name to the highest sequence id.
|
||||
* <p/>
|
||||
* Contains all the regions it has an entry for.
|
||||
*/
|
||||
public final Map<byte[], Long> encodedName2HighestSequenceId;
|
||||
private final Map<byte[], Long> encodedName2HighestSequenceId;
|
||||
|
||||
/**
|
||||
* The log file size. Notice that the size may not be accurate if we do asynchronous close in
|
||||
* sub classes.
|
||||
*/
|
||||
public final long logSize;
|
||||
private final long logSize;
|
||||
|
||||
public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
|
||||
/**
|
||||
* If we do asynchronous close in sub classes, it is possible that when adding WALProps to the
|
||||
* rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file,
|
||||
* for safety.
|
||||
*/
|
||||
private volatile boolean closed = false;
|
||||
|
||||
WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
|
||||
this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
|
||||
this.logSize = logSize;
|
||||
}
|
||||
|
@ -313,7 +320,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
* Map of WAL log file to properties. The map is sorted by the log file creation timestamp
|
||||
* (contained in the log file name).
|
||||
*/
|
||||
protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
|
||||
protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props =
|
||||
new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
|
||||
|
||||
/**
|
||||
|
@ -332,6 +339,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
|
||||
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
|
||||
|
||||
protected final ExecutorService closeExecutor = Executors.newCachedThreadPool(
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
|
||||
|
||||
// Run in caller if we get reject execution exception, to avoid aborting region server when we get
|
||||
// reject execution exception. Usually this should not happen but let's make it more robust.
|
||||
private final ExecutorService logArchiveExecutor =
|
||||
|
@ -679,7 +689,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
Map<byte[], List<byte[]>> regions = null;
|
||||
int logCount = getNumRolledLogFiles();
|
||||
if (logCount > this.maxLogs && logCount > 0) {
|
||||
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
|
||||
Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry();
|
||||
regions =
|
||||
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
|
||||
}
|
||||
|
@ -703,13 +713,34 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
}
|
||||
|
||||
/**
|
||||
* Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
|
||||
* Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file.
|
||||
*/
|
||||
private void cleanOldLogs() throws IOException {
|
||||
protected final void markClosedAndClean(Path path) {
|
||||
WALProps props = walFile2Props.get(path);
|
||||
// typically this should not be null, but if there is no big issue if it is already null, so
|
||||
// let's make the code more robust
|
||||
if (props != null) {
|
||||
props.closed = true;
|
||||
cleanOldLogs();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
|
||||
* <p/>
|
||||
* Use synchronized because we may call this method in different threads, normally when replacing
|
||||
* writer, and since now close writer may be asynchronous, we will also call this method in the
|
||||
* closeExecutor, right after we actually close a WAL writer.
|
||||
*/
|
||||
private synchronized void cleanOldLogs() {
|
||||
List<Pair<Path, Long>> logsToArchive = null;
|
||||
// For each log file, look at its Map of regions to highest sequence id; if all sequence ids
|
||||
// are older than what is currently in memory, the WAL can be GC'd.
|
||||
for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
|
||||
for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) {
|
||||
if (!e.getValue().closed) {
|
||||
LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey());
|
||||
continue;
|
||||
}
|
||||
Path log = e.getKey();
|
||||
Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
|
||||
if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
|
||||
|
@ -791,7 +822,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
|
||||
if (oldPath != null) {
|
||||
this.walFile2Props.put(oldPath,
|
||||
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
|
||||
new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
|
||||
this.totalLogSize.addAndGet(oldFileLen);
|
||||
LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
|
||||
CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
|
||||
|
@ -987,6 +1018,20 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
|||
// and abort the region server
|
||||
logArchiveExecutor.shutdown();
|
||||
}
|
||||
// we also need to wait logArchive to finish if we want to a graceful shutdown as we may still
|
||||
// have some pending archiving tasks not finished yet, and in close we may archive all the
|
||||
// remaining WAL files, there could be race if we do not wait for the background archive task
|
||||
// finish
|
||||
try {
|
||||
if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) {
|
||||
throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
|
||||
+ " the shutdown of WAL doesn't complete! Please check the status of underlying "
|
||||
+ "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
|
||||
+ "\"");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -35,7 +35,6 @@ import java.util.Queue;
|
|||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -179,9 +178,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
|
||||
private final long batchSize;
|
||||
|
||||
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
|
||||
|
||||
private volatile AsyncFSOutput fsOut;
|
||||
|
||||
private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
|
||||
|
@ -718,23 +714,20 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
}
|
||||
}
|
||||
|
||||
protected final long closeWriter(AsyncWriter writer, Path path) {
|
||||
if (writer != null) {
|
||||
inflightWALClosures.put(path.getName(), writer);
|
||||
long fileLength = writer.getLength();
|
||||
closeExecutor.execute(() -> {
|
||||
try {
|
||||
writer.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("close old writer failed", e);
|
||||
} finally {
|
||||
inflightWALClosures.remove(path.getName());
|
||||
}
|
||||
});
|
||||
return fileLength;
|
||||
} else {
|
||||
return 0L;
|
||||
}
|
||||
private void closeWriter(AsyncWriter writer, Path path) {
|
||||
inflightWALClosures.put(path.getName(), writer);
|
||||
closeExecutor.execute(() -> {
|
||||
try {
|
||||
writer.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("close old writer failed", e);
|
||||
} finally {
|
||||
// call this even if the above close fails, as there is no other chance we can set closed to
|
||||
// true, it will not cause big problems.
|
||||
markClosedAndClean(path);
|
||||
inflightWALClosures.remove(path.getName());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -742,8 +735,19 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
throws IOException {
|
||||
Preconditions.checkNotNull(nextWriter);
|
||||
waitForSafePoint();
|
||||
long oldFileLen = closeWriter(this.writer, oldPath);
|
||||
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
|
||||
// we will call rollWriter in init method, where we want to create the first writer and
|
||||
// obviously the previous writer is null, so here we need this null check. And why we must call
|
||||
// logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after
|
||||
// closing the writer asynchronously, we need to make sure the WALProps is put into
|
||||
// walFile2Props before we call markClosedAndClean
|
||||
if (writer != null) {
|
||||
long oldFileLen = writer.getLength();
|
||||
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
|
||||
closeWriter(writer, oldPath);
|
||||
} else {
|
||||
logRollAndSetupWalProps(oldPath, newPath, 0);
|
||||
}
|
||||
|
||||
this.writer = nextWriter;
|
||||
if (nextWriter instanceof AsyncProtobufLogWriter) {
|
||||
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
|
||||
|
|
|
@ -36,8 +36,6 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -169,8 +167,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
private final AtomicInteger closeErrorCount = new AtomicInteger();
|
||||
|
||||
private final int waitOnShutdownInSeconds;
|
||||
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
|
||||
|
||||
/**
|
||||
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
|
||||
|
@ -376,28 +372,44 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
LOG.warn(
|
||||
"Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage());
|
||||
}
|
||||
long oldFileLen = 0L;
|
||||
// It is at the safe point. Swap out writer from under the blocked writer thread.
|
||||
// we will call rollWriter in init method, where we want to create the first writer and
|
||||
// obviously the previous writer is null, so here we need this null check. And why we must
|
||||
// call logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean
|
||||
// after closing the writer asynchronously, we need to make sure the WALProps is put into
|
||||
// walFile2Props before we call markClosedAndClean
|
||||
if (this.writer != null) {
|
||||
oldFileLen = this.writer.getLength();
|
||||
long oldFileLen = this.writer.getLength();
|
||||
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
|
||||
// In case of having unflushed entries or we already reached the
|
||||
// closeErrorsTolerated count, call the closeWriter inline rather than in async
|
||||
// way so that in case of an IOE we will throw it back and abort RS.
|
||||
inflightWALClosures.put(oldPath.getName(), writer);
|
||||
if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) {
|
||||
closeWriter(this.writer, oldPath, true);
|
||||
try {
|
||||
closeWriter(this.writer, oldPath, true);
|
||||
} finally {
|
||||
inflightWALClosures.remove(oldPath.getName());
|
||||
}
|
||||
} else {
|
||||
Writer localWriter = this.writer;
|
||||
closeExecutor.execute(() -> {
|
||||
try {
|
||||
closeWriter(localWriter, oldPath, false);
|
||||
} catch (IOException e) {
|
||||
// We will never reach here.
|
||||
LOG.warn("close old writer failed", e);
|
||||
} finally {
|
||||
// call this even if the above close fails, as there is no other chance we can set
|
||||
// closed to true, it will not cause big problems.
|
||||
markClosedAndClean(oldPath);
|
||||
inflightWALClosures.remove(oldPath.getName());
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
logRollAndSetupWalProps(oldPath, newPath, 0);
|
||||
}
|
||||
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
|
||||
|
||||
this.writer = nextWriter;
|
||||
if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
|
||||
this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
|
||||
|
@ -452,8 +464,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
|||
}
|
||||
LOG.warn("Riding over failed WAL close of " + path
|
||||
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
|
||||
} finally {
|
||||
inflightWALClosures.remove(path.getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -397,10 +397,11 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
|||
serverName = ServerName.parseServerName(logDirName);
|
||||
} catch (IllegalArgumentException | IllegalStateException ex) {
|
||||
serverName = null;
|
||||
LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
|
||||
LOG.warn("Cannot parse a server name from path={}", logFile, ex);
|
||||
}
|
||||
if (serverName != null && serverName.getStartcode() < 0) {
|
||||
LOG.warn("Invalid log file path=" + logFile);
|
||||
if (serverName != null && serverName.getStartCode() < 0) {
|
||||
LOG.warn("Invalid log file path={}, start code {} is less than 0", logFile,
|
||||
serverName.getStartCode());
|
||||
serverName = null;
|
||||
}
|
||||
return serverName;
|
||||
|
@ -465,6 +466,11 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
|||
}
|
||||
|
||||
ServerName serverName = getServerNameFromWALDirectoryName(path);
|
||||
if (serverName == null) {
|
||||
LOG.warn("Can not extract server name from path {}, "
|
||||
+ "give up searching the separated old log dir", path);
|
||||
return null;
|
||||
}
|
||||
// Try finding the log in separate old log dir
|
||||
oldLogDir = new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
|
||||
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
|
||||
|
|
|
@ -419,9 +419,11 @@ public class TestAdmin2 extends TestAdminBase {
|
|||
r.flush(true);
|
||||
}
|
||||
ADMIN.rollWALWriter(regionServer.getServerName());
|
||||
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
|
||||
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
|
||||
assertTrue(("actual count: " + count), count <= 2);
|
||||
TEST_UTIL.waitFor(5000, () -> {
|
||||
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
|
||||
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
|
||||
return count <= 2;
|
||||
});
|
||||
}
|
||||
|
||||
private void setUpforLogRolling() {
|
||||
|
|
|
@ -152,9 +152,11 @@ public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
|
|||
r.flush(true);
|
||||
}
|
||||
admin.rollWALWriter(regionServer.getServerName()).join();
|
||||
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
|
||||
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
|
||||
assertTrue(("actual count: " + count), count <= 2);
|
||||
TEST_UTIL.waitFor(5000, () -> {
|
||||
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
|
||||
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
|
||||
return count <= 2;
|
||||
});
|
||||
}
|
||||
|
||||
private void setUpforLogRolling() {
|
||||
|
|
|
@ -488,9 +488,8 @@ public class TestPerColumnFamilyFlush {
|
|||
// Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
|
||||
int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
|
||||
assertNull(getWAL(desiredRegion).rollWriter());
|
||||
while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
TEST_UTIL.waitFor(60000,
|
||||
() -> getNumRolledLogFiles(desiredRegion) > currentNumRolledLogFiles);
|
||||
}
|
||||
assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
|
||||
assertTrue(
|
||||
|
@ -529,7 +528,7 @@ public class TestPerColumnFamilyFlush {
|
|||
desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
|
||||
// let WAL cleanOldLogs
|
||||
assertNull(getWAL(desiredRegion).rollWriter(true));
|
||||
assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
|
||||
TEST_UTIL.waitFor(60000, () -> getNumRolledLogFiles(desiredRegion) < maxLogs);
|
||||
} finally {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
|
|
@ -253,21 +253,14 @@ public abstract class AbstractTestFSWAL {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of
|
||||
* regions which should be flushed in order to archive the oldest wal file.
|
||||
* <p>
|
||||
* This method tests this behavior by inserting edits and rolling the wal enough times to reach
|
||||
* the max number of logs threshold. It checks whether we get the "right regions and stores" for
|
||||
* flush on rolling the wal.
|
||||
*/
|
||||
@Test
|
||||
public void testFindMemStoresEligibleForFlush() throws Exception {
|
||||
LOG.debug("testFindMemStoresEligibleForFlush");
|
||||
Configuration conf1 = HBaseConfiguration.create(CONF);
|
||||
conf1.setInt("hbase.regionserver.maxlogs", 1);
|
||||
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
|
||||
// now we will close asynchronously and will not archive a wal file unless it is fully closed, so
|
||||
// sometimes we need to wait a bit before asserting, especially when you want to test the removal
|
||||
// of numRolledLogFiles
|
||||
private void waitNumRolledLogFiles(AbstractFSWAL<?> wal, int expected) {
|
||||
TEST_UTIL.waitFor(5000, () -> wal.getNumRolledLogFiles() == expected);
|
||||
}
|
||||
|
||||
private void testFindMemStoresEligibleForFlush(AbstractFSWAL<?> wal) throws IOException {
|
||||
String cf1 = "cf1";
|
||||
String cf2 = "cf2";
|
||||
String cf3 = "cf3";
|
||||
|
@ -278,7 +271,7 @@ public abstract class AbstractTestFSWAL {
|
|||
RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build();
|
||||
RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build();
|
||||
|
||||
List<ColumnFamilyDescriptor> cfs = new ArrayList();
|
||||
List<ColumnFamilyDescriptor> cfs = new ArrayList<>();
|
||||
cfs.add(ColumnFamilyDescriptorBuilder.of(cf1));
|
||||
cfs.add(ColumnFamilyDescriptorBuilder.of(cf2));
|
||||
TableDescriptor t3 =
|
||||
|
@ -299,87 +292,101 @@ public abstract class AbstractTestFSWAL {
|
|||
for (byte[] fam : t3.getColumnFamilyNames()) {
|
||||
scopes3.put(fam, 0);
|
||||
}
|
||||
try {
|
||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||
wal.rollWriter();
|
||||
// add some more edits and roll the wal. This would reach the log number threshold
|
||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||
wal.rollWriter();
|
||||
// with above rollWriter call, the max logs limit is reached.
|
||||
assertTrue(wal.getNumRolledLogFiles() == 2);
|
||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||
wal.rollWriter();
|
||||
// add some more edits and roll the wal. This would reach the log number threshold
|
||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||
wal.rollWriter();
|
||||
// with above rollWriter call, the max logs limit is reached.
|
||||
waitNumRolledLogFiles(wal, 2);
|
||||
|
||||
// get the regions to flush; since there is only one region in the oldest wal, it should
|
||||
// return only one region.
|
||||
Map<byte[], List<byte[]>> regionsToFlush = wal.findRegionsToForceFlush();
|
||||
assertEquals(1, regionsToFlush.size());
|
||||
assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
|
||||
// insert edits in second region
|
||||
addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
|
||||
// get the regions to flush, it should still read region1.
|
||||
regionsToFlush = wal.findRegionsToForceFlush();
|
||||
assertEquals(1, regionsToFlush.size());
|
||||
assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
|
||||
// flush region 1, and roll the wal file. Only last wal which has entries for region1 should
|
||||
// remain.
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||
wal.rollWriter();
|
||||
// only one wal should remain now (that is for the second region).
|
||||
assertEquals(1, wal.getNumRolledLogFiles());
|
||||
// flush the second region
|
||||
flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
|
||||
wal.rollWriter(true);
|
||||
// no wal should remain now.
|
||||
assertEquals(0, wal.getNumRolledLogFiles());
|
||||
// add edits both to region 1 and region 2, and roll.
|
||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||
addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
|
||||
wal.rollWriter();
|
||||
// add edits and roll the writer, to reach the max logs limit.
|
||||
assertEquals(1, wal.getNumRolledLogFiles());
|
||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||
wal.rollWriter();
|
||||
// it should return two regions to flush, as the oldest wal file has entries
|
||||
// for both regions.
|
||||
regionsToFlush = wal.findRegionsToForceFlush();
|
||||
assertEquals(2, regionsToFlush.size());
|
||||
// flush both regions
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||
flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
|
||||
wal.rollWriter(true);
|
||||
assertEquals(0, wal.getNumRolledLogFiles());
|
||||
// Add an edit to region1, and roll the wal.
|
||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
|
||||
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||
wal.rollWriter();
|
||||
wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
|
||||
assertEquals(1, wal.getNumRolledLogFiles());
|
||||
// get the regions to flush; since there is only one region in the oldest wal, it should
|
||||
// return only one region.
|
||||
Map<byte[], List<byte[]>> regionsToFlush = wal.findRegionsToForceFlush();
|
||||
assertEquals(1, regionsToFlush.size());
|
||||
assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
|
||||
// insert edits in second region
|
||||
addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
|
||||
// get the regions to flush, it should still read region1.
|
||||
regionsToFlush = wal.findRegionsToForceFlush();
|
||||
assertEquals(1, regionsToFlush.size());
|
||||
assertEquals(hri1.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
|
||||
// flush region 1, and roll the wal file. Only last wal which has entries for region1 should
|
||||
// remain.
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||
wal.rollWriter();
|
||||
// only one wal should remain now (that is for the second region).
|
||||
waitNumRolledLogFiles(wal, 1);
|
||||
// flush the second region
|
||||
flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
|
||||
wal.rollWriter(true);
|
||||
// no wal should remain now.
|
||||
waitNumRolledLogFiles(wal, 0);
|
||||
// add edits both to region 1 and region 2, and roll.
|
||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||
addEdits(wal, hri2, t2, 2, mvcc, scopes2, cf1);
|
||||
wal.rollWriter();
|
||||
// add edits and roll the writer, to reach the max logs limit.
|
||||
waitNumRolledLogFiles(wal, 1);
|
||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||
wal.rollWriter();
|
||||
// it should return two regions to flush, as the oldest wal file has entries
|
||||
// for both regions.
|
||||
regionsToFlush = wal.findRegionsToForceFlush();
|
||||
assertEquals(2, regionsToFlush.size());
|
||||
// flush both regions
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||
flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
|
||||
wal.rollWriter(true);
|
||||
waitNumRolledLogFiles(wal, 0);
|
||||
// Add an edit to region1, and roll the wal.
|
||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1, cf1);
|
||||
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
|
||||
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||
wal.rollWriter();
|
||||
wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
|
||||
waitNumRolledLogFiles(wal, 1);
|
||||
|
||||
// clear test data
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||
wal.rollWriter(true);
|
||||
// add edits for three familes
|
||||
addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
|
||||
addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2);
|
||||
addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3);
|
||||
wal.rollWriter();
|
||||
addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
|
||||
wal.rollWriter();
|
||||
assertEquals(2, wal.getNumRolledLogFiles());
|
||||
// flush one family before archive oldest wal
|
||||
Set<byte[]> flushedFamilyNames = new HashSet<>();
|
||||
flushedFamilyNames.add(Bytes.toBytes(cf1));
|
||||
flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames);
|
||||
regionsToFlush = wal.findRegionsToForceFlush();
|
||||
// then only two family need to be flushed when archive oldest wal
|
||||
assertEquals(1, regionsToFlush.size());
|
||||
assertEquals(hri3.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
|
||||
assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size());
|
||||
} finally {
|
||||
if (wal != null) {
|
||||
wal.close();
|
||||
}
|
||||
// clear test data
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||
wal.rollWriter(true);
|
||||
// add edits for three familes
|
||||
addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
|
||||
addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf2);
|
||||
addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf3);
|
||||
wal.rollWriter();
|
||||
addEdits(wal, hri3, t3, 2, mvcc, scopes3, cf1);
|
||||
wal.rollWriter();
|
||||
waitNumRolledLogFiles(wal, 2);
|
||||
// flush one family before archive oldest wal
|
||||
Set<byte[]> flushedFamilyNames = new HashSet<>();
|
||||
flushedFamilyNames.add(Bytes.toBytes(cf1));
|
||||
flushRegion(wal, hri3.getEncodedNameAsBytes(), flushedFamilyNames);
|
||||
regionsToFlush = wal.findRegionsToForceFlush();
|
||||
// then only two family need to be flushed when archive oldest wal
|
||||
assertEquals(1, regionsToFlush.size());
|
||||
assertEquals(hri3.getEncodedNameAsBytes(), (byte[]) regionsToFlush.keySet().toArray()[0]);
|
||||
assertEquals(2, regionsToFlush.get(hri3.getEncodedNameAsBytes()).size());
|
||||
}
|
||||
|
||||
/**
|
||||
* On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of
|
||||
* regions which should be flushed in order to archive the oldest wal file.
|
||||
* <p>
|
||||
* This method tests this behavior by inserting edits and rolling the wal enough times to reach
|
||||
* the max number of logs threshold. It checks whether we get the "right regions and stores" for
|
||||
* flush on rolling the wal.
|
||||
*/
|
||||
@Test
|
||||
public void testFindMemStoresEligibleForFlush() throws Exception {
|
||||
LOG.debug("testFindMemStoresEligibleForFlush");
|
||||
Configuration conf1 = HBaseConfiguration.create(CONF);
|
||||
conf1.setInt("hbase.regionserver.maxlogs", 1);
|
||||
try (AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null)) {
|
||||
testFindMemStoresEligibleForFlush(wal);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test(expected = IOException.class)
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -137,9 +139,7 @@ public abstract class AbstractTestLogRolling {
|
|||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
protected void startAndWriteData() throws IOException, InterruptedException {
|
||||
// When the hbase:meta table can be opened, the region servers are running
|
||||
TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
|
||||
private void startAndWriteData() throws IOException, InterruptedException {
|
||||
this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
|
||||
|
||||
Table table = createTestTable(this.tableName);
|
||||
|
@ -175,19 +175,6 @@ public abstract class AbstractTestLogRolling {
|
|||
}
|
||||
}
|
||||
|
||||
private void assertLogFileSize(WAL log) throws InterruptedException {
|
||||
if (AbstractFSWALProvider.getNumRolledLogFiles(log) > 0) {
|
||||
assertTrue(AbstractFSWALProvider.getLogFileSize(log) > 0);
|
||||
} else {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
if (AbstractFSWALProvider.getLogFileSize(log) != 0) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
}
|
||||
assertEquals(0, AbstractFSWALProvider.getLogFileSize(log));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that logs are deleted
|
||||
*/
|
||||
|
@ -200,20 +187,25 @@ public abstract class AbstractTestLogRolling {
|
|||
final WAL log = server.getWAL(region);
|
||||
LOG.info(
|
||||
"after writing there are " + AbstractFSWALProvider.getNumRolledLogFiles(log) + " log files");
|
||||
assertLogFileSize(log);
|
||||
|
||||
// roll the log, so we should have at least one rolled file and the log file size should be
|
||||
// greater than 0, in case in the above method we rolled in the last round and then flushed so
|
||||
// all the old wal files are deleted and cause the below assertion to fail
|
||||
log.rollWriter();
|
||||
|
||||
assertThat(AbstractFSWALProvider.getLogFileSize(log), greaterThan(0L));
|
||||
|
||||
// flush all regions
|
||||
for (HRegion r : server.getOnlineRegionsLocalContext()) {
|
||||
r.flush(true);
|
||||
}
|
||||
|
||||
// Now roll the log
|
||||
// Now roll the log the again
|
||||
log.rollWriter();
|
||||
|
||||
int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
|
||||
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
|
||||
assertTrue(("actual count: " + count), count <= 2);
|
||||
assertLogFileSize(log);
|
||||
// should have deleted all the rolled wal files
|
||||
TEST_UTIL.waitFor(5000, () -> AbstractFSWALProvider.getNumRolledLogFiles(log) == 0);
|
||||
assertEquals(0, AbstractFSWALProvider.getLogFileSize(log));
|
||||
}
|
||||
|
||||
protected String getName() {
|
||||
|
|
|
@ -112,10 +112,6 @@ public class TestFSHLogProvider {
|
|||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
static String getName() {
|
||||
return "TestDefaultWALProvider";
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetServerNameFromWALDirectoryName() throws IOException {
|
||||
ServerName sn = ServerName.valueOf("hn", 450, 1398);
|
||||
|
@ -163,7 +159,7 @@ public class TestFSHLogProvider {
|
|||
/**
|
||||
* used by TestDefaultWALProviderWithHLogKey
|
||||
*/
|
||||
WALKeyImpl getWalKey(final byte[] info, final TableName tableName, final long timestamp,
|
||||
private WALKeyImpl getWalKey(final byte[] info, final TableName tableName, final long timestamp,
|
||||
NavigableMap<byte[], Integer> scopes) {
|
||||
return new WALKeyImpl(info, tableName, timestamp, mvcc, scopes);
|
||||
}
|
||||
|
@ -171,14 +167,19 @@ public class TestFSHLogProvider {
|
|||
/**
|
||||
* helper method to simulate region flush for a WAL.
|
||||
*/
|
||||
protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
|
||||
private void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
|
||||
wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
|
||||
wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLogCleaning() throws Exception {
|
||||
LOG.info(currentTest.getMethodName());
|
||||
// now we will close asynchronously and will not archive a wal file unless it is fully closed, so
|
||||
// sometimes we need to wait a bit before asserting, especially when you want to test the removal
|
||||
// of numRolledLogFiles
|
||||
private void waitNumRolledLogFiles(WAL wal, int expected) {
|
||||
TEST_UTIL.waitFor(5000, () -> AbstractFSWALProvider.getNumRolledLogFiles(wal) == expected);
|
||||
}
|
||||
|
||||
private void testLogCleaning(WALFactory wals) throws IOException {
|
||||
TableDescriptor htd =
|
||||
TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
||||
|
@ -193,58 +194,122 @@ public class TestFSHLogProvider {
|
|||
for (byte[] fam : htd2.getColumnFamilyNames()) {
|
||||
scopes2.put(fam, 0);
|
||||
}
|
||||
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
||||
RegionInfo hri2 = RegionInfoBuilder.newBuilder(htd2.getTableName()).build();
|
||||
// we want to mix edits from regions, so pick our own identifier.
|
||||
WAL log = wals.getWAL(null);
|
||||
|
||||
// Add a single edit and make sure that rolling won't remove the file
|
||||
// Before HBASE-3198 it used to delete it
|
||||
addEdits(log, hri, htd, 1, scopes1);
|
||||
log.rollWriter();
|
||||
waitNumRolledLogFiles(log, 1);
|
||||
|
||||
// See if there's anything wrong with more than 1 edit
|
||||
addEdits(log, hri, htd, 2, scopes1);
|
||||
log.rollWriter();
|
||||
assertEquals(2, FSHLogProvider.getNumRolledLogFiles(log));
|
||||
|
||||
// Now mix edits from 2 regions, still no flushing
|
||||
addEdits(log, hri, htd, 1, scopes1);
|
||||
addEdits(log, hri2, htd2, 1, scopes2);
|
||||
addEdits(log, hri, htd, 1, scopes1);
|
||||
addEdits(log, hri2, htd2, 1, scopes2);
|
||||
log.rollWriter();
|
||||
waitNumRolledLogFiles(log, 3);
|
||||
|
||||
// Flush the first region, we expect to see the first two files getting
|
||||
// archived. We need to append something or writer won't be rolled.
|
||||
addEdits(log, hri2, htd2, 1, scopes2);
|
||||
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
|
||||
log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
|
||||
log.rollWriter();
|
||||
waitNumRolledLogFiles(log, 2);
|
||||
|
||||
// Flush the second region, which removes all the remaining output files
|
||||
// since the oldest was completely flushed and the two others only contain
|
||||
// flush information
|
||||
addEdits(log, hri2, htd2, 1, scopes2);
|
||||
log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames());
|
||||
log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
|
||||
log.rollWriter();
|
||||
waitNumRolledLogFiles(log, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLogCleaning() throws Exception {
|
||||
LOG.info(currentTest.getMethodName());
|
||||
Configuration localConf = new Configuration(conf);
|
||||
localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName());
|
||||
WALFactory wals = new WALFactory(localConf, currentTest.getMethodName());
|
||||
try {
|
||||
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
||||
RegionInfo hri2 = RegionInfoBuilder.newBuilder(htd2.getTableName()).build();
|
||||
// we want to mix edits from regions, so pick our own identifier.
|
||||
WAL log = wals.getWAL(null);
|
||||
|
||||
// Add a single edit and make sure that rolling won't remove the file
|
||||
// Before HBASE-3198 it used to delete it
|
||||
addEdits(log, hri, htd, 1, scopes1);
|
||||
log.rollWriter();
|
||||
assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(log));
|
||||
|
||||
// See if there's anything wrong with more than 1 edit
|
||||
addEdits(log, hri, htd, 2, scopes1);
|
||||
log.rollWriter();
|
||||
assertEquals(2, FSHLogProvider.getNumRolledLogFiles(log));
|
||||
|
||||
// Now mix edits from 2 regions, still no flushing
|
||||
addEdits(log, hri, htd, 1, scopes1);
|
||||
addEdits(log, hri2, htd2, 1, scopes2);
|
||||
addEdits(log, hri, htd, 1, scopes1);
|
||||
addEdits(log, hri2, htd2, 1, scopes2);
|
||||
log.rollWriter();
|
||||
assertEquals(3, AbstractFSWALProvider.getNumRolledLogFiles(log));
|
||||
|
||||
// Flush the first region, we expect to see the first two files getting
|
||||
// archived. We need to append something or writer won't be rolled.
|
||||
addEdits(log, hri2, htd2, 1, scopes2);
|
||||
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
|
||||
log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
|
||||
log.rollWriter();
|
||||
int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
|
||||
assertEquals(2, count);
|
||||
|
||||
// Flush the second region, which removes all the remaining output files
|
||||
// since the oldest was completely flushed and the two others only contain
|
||||
// flush information
|
||||
addEdits(log, hri2, htd2, 1, scopes2);
|
||||
log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames());
|
||||
log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
|
||||
log.rollWriter();
|
||||
assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log));
|
||||
testLogCleaning(wals);
|
||||
} finally {
|
||||
if (wals != null) {
|
||||
wals.close();
|
||||
}
|
||||
wals.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void testWALArchiving(WALFactory wals) throws IOException {
|
||||
TableDescriptor table1 =
|
||||
TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName() + "1"))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
||||
TableDescriptor table2 =
|
||||
TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName() + "2"))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
||||
NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
for (byte[] fam : table1.getColumnFamilyNames()) {
|
||||
scopes1.put(fam, 0);
|
||||
}
|
||||
NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
for (byte[] fam : table2.getColumnFamilyNames()) {
|
||||
scopes2.put(fam, 0);
|
||||
}
|
||||
WAL wal = wals.getWAL(null);
|
||||
assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
RegionInfo hri1 = RegionInfoBuilder.newBuilder(table1.getTableName()).build();
|
||||
RegionInfo hri2 = RegionInfoBuilder.newBuilder(table2.getTableName()).build();
|
||||
// variables to mock region sequenceIds.
|
||||
// start with the testing logic: insert a waledit, and roll writer
|
||||
addEdits(wal, hri1, table1, 1, scopes1);
|
||||
wal.rollWriter();
|
||||
// assert that the wal is rolled
|
||||
waitNumRolledLogFiles(wal, 1);
|
||||
// add edits in the second wal file, and roll writer.
|
||||
addEdits(wal, hri1, table1, 1, scopes1);
|
||||
wal.rollWriter();
|
||||
// assert that the wal is rolled
|
||||
waitNumRolledLogFiles(wal, 2);
|
||||
// add a waledit to table1, and flush the region.
|
||||
addEdits(wal, hri1, table1, 3, scopes1);
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getColumnFamilyNames());
|
||||
// roll log; all old logs should be archived.
|
||||
wal.rollWriter();
|
||||
waitNumRolledLogFiles(wal, 0);
|
||||
// add an edit to table2, and roll writer
|
||||
addEdits(wal, hri2, table2, 1, scopes2);
|
||||
wal.rollWriter();
|
||||
waitNumRolledLogFiles(wal, 1);
|
||||
// add edits for table1, and roll writer
|
||||
addEdits(wal, hri1, table1, 2, scopes1);
|
||||
wal.rollWriter();
|
||||
waitNumRolledLogFiles(wal, 2);
|
||||
// add edits for table2, and flush hri1.
|
||||
addEdits(wal, hri2, table2, 2, scopes2);
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getColumnFamilyNames());
|
||||
// the log : region-sequenceId map is
|
||||
// log1: region2 (unflushed)
|
||||
// log2: region1 (flushed)
|
||||
// log3: region2 (unflushed)
|
||||
// roll the writer; log2 should be archived.
|
||||
wal.rollWriter();
|
||||
waitNumRolledLogFiles(wal, 2);
|
||||
// flush region2, and all logs should be archived.
|
||||
addEdits(wal, hri2, table2, 2, scopes2);
|
||||
flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getColumnFamilyNames());
|
||||
wal.rollWriter();
|
||||
waitNumRolledLogFiles(wal, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs and
|
||||
* also don't archive "live logs" (that is, a log with un-flushed entries).
|
||||
|
@ -258,72 +323,14 @@ public class TestFSHLogProvider {
|
|||
@Test
|
||||
public void testWALArchiving() throws IOException {
|
||||
LOG.debug(currentTest.getMethodName());
|
||||
TableDescriptor table1 =
|
||||
TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName() + "1"))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
||||
TableDescriptor table2 =
|
||||
TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName() + "2"))
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
||||
NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
for (byte[] fam : table1.getColumnFamilyNames()) {
|
||||
scopes1.put(fam, 0);
|
||||
}
|
||||
NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
for (byte[] fam : table2.getColumnFamilyNames()) {
|
||||
scopes2.put(fam, 0);
|
||||
}
|
||||
|
||||
Configuration localConf = new Configuration(conf);
|
||||
localConf.set(WALFactory.WAL_PROVIDER, FSHLogProvider.class.getName());
|
||||
WALFactory wals = new WALFactory(localConf, currentTest.getMethodName());
|
||||
try {
|
||||
WAL wal = wals.getWAL(null);
|
||||
assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
RegionInfo hri1 = RegionInfoBuilder.newBuilder(table1.getTableName()).build();
|
||||
RegionInfo hri2 = RegionInfoBuilder.newBuilder(table2.getTableName()).build();
|
||||
// variables to mock region sequenceIds.
|
||||
// start with the testing logic: insert a waledit, and roll writer
|
||||
addEdits(wal, hri1, table1, 1, scopes1);
|
||||
wal.rollWriter();
|
||||
// assert that the wal is rolled
|
||||
assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
// add edits in the second wal file, and roll writer.
|
||||
addEdits(wal, hri1, table1, 1, scopes1);
|
||||
wal.rollWriter();
|
||||
// assert that the wal is rolled
|
||||
assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
// add a waledit to table1, and flush the region.
|
||||
addEdits(wal, hri1, table1, 3, scopes1);
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getColumnFamilyNames());
|
||||
// roll log; all old logs should be archived.
|
||||
wal.rollWriter();
|
||||
assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
// add an edit to table2, and roll writer
|
||||
addEdits(wal, hri2, table2, 1, scopes2);
|
||||
wal.rollWriter();
|
||||
assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
// add edits for table1, and roll writer
|
||||
addEdits(wal, hri1, table1, 2, scopes1);
|
||||
wal.rollWriter();
|
||||
assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
// add edits for table2, and flush hri1.
|
||||
addEdits(wal, hri2, table2, 2, scopes2);
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getColumnFamilyNames());
|
||||
// the log : region-sequenceId map is
|
||||
// log1: region2 (unflushed)
|
||||
// log2: region1 (flushed)
|
||||
// log3: region2 (unflushed)
|
||||
// roll the writer; log2 should be archived.
|
||||
wal.rollWriter();
|
||||
assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
// flush region2, and all logs should be archived.
|
||||
addEdits(wal, hri2, table2, 2, scopes2);
|
||||
flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getColumnFamilyNames());
|
||||
wal.rollWriter();
|
||||
assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal));
|
||||
testWALArchiving(wals);
|
||||
} finally {
|
||||
if (wals != null) {
|
||||
wals.close();
|
||||
}
|
||||
wals.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue