HDFS-3693. JNStorage should read its storage info even before a writer becomes active. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1365794 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d2d0736de4
commit
b17018e4b8
|
@ -6,3 +6,5 @@ HDFS-3077. Quorum-based protocol for reading and writing edit logs. Contributed
|
||||||
HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary (todd)
|
HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary (todd)
|
||||||
|
|
||||||
HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs (todd)
|
HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs (todd)
|
||||||
|
|
||||||
|
HDFS-3693. JNStorage should read its storage info even before a writer becomes active (todd)
|
||||||
|
|
|
@ -120,12 +120,13 @@ public class GetJournalEditServlet extends HttpServlet {
|
||||||
|
|
||||||
if (theirStorageInfoString != null
|
if (theirStorageInfoString != null
|
||||||
&& !myStorageInfoString.equals(theirStorageInfoString)) {
|
&& !myStorageInfoString.equals(theirStorageInfoString)) {
|
||||||
response.sendError(HttpServletResponse.SC_FORBIDDEN,
|
String msg = "This node has storage info '" + myStorageInfoString
|
||||||
"This node has storage info " + myStorageInfoString
|
+ "' but the requesting node expected '"
|
||||||
+ " but the requesting node expected "
|
+ theirStorageInfoString + "'";
|
||||||
+ theirStorageInfoString);
|
|
||||||
LOG.warn("Received an invalid request file transfer request "
|
response.sendError(HttpServletResponse.SC_FORBIDDEN, msg);
|
||||||
+ " with storage info " + theirStorageInfoString);
|
LOG.warn("Received an invalid request file transfer request from " +
|
||||||
|
request.getRemoteAddr() + ": " + msg);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link Storage} implementation for the {@link JournalNode}.
|
* A {@link Storage} implementation for the {@link JournalNode}.
|
||||||
*
|
*
|
||||||
|
@ -38,18 +40,21 @@ class JNStorage extends Storage {
|
||||||
|
|
||||||
private final FileJournalManager fjm;
|
private final FileJournalManager fjm;
|
||||||
private final StorageDirectory sd;
|
private final StorageDirectory sd;
|
||||||
private boolean lazyInitted = false;
|
private StorageState state;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param logDir the path to the directory in which data will be stored
|
* @param logDir the path to the directory in which data will be stored
|
||||||
* @param errorReporter a callback to report errors
|
* @param errorReporter a callback to report errors
|
||||||
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
protected JNStorage(File logDir, StorageErrorReporter errorReporter) {
|
protected JNStorage(File logDir, StorageErrorReporter errorReporter) throws IOException {
|
||||||
super(NodeType.JOURNAL_NODE);
|
super(NodeType.JOURNAL_NODE);
|
||||||
|
|
||||||
sd = new StorageDirectory(logDir);
|
sd = new StorageDirectory(logDir);
|
||||||
this.addStorageDir(sd);
|
this.addStorageDir(sd);
|
||||||
this.fjm = new FileJournalManager(sd, errorReporter);
|
this.fjm = new FileJournalManager(sd, errorReporter);
|
||||||
|
|
||||||
|
analyzeStorage();
|
||||||
}
|
}
|
||||||
|
|
||||||
FileJournalManager getJournalManager() {
|
FileJournalManager getJournalManager() {
|
||||||
|
@ -107,34 +112,27 @@ class JNStorage extends Storage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void analyzeStorage(NamespaceInfo nsInfo) throws IOException {
|
public void formatIfNecessary(NamespaceInfo nsInfo) throws IOException {
|
||||||
if (lazyInitted) {
|
if (state == StorageState.NOT_FORMATTED ||
|
||||||
checkConsistentNamespace(nsInfo);
|
state == StorageState.NON_EXISTENT) {
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
StorageState state = sd.analyzeStorage(StartupOption.REGULAR, this);
|
|
||||||
switch (state) {
|
|
||||||
case NON_EXISTENT:
|
|
||||||
case NOT_FORMATTED:
|
|
||||||
format(nsInfo);
|
format(nsInfo);
|
||||||
// In the NORMAL case below, analyzeStorage() has already locked the
|
analyzeStorage();
|
||||||
// directory for us. But in the case that we format it, we have to
|
assert state == StorageState.NORMAL :
|
||||||
// lock it here.
|
"Unexpected state after formatting: " + state;
|
||||||
// The directory is unlocked in close() when the node shuts down.
|
} else {
|
||||||
sd.lock();
|
Preconditions.checkState(state == StorageState.NORMAL,
|
||||||
break;
|
"Unhandled storage state in %s: %s", this, state);
|
||||||
case NORMAL:
|
assert getNamespaceID() != 0;
|
||||||
// Storage directory is already locked by analyzeStorage() - no
|
|
||||||
// need to lock it here.
|
|
||||||
readProperties(sd);
|
|
||||||
checkConsistentNamespace(nsInfo);
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
checkConsistentNamespace(nsInfo);
|
||||||
LOG.warn("TODO: unhandled state for storage dir " + sd + ": " + state);
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void analyzeStorage() throws IOException {
|
||||||
|
this.state = sd.analyzeStorage(StartupOption.REGULAR, this);
|
||||||
|
if (state == StorageState.NORMAL) {
|
||||||
|
readProperties(sd);
|
||||||
}
|
}
|
||||||
lazyInitted = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkConsistentNamespace(NamespaceInfo nsInfo)
|
private void checkConsistentNamespace(NamespaceInfo nsInfo)
|
||||||
|
|
|
@ -79,7 +79,7 @@ class Journal implements Closeable {
|
||||||
|
|
||||||
private final FileJournalManager fjm;
|
private final FileJournalManager fjm;
|
||||||
|
|
||||||
Journal(File logDir, StorageErrorReporter errorReporter) {
|
Journal(File logDir, StorageErrorReporter errorReporter) throws IOException {
|
||||||
storage = new JNStorage(logDir, errorReporter);
|
storage = new JNStorage(logDir, errorReporter);
|
||||||
|
|
||||||
File currentDir = storage.getSingularStorageDir().getCurrentDir();
|
File currentDir = storage.getSingularStorageDir().getCurrentDir();
|
||||||
|
@ -152,7 +152,7 @@ class Journal implements Closeable {
|
||||||
|
|
||||||
// If the storage is unformatted, format it with this NS.
|
// If the storage is unformatted, format it with this NS.
|
||||||
// Otherwise, check that the NN's nsinfo matches the storage.
|
// Otherwise, check that the NN's nsinfo matches the storage.
|
||||||
storage.analyzeStorage(nsInfo);
|
storage.formatIfNecessary(nsInfo);
|
||||||
|
|
||||||
if (epoch <= getLastPromisedEpoch()) {
|
if (epoch <= getLastPromisedEpoch()) {
|
||||||
throw new IOException("Proposed epoch " + epoch + " <= last promise " +
|
throw new IOException("Proposed epoch " + epoch + " <= last promise " +
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class JournalNode implements Tool, Configurable {
|
||||||
*/
|
*/
|
||||||
private int resultCode = 0;
|
private int resultCode = 0;
|
||||||
|
|
||||||
synchronized Journal getOrCreateJournal(String jid) {
|
synchronized Journal getOrCreateJournal(String jid) throws IOException {
|
||||||
QuorumJournalManager.checkJournalId(jid);
|
QuorumJournalManager.checkJournalId(jid);
|
||||||
|
|
||||||
Journal journal = journalsById.get(jid);
|
Journal journal = journalsById.get(jid);
|
||||||
|
|
|
@ -115,7 +115,8 @@ public class JournalNodeHttpServer {
|
||||||
DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY);
|
DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Journal getJournalFromContext(ServletContext context, String jid) {
|
public static Journal getJournalFromContext(ServletContext context, String jid)
|
||||||
|
throws IOException {
|
||||||
JournalNode jn = (JournalNode)context.getAttribute(JN_ATTRIBUTE_KEY);
|
JournalNode jn = (JournalNode)context.getAttribute(JN_ATTRIBUTE_KEY);
|
||||||
return jn.getOrCreateJournal(jid);
|
return jn.getOrCreateJournal(jid);
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,10 +112,17 @@ public class TestJournal {
|
||||||
QJMTestUtil.createTxnData(1, 2));
|
QJMTestUtil.createTxnData(1, 2));
|
||||||
// Don't finalize.
|
// Don't finalize.
|
||||||
|
|
||||||
|
String storageString = journal.getStorage().toColonSeparatedString();
|
||||||
|
System.err.println("storage string: " + storageString);
|
||||||
journal.close(); // close to unlock the storage dir
|
journal.close(); // close to unlock the storage dir
|
||||||
|
|
||||||
// Now re-instantiate, make sure history is still there
|
// Now re-instantiate, make sure history is still there
|
||||||
journal = new Journal(TEST_LOG_DIR, mockErrorReporter);
|
journal = new Journal(TEST_LOG_DIR, mockErrorReporter);
|
||||||
|
|
||||||
|
// The storage info should be read, even if no writer has taken over.
|
||||||
|
assertEquals(storageString,
|
||||||
|
journal.getStorage().toColonSeparatedString());
|
||||||
|
|
||||||
assertEquals(1, journal.getLastPromisedEpoch());
|
assertEquals(1, journal.getLastPromisedEpoch());
|
||||||
NewEpochResponseProtoOrBuilder newEpoch = journal.newEpoch(FAKE_NSINFO, 2);
|
NewEpochResponseProtoOrBuilder newEpoch = journal.newEpoch(FAKE_NSINFO, 2);
|
||||||
assertEquals(1, newEpoch.getLastSegmentTxId());
|
assertEquals(1, newEpoch.getLastSegmentTxId());
|
||||||
|
@ -135,9 +142,8 @@ public class TestJournal {
|
||||||
// Journal should be locked
|
// Journal should be locked
|
||||||
GenericTestUtils.assertExists(lockFile);
|
GenericTestUtils.assertExists(lockFile);
|
||||||
|
|
||||||
Journal journal2 = new Journal(TEST_LOG_DIR, mockErrorReporter);
|
|
||||||
try {
|
try {
|
||||||
journal2.newEpoch(FAKE_NSINFO, 2);
|
new Journal(TEST_LOG_DIR, mockErrorReporter);
|
||||||
fail("Did not fail to create another journal in same dir");
|
fail("Did not fail to create another journal in same dir");
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
GenericTestUtils.assertExceptionContains(
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
@ -147,6 +153,7 @@ public class TestJournal {
|
||||||
journal.close();
|
journal.close();
|
||||||
|
|
||||||
// Journal should no longer be locked after the close() call.
|
// Journal should no longer be locked after the close() call.
|
||||||
|
Journal journal2 = new Journal(TEST_LOG_DIR, mockErrorReporter);
|
||||||
journal2.newEpoch(FAKE_NSINFO, 2);
|
journal2.newEpoch(FAKE_NSINFO, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue