HDFS-5080. Merge change r1514386 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1514391 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2013-08-15 17:33:31 +00:00
parent ed114b4b13
commit 5155be23e3
19 changed files with 265 additions and 52 deletions

View File

@ -107,6 +107,9 @@ Release 2.1.1-beta - UNRELEASED
HDFS-5093. TestGlobPaths should re-use the MiniDFSCluster to avoid failure HDFS-5093. TestGlobPaths should re-use the MiniDFSCluster to avoid failure
on Windows. (Chuan Liu via cnauroth) on Windows. (Chuan Liu via cnauroth)
HDFS-5080. BootstrapStandby not working with QJM when the existing NN is
active. (jing9)
Release 2.1.0-beta - 2013-08-06 Release 2.1.0-beta - 2013-08-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -109,7 +109,7 @@ public ListenableFuture<Void> finalizeLogSegment(
* Fetch the list of edit logs available on the remote node. * Fetch the list of edit logs available on the remote node.
*/ */
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest( public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
long fromTxnId, boolean forReading); long fromTxnId, boolean forReading, boolean inProgressOk);
/** /**
* Prepare recovery. See the HDFS-3077 design document for details. * Prepare recovery. See the HDFS-3077 design document for details.

View File

@ -262,14 +262,14 @@ public QuorumCall<AsyncLogger, Void> sendEdits(
return QuorumCall.create(calls); return QuorumCall.create(calls);
} }
public QuorumCall<AsyncLogger, RemoteEditLogManifest> public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
getEditLogManifest(long fromTxnId, boolean forReading) { long fromTxnId, boolean forReading, boolean inProgressOk) {
Map<AsyncLogger, Map<AsyncLogger,
ListenableFuture<RemoteEditLogManifest>> calls ListenableFuture<RemoteEditLogManifest>> calls
= Maps.newHashMap(); = Maps.newHashMap();
for (AsyncLogger logger : loggers) { for (AsyncLogger logger : loggers) {
ListenableFuture<RemoteEditLogManifest> future = ListenableFuture<RemoteEditLogManifest> future =
logger.getEditLogManifest(fromTxnId, forReading); logger.getEditLogManifest(fromTxnId, forReading, inProgressOk);
calls.put(logger, future); calls.put(logger, future);
} }
return QuorumCall.create(calls); return QuorumCall.create(calls);

View File

@ -519,12 +519,13 @@ public Void call() throws Exception {
@Override @Override
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest( public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
final long fromTxnId, final boolean forReading) { final long fromTxnId, final boolean forReading,
final boolean inProgressOk) {
return executor.submit(new Callable<RemoteEditLogManifest>() { return executor.submit(new Callable<RemoteEditLogManifest>() {
@Override @Override
public RemoteEditLogManifest call() throws IOException { public RemoteEditLogManifest call() throws IOException {
GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest( GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
journalId, fromTxnId, forReading); journalId, fromTxnId, forReading, inProgressOk);
// Update the http port, since we need this to build URLs to any of the // Update the http port, since we need this to build URLs to any of the
// returned logs. // returned logs.
httpPort = ret.getHttpPort(); httpPort = ret.getHttpPort();

View File

@ -456,7 +456,7 @@ public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException { long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException {
QuorumCall<AsyncLogger, RemoteEditLogManifest> q = QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
loggers.getEditLogManifest(fromTxnId, forReading); loggers.getEditLogManifest(fromTxnId, forReading, inProgressOk);
Map<AsyncLogger, RemoteEditLogManifest> resps = Map<AsyncLogger, RemoteEditLogManifest> resps =
loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs, loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
"selectInputStreams"); "selectInputStreams");
@ -480,8 +480,7 @@ public void selectInputStreams(Collection<EditLogInputStream> streams,
allStreams.add(elis); allStreams.add(elis);
} }
} }
JournalSet.chainAndMakeRedundantStreams( JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId);
streams, allStreams, fromTxnId, inProgressOk);
} }
@Override @Override

View File

@ -125,10 +125,13 @@ public void purgeLogsOlderThan(RequestInfo requestInfo, long minTxIdToKeep)
* @param sinceTxId the first transaction which the client cares about * @param sinceTxId the first transaction which the client cares about
* @param forReading whether or not the caller intends to read from the edit * @param forReading whether or not the caller intends to read from the edit
* logs * logs
* @param inProgressOk whether or not to check the in-progress edit log
* segment
* @return a list of edit log segments since the given transaction ID. * @return a list of edit log segments since the given transaction ID.
*/ */
public GetEditLogManifestResponseProto getEditLogManifest( public GetEditLogManifestResponseProto getEditLogManifest(String jid,
String jid, long sinceTxId, boolean forReading) throws IOException; long sinceTxId, boolean forReading, boolean inProgressOk)
throws IOException;
/** /**
* Begin the recovery process for a given segment. See the HDFS-3077 * Begin the recovery process for a given segment. See the HDFS-3077

View File

@ -203,7 +203,8 @@ public GetEditLogManifestResponseProto getEditLogManifest(
return impl.getEditLogManifest( return impl.getEditLogManifest(
request.getJid().getIdentifier(), request.getJid().getIdentifier(),
request.getSinceTxId(), request.getSinceTxId(),
request.getForReading()); request.getForReading(),
request.getInProgressOk());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }

View File

@ -228,13 +228,15 @@ public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
@Override @Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid, public GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId, boolean forReading) throws IOException { long sinceTxId, boolean forReading, boolean inProgressOk)
throws IOException {
try { try {
return rpcProxy.getEditLogManifest(NULL_CONTROLLER, return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
GetEditLogManifestRequestProto.newBuilder() GetEditLogManifestRequestProto.newBuilder()
.setJid(convertJournalId(jid)) .setJid(convertJournalId(jid))
.setSinceTxId(sinceTxId) .setSinceTxId(sinceTxId)
.setForReading(forReading) .setForReading(forReading)
.setInProgressOk(inProgressOk)
.build()); .build());
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);

View File

@ -25,10 +25,9 @@
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.net.URL; import java.net.URL;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -36,8 +35,8 @@
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException; import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException; import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@ -50,6 +49,7 @@
import org.apache.hadoop.hdfs.server.namenode.JournalManager; import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream; import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
import org.apache.hadoop.hdfs.util.BestEffortLongFile; import org.apache.hadoop.hdfs.util.BestEffortLongFile;
@ -630,14 +630,31 @@ private void purgePaxosDecision(long segmentTxId) throws IOException {
* @see QJournalProtocol#getEditLogManifest(String, long) * @see QJournalProtocol#getEditLogManifest(String, long)
*/ */
public RemoteEditLogManifest getEditLogManifest(long sinceTxId, public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
boolean forReading) throws IOException { boolean forReading, boolean inProgressOk) throws IOException {
// No need to checkRequest() here - anyone may ask for the list // No need to checkRequest() here - anyone may ask for the list
// of segments. // of segments.
checkFormatted(); checkFormatted();
RemoteEditLogManifest manifest = new RemoteEditLogManifest( // if this is for reading, ignore the in-progress editlog segment
fjm.getRemoteEditLogs(sinceTxId, forReading)); inProgressOk = forReading ? false : inProgressOk;
return manifest; List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, forReading,
inProgressOk);
if (inProgressOk) {
RemoteEditLog log = null;
for (Iterator<RemoteEditLog> iter = logs.iterator(); iter.hasNext();) {
log = iter.next();
if (log.isInProgress()) {
iter.remove();
break;
}
}
if (log != null && log.isInProgress()) {
logs.add(new RemoteEditLog(log.getStartTxId(), getHighestWrittenTxId()));
}
}
return new RemoteEditLogManifest(logs);
} }
/** /**

View File

@ -175,10 +175,11 @@ public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
@Override @Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid, public GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId, boolean forReading) throws IOException { long sinceTxId, boolean forReading, boolean inProgressOk)
throws IOException {
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid) RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid)
.getEditLogManifest(sinceTxId, forReading); .getEditLogManifest(sinceTxId, forReading, inProgressOk);
return GetEditLogManifestResponseProto.newBuilder() return GetEditLogManifestResponseProto.newBuilder()
.setManifest(PBHelper.convert(manifest)) .setManifest(PBHelper.convert(manifest))

View File

@ -1231,6 +1231,7 @@ synchronized void recoverUnclosedStreams() {
} }
} }
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams, public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk, boolean forReading) throws IOException { long fromTxId, boolean inProgressOk, boolean forReading) throws IOException {
journalSet.selectInputStreams(streams, fromTxId, inProgressOk, forReading); journalSet.selectInputStreams(streams, fromTxId, inProgressOk, forReading);
@ -1241,18 +1242,27 @@ public Collection<EditLogInputStream> selectInputStreams(
return selectInputStreams(fromTxId, toAtLeastTxId, null, true); return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
} }
/** Select a list of input streams to load */
public Collection<EditLogInputStream> selectInputStreams(
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
boolean inProgressOk) throws IOException {
return selectInputStreams(fromTxId, toAtLeastTxId, recovery, inProgressOk,
true);
}
/** /**
* Select a list of input streams to load. * Select a list of input streams.
* *
* @param fromTxId first transaction in the selected streams * @param fromTxId first transaction in the selected streams
* @param toAtLeast the selected streams must contain this transaction * @param toAtLeast the selected streams must contain this transaction
* @param inProgessOk set to true if in-progress streams are OK * @param inProgessOk set to true if in-progress streams are OK
* @param forReading whether or not to use the streams to load the edit log
*/ */
public synchronized Collection<EditLogInputStream> selectInputStreams( public synchronized Collection<EditLogInputStream> selectInputStreams(
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery, long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
boolean inProgressOk) throws IOException { boolean inProgressOk, boolean forReading) throws IOException {
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
selectInputStreams(streams, fromTxId, inProgressOk, true); selectInputStreams(streams, fromTxId, inProgressOk, forReading);
try { try {
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk); checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);

View File

@ -169,18 +169,26 @@ public void purgeLogsOlderThan(long minTxIdToKeep)
* @param fromTxId the txnid which to start looking * @param fromTxId the txnid which to start looking
* @param forReading whether or not the caller intends to read from the edit * @param forReading whether or not the caller intends to read from the edit
* logs * logs
* @param inProgressOk whether or not to include the in-progress edit log
* segment
* @return a list of remote edit logs * @return a list of remote edit logs
* @throws IOException if edit logs cannot be listed. * @throws IOException if edit logs cannot be listed.
*/ */
public List<RemoteEditLog> getRemoteEditLogs(long firstTxId, public List<RemoteEditLog> getRemoteEditLogs(long firstTxId,
boolean forReading) throws IOException { boolean forReading, boolean inProgressOk) throws IOException {
// make sure not reading in-progress edit log, i.e., if forReading is true,
// we should ignore the in-progress edit log.
Preconditions.checkArgument(!(forReading && inProgressOk));
File currentDir = sd.getCurrentDir(); File currentDir = sd.getCurrentDir();
List<EditLogFile> allLogFiles = matchEditLogs(currentDir); List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity( List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
allLogFiles.size()); allLogFiles.size());
for (EditLogFile elf : allLogFiles) { for (EditLogFile elf : allLogFiles) {
if (elf.hasCorruptHeader() || elf.isInProgress()) continue; if (elf.hasCorruptHeader() || (!inProgressOk && elf.isInProgress())) {
continue;
}
if (elf.getFirstTxId() >= firstTxId) { if (elf.getFirstTxId() >= firstTxId) {
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId)); ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
} else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) { } else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) {

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -37,7 +39,6 @@
import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ExitUtil.terminate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain; import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@ -255,13 +256,12 @@ public void selectInputStreams(Collection<EditLogInputStream> streams,
". Skipping.", ioe); ". Skipping.", ioe);
} }
} }
chainAndMakeRedundantStreams(streams, allStreams, fromTxId, inProgressOk); chainAndMakeRedundantStreams(streams, allStreams, fromTxId);
} }
public static void chainAndMakeRedundantStreams( public static void chainAndMakeRedundantStreams(
Collection<EditLogInputStream> outStreams, Collection<EditLogInputStream> outStreams,
PriorityQueue<EditLogInputStream> allStreams, PriorityQueue<EditLogInputStream> allStreams, long fromTxId) {
long fromTxId, boolean inProgressOk) {
// We want to group together all the streams that start on the same start // We want to group together all the streams that start on the same start
// transaction ID. To do this, we maintain an accumulator (acc) of all // transaction ID. To do this, we maintain an accumulator (acc) of all
// the streams we've seen at a given start transaction ID. When we see a // the streams we've seen at a given start transaction ID. When we see a
@ -596,7 +596,7 @@ public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId,
if (j.getManager() instanceof FileJournalManager) { if (j.getManager() instanceof FileJournalManager) {
FileJournalManager fjm = (FileJournalManager)j.getManager(); FileJournalManager fjm = (FileJournalManager)j.getManager();
try { try {
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId, forReading)); allLogs.addAll(fjm.getRemoteEditLogs(fromTxId, forReading, false));
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Cannot list edit logs in " + fjm, t); LOG.warn("Cannot list edit logs in " + fjm, t);
} }

View File

@ -226,7 +226,7 @@ private boolean checkLogsAvailableForRead(FSImage image, long imageTxId,
try { try {
Collection<EditLogInputStream> streams = Collection<EditLogInputStream> streams =
image.getEditLog().selectInputStreams( image.getEditLog().selectInputStreams(
firstTxIdInLogs, curTxIdOnOtherNode, null, true); firstTxIdInLogs, curTxIdOnOtherNode, null, true, false);
for (EditLogInputStream stream : streams) { for (EditLogInputStream stream : streams) {
IOUtils.closeStream(stream); IOUtils.closeStream(stream);
} }

View File

@ -177,6 +177,7 @@ message GetEditLogManifestRequestProto {
required uint64 sinceTxId = 2; // Transaction ID required uint64 sinceTxId = 2; // Transaction ID
// Whether or not the client will be reading from the returned streams. // Whether or not the client will be reading from the returned streams.
optional bool forReading = 3 [default = true]; optional bool forReading = 3 [default = true];
optional bool inProgressOk = 4 [default = false];
} }
message GetEditLogManifestResponseProto { message GetEditLogManifestResponseProto {

View File

@ -62,6 +62,7 @@
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -548,4 +549,16 @@ public static void logStorageContents(Log LOG, NNStorage storage) {
public static FSImage getFSImage(NameNode node) { public static FSImage getFSImage(NameNode node) {
return node.getFSImage(); return node.getFSImage();
} }
public static void assertNNFilesMatch(MiniDFSCluster cluster) throws Exception {
List<File> curDirs = Lists.newArrayList();
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0));
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
// Ignore seen_txid file, since the newly bootstrapped standby
// will have a higher seen_txid than the one it bootstrapped from.
Set<String> ignoredFiles = ImmutableSet.of("seen_txid");
FSImageTestUtil.assertParallelFilesAreIdentical(curDirs,
ignoredFiles);
}
} }

View File

@ -480,6 +480,6 @@ public void testExcludeInProgressStreams() throws CorruptionException,
private static String getLogsAsString( private static String getLogsAsString(
FileJournalManager fjm, long firstTxId) throws IOException { FileJournalManager fjm, long firstTxId) throws IOException {
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId, true)); return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId, true, false));
} }
} }

View File

@ -24,8 +24,6 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -45,8 +43,6 @@
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
public class TestBootstrapStandby { public class TestBootstrapStandby {
private static final Log LOG = LogFactory.getLog(TestBootstrapStandby.class); private static final Log LOG = LogFactory.getLog(TestBootstrapStandby.class);
@ -107,7 +103,7 @@ public void testSuccessfulBaseCase() throws Exception {
// Should have copied over the namespace from the active // Should have copied over the namespace from the active
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1, FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
ImmutableList.of(0)); ImmutableList.of(0));
assertNNFilesMatch(); FSImageTestUtil.assertNNFilesMatch(cluster);
// We should now be able to start the standby successfully. // We should now be able to start the standby successfully.
cluster.restartNameNode(1); cluster.restartNameNode(1);
@ -138,7 +134,7 @@ public void testDownloadingLaterCheckpoint() throws Exception {
// Should have copied over the namespace from the active // Should have copied over the namespace from the active
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1, FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
ImmutableList.of((int)expectedCheckpointTxId)); ImmutableList.of((int)expectedCheckpointTxId));
assertNNFilesMatch(); FSImageTestUtil.assertNNFilesMatch(cluster);
// We should now be able to start the standby successfully. // We should now be able to start the standby successfully.
cluster.restartNameNode(1); cluster.restartNameNode(1);
@ -209,18 +205,6 @@ public void testOtherNodeNotActive() throws Exception {
assertEquals(0, rc); assertEquals(0, rc);
} }
private void assertNNFilesMatch() throws Exception {
List<File> curDirs = Lists.newArrayList();
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0));
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
// Ignore seen_txid file, since the newly bootstrapped standby
// will have a higher seen_txid than the one it bootstrapped from.
Set<String> ignoredFiles = ImmutableSet.of("seen_txid");
FSImageTestUtil.assertParallelFilesAreIdentical(curDirs,
ignoredFiles);
}
private void removeStandbyNameDirs() { private void removeStandbyNameDirs() {
for (URI u : cluster.getNameDirs(1)) { for (URI u : cluster.getNameDirs(1)) {
assertTrue(u.getScheme().equals("file")); assertTrue(u.getScheme().equals("file"));

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
/**
* Test BootstrapStandby when QJM is used for shared edits.
*/
public class TestBootstrapStandbyWithQJM {
private static final String NAMESERVICE = "ns1";
private static final String NN1 = "nn1";
private static final String NN2 = "nn2";
private static final int NUM_JN = 3;
private static final int NN1_IPC_PORT = 10000;
private static final int NN1_INFO_PORT = 10001;
private static final int NN2_IPC_PORT = 10002;
private static final int NN2_INFO_PORT = 10003;
private MiniDFSCluster cluster;
private MiniJournalCluster jCluster;
@Before
public void setup() throws Exception {
// start 3 journal nodes
jCluster = new MiniJournalCluster.Builder(new Configuration()).format(true)
.numJournalNodes(NUM_JN).build();
URI journalURI = jCluster.getQuorumJournalURI(NAMESERVICE);
// start cluster with 2 NameNodes
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN(
new MiniDFSNNTopology.NNConf("nn1").setIpcPort(NN1_IPC_PORT)
.setHttpPort(NN1_INFO_PORT)).addNN(
new MiniDFSNNTopology.NNConf("nn2").setIpcPort(NN2_IPC_PORT)
.setHttpPort(NN2_INFO_PORT)));
Configuration conf = initHAConf(journalURI);
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
.numDataNodes(1).manageNameDfsSharedDirs(false).build();
cluster.waitActive();
Configuration confNN0 = new Configuration(conf);
cluster.shutdown();
// initialize the journal nodes
confNN0.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
NameNode.initializeSharedEdits(confNN0, true);
// restart the cluster
cluster = new MiniDFSCluster.Builder(conf).format(false)
.nnTopology(topology).numDataNodes(1).manageNameDfsSharedDirs(false)
.build();
cluster.waitActive();
// make nn0 active
cluster.transitionToActive(0);
// do sth to generate in-progress edit log data
DistributedFileSystem dfs = (DistributedFileSystem)
HATestUtil.configureFailoverFs(cluster, conf);
dfs.mkdirs(new Path("/test2"));
dfs.close();
}
@After
public void cleanup() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
if (jCluster != null) {
jCluster.shutdown();
}
}
private Configuration initHAConf(URI journalURI) {
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
journalURI.toString());
String address1 = "127.0.0.1:" + NN1_IPC_PORT;
String address2 = "127.0.0.1:" + NN2_IPC_PORT;
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
NAMESERVICE, NN1), address1);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
NAMESERVICE, NN2), address2);
conf.set(DFSConfigKeys.DFS_NAMESERVICES, NAMESERVICE);
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, NAMESERVICE),
NN1 + "," + NN2);
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + NAMESERVICE,
ConfiguredFailoverProxyProvider.class.getName());
conf.set("fs.defaultFS", "hdfs://" + NAMESERVICE);
return conf;
}
/** BootstrapStandby when the existing NN is standby */
@Test
public void testBootstrapStandbyWithStandbyNN() throws Exception {
// make the first NN in standby state
cluster.transitionToStandby(0);
Configuration confNN1 = cluster.getConfiguration(1);
// shut down nn1
cluster.shutdownNameNode(1);
int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1);
assertEquals(0, rc);
// Should have copied over the namespace from the standby
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
ImmutableList.of(0));
FSImageTestUtil.assertNNFilesMatch(cluster);
}
/** BootstrapStandby when the existing NN is active */
@Test
public void testBootstrapStandbyWithActiveNN() throws Exception {
// make the first NN in active state
cluster.transitionToActive(0);
Configuration confNN1 = cluster.getConfiguration(1);
// shut down nn1
cluster.shutdownNameNode(1);
int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1);
assertEquals(0, rc);
// Should have copied over the namespace from the standby
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
ImmutableList.of(0));
FSImageTestUtil.assertNNFilesMatch(cluster);
}
}