HDFS-14674. [SBN read] Got an unexpected txid when tail editlog. Contributed by wangzhaohui.
This commit is contained in:
parent
2408c2491f
commit
c824653955
|
@ -894,6 +894,7 @@ public class FSImage implements Closeable {
|
||||||
prog.beginPhase(Phase.LOADING_EDITS);
|
prog.beginPhase(Phase.LOADING_EDITS);
|
||||||
|
|
||||||
long prevLastAppliedTxId = lastAppliedTxId;
|
long prevLastAppliedTxId = lastAppliedTxId;
|
||||||
|
long remainingReadTxns = maxTxnsToRead;
|
||||||
try {
|
try {
|
||||||
FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);
|
FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);
|
||||||
|
|
||||||
|
@ -910,8 +911,8 @@ public class FSImage implements Closeable {
|
||||||
(lastAppliedTxId + 1) + logSuppressed);
|
(lastAppliedTxId + 1) + logSuppressed);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead,
|
remainingReadTxns -= loader.loadFSEdits(editIn, lastAppliedTxId + 1,
|
||||||
startOpt, recovery);
|
remainingReadTxns, startOpt, recovery);
|
||||||
} finally {
|
} finally {
|
||||||
// Update lastAppliedTxId even in case of error, since some ops may
|
// Update lastAppliedTxId even in case of error, since some ops may
|
||||||
// have been successfully applied before the error.
|
// have been successfully applied before the error.
|
||||||
|
@ -922,6 +923,9 @@ public class FSImage implements Closeable {
|
||||||
&& recovery != null) {
|
&& recovery != null) {
|
||||||
lastAppliedTxId = editIn.getLastTxId();
|
lastAppliedTxId = editIn.getLastTxId();
|
||||||
}
|
}
|
||||||
|
if (remainingReadTxns <= 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
FSEditLog.closeAllStreams(editStreams);
|
FSEditLog.closeAllStreams(editStreams);
|
||||||
|
|
|
@ -305,6 +305,87 @@ public class TestEditLog {
|
||||||
return loader.loadFSEdits(new EditLogByteInputStream(data), 1);
|
return loader.loadFSEdits(new EditLogByteInputStream(data), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiStreamsLoadEditWithConfMaxTxns()
|
||||||
|
throws IOException {
|
||||||
|
Configuration conf = getConf();
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
FileSystem fileSystem = null;
|
||||||
|
FSImage writeFsImage = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster
|
||||||
|
.Builder(conf)
|
||||||
|
.numDataNodes(NUM_DATA_NODES)
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
fileSystem = cluster.getFileSystem();
|
||||||
|
final FSNamesystem namesystem = cluster.getNamesystem();
|
||||||
|
writeFsImage = namesystem.getFSImage();
|
||||||
|
for (Iterator<URI> it = cluster.getNameDirs(0)
|
||||||
|
.iterator(); it.hasNext();) {
|
||||||
|
File dir = new File(it.next().getPath());
|
||||||
|
System.out.println(dir);
|
||||||
|
}
|
||||||
|
// Roll log so new output buffer size takes effect
|
||||||
|
// we should now be writing to edits_inprogress_3
|
||||||
|
long originalLastInodeId = namesystem.dir.getLastInodeId();
|
||||||
|
// Reopen some files as for append
|
||||||
|
Transactions trans = new Transactions(
|
||||||
|
namesystem, NUM_TRANSACTIONS, NUM_TRANSACTIONS / 2);
|
||||||
|
trans.run();
|
||||||
|
// Roll another time to finalize edits_inprogress_3
|
||||||
|
writeFsImage.rollEditLog(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||||
|
Transactions trans1 = new Transactions(
|
||||||
|
namesystem, NUM_TRANSACTIONS, NUM_TRANSACTIONS / 2);
|
||||||
|
trans1.run();
|
||||||
|
writeFsImage.rollEditLog(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||||
|
namesystem.dir.resetLastInodeIdWithoutChecking(originalLastInodeId);
|
||||||
|
for(Iterator<StorageDirectory> it = writeFsImage.getStorage().
|
||||||
|
dirIterator(NameNodeDirType.EDITS); it.hasNext();){
|
||||||
|
long expectedTxns = (2 * NUM_TRANSACTIONS) + 2;
|
||||||
|
File editFile = NNStorage.getFinalizedEditsFile(it.next(),
|
||||||
|
1, expectedTxns);
|
||||||
|
File editFile1 = NNStorage.getFinalizedEditsFile(it.next(),
|
||||||
|
203, 404);
|
||||||
|
assertTrue("Expect " + editFile + " exists", editFile.exists());
|
||||||
|
assertTrue("Expect " + editFile1 + " exists", editFile1.exists());
|
||||||
|
EditLogFileInputStream editLogFileInputStream1 =
|
||||||
|
new EditLogFileInputStream(editFile, 1, 202, false);
|
||||||
|
EditLogFileInputStream editLogFileInputStream2 =
|
||||||
|
new EditLogFileInputStream(editFile1, 203, 404, false);
|
||||||
|
List<EditLogInputStream> editStreams = Lists.newArrayList();
|
||||||
|
editStreams.add(editLogFileInputStream1);
|
||||||
|
editStreams.add(editLogFileInputStream2);
|
||||||
|
FSImage readFsImage = new FSImage(conf);
|
||||||
|
try {
|
||||||
|
readFsImage.loadEdits(editStreams, namesystem, 100, null, null);
|
||||||
|
} catch (Exception e){
|
||||||
|
LOG.error("There appears to be an out-of-order edit in the edit log",
|
||||||
|
e.getMessage());
|
||||||
|
fail("no exception should be thrown");
|
||||||
|
} finally {
|
||||||
|
if (readFsImage != null) {
|
||||||
|
readFsImage.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
if(fileSystem != null) {
|
||||||
|
fileSystem.close();
|
||||||
|
}
|
||||||
|
if(cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
if(writeFsImage != null){
|
||||||
|
writeFsImage.close();
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.error("Couldn't shut down cleanly", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple test for writing to and rolling the edit log.
|
* Simple test for writing to and rolling the edit log.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue