HBASE-2645 HLog writer can do 1-2 sync operations after lease has been recovered for split process; REVERT -- TEST FAILS

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1404875 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-11-02 04:32:54 +00:00
parent 75682c0c5b
commit 2803eed35e
1 changed files with 64 additions and 75 deletions

View File

@ -85,7 +85,8 @@ public class TestHLogSplit {
private Configuration conf; private Configuration conf;
private FileSystem fs; private FileSystem fs;
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static HBaseTestingUtility
TEST_UTIL = new HBaseTestingUtility();
private static final Path hbaseDir = new Path("/hbase"); private static final Path hbaseDir = new Path("/hbase");
@ -155,46 +156,6 @@ public class TestHLogSplit {
public void tearDown() throws Exception { public void tearDown() throws Exception {
} }
/**
* Simulates splitting a WAL out from under a regionserver that is still trying to write it. Ensures we do not
* lose edits.
* @throws IOException
*/
@Test
public void testLogCannotBeWrittenOnceParsed() throws IOException {
AtomicLong counter = new AtomicLong(0);
AtomicBoolean stop = new AtomicBoolean(false);
// Index of the WAL we want to keep open. generateHLogs will leave open the WAL whose index we supply here.
int walToKeepOpen = 9;
// The below method writes NUM_WRITERS files each with ENTRIES entries it for a total of NUM_WRITERS * ENTRIES added
// per column family in the region.
generateHLogs(walToKeepOpen);
fs.initialize(fs.getUri(), conf);
String julietRegion = regions.get(0);
// This WAL should be open still
HLog.Writer stillOpenWAL = writer[walToKeepOpen];
// Thread that will keep writing the WAL across the WAL splitting below. Before we start writing, make sure the
// counter has the edits that were written above in generateHLogs.
counter.addAndGet(ENTRIES * NUM_WRITERS);
Thread zombie = new ZombieLastLogWriterRegionServer(stillOpenWAL, counter, stop, TABLE_NAME, julietRegion);
try {
zombie.start();
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, hbaseDir, hlogDir, oldLogDir, fs);
logSplitter.splitLog();
// Get the recovered edits file made by the WAL log splitting process above.
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, julietRegion);
// It's possible that the writer got an error while appending and didn't count an edit
// however the entry will in fact be written to file and split with the rest
long numberOfEditsInRegion = countHLog(logfile, fs, conf);
assertTrue("The log file could have at most 1 extra log entry, but can't have less. Zombie could write " +
counter.get() + " and logfile had only " + numberOfEditsInRegion + " in logfile=" + logfile,
counter.get() == numberOfEditsInRegion || counter.get() + 1 == numberOfEditsInRegion);
} finally {
stop.set(true);
}
}
/** /**
* @throws IOException * @throws IOException
* @see https://issues.apache.org/jira/browse/HBASE-3020 * @see https://issues.apache.org/jira/browse/HBASE-3020
@ -517,11 +478,11 @@ public class TestHLogSplit {
hbaseDir, hlogDir, oldLogDir, fs); hbaseDir, hlogDir, oldLogDir, fs);
logSplitter.splitLog(); logSplitter.splitLog();
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION); Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
int actualCount = 0; int actualCount = 0;
HLog.Reader in = HLogFactory.createReader(fs, splitLog, conf); HLog.Reader in = HLogFactory.createReader(fs, splitLog, conf);
@SuppressWarnings("unused")
HLog.Entry entry; HLog.Entry entry;
while ((entry = in.next()) != null) ++actualCount; while ((entry = in.next()) != null) ++actualCount;
assertEquals(entryCount-1, actualCount); assertEquals(entryCount-1, actualCount);
@ -581,7 +542,36 @@ public class TestHLogSplit {
// hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
} }
} }
/* DISABLED for now. TODO: HBASE-2645
@Test
public void testLogCannotBeWrittenOnceParsed() throws IOException {
AtomicLong counter = new AtomicLong(0);
AtomicBoolean stop = new AtomicBoolean(false);
generateHLogs(9);
fs.initialize(fs.getUri(), conf);
Thread zombie = new ZombieLastLogWriterRegionServer(writer[9], counter, stop);
try {
zombie.start();
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, "juliet");
// It's possible that the writer got an error while appending and didn't count it
// however the entry will in fact be written to file and split with the rest
long numberOfEditsInRegion = countHLog(logfile, fs, conf);
assertTrue("The log file could have at most 1 extra log entry, but " +
"can't have less. Zombie could write "+counter.get() +" and logfile had only"+ numberOfEditsInRegion+" " + logfile, counter.get() == numberOfEditsInRegion ||
counter.get() + 1 == numberOfEditsInRegion);
} finally {
stop.set(true);
}
}
*/
@Test @Test
public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted() public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted()
@ -945,16 +935,10 @@ public class TestHLogSplit {
AtomicBoolean stop; AtomicBoolean stop;
Path log; Path log;
HLog.Writer lastLogWriter; HLog.Writer lastLogWriter;
private final byte [] tableName; public ZombieLastLogWriterRegionServer(HLog.Writer writer, AtomicLong counter, AtomicBoolean stop) {
private final String regionName;
public ZombieLastLogWriterRegionServer(HLog.Writer writer, AtomicLong counter, AtomicBoolean stop,
final byte [] tableName, final String regionName) {
this.stop = stop; this.stop = stop;
this.editsCount = counter; this.editsCount = counter;
this.lastLogWriter = writer; this.lastLogWriter = writer;
this.tableName = tableName;
this.regionName = regionName;
} }
@Override @Override
@ -963,16 +947,13 @@ public class TestHLogSplit {
return; return;
} }
flushToConsole("starting"); flushToConsole("starting");
byte [] regionBytes = Bytes.toBytes(this.regionName);
boolean created = false;
while (true) { while (true) {
try { try {
if (!created) { String region = "juliet";
fs.mkdirs(new Path(new Path(hbaseDir, Bytes.toString(this.tableName)), this.regionName));
created = true; fs.mkdirs(new Path(new Path(hbaseDir, region), region));
} appendEntry(lastLogWriter, TABLE_NAME, region.getBytes(),
appendEntry(lastLogWriter, TABLE_NAME, regionBytes, ("r" + editsCount).getBytes(), regionBytes, ("r" + editsCount).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
QUALIFIER, VALUE, 0);
lastLogWriter.sync(); lastLogWriter.sync();
editsCount.incrementAndGet(); editsCount.incrementAndGet();
try { try {
@ -980,16 +961,21 @@ public class TestHLogSplit {
} catch (InterruptedException e) { } catch (InterruptedException e) {
// //
} }
} catch (IOException ex) { } catch (IOException ex) {
if (ex instanceof RemoteException) { if (ex instanceof RemoteException) {
flushToConsole("Juliet: got RemoteException " + ex.getMessage() + flushToConsole("Juliet: got RemoteException " +
" while writing " + (editsCount.get() + 1)); ex.getMessage() + " while writing " + (editsCount.get() + 1));
break; break;
} else { } else {
assertTrue("Failed to write " + editsCount.get(), false); assertTrue("Failed to write " + editsCount.get(), false);
} }
} }
} }
} }
} }
@ -1163,7 +1149,8 @@ public class TestHLogSplit {
regions.add(regionName); regions.add(regionName);
generateHLogs(-1); generateHLogs(-1);
HLogFactory.createHLog(fs, regiondir, regionName, conf); final HLog log = HLogFactory.createHLog(fs, regiondir,
regionName, conf);
HLogSplitter logSplitter = new HLogSplitter( HLogSplitter logSplitter = new HLogSplitter(
conf, hbaseDir, hlogDir, oldLogDir, fs, null) { conf, hbaseDir, hlogDir, oldLogDir, fs, null) {
@ -1223,12 +1210,14 @@ public class TestHLogSplit {
makeRegionDirs(fs, regions); makeRegionDirs(fs, regions);
fs.mkdirs(hlogDir); fs.mkdirs(hlogDir);
for (int i = 0; i < writers; i++) { for (int i = 0; i < writers; i++) {
writer[i] = HLogFactory.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf); writer[i] = HLogFactory.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i),
conf);
for (int j = 0; j < entries; j++) { for (int j = 0; j < entries; j++) {
int prefix = 0; int prefix = 0;
for (String region : regions) { for (String region : regions) {
String row_key = region + prefix++ + i + j; String row_key = region + prefix++ + i + j;
appendEntry(writer[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq); appendEntry(writer[i], TABLE_NAME, region.getBytes(),
row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq);
} }
} }
if (i != leaveOpen) { if (i != leaveOpen) {