HBASE-6059 Replaying recovered edits would make deleted data exist again

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1344554 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-05-31 03:46:27 +00:00
parent bf1470c6c7
commit b3ec7ced77
6 changed files with 179 additions and 50 deletions

View File

@ -151,7 +151,10 @@ class Compactor extends Configured {
boolean hasMore;
do {
hasMore = scanner.next(kvs, compactionKVMax);
if (writer == null && !kvs.isEmpty()) {
// Create the writer even if no kv(Empty store file is also ok),
// because we need record the max seq id for the store file, see
// HBASE-6059
if (writer == null) {
writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true);
}
if (writer != null) {

View File

@ -526,22 +526,13 @@ public class HRegion implements HeapSize { // , Writable{
cleanupTmpDir();
// Load in all the HStores.
// Get minimum of the maxSeqId across all the store.
//
// Context: During replay we want to ensure that we do not lose any data. So, we
// have to be conservative in how we replay logs. For each store, we calculate
// the maxSeqId up to which the store was flushed. But, since different stores
// could have a different maxSeqId, we choose the
// minimum across all the stores.
// This could potentially result in duplication of data for stores that are ahead
// of others. ColumnTrackers in the ScanQueryMatchers do the de-duplication, so we
// do not have to worry.
// TODO: If there is a store that was never flushed in a long time, we could replay
// a lot of data. Currently, this is not a problem because we flush all the stores at
// the same time. If we move to per-cf flushing, we might want to revisit this and send
// in a vector of maxSeqIds instead of sending in a single number, which has to be the
// min across all the max.
long minSeqId = -1;
// the maxSeqId up to which the store was flushed. And, skip the edits which
// is equal to or lower than maxSeqId for each store.
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
Bytes.BYTES_COMPARATOR);
long maxSeqId = -1;
// initialized to -1 so that we pick up MemstoreTS from column families
long maxMemstoreTS = -1;
@ -571,9 +562,8 @@ public class HRegion implements HeapSize { // , Writable{
this.stores.put(store.getColumnFamilyName().getBytes(), store);
long storeSeqId = store.getMaxSequenceId();
if (minSeqId == -1 || storeSeqId < minSeqId) {
minSeqId = storeSeqId;
}
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
storeSeqId);
if (maxSeqId == -1 || storeSeqId > maxSeqId) {
maxSeqId = storeSeqId;
}
@ -593,7 +583,7 @@ public class HRegion implements HeapSize { // , Writable{
mvcc.initialize(maxMemstoreTS + 1);
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
this.regiondir, minSeqId, reporter, status));
this.regiondir, maxSeqIdInStores, reporter, status));
status.setStatus("Cleaning up detritus from prior splits");
// Get rid of any splits or merges that were lost in-progress. Clean out
@ -2755,8 +2745,8 @@ public class HRegion implements HeapSize { // , Writable{
* make sense in a this single region context only -- until we online.
*
* @param regiondir
* @param minSeqId Any edit found in split editlogs needs to be in excess of
* this minSeqId to be applied, else its skipped.
* @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
* the maxSeqId for the store to be applied, else its skipped.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
@ -2764,12 +2754,19 @@ public class HRegion implements HeapSize { // , Writable{
* @throws IOException
*/
protected long replayRecoveredEditsIfAny(final Path regiondir,
final long minSeqId, final CancelableProgressable reporter,
final MonitoredTask status)
Map<byte[], Long> maxSeqIdInStores,
final CancelableProgressable reporter, final MonitoredTask status)
throws UnsupportedEncodingException, IOException {
long seqid = minSeqId;
long minSeqIdForTheRegion = -1;
for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
minSeqIdForTheRegion = maxSeqIdInStore;
}
}
long seqid = minSeqIdForTheRegion;
NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
if (files == null || files.isEmpty()) return seqid;
for (Path edits: files) {
if (edits == null || !this.fs.exists(edits)) {
LOG.warn("Null or non-existent edits file: " + edits);
@ -2780,16 +2777,16 @@ public class HRegion implements HeapSize { // , Writable{
long maxSeqId = Long.MAX_VALUE;
String fileName = edits.getName();
maxSeqId = Math.abs(Long.parseLong(fileName));
if (maxSeqId <= minSeqId) {
if (maxSeqId <= minSeqIdForTheRegion) {
String msg = "Maximum sequenceid for this log is " + maxSeqId
+ " and minimum sequenceid for the region is " + minSeqId
+ " and minimum sequenceid for the region is " + minSeqIdForTheRegion
+ ", skipped the whole file, path=" + edits;
LOG.debug(msg);
continue;
}
try {
seqid = replayRecoveredEdits(edits, seqid, reporter);
seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter);
} catch (IOException e) {
boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
if (skipErrors) {
@ -2806,7 +2803,7 @@ public class HRegion implements HeapSize { // , Writable{
this.rsAccounting.clearRegionReplayEditsSize(this.regionInfo.getRegionName());
}
}
if (seqid > minSeqId) {
if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid, status);
}
@ -2823,18 +2820,17 @@ public class HRegion implements HeapSize { // , Writable{
/*
* @param edits File of recovered edits.
* @param minSeqId Minimum sequenceid found in a store file. Edits in log
* must be larger than this to be replayed.
* @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in log
* must be larger than this to be replayed for each store.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
private long replayRecoveredEdits(final Path edits,
final long minSeqId, final CancelableProgressable reporter)
Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
throws IOException {
String msg = "Replaying edits from " + edits + "; minSequenceid=" +
minSeqId + "; path=" + edits;
String msg = "Replaying edits from " + edits;
LOG.info(msg);
MonitoredTask status = TaskMonitor.get().createStatus(msg);
@ -2842,7 +2838,7 @@ public class HRegion implements HeapSize { // , Writable{
HLog.Reader reader = null;
try {
reader = HLog.getReader(this.fs, edits, conf);
long currentEditSeqId = minSeqId;
long currentEditSeqId = -1;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
@ -2901,12 +2897,6 @@ public class HRegion implements HeapSize { // , Writable{
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
// Now, figure if we should skip this edit.
if (key.getLogSeqNum() <= currentEditSeqId) {
skippedEdits++;
continue;
}
currentEditSeqId = key.getLogSeqNum();
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
@ -2927,6 +2917,13 @@ public class HRegion implements HeapSize { // , Writable{
skippedEdits++;
continue;
}
// Now, figure if we should skip this edit.
if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
.getName())) {
skippedEdits++;
continue;
}
currentEditSeqId = key.getLogSeqNum();
// Once we are over the limit, restoreEdit will keep returning true to
// flush -- but don't flush until we've played all the kvs that make up
// the WALEdit.

View File

@ -1728,6 +1728,10 @@ public class Store extends SchemaConfigured implements HeapSize {
LOG.warn("StoreFile " + f + " has a null Reader");
return;
}
if (r.getEntries() == 0) {
LOG.warn("StoreFile " + f + " is a empty store file");
return;
}
// TODO: Cache these keys rather than make each time?
byte [] fk = r.getFirstKey();
KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);

View File

@ -180,7 +180,13 @@ public class TestHRegion extends HBaseTestCase {
writer.close();
}
MonitoredTask status = TaskMonitor.get().createStatus(method);
long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId-1, null, status);
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
Bytes.BYTES_COMPARATOR);
for (Store store : region.getStores().values()) {
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
minSeqId - 1);
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
Get get = new Get(row);
Result result = region.get(get, null);
@ -226,7 +232,13 @@ public class TestHRegion extends HBaseTestCase {
}
long recoverSeqId = 1030;
MonitoredTask status = TaskMonitor.get().createStatus(method);
long seqId = region.replayRecoveredEditsIfAny(regiondir, recoverSeqId-1, null, status);
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
Bytes.BYTES_COMPARATOR);
for (Store store : region.getStores().values()) {
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
recoverSeqId - 1);
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
Get get = new Get(row);
Result result = region.get(get, null);
@ -267,7 +279,14 @@ public class TestHRegion extends HBaseTestCase {
recoveredEditsDir, String.format("%019d", minSeqId-1));
FSDataOutputStream dos= fs.create(recoveredEdits);
dos.close();
long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId, null, null);
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
Bytes.BYTES_COMPARATOR);
for (Store store : region.getStores().values()) {
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId);
}
long seqId = region.replayRecoveredEditsIfAny(regiondir,
maxSeqIdInStores, null, null);
assertEquals(minSeqId, seqId);
} finally {
HRegion.closeHRegion(this.region);

View File

@ -183,12 +183,17 @@ public class TestStore extends TestCase {
for (int i = 1; i <= storeFileNum; i++) {
// verify the expired store file.
CompactionRequest cr = this.store.requestCompaction();
assertEquals(1, cr.getFiles().size());
assertTrue(cr.getFiles().get(0).getReader().getMaxTimestamp() <
(System.currentTimeMillis() - this.store.scanInfo.getTtl()));
// Verify that the expired the store has been deleted.
this.store.compact(cr);
assertEquals(storeFileNum - i, this.store.getStorefiles().size());
// the first is expired normally.
// If not the first compaction, there is another empty store file,
assertEquals(Math.min(i, 2), cr.getFiles().size());
for (int j = 0; i < cr.getFiles().size(); j++) {
assertTrue(cr.getFiles().get(j).getReader().getMaxTimestamp() < (System
.currentTimeMillis() - this.store.scanInfo.getTtl()));
}
// Verify that the expired store file is compacted to an empty store file.
StoreFile compactedFile = this.store.compact(cr);
// It is an empty store file.
assertEquals(0, compactedFile.getReader().getEntries());
// Let the next store file expired.
Thread.sleep(sleepTime);

View File

@ -35,13 +35,20 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@ -77,7 +84,7 @@ public class TestWALReplay {
conf.setBoolean("dfs.support.append", true);
// The below config supported by 0.20-append and CDH3b2
conf.setInt("dfs.client.block.recovery.retries", 2);
TEST_UTIL.startMiniDFSCluster(3);
TEST_UTIL.startMiniCluster(3);
Path hbaseRootDir =
TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
LOG.info("hbase.rootdir=" + hbaseRootDir);
@ -86,7 +93,7 @@ public class TestWALReplay {
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniDFSCluster();
TEST_UTIL.shutdownMiniCluster();
}
@Before
@ -117,6 +124,100 @@ public class TestWALReplay {
}
}
/**
*
* @throws Exception
*/
@Test
public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
final byte[] tableName = Bytes
.toBytes("testReplayEditsAfterRegionMovedWithMultiCF");
byte[] family1 = Bytes.toBytes("cf1");
byte[] family2 = Bytes.toBytes("cf2");
byte[] qualifier = Bytes.toBytes("q");
byte[] value = Bytes.toBytes("testV");
byte[][] familys = { family1, family2 };
TEST_UTIL.createTable(tableName, familys);
HTable htable = new HTable(TEST_UTIL.getConfiguration(), tableName);
Put put = new Put(Bytes.toBytes("r1"));
put.add(family1, qualifier, value);
htable.put(put);
ResultScanner resultScanner = htable.getScanner(new Scan());
int count = 0;
while (resultScanner.next() != null) {
count++;
}
resultScanner.close();
assertEquals(1, count);
MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
List<HRegion> regions = hbaseCluster.getRegions(tableName);
assertEquals(1, regions.size());
// move region to another regionserver
HRegion destRegion = regions.get(0);
int originServerNum = hbaseCluster
.getServerWith(destRegion.getRegionName());
assertTrue("Please start more than 1 regionserver", hbaseCluster
.getRegionServerThreads().size() > 1);
int destServerNum = 0;
while (destServerNum == originServerNum) {
destServerNum++;
}
HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
// move region to destination regionserver
moveRegionAndWait(destRegion, destServer);
// delete the row
Delete del = new Delete(Bytes.toBytes("r1"));
htable.delete(del);
resultScanner = htable.getScanner(new Scan());
count = 0;
while (resultScanner.next() != null) {
count++;
}
resultScanner.close();
assertEquals(0, count);
// flush region and make major compaction
destServer.getOnlineRegion(destRegion.getRegionName()).flushcache();
// wait to complete major compaction
for (Store store : destServer.getOnlineRegion(destRegion.getRegionName())
.getStores().values()) {
store.triggerMajorCompaction();
}
destServer.getOnlineRegion(destRegion.getRegionName()).compactStores();
// move region to origin regionserver
moveRegionAndWait(destRegion, originServer);
// abort the origin regionserver
originServer.abort("testing");
// see what we get
Result result = htable.get(new Get(Bytes.toBytes("r1")));
if (result != null) {
assertTrue("Row is deleted, but we get" + result.toString(),
(result == null) || result.isEmpty());
}
resultScanner.close();
}
private void moveRegionAndWait(HRegion destRegion, HRegionServer destServer)
throws InterruptedException, MasterNotRunningException,
ZooKeeperConnectionException, IOException {
HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
TEST_UTIL.getHBaseAdmin().move(
destRegion.getRegionInfo().getEncodedNameAsBytes(),
Bytes.toBytes(destServer.getServerName().getServerName()));
while (true) {
ServerName serverName = master.getAssignmentManager()
.getRegionServerOfRegion(destRegion.getRegionInfo());
if (serverName != null && serverName.equals(destServer.getServerName())) break;
Thread.sleep(10);
}
}
/**
* Tests for hbase-2727.
* @throws Exception