HDFS-5840. Merge r1581260 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1581261 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
590df94253
commit
8b39fc9076
|
@ -448,6 +448,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-5846. Assigning DEFAULT_RACK in resolveNetworkLocation method can break
|
HDFS-5846. Assigning DEFAULT_RACK in resolveNetworkLocation method can break
|
||||||
data resiliency. (Nikola Vujic via cnauroth)
|
data resiliency. (Nikola Vujic via cnauroth)
|
||||||
|
|
||||||
|
HDFS-5840. Follow-up to HDFS-5138 to improve error handling during partial
|
||||||
|
upgrade failures. (atm, jing9 and suresh via jing9)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
|
|
|
@ -1714,23 +1714,19 @@ public class DFSUtil {
|
||||||
*
|
*
|
||||||
* @param objects the collection of objects to check for equality.
|
* @param objects the collection of objects to check for equality.
|
||||||
*/
|
*/
|
||||||
public static void assertAllResultsEqual(Collection<?> objects) {
|
public static void assertAllResultsEqual(Collection<?> objects)
|
||||||
Object[] resultsArray = objects.toArray();
|
throws AssertionError {
|
||||||
|
if (objects.size() == 0 || objects.size() == 1)
|
||||||
if (resultsArray.length == 0)
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
for (int i = 0; i < resultsArray.length; i++) {
|
Object[] resultsArray = objects.toArray();
|
||||||
if (i == 0)
|
for (int i = 1; i < resultsArray.length; i++) {
|
||||||
continue;
|
Object currElement = resultsArray[i];
|
||||||
else {
|
Object lastElement = resultsArray[i - 1];
|
||||||
Object currElement = resultsArray[i];
|
if ((currElement == null && currElement != lastElement) ||
|
||||||
Object lastElement = resultsArray[i - 1];
|
(currElement != null && !currElement.equals(lastElement))) {
|
||||||
if ((currElement == null && currElement != lastElement) ||
|
throw new AssertionError("Not all elements match in results: " +
|
||||||
(currElement != null && !currElement.equals(lastElement))) {
|
Arrays.toString(resultsArray));
|
||||||
throw new AssertionError("Not all elements match in results: " +
|
|
||||||
Arrays.toString(resultsArray));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -571,7 +571,11 @@ public class QuorumJournalManager implements JournalManager {
|
||||||
|
|
||||||
// Either they all return the same thing or this call fails, so we can
|
// Either they all return the same thing or this call fails, so we can
|
||||||
// just return the first result.
|
// just return the first result.
|
||||||
DFSUtil.assertAllResultsEqual(call.getResults().values());
|
try {
|
||||||
|
DFSUtil.assertAllResultsEqual(call.getResults().values());
|
||||||
|
} catch (AssertionError ae) {
|
||||||
|
throw new IOException("Results differed for canRollBack", ae);
|
||||||
|
}
|
||||||
for (Boolean result : call.getResults().values()) {
|
for (Boolean result : call.getResults().values()) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -617,7 +621,11 @@ public class QuorumJournalManager implements JournalManager {
|
||||||
|
|
||||||
// Either they all return the same thing or this call fails, so we can
|
// Either they all return the same thing or this call fails, so we can
|
||||||
// just return the first result.
|
// just return the first result.
|
||||||
DFSUtil.assertAllResultsEqual(call.getResults().values());
|
try {
|
||||||
|
DFSUtil.assertAllResultsEqual(call.getResults().values());
|
||||||
|
} catch (AssertionError ae) {
|
||||||
|
throw new IOException("Results differed for getJournalCTime", ae);
|
||||||
|
}
|
||||||
for (Long result : call.getResults().values()) {
|
for (Long result : call.getResults().values()) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,15 +65,15 @@ class JNStorage extends Storage {
|
||||||
* @param errorReporter a callback to report errors
|
* @param errorReporter a callback to report errors
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
protected JNStorage(Configuration conf, File logDir,
|
protected JNStorage(Configuration conf, File logDir, StartupOption startOpt,
|
||||||
StorageErrorReporter errorReporter) throws IOException {
|
StorageErrorReporter errorReporter) throws IOException {
|
||||||
super(NodeType.JOURNAL_NODE);
|
super(NodeType.JOURNAL_NODE);
|
||||||
|
|
||||||
sd = new StorageDirectory(logDir);
|
sd = new StorageDirectory(logDir);
|
||||||
this.addStorageDir(sd);
|
this.addStorageDir(sd);
|
||||||
this.fjm = new FileJournalManager(conf, sd, errorReporter);
|
this.fjm = new FileJournalManager(conf, sd, errorReporter);
|
||||||
|
|
||||||
analyzeStorage();
|
analyzeAndRecoverStorage(startOpt);
|
||||||
}
|
}
|
||||||
|
|
||||||
FileJournalManager getJournalManager() {
|
FileJournalManager getJournalManager() {
|
||||||
|
@ -216,6 +216,18 @@ class JNStorage extends Storage {
|
||||||
layoutVersion = lv;
|
layoutVersion = lv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void analyzeAndRecoverStorage(StartupOption startOpt) throws IOException {
|
||||||
|
this.state = sd.analyzeStorage(startOpt, this);
|
||||||
|
final boolean needRecover = state != StorageState.NORMAL
|
||||||
|
&& state != StorageState.NON_EXISTENT
|
||||||
|
&& state != StorageState.NOT_FORMATTED;
|
||||||
|
if (state == StorageState.NORMAL && startOpt != StartupOption.ROLLBACK) {
|
||||||
|
readProperties(sd);
|
||||||
|
} else if (needRecover) {
|
||||||
|
sd.doRecover(state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void checkConsistentNamespace(NamespaceInfo nsInfo)
|
void checkConsistentNamespace(NamespaceInfo nsInfo)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (nsInfo.getNamespaceID() != getNamespaceID()) {
|
if (nsInfo.getNamespaceID() != getNamespaceID()) {
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.Persisted
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
|
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
|
||||||
|
@ -138,8 +139,9 @@ public class Journal implements Closeable {
|
||||||
private static final int WARN_SYNC_MILLIS_THRESHOLD = 1000;
|
private static final int WARN_SYNC_MILLIS_THRESHOLD = 1000;
|
||||||
|
|
||||||
Journal(Configuration conf, File logDir, String journalId,
|
Journal(Configuration conf, File logDir, String journalId,
|
||||||
StorageErrorReporter errorReporter) throws IOException {
|
StartupOption startOpt, StorageErrorReporter errorReporter)
|
||||||
storage = new JNStorage(conf, logDir, errorReporter);
|
throws IOException {
|
||||||
|
storage = new JNStorage(conf, logDir, startOpt, errorReporter);
|
||||||
this.journalId = journalId;
|
this.journalId = journalId;
|
||||||
|
|
||||||
refreshCachedData();
|
refreshCachedData();
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
|
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
@ -77,19 +78,24 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
||||||
*/
|
*/
|
||||||
private int resultCode = 0;
|
private int resultCode = 0;
|
||||||
|
|
||||||
synchronized Journal getOrCreateJournal(String jid) throws IOException {
|
synchronized Journal getOrCreateJournal(String jid, StartupOption startOpt)
|
||||||
|
throws IOException {
|
||||||
QuorumJournalManager.checkJournalId(jid);
|
QuorumJournalManager.checkJournalId(jid);
|
||||||
|
|
||||||
Journal journal = journalsById.get(jid);
|
Journal journal = journalsById.get(jid);
|
||||||
if (journal == null) {
|
if (journal == null) {
|
||||||
File logDir = getLogDir(jid);
|
File logDir = getLogDir(jid);
|
||||||
LOG.info("Initializing journal in directory " + logDir);
|
LOG.info("Initializing journal in directory " + logDir);
|
||||||
journal = new Journal(conf, logDir, jid, new ErrorReporter());
|
journal = new Journal(conf, logDir, jid, startOpt, new ErrorReporter());
|
||||||
journalsById.put(jid, journal);
|
journalsById.put(jid, journal);
|
||||||
}
|
}
|
||||||
|
|
||||||
return journal;
|
return journal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Journal getOrCreateJournal(String jid) throws IOException {
|
||||||
|
return getOrCreateJournal(jid, StartupOption.REGULAR);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setConf(Configuration conf) {
|
public void setConf(Configuration conf) {
|
||||||
|
@ -306,12 +312,12 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
||||||
|
|
||||||
public Boolean canRollBack(String journalId, StorageInfo storage,
|
public Boolean canRollBack(String journalId, StorageInfo storage,
|
||||||
StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
|
StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
|
||||||
return getOrCreateJournal(journalId).canRollBack(storage, prevStorage,
|
return getOrCreateJournal(journalId, StartupOption.ROLLBACK).canRollBack(
|
||||||
targetLayoutVersion);
|
storage, prevStorage, targetLayoutVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void doRollback(String journalId) throws IOException {
|
public void doRollback(String journalId) throws IOException {
|
||||||
getOrCreateJournal(journalId).doRollback();
|
getOrCreateJournal(journalId, StartupOption.ROLLBACK).doRollback();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Long getJournalCTime(String journalId) throws IOException {
|
public Long getJournalCTime(String journalId) throws IOException {
|
||||||
|
|
|
@ -44,9 +44,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
|
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
|
||||||
|
@ -1367,7 +1367,7 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized boolean canRollBackSharedLog(Storage prevStorage,
|
public synchronized boolean canRollBackSharedLog(StorageInfo prevStorage,
|
||||||
int targetLayoutVersion) throws IOException {
|
int targetLayoutVersion) throws IOException {
|
||||||
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
|
for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
|
||||||
if (jas.isShared()) {
|
if (jas.isShared()) {
|
||||||
|
|
|
@ -393,6 +393,10 @@ public class FSImage implements Closeable {
|
||||||
|
|
||||||
saveFSImageInAllDirs(target, editLog.getLastWrittenTxId());
|
saveFSImageInAllDirs(target, editLog.getLastWrittenTxId());
|
||||||
|
|
||||||
|
// upgrade shared edit storage first
|
||||||
|
if (target.isHaEnabled()) {
|
||||||
|
editLog.doUpgradeOfSharedLog();
|
||||||
|
}
|
||||||
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
|
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
|
||||||
StorageDirectory sd = it.next();
|
StorageDirectory sd = it.next();
|
||||||
try {
|
try {
|
||||||
|
@ -402,9 +406,6 @@ public class FSImage implements Closeable {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (target.isHaEnabled()) {
|
|
||||||
editLog.doUpgradeOfSharedLog();
|
|
||||||
}
|
|
||||||
storage.reportErrorsOnDirectories(errorSDs);
|
storage.reportErrorsOnDirectories(errorSDs);
|
||||||
|
|
||||||
isUpgradeFinalized = false;
|
isUpgradeFinalized = false;
|
||||||
|
@ -430,14 +431,19 @@ public class FSImage implements Closeable {
|
||||||
HdfsConstants.NAMENODE_LAYOUT_VERSION)) {
|
HdfsConstants.NAMENODE_LAYOUT_VERSION)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
LOG.info("Can perform rollback for " + sd);
|
||||||
canRollback = true;
|
canRollback = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fsns.isHaEnabled()) {
|
if (fsns.isHaEnabled()) {
|
||||||
// If HA is enabled, check if the shared log can be rolled back as well.
|
// If HA is enabled, check if the shared log can be rolled back as well.
|
||||||
editLog.initJournalsForWrite();
|
editLog.initJournalsForWrite();
|
||||||
canRollback |= editLog.canRollBackSharedLog(prevState.getStorage(),
|
boolean canRollBackSharedEditLog = editLog.canRollBackSharedLog(
|
||||||
HdfsConstants.NAMENODE_LAYOUT_VERSION);
|
prevState.getStorage(), HdfsConstants.NAMENODE_LAYOUT_VERSION);
|
||||||
|
if (canRollBackSharedEditLog) {
|
||||||
|
LOG.info("Can perform rollback for shared edit log.");
|
||||||
|
canRollback = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!canRollback)
|
if (!canRollback)
|
||||||
|
|
|
@ -26,6 +26,8 @@ import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
abstract class NNUpgradeUtil {
|
abstract class NNUpgradeUtil {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(NNUpgradeUtil.class);
|
private static final Log LOG = LogFactory.getLog(NNUpgradeUtil.class);
|
||||||
|
@ -82,7 +84,8 @@ abstract class NNUpgradeUtil {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG.info("Finalizing upgrade of storage directory " + sd.getRoot());
|
LOG.info("Finalizing upgrade of storage directory " + sd.getRoot());
|
||||||
assert sd.getCurrentDir().exists() : "Current directory must exist.";
|
Preconditions.checkState(sd.getCurrentDir().exists(),
|
||||||
|
"Current directory must exist.");
|
||||||
final File tmpDir = sd.getFinalizedTmp();
|
final File tmpDir = sd.getFinalizedTmp();
|
||||||
// rename previous to tmp and remove
|
// rename previous to tmp and remove
|
||||||
NNStorage.rename(prevDir, tmpDir);
|
NNStorage.rename(prevDir, tmpDir);
|
||||||
|
@ -105,9 +108,14 @@ abstract class NNUpgradeUtil {
|
||||||
File curDir = sd.getCurrentDir();
|
File curDir = sd.getCurrentDir();
|
||||||
File prevDir = sd.getPreviousDir();
|
File prevDir = sd.getPreviousDir();
|
||||||
File tmpDir = sd.getPreviousTmp();
|
File tmpDir = sd.getPreviousTmp();
|
||||||
assert curDir.exists() : "Current directory must exist.";
|
|
||||||
assert !prevDir.exists() : "previous directory must not exist.";
|
Preconditions.checkState(curDir.exists(),
|
||||||
assert !tmpDir.exists() : "previous.tmp directory must not exist.";
|
"Current directory must exist for preupgrade.");
|
||||||
|
Preconditions.checkState(!prevDir.exists(),
|
||||||
|
"Previous directory must not exist for preupgrade.");
|
||||||
|
Preconditions.checkState(!tmpDir.exists(),
|
||||||
|
"Previous.tmp directory must not exist for preupgrade."
|
||||||
|
+ "Consider restarting for recovery.");
|
||||||
|
|
||||||
// rename current to tmp
|
// rename current to tmp
|
||||||
NNStorage.rename(curDir, tmpDir);
|
NNStorage.rename(curDir, tmpDir);
|
||||||
|
@ -136,6 +144,11 @@ abstract class NNUpgradeUtil {
|
||||||
|
|
||||||
File prevDir = sd.getPreviousDir();
|
File prevDir = sd.getPreviousDir();
|
||||||
File tmpDir = sd.getPreviousTmp();
|
File tmpDir = sd.getPreviousTmp();
|
||||||
|
Preconditions.checkState(!prevDir.exists(),
|
||||||
|
"previous directory must not exist for upgrade.");
|
||||||
|
Preconditions.checkState(tmpDir.exists(),
|
||||||
|
"previous.tmp directory must exist for upgrade.");
|
||||||
|
|
||||||
// rename tmp to previous
|
// rename tmp to previous
|
||||||
NNStorage.rename(tmpDir, prevDir);
|
NNStorage.rename(tmpDir, prevDir);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
@ -154,14 +167,19 @@ abstract class NNUpgradeUtil {
|
||||||
static void doRollBack(StorageDirectory sd)
|
static void doRollBack(StorageDirectory sd)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
File prevDir = sd.getPreviousDir();
|
File prevDir = sd.getPreviousDir();
|
||||||
if (!prevDir.exists())
|
if (!prevDir.exists()) {
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
File tmpDir = sd.getRemovedTmp();
|
File tmpDir = sd.getRemovedTmp();
|
||||||
assert !tmpDir.exists() : "removed.tmp directory must not exist.";
|
Preconditions.checkState(!tmpDir.exists(),
|
||||||
|
"removed.tmp directory must not exist for rollback."
|
||||||
|
+ "Consider restarting for recovery.");
|
||||||
// rename current to tmp
|
// rename current to tmp
|
||||||
File curDir = sd.getCurrentDir();
|
File curDir = sd.getCurrentDir();
|
||||||
assert curDir.exists() : "Current directory must exist.";
|
Preconditions.checkState(curDir.exists(),
|
||||||
|
"Current directory must exist for rollback.");
|
||||||
|
|
||||||
NNStorage.rename(curDir, tmpDir);
|
NNStorage.rename(curDir, tmpDir);
|
||||||
// rename previous to current
|
// rename previous to current
|
||||||
NNStorage.rename(prevDir, curDir);
|
NNStorage.rename(prevDir, curDir);
|
||||||
|
|
|
@ -780,14 +780,19 @@ digest:hdfs-zkfcs:vlUvLnd8MlacsE80rDuu6ONESbM=:rwcda
|
||||||
|
|
||||||
[[1]] Shut down all of the NNs as normal, and install the newer software.
|
[[1]] Shut down all of the NNs as normal, and install the newer software.
|
||||||
|
|
||||||
[[2]] Start one of the NNs with the <<<'-upgrade'>>> flag.
|
[[2]] Start up all of the JNs. Note that it is <<critical>> that all the
|
||||||
|
JNs be running when performing the upgrade, rollback, or finalization
|
||||||
|
operations. If any of the JNs are down at the time of running any of these
|
||||||
|
operations, the operation will fail.
|
||||||
|
|
||||||
|
[[3]] Start one of the NNs with the <<<'-upgrade'>>> flag.
|
||||||
|
|
||||||
[[3]] On start, this NN will not enter the standby state as usual in an HA
|
[[4]] On start, this NN will not enter the standby state as usual in an HA
|
||||||
setup. Rather, this NN will immediately enter the active state, perform an
|
setup. Rather, this NN will immediately enter the active state, perform an
|
||||||
upgrade of its local storage dirs, and also perform an upgrade of the shared
|
upgrade of its local storage dirs, and also perform an upgrade of the shared
|
||||||
edit log.
|
edit log.
|
||||||
|
|
||||||
[[4]] At this point the other NN in the HA pair will be out of sync with
|
[[5]] At this point the other NN in the HA pair will be out of sync with
|
||||||
the upgraded NN. In order to bring it back in sync and once again have a highly
|
the upgraded NN. In order to bring it back in sync and once again have a highly
|
||||||
available setup, you should re-bootstrap this NameNode by running the NN with
|
available setup, you should re-bootstrap this NameNode by running the NN with
|
||||||
the <<<'-bootstrapStandby'>>> flag. It is an error to start this second NN with
|
the <<<'-bootstrapStandby'>>> flag. It is an error to start this second NN with
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochR
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
|
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
||||||
|
@ -70,7 +71,7 @@ public class TestJournal {
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
FileUtil.fullyDelete(TEST_LOG_DIR);
|
FileUtil.fullyDelete(TEST_LOG_DIR);
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
journal = new Journal(conf, TEST_LOG_DIR, JID,
|
journal = new Journal(conf, TEST_LOG_DIR, JID, StartupOption.REGULAR,
|
||||||
mockErrorReporter);
|
mockErrorReporter);
|
||||||
journal.format(FAKE_NSINFO);
|
journal.format(FAKE_NSINFO);
|
||||||
}
|
}
|
||||||
|
@ -179,7 +180,8 @@ public class TestJournal {
|
||||||
journal.close(); // close to unlock the storage dir
|
journal.close(); // close to unlock the storage dir
|
||||||
|
|
||||||
// Now re-instantiate, make sure history is still there
|
// Now re-instantiate, make sure history is still there
|
||||||
journal = new Journal(conf, TEST_LOG_DIR, JID, mockErrorReporter);
|
journal = new Journal(conf, TEST_LOG_DIR, JID, StartupOption.REGULAR,
|
||||||
|
mockErrorReporter);
|
||||||
|
|
||||||
// The storage info should be read, even if no writer has taken over.
|
// The storage info should be read, even if no writer has taken over.
|
||||||
assertEquals(storageString,
|
assertEquals(storageString,
|
||||||
|
@ -239,7 +241,8 @@ public class TestJournal {
|
||||||
|
|
||||||
journal.newEpoch(FAKE_NSINFO, 1);
|
journal.newEpoch(FAKE_NSINFO, 1);
|
||||||
try {
|
try {
|
||||||
new Journal(conf, TEST_LOG_DIR, JID, mockErrorReporter);
|
new Journal(conf, TEST_LOG_DIR, JID, StartupOption.REGULAR,
|
||||||
|
mockErrorReporter);
|
||||||
fail("Did not fail to create another journal in same dir");
|
fail("Did not fail to create another journal in same dir");
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
GenericTestUtils.assertExceptionContains(
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
@ -250,7 +253,8 @@ public class TestJournal {
|
||||||
|
|
||||||
// Journal should no longer be locked after the close() call.
|
// Journal should no longer be locked after the close() call.
|
||||||
// Hence, should be able to create a new Journal in the same dir.
|
// Hence, should be able to create a new Journal in the same dir.
|
||||||
Journal journal2 = new Journal(conf, TEST_LOG_DIR, JID, mockErrorReporter);
|
Journal journal2 = new Journal(conf, TEST_LOG_DIR, JID,
|
||||||
|
StartupOption.REGULAR, mockErrorReporter);
|
||||||
journal2.newEpoch(FAKE_NSINFO, 2);
|
journal2.newEpoch(FAKE_NSINFO, 2);
|
||||||
journal2.close();
|
journal2.close();
|
||||||
}
|
}
|
||||||
|
@ -279,7 +283,8 @@ public class TestJournal {
|
||||||
// Check that, even if we re-construct the journal by scanning the
|
// Check that, even if we re-construct the journal by scanning the
|
||||||
// disk, we don't allow finalizing incorrectly.
|
// disk, we don't allow finalizing incorrectly.
|
||||||
journal.close();
|
journal.close();
|
||||||
journal = new Journal(conf, TEST_LOG_DIR, JID, mockErrorReporter);
|
journal = new Journal(conf, TEST_LOG_DIR, JID, StartupOption.REGULAR,
|
||||||
|
mockErrorReporter);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
journal.finalizeLogSegment(makeRI(4), 1, 6);
|
journal.finalizeLogSegment(makeRI(4), 1, 6);
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -41,8 +42,12 @@ import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
||||||
|
@ -558,6 +563,45 @@ public class TestHAStateTransitions {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test also serves to test
|
||||||
|
* {@link HAUtil#getProxiesForAllNameNodesInNameservice(Configuration, String)} and
|
||||||
|
* {@link DFSUtil#getRpcAddressesForNameserviceId(Configuration, String, String)}
|
||||||
|
* by virtue of the fact that it wouldn't work properly if the proxies
|
||||||
|
* returned were not for the correct NNs.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testIsAtLeastOneActive() throws Exception {
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration())
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||||
|
.numDataNodes(0)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
HATestUtil.setFailoverConfigurations(cluster, conf);
|
||||||
|
|
||||||
|
List<ClientProtocol> namenodes =
|
||||||
|
HAUtil.getProxiesForAllNameNodesInNameservice(conf,
|
||||||
|
HATestUtil.getLogicalHostname(cluster));
|
||||||
|
|
||||||
|
assertEquals(2, namenodes.size());
|
||||||
|
|
||||||
|
assertFalse(HAUtil.isAtLeastOneActive(namenodes));
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
assertTrue(HAUtil.isAtLeastOneActive(namenodes));
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
assertFalse(HAUtil.isAtLeastOneActive(namenodes));
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
assertTrue(HAUtil.isAtLeastOneActive(namenodes));
|
||||||
|
cluster.transitionToStandby(1);
|
||||||
|
assertFalse(HAUtil.isAtLeastOneActive(namenodes));
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private boolean isDTRunning(NameNode nn) {
|
private boolean isDTRunning(NameNode nn) {
|
||||||
return NameNodeAdapter.getDtSecretManager(nn.getNamesystem()).isRunning();
|
return NameNodeAdapter.getDtSecretManager(nn.getNamesystem()).isRunning();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue