HDFS-2909. HA: Inaccessible shared edits dir not getting removed from FSImage storage dirs upon error. Contributed by Bikas Saha.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1244753 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1fb0ab92f8
commit
3c145d3492
|
@ -202,3 +202,5 @@ HDFS-2947. On startup NN throws an NPE in the metrics system. (atm)
|
||||||
HDFS-2942. TestActiveStandbyElectorRealZK fails if build dir does not exist. (atm)
|
HDFS-2942. TestActiveStandbyElectorRealZK fails if build dir does not exist. (atm)
|
||||||
|
|
||||||
HDFS-2948. NN throws NPE during shutdown if it fails to startup (todd)
|
HDFS-2948. NN throws NPE during shutdown if it fails to startup (todd)
|
||||||
|
|
||||||
|
HDFS-2909. HA: Inaccessible shared edits dir not getting removed from FSImage storage dirs upon error. (Bikas Saha via jitendra)
|
||||||
|
|
|
@ -221,7 +221,7 @@ public class FSEditLog {
|
||||||
if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
|
if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
|
||||||
StorageDirectory sd = storage.getStorageDirectory(u);
|
StorageDirectory sd = storage.getStorageDirectory(u);
|
||||||
if (sd != null) {
|
if (sd != null) {
|
||||||
journalSet.add(new FileJournalManager(sd), required);
|
journalSet.add(new FileJournalManager(sd, storage), required);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
journalSet.add(createJournal(u), required);
|
journalSet.add(createJournal(u), required);
|
||||||
|
|
|
@ -52,6 +52,7 @@ class FileJournalManager implements JournalManager {
|
||||||
private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
|
private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
|
||||||
|
|
||||||
private final StorageDirectory sd;
|
private final StorageDirectory sd;
|
||||||
|
private final NNStorage storage;
|
||||||
private int outputBufferCapacity = 512*1024;
|
private int outputBufferCapacity = 512*1024;
|
||||||
|
|
||||||
private static final Pattern EDITS_REGEX = Pattern.compile(
|
private static final Pattern EDITS_REGEX = Pattern.compile(
|
||||||
|
@ -65,8 +66,9 @@ class FileJournalManager implements JournalManager {
|
||||||
StoragePurger purger
|
StoragePurger purger
|
||||||
= new NNStorageRetentionManager.DeletionStoragePurger();
|
= new NNStorageRetentionManager.DeletionStoragePurger();
|
||||||
|
|
||||||
public FileJournalManager(StorageDirectory sd) {
|
public FileJournalManager(StorageDirectory sd, NNStorage storage) {
|
||||||
this.sd = sd;
|
this.sd = sd;
|
||||||
|
this.storage = storage;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -75,11 +77,16 @@ class FileJournalManager implements JournalManager {
|
||||||
@Override
|
@Override
|
||||||
synchronized public EditLogOutputStream startLogSegment(long txid)
|
synchronized public EditLogOutputStream startLogSegment(long txid)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
currentInProgress = NNStorage.getInProgressEditsFile(sd, txid);
|
try {
|
||||||
EditLogOutputStream stm = new EditLogFileOutputStream(currentInProgress,
|
currentInProgress = NNStorage.getInProgressEditsFile(sd, txid);
|
||||||
outputBufferCapacity);
|
EditLogOutputStream stm = new EditLogFileOutputStream(currentInProgress,
|
||||||
stm.create();
|
outputBufferCapacity);
|
||||||
return stm;
|
stm.create();
|
||||||
|
return stm;
|
||||||
|
} catch (IOException e) {
|
||||||
|
storage.reportErrorsOnDirectory(sd);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -95,6 +102,7 @@ class FileJournalManager implements JournalManager {
|
||||||
"Can't finalize edits file " + inprogressFile + " since finalized file " +
|
"Can't finalize edits file " + inprogressFile + " since finalized file " +
|
||||||
"already exists");
|
"already exists");
|
||||||
if (!inprogressFile.renameTo(dstFile)) {
|
if (!inprogressFile.renameTo(dstFile)) {
|
||||||
|
storage.reportErrorsOnDirectory(sd);
|
||||||
throw new IllegalStateException("Unable to finalize edits file " + inprogressFile);
|
throw new IllegalStateException("Unable to finalize edits file " + inprogressFile);
|
||||||
}
|
}
|
||||||
if (inprogressFile.equals(currentInProgress)) {
|
if (inprogressFile.equals(currentInProgress)) {
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.io.File;
|
||||||
import java.io.FilenameFilter;
|
import java.io.FilenameFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
|
import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||||
|
@ -59,7 +60,7 @@ public class TestFileJournalManager {
|
||||||
|
|
||||||
long numJournals = 0;
|
long numJournals = 0;
|
||||||
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
|
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
|
||||||
FileJournalManager jm = new FileJournalManager(sd);
|
FileJournalManager jm = new FileJournalManager(sd, storage);
|
||||||
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true));
|
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true));
|
||||||
numJournals++;
|
numJournals++;
|
||||||
}
|
}
|
||||||
|
@ -79,7 +80,7 @@ public class TestFileJournalManager {
|
||||||
5, new AbortSpec(5, 0));
|
5, new AbortSpec(5, 0));
|
||||||
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
|
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
|
||||||
|
|
||||||
FileJournalManager jm = new FileJournalManager(sd);
|
FileJournalManager jm = new FileJournalManager(sd, storage);
|
||||||
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL,
|
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL,
|
||||||
jm.getNumberOfTransactions(1, true));
|
jm.getNumberOfTransactions(1, true));
|
||||||
}
|
}
|
||||||
|
@ -102,16 +103,16 @@ public class TestFileJournalManager {
|
||||||
5, new AbortSpec(5, 1));
|
5, new AbortSpec(5, 1));
|
||||||
Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
|
Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
|
||||||
StorageDirectory sd = dirs.next();
|
StorageDirectory sd = dirs.next();
|
||||||
FileJournalManager jm = new FileJournalManager(sd);
|
FileJournalManager jm = new FileJournalManager(sd, storage);
|
||||||
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true));
|
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true));
|
||||||
|
|
||||||
sd = dirs.next();
|
sd = dirs.next();
|
||||||
jm = new FileJournalManager(sd);
|
jm = new FileJournalManager(sd, storage);
|
||||||
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
|
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
|
||||||
true));
|
true));
|
||||||
|
|
||||||
sd = dirs.next();
|
sd = dirs.next();
|
||||||
jm = new FileJournalManager(sd);
|
jm = new FileJournalManager(sd, storage);
|
||||||
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true));
|
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,17 +136,17 @@ public class TestFileJournalManager {
|
||||||
new AbortSpec(5, 2));
|
new AbortSpec(5, 2));
|
||||||
Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
|
Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
|
||||||
StorageDirectory sd = dirs.next();
|
StorageDirectory sd = dirs.next();
|
||||||
FileJournalManager jm = new FileJournalManager(sd);
|
FileJournalManager jm = new FileJournalManager(sd, storage);
|
||||||
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
|
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
|
||||||
true));
|
true));
|
||||||
|
|
||||||
sd = dirs.next();
|
sd = dirs.next();
|
||||||
jm = new FileJournalManager(sd);
|
jm = new FileJournalManager(sd, storage);
|
||||||
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
|
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
|
||||||
true));
|
true));
|
||||||
|
|
||||||
sd = dirs.next();
|
sd = dirs.next();
|
||||||
jm = new FileJournalManager(sd);
|
jm = new FileJournalManager(sd, storage);
|
||||||
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
|
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1,
|
||||||
true));
|
true));
|
||||||
}
|
}
|
||||||
|
@ -161,6 +162,25 @@ public class TestFileJournalManager {
|
||||||
}
|
}
|
||||||
raf.close();
|
raf.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expected=IllegalStateException.class)
|
||||||
|
public void testFinalizeErrorReportedToNNStorage() throws IOException, InterruptedException {
|
||||||
|
File f = new File(TestEditLog.TEST_DIR + "/filejournaltestError");
|
||||||
|
// abort after 10th roll
|
||||||
|
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
|
||||||
|
10, new AbortSpec(10, 0));
|
||||||
|
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
|
||||||
|
|
||||||
|
FileJournalManager jm = new FileJournalManager(sd, storage);
|
||||||
|
String sdRootPath = sd.getRoot().getAbsolutePath();
|
||||||
|
FileUtil.chmod(sdRootPath, "-w", true);
|
||||||
|
try {
|
||||||
|
jm.finalizeLogSegment(0, 1);
|
||||||
|
} finally {
|
||||||
|
assertTrue(storage.getRemovedStorageDirs().contains(sd));
|
||||||
|
FileUtil.chmod(sdRootPath, "+w", true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that we can read from a stream created by FileJournalManager.
|
* Test that we can read from a stream created by FileJournalManager.
|
||||||
|
@ -176,7 +196,7 @@ public class TestFileJournalManager {
|
||||||
10, new AbortSpec(10, 0));
|
10, new AbortSpec(10, 0));
|
||||||
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
|
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
|
||||||
|
|
||||||
FileJournalManager jm = new FileJournalManager(sd);
|
FileJournalManager jm = new FileJournalManager(sd, storage);
|
||||||
long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL;
|
long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL;
|
||||||
assertEquals(expectedTotalTxnCount, jm.getNumberOfTransactions(1, true));
|
assertEquals(expectedTotalTxnCount, jm.getNumberOfTransactions(1, true));
|
||||||
|
|
||||||
|
@ -211,7 +231,7 @@ public class TestFileJournalManager {
|
||||||
10);
|
10);
|
||||||
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
|
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
|
||||||
|
|
||||||
FileJournalManager jm = new FileJournalManager(sd);
|
FileJournalManager jm = new FileJournalManager(sd, storage);
|
||||||
|
|
||||||
// 10 rolls, so 11 rolled files, 110 txids total.
|
// 10 rolls, so 11 rolled files, 110 txids total.
|
||||||
final int TOTAL_TXIDS = 10 * 11;
|
final int TOTAL_TXIDS = 10 * 11;
|
||||||
|
@ -248,7 +268,7 @@ public class TestFileJournalManager {
|
||||||
assertEquals(1, files.length);
|
assertEquals(1, files.length);
|
||||||
assertTrue(files[0].delete());
|
assertTrue(files[0].delete());
|
||||||
|
|
||||||
FileJournalManager jm = new FileJournalManager(sd);
|
FileJournalManager jm = new FileJournalManager(sd, storage);
|
||||||
assertEquals(startGapTxId-1, jm.getNumberOfTransactions(1, true));
|
assertEquals(startGapTxId-1, jm.getNumberOfTransactions(1, true));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -286,7 +306,7 @@ public class TestFileJournalManager {
|
||||||
|
|
||||||
corruptAfterStartSegment(files[0]);
|
corruptAfterStartSegment(files[0]);
|
||||||
|
|
||||||
FileJournalManager jm = new FileJournalManager(sd);
|
FileJournalManager jm = new FileJournalManager(sd, storage);
|
||||||
assertEquals(10*TXNS_PER_ROLL+1,
|
assertEquals(10*TXNS_PER_ROLL+1,
|
||||||
jm.getNumberOfTransactions(1, true));
|
jm.getNumberOfTransactions(1, true));
|
||||||
}
|
}
|
||||||
|
@ -300,7 +320,8 @@ public class TestFileJournalManager {
|
||||||
NNStorage.getInProgressEditsFileName(201),
|
NNStorage.getInProgressEditsFileName(201),
|
||||||
NNStorage.getFinalizedEditsFileName(1001, 1100));
|
NNStorage.getFinalizedEditsFileName(1001, 1100));
|
||||||
|
|
||||||
FileJournalManager fjm = new FileJournalManager(sd);
|
// passing null for NNStorage because this unit test will not use it
|
||||||
|
FileJournalManager fjm = new FileJournalManager(sd, null);
|
||||||
assertEquals("[1,100],[101,200],[1001,1100]", getLogsAsString(fjm, 1));
|
assertEquals("[1,100],[101,200],[1001,1100]", getLogsAsString(fjm, 1));
|
||||||
assertEquals("[101,200],[1001,1100]", getLogsAsString(fjm, 101));
|
assertEquals("[101,200],[1001,1100]", getLogsAsString(fjm, 101));
|
||||||
assertEquals("[1001,1100]", getLogsAsString(fjm, 201));
|
assertEquals("[1001,1100]", getLogsAsString(fjm, 201));
|
||||||
|
@ -336,7 +357,7 @@ public class TestFileJournalManager {
|
||||||
10);
|
10);
|
||||||
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
|
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
|
||||||
|
|
||||||
FileJournalManager jm = new FileJournalManager(sd);
|
FileJournalManager jm = new FileJournalManager(sd, storage);
|
||||||
|
|
||||||
EditLogInputStream elis = jm.getInputStream(5, true);
|
EditLogInputStream elis = jm.getInputStream(5, true);
|
||||||
FSEditLogOp op = elis.readOp();
|
FSEditLogOp op = elis.readOp();
|
||||||
|
@ -357,7 +378,7 @@ public class TestFileJournalManager {
|
||||||
10, false);
|
10, false);
|
||||||
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
|
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
|
||||||
|
|
||||||
FileJournalManager jm = new FileJournalManager(sd);
|
FileJournalManager jm = new FileJournalManager(sd, storage);
|
||||||
|
|
||||||
// If we exclude the in-progess stream, we should only have 100 tx.
|
// If we exclude the in-progess stream, we should only have 100 tx.
|
||||||
assertEquals(100, jm.getNumberOfTransactions(1, false));
|
assertEquals(100, jm.getNumberOfTransactions(1, false));
|
||||||
|
|
|
@ -292,8 +292,9 @@ public class TestNNStorageRetentionManager {
|
||||||
for (FakeRoot root : dirRoots.values()) {
|
for (FakeRoot root : dirRoots.values()) {
|
||||||
if (!root.type.isOfType(NameNodeDirType.EDITS)) continue;
|
if (!root.type.isOfType(NameNodeDirType.EDITS)) continue;
|
||||||
|
|
||||||
|
// passing null NNStorage for unit test because it does not use it
|
||||||
FileJournalManager fjm = new FileJournalManager(
|
FileJournalManager fjm = new FileJournalManager(
|
||||||
root.mockStorageDir());
|
root.mockStorageDir(), null);
|
||||||
fjm.purger = purger;
|
fjm.purger = purger;
|
||||||
jms.add(fjm);
|
jms.add(fjm);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue