HBASE-7671 Flushing memstore again after last failure could cause data loss
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1449826 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
646173a5c5
commit
f5fcabe301
|
@ -1514,6 +1514,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
protected boolean internalFlushcache(
|
||||
final HLog wal, final long myseqid, MonitoredTask status)
|
||||
throws IOException {
|
||||
if (this.rsServices != null && this.rsServices.isAborted()) {
|
||||
// Don't flush when server aborting, it's unsafe
|
||||
throw new IOException("Aborting flush because server is abortted...");
|
||||
}
|
||||
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
// Clear flush flag.
|
||||
// Record latest flush time
|
||||
|
|
|
@ -733,7 +733,7 @@ public class HStore implements Store {
|
|||
* @return Path The path name of the tmp file to which the store was flushed
|
||||
* @throws IOException
|
||||
*/
|
||||
private Path flushCache(final long logCacheFlushId,
|
||||
protected Path flushCache(final long logCacheFlushId,
|
||||
SortedSet<KeyValue> snapshot,
|
||||
TimeRangeTracker snapshotTimeRangeTracker,
|
||||
AtomicLong flushedSize,
|
||||
|
|
|
@ -20,12 +20,18 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.SortedSet;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -33,7 +39,18 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
|
@ -47,8 +64,11 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
|||
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
|
@ -529,6 +549,129 @@ public class TestWALReplay {
|
|||
assertEquals(result.size(), result1b.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that we could recover the data correctly after aborting flush. In the
|
||||
* test, first we abort flush after writing some data, then writing more data
|
||||
* and flush again, at last verify the data.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testReplayEditsAfterAbortingFlush() throws IOException {
|
||||
final String tableNameStr = "testReplayEditsAfterAbortingFlush";
|
||||
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
|
||||
final Path basedir = new Path(this.hbaseRootDir, tableNameStr);
|
||||
deleteDir(basedir);
|
||||
final HTableDescriptor htd = createBasic3FamilyHTD(tableNameStr);
|
||||
HRegion region3 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
|
||||
region3.close();
|
||||
region3.getLog().closeAndDelete();
|
||||
// Write countPerFamily edits into the three families. Do a flush on one
|
||||
// of the families during the load of edits so its seqid is not same as
|
||||
// others to test we do right thing when different seqids.
|
||||
HLog wal = createWAL(this.conf);
|
||||
final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
|
||||
RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
|
||||
Mockito.doReturn(false).when(rsServices).isAborted();
|
||||
HRegion region = new HRegion(basedir, wal, this.fs, this.conf, hri, htd,
|
||||
rsServices) {
|
||||
@Override
|
||||
protected HStore instantiateHStore(Path tableDir, HColumnDescriptor c)
|
||||
throws IOException {
|
||||
return new HStore(tableDir, this, c, fs, conf) {
|
||||
@Override
|
||||
protected Path flushCache(final long logCacheFlushId,
|
||||
SortedSet<KeyValue> snapshot,
|
||||
TimeRangeTracker snapshotTimeRangeTracker,
|
||||
AtomicLong flushedSize, MonitoredTask status) throws IOException {
|
||||
if (throwExceptionWhenFlushing.get()) {
|
||||
throw new IOException("Simulated exception by tests");
|
||||
}
|
||||
return super.flushCache(logCacheFlushId, snapshot,
|
||||
snapshotTimeRangeTracker, flushedSize, status);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
long seqid = region.initialize();
|
||||
// HRegionServer usually does this. It knows the largest seqid across all
|
||||
// regions.
|
||||
wal.setSequenceNumber(seqid);
|
||||
|
||||
int writtenRowCount = 10;
|
||||
List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>(
|
||||
htd.getFamilies());
|
||||
for (int i = 0; i < writtenRowCount; i++) {
|
||||
Put put = new Put(Bytes.toBytes(tableNameStr + Integer.toString(i)));
|
||||
put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
|
||||
Bytes.toBytes("val"));
|
||||
region.put(put);
|
||||
}
|
||||
|
||||
// Now assert edits made it in.
|
||||
RegionScanner scanner = region.getScanner(new Scan());
|
||||
assertEquals(writtenRowCount, getScannedCount(scanner));
|
||||
|
||||
// Let us flush the region
|
||||
throwExceptionWhenFlushing.set(true);
|
||||
try {
|
||||
region.flushcache();
|
||||
fail("Injected exception hasn't been thrown");
|
||||
} catch (Throwable t) {
|
||||
LOG.info("Expected simulated exception when flushing region,"
|
||||
+ t.getMessage());
|
||||
// simulated to abort server
|
||||
Mockito.doReturn(true).when(rsServices).isAborted();
|
||||
}
|
||||
// writing more data
|
||||
int moreRow = 10;
|
||||
for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
|
||||
Put put = new Put(Bytes.toBytes(tableNameStr + Integer.toString(i)));
|
||||
put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
|
||||
Bytes.toBytes("val"));
|
||||
region.put(put);
|
||||
}
|
||||
writtenRowCount += moreRow;
|
||||
// call flush again
|
||||
throwExceptionWhenFlushing.set(false);
|
||||
try {
|
||||
region.flushcache();
|
||||
} catch (IOException t) {
|
||||
LOG.info("Expected exception when flushing region because server is stopped,"
|
||||
+ t.getMessage());
|
||||
}
|
||||
|
||||
region.close(true);
|
||||
wal.close();
|
||||
|
||||
// Let us try to split and recover
|
||||
runWALSplit(this.conf);
|
||||
HLog wal2 = createWAL(this.conf);
|
||||
Mockito.doReturn(false).when(rsServices).isAborted();
|
||||
HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, htd,
|
||||
rsServices);
|
||||
long seqid2 = region2.initialize();
|
||||
// HRegionServer usually does this. It knows the largest seqid across all
|
||||
// regions.
|
||||
wal2.setSequenceNumber(seqid2);
|
||||
|
||||
scanner = region2.getScanner(new Scan());
|
||||
assertEquals(writtenRowCount, getScannedCount(scanner));
|
||||
}
|
||||
|
||||
private int getScannedCount(RegionScanner scanner) throws IOException {
|
||||
int scannedCount = 0;
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
while (true) {
|
||||
boolean existMore = scanner.next(results);
|
||||
if (!results.isEmpty())
|
||||
scannedCount++;
|
||||
if (!existMore)
|
||||
break;
|
||||
results.clear();
|
||||
}
|
||||
return scannedCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an HRegion with the result of a HLog split and test we only see the
|
||||
* good edits
|
||||
|
|
Loading…
Reference in New Issue