HBASE-5689 Skipping RecoveredEdits may cause data loss (Chunhui)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1310788 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
915f2cd40d
commit
7a0c1daf63
|
@ -2718,7 +2718,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
long seqid = minSeqId;
|
||||
NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
|
||||
if (files == null || files.isEmpty()) return seqid;
|
||||
boolean checkSafeToSkip = true;
|
||||
for (Path edits: files) {
|
||||
if (edits == null || !this.fs.exists(edits)) {
|
||||
LOG.warn("Null or non-existent edits file: " + edits);
|
||||
|
@ -2726,22 +2725,15 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
if (isZeroLengthThenDelete(this.fs, edits)) continue;
|
||||
|
||||
if (checkSafeToSkip) {
|
||||
Path higher = files.higher(edits);
|
||||
long maxSeqId = Long.MAX_VALUE;
|
||||
if (higher != null) {
|
||||
// Edit file name pattern, HLog.EDITFILES_NAME_PATTERN: "-?[0-9]+"
|
||||
String fileName = higher.getName();
|
||||
String fileName = edits.getName();
|
||||
maxSeqId = Math.abs(Long.parseLong(fileName));
|
||||
}
|
||||
if (maxSeqId <= minSeqId) {
|
||||
String msg = "Maximum possible sequenceid for this log is " + maxSeqId
|
||||
String msg = "Maximum sequenceid for this log is " + maxSeqId
|
||||
+ " and minimum sequenceid for the region is " + minSeqId
|
||||
+ ", skipped the whole file, path=" + edits;
|
||||
LOG.debug(msg);
|
||||
continue;
|
||||
} else {
|
||||
checkSafeToSkip = false;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
|
@ -426,6 +426,7 @@ public class HLogSplitter {
|
|||
}
|
||||
}
|
||||
wap.w.append(entry);
|
||||
outputSink.updateRegionMaximumEditLogSeqNum(entry);
|
||||
editsCount++;
|
||||
// If sufficient edits have passed OR we've opened a few files, check if
|
||||
// we should report progress.
|
||||
|
@ -455,7 +456,8 @@ public class HLogSplitter {
|
|||
throw e;
|
||||
} finally {
|
||||
int n = 0;
|
||||
for (Object o : logWriters.values()) {
|
||||
for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
|
||||
Object o = logWritersEntry.getValue();
|
||||
long t1 = EnvironmentEdgeManager.currentTimeMillis();
|
||||
if ((t1 - last_report_at) > period) {
|
||||
last_report_at = t;
|
||||
|
@ -471,7 +473,8 @@ public class HLogSplitter {
|
|||
WriterAndPath wap = (WriterAndPath)o;
|
||||
wap.w.close();
|
||||
LOG.debug("Closed " + wap.p);
|
||||
Path dst = getCompletedRecoveredEditsFilePath(wap.p);
|
||||
Path dst = getCompletedRecoveredEditsFilePath(wap.p, outputSink
|
||||
.getRegionMaximumEditLogSeqNum(logWritersEntry.getKey()));
|
||||
if (!dst.equals(wap.p) && fs.exists(dst)) {
|
||||
LOG.warn("Found existing old edits file. It could be the "
|
||||
+ "result of a previous failed split attempt. Deleting " + dst
|
||||
|
@ -488,6 +491,7 @@ public class HLogSplitter {
|
|||
if (!fs.rename(wap.p, dst)) {
|
||||
throw new IOException("Failed renaming " + wap.p + " to " + dst);
|
||||
}
|
||||
LOG.debug("Rename " + wap.p + " to " + dst);
|
||||
}
|
||||
}
|
||||
String msg = "Processed " + editsCount + " edits across " + n + " regions" +
|
||||
|
@ -681,16 +685,16 @@ public class HLogSplitter {
|
|||
}
|
||||
|
||||
/**
|
||||
* Convert path to a file under RECOVERED_EDITS_DIR directory without
|
||||
* RECOVERED_LOG_TMPFILE_SUFFIX
|
||||
* Get the completed recovered edits file path, renaming it to be by last edit
|
||||
* in the file from its first edit. Then we could use the name to skip
|
||||
* recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
|
||||
* @param srcPath
|
||||
* @return dstPath without RECOVERED_LOG_TMPFILE_SUFFIX
|
||||
* @param maximumEditLogSeqNum
|
||||
* @return dstPath take file's last edit log seq num as the name
|
||||
*/
|
||||
static Path getCompletedRecoveredEditsFilePath(Path srcPath) {
|
||||
String fileName = srcPath.getName();
|
||||
if (fileName.endsWith(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)) {
|
||||
fileName = fileName.split(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)[0];
|
||||
}
|
||||
static Path getCompletedRecoveredEditsFilePath(Path srcPath,
|
||||
Long maximumEditLogSeqNum) {
|
||||
String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
|
||||
return new Path(srcPath.getParent(), fileName);
|
||||
}
|
||||
|
||||
|
@ -1027,6 +1031,7 @@ public class HLogSplitter {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
|
||||
List<Entry> entries = buffer.entryBuffer;
|
||||
if (entries.isEmpty()) {
|
||||
|
@ -1050,6 +1055,7 @@ public class HLogSplitter {
|
|||
}
|
||||
}
|
||||
wap.w.append(logEntry);
|
||||
outputSink.updateRegionMaximumEditLogSeqNum(logEntry);
|
||||
editsCount++;
|
||||
}
|
||||
// Pass along summary statistics
|
||||
|
@ -1138,6 +1144,8 @@ public class HLogSplitter {
|
|||
class OutputSink {
|
||||
private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
|
||||
new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
|
||||
private final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
|
||||
.synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
|
||||
private final List<WriterThread> writerThreads = Lists.newArrayList();
|
||||
|
||||
/* Set of regions which we've decided should not output edits */
|
||||
|
@ -1204,8 +1212,11 @@ public class HLogSplitter {
|
|||
List<Path> paths = new ArrayList<Path>();
|
||||
List<IOException> thrown = Lists.newArrayList();
|
||||
closeLogWriters(thrown);
|
||||
for (WriterAndPath wap : logWriters.values()) {
|
||||
Path dst = getCompletedRecoveredEditsFilePath(wap.p);
|
||||
for (Map.Entry<byte[], WriterAndPath> logWritersEntry : logWriters
|
||||
.entrySet()) {
|
||||
WriterAndPath wap = logWritersEntry.getValue();
|
||||
Path dst = getCompletedRecoveredEditsFilePath(wap.p,
|
||||
regionMaximumEditLogSeqNum.get(logWritersEntry.getKey()));
|
||||
try {
|
||||
if (!dst.equals(wap.p) && fs.exists(dst)) {
|
||||
LOG.warn("Found existing old edits file. It could be the "
|
||||
|
@ -1223,6 +1234,7 @@ public class HLogSplitter {
|
|||
if (!fs.rename(wap.p, dst)) {
|
||||
throw new IOException("Failed renaming " + wap.p + " to " + dst);
|
||||
}
|
||||
LOG.debug("Rename " + wap.p + " to " + dst);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
|
||||
|
@ -1289,6 +1301,18 @@ public class HLogSplitter {
|
|||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update region's maximum edit log SeqNum.
|
||||
*/
|
||||
void updateRegionMaximumEditLogSeqNum(Entry entry) {
|
||||
regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(),
|
||||
entry.getKey().getLogSeqNum());
|
||||
}
|
||||
|
||||
Long getRegionMaximumEditLogSeqNum(byte[] region) {
|
||||
return regionMaximumEditLogSeqNum.get(region);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a map from encoded region ID to the number of edits written out
|
||||
* for that region.
|
||||
|
|
|
@ -54,9 +54,11 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
|||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
|
||||
|
@ -67,6 +69,7 @@ import org.apache.hadoop.hbase.filter.NullComparator;
|
|||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||
import org.apache.hadoop.hbase.io.hfile.Compression;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
||||
|
@ -136,6 +139,95 @@ public class TestHRegion extends HBaseTestCase {
|
|||
SchemaMetrics.validateMetricChanges(startingMetrics);
|
||||
}
|
||||
|
||||
public void testDataCorrectnessReplayingRecoveredEdits() throws Exception {
|
||||
final int NUM_MASTERS = 1;
|
||||
final int NUM_RS = 3;
|
||||
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
|
||||
|
||||
try {
|
||||
final byte[] TABLENAME = Bytes
|
||||
.toBytes("testDataCorrectnessReplayingRecoveredEdits");
|
||||
final byte[] FAMILY = Bytes.toBytes("family");
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
HMaster master = cluster.getMaster();
|
||||
|
||||
// Create table
|
||||
HTableDescriptor desc = new HTableDescriptor(TABLENAME);
|
||||
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||
HBaseAdmin hbaseAdmin = TEST_UTIL.getHBaseAdmin();
|
||||
hbaseAdmin.createTable(desc);
|
||||
|
||||
assertTrue(hbaseAdmin.isTableAvailable(TABLENAME));
|
||||
|
||||
// Put data: r1->v1
|
||||
HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
|
||||
putDataAndVerify(table, "r1", FAMILY, "v1", 1);
|
||||
|
||||
// Move region to target server
|
||||
HRegionInfo regionInfo = table.getRegionLocation("r1").getRegionInfo();
|
||||
int originServerNum = cluster.getServerWith(regionInfo.getRegionName());
|
||||
HRegionServer originServer = cluster.getRegionServer(originServerNum);
|
||||
int targetServerNum = NUM_RS - 1 - originServerNum;
|
||||
HRegionServer targetServer = cluster.getRegionServer(targetServerNum);
|
||||
hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(targetServer.getServerName().getServerName()));
|
||||
do {
|
||||
Thread.sleep(1);
|
||||
} while (cluster.getServerWith(regionInfo.getRegionName()) == originServerNum);
|
||||
|
||||
// Put data: r2->v2
|
||||
putDataAndVerify(table, "r2", FAMILY, "v2", 2);
|
||||
|
||||
// Move region to origin server
|
||||
hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(originServer.getServerName().getServerName()));
|
||||
do {
|
||||
Thread.sleep(1);
|
||||
} while (cluster.getServerWith(regionInfo.getRegionName()) == targetServerNum);
|
||||
|
||||
// Put data: r3->v3
|
||||
putDataAndVerify(table, "r3", FAMILY, "v3", 3);
|
||||
|
||||
// Kill target server
|
||||
targetServer.kill();
|
||||
cluster.getRegionServerThreads().get(targetServerNum).join();
|
||||
// Wait until finish processing of shutdown
|
||||
while (master.getServerManager().areDeadServersInProgress()) {
|
||||
Thread.sleep(5);
|
||||
}
|
||||
// Kill origin server
|
||||
originServer.kill();
|
||||
cluster.getRegionServerThreads().get(originServerNum).join();
|
||||
|
||||
// Put data: r4->v4
|
||||
putDataAndVerify(table, "r4", FAMILY, "v4", 4);
|
||||
|
||||
} finally {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
}
|
||||
|
||||
private void putDataAndVerify(HTable table, String row, byte[] family,
|
||||
String value, int verifyNum) throws IOException {
|
||||
System.out.println("=========Putting data :" + row);
|
||||
Put put = new Put(Bytes.toBytes(row));
|
||||
put.add(family, Bytes.toBytes("q1"), Bytes.toBytes(value));
|
||||
table.put(put);
|
||||
ResultScanner resultScanner = table.getScanner(new Scan());
|
||||
List<Result> results = new ArrayList<Result>();
|
||||
while (true) {
|
||||
Result r = resultScanner.next();
|
||||
if (r == null)
|
||||
break;
|
||||
results.add(r);
|
||||
}
|
||||
resultScanner.close();
|
||||
if (results.size() != verifyNum) {
|
||||
System.out.println(results);
|
||||
}
|
||||
assertEquals(verifyNum, results.size());
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// New tests that doesn't spin up a mini cluster but rather just test the
|
||||
// individual code pieces in the HRegion. Putting files locally in
|
||||
|
|
Loading…
Reference in New Issue