HDFS-5080. BootstrapStandby not working with QJM when the existing NN is active. Contributed by Jing Zhao.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1514386 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
26c5a490e5
commit
8172215e56
|
@ -333,6 +333,9 @@ Release 2.1.1-beta - UNRELEASED
|
||||||
HDFS-5093. TestGlobPaths should re-use the MiniDFSCluster to avoid failure
|
HDFS-5093. TestGlobPaths should re-use the MiniDFSCluster to avoid failure
|
||||||
on Windows. (Chuan Liu via cnauroth)
|
on Windows. (Chuan Liu via cnauroth)
|
||||||
|
|
||||||
|
HDFS-5080. BootstrapStandby not working with QJM when the existing NN is
|
||||||
|
active. (jing9)
|
||||||
|
|
||||||
Release 2.1.0-beta - 2013-08-06
|
Release 2.1.0-beta - 2013-08-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -109,7 +109,7 @@ interface AsyncLogger {
|
||||||
* Fetch the list of edit logs available on the remote node.
|
* Fetch the list of edit logs available on the remote node.
|
||||||
*/
|
*/
|
||||||
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
|
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
|
||||||
long fromTxnId, boolean forReading);
|
long fromTxnId, boolean forReading, boolean inProgressOk);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prepare recovery. See the HDFS-3077 design document for details.
|
* Prepare recovery. See the HDFS-3077 design document for details.
|
||||||
|
|
|
@ -262,14 +262,14 @@ class AsyncLoggerSet {
|
||||||
return QuorumCall.create(calls);
|
return QuorumCall.create(calls);
|
||||||
}
|
}
|
||||||
|
|
||||||
public QuorumCall<AsyncLogger, RemoteEditLogManifest>
|
public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
|
||||||
getEditLogManifest(long fromTxnId, boolean forReading) {
|
long fromTxnId, boolean forReading, boolean inProgressOk) {
|
||||||
Map<AsyncLogger,
|
Map<AsyncLogger,
|
||||||
ListenableFuture<RemoteEditLogManifest>> calls
|
ListenableFuture<RemoteEditLogManifest>> calls
|
||||||
= Maps.newHashMap();
|
= Maps.newHashMap();
|
||||||
for (AsyncLogger logger : loggers) {
|
for (AsyncLogger logger : loggers) {
|
||||||
ListenableFuture<RemoteEditLogManifest> future =
|
ListenableFuture<RemoteEditLogManifest> future =
|
||||||
logger.getEditLogManifest(fromTxnId, forReading);
|
logger.getEditLogManifest(fromTxnId, forReading, inProgressOk);
|
||||||
calls.put(logger, future);
|
calls.put(logger, future);
|
||||||
}
|
}
|
||||||
return QuorumCall.create(calls);
|
return QuorumCall.create(calls);
|
||||||
|
|
|
@ -519,12 +519,13 @@ public class IPCLoggerChannel implements AsyncLogger {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
|
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
|
||||||
final long fromTxnId, final boolean forReading) {
|
final long fromTxnId, final boolean forReading,
|
||||||
|
final boolean inProgressOk) {
|
||||||
return executor.submit(new Callable<RemoteEditLogManifest>() {
|
return executor.submit(new Callable<RemoteEditLogManifest>() {
|
||||||
@Override
|
@Override
|
||||||
public RemoteEditLogManifest call() throws IOException {
|
public RemoteEditLogManifest call() throws IOException {
|
||||||
GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
|
GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
|
||||||
journalId, fromTxnId, forReading);
|
journalId, fromTxnId, forReading, inProgressOk);
|
||||||
// Update the http port, since we need this to build URLs to any of the
|
// Update the http port, since we need this to build URLs to any of the
|
||||||
// returned logs.
|
// returned logs.
|
||||||
httpPort = ret.getHttpPort();
|
httpPort = ret.getHttpPort();
|
||||||
|
|
|
@ -456,7 +456,7 @@ public class QuorumJournalManager implements JournalManager {
|
||||||
long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException {
|
long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException {
|
||||||
|
|
||||||
QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
|
QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
|
||||||
loggers.getEditLogManifest(fromTxnId, forReading);
|
loggers.getEditLogManifest(fromTxnId, forReading, inProgressOk);
|
||||||
Map<AsyncLogger, RemoteEditLogManifest> resps =
|
Map<AsyncLogger, RemoteEditLogManifest> resps =
|
||||||
loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
|
loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
|
||||||
"selectInputStreams");
|
"selectInputStreams");
|
||||||
|
@ -480,8 +480,7 @@ public class QuorumJournalManager implements JournalManager {
|
||||||
allStreams.add(elis);
|
allStreams.add(elis);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
JournalSet.chainAndMakeRedundantStreams(
|
JournalSet.chainAndMakeRedundantStreams(streams, allStreams, fromTxnId);
|
||||||
streams, allStreams, fromTxnId, inProgressOk);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -125,10 +125,13 @@ public interface QJournalProtocol {
|
||||||
* @param sinceTxId the first transaction which the client cares about
|
* @param sinceTxId the first transaction which the client cares about
|
||||||
* @param forReading whether or not the caller intends to read from the edit
|
* @param forReading whether or not the caller intends to read from the edit
|
||||||
* logs
|
* logs
|
||||||
|
* @param inProgressOk whether or not to check the in-progress edit log
|
||||||
|
* segment
|
||||||
* @return a list of edit log segments since the given transaction ID.
|
* @return a list of edit log segments since the given transaction ID.
|
||||||
*/
|
*/
|
||||||
public GetEditLogManifestResponseProto getEditLogManifest(
|
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
||||||
String jid, long sinceTxId, boolean forReading) throws IOException;
|
long sinceTxId, boolean forReading, boolean inProgressOk)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Begin the recovery process for a given segment. See the HDFS-3077
|
* Begin the recovery process for a given segment. See the HDFS-3077
|
||||||
|
|
|
@ -203,7 +203,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
|
||||||
return impl.getEditLogManifest(
|
return impl.getEditLogManifest(
|
||||||
request.getJid().getIdentifier(),
|
request.getJid().getIdentifier(),
|
||||||
request.getSinceTxId(),
|
request.getSinceTxId(),
|
||||||
request.getForReading());
|
request.getForReading(),
|
||||||
|
request.getInProgressOk());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -228,13 +228,15 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
||||||
long sinceTxId, boolean forReading) throws IOException {
|
long sinceTxId, boolean forReading, boolean inProgressOk)
|
||||||
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
|
return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
|
||||||
GetEditLogManifestRequestProto.newBuilder()
|
GetEditLogManifestRequestProto.newBuilder()
|
||||||
.setJid(convertJournalId(jid))
|
.setJid(convertJournalId(jid))
|
||||||
.setSinceTxId(sinceTxId)
|
.setSinceTxId(sinceTxId)
|
||||||
.setForReading(forReading)
|
.setForReading(forReading)
|
||||||
|
.setInProgressOk(inProgressOk)
|
||||||
.build());
|
.build());
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
|
|
@ -25,10 +25,9 @@ import java.io.InputStream;
|
||||||
import java.io.OutputStreamWriter;
|
import java.io.OutputStreamWriter;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.regex.Matcher;
|
|
||||||
import java.util.regex.Pattern;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -36,8 +35,8 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
|
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
|
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
||||||
|
@ -50,6 +49,7 @@ import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
|
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
|
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
|
||||||
import org.apache.hadoop.hdfs.util.BestEffortLongFile;
|
import org.apache.hadoop.hdfs.util.BestEffortLongFile;
|
||||||
|
@ -630,14 +630,31 @@ class Journal implements Closeable {
|
||||||
* @see QJournalProtocol#getEditLogManifest(String, long)
|
* @see QJournalProtocol#getEditLogManifest(String, long)
|
||||||
*/
|
*/
|
||||||
public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
|
public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
|
||||||
boolean forReading) throws IOException {
|
boolean forReading, boolean inProgressOk) throws IOException {
|
||||||
// No need to checkRequest() here - anyone may ask for the list
|
// No need to checkRequest() here - anyone may ask for the list
|
||||||
// of segments.
|
// of segments.
|
||||||
checkFormatted();
|
checkFormatted();
|
||||||
|
|
||||||
RemoteEditLogManifest manifest = new RemoteEditLogManifest(
|
// if this is for reading, ignore the in-progress editlog segment
|
||||||
fjm.getRemoteEditLogs(sinceTxId, forReading));
|
inProgressOk = forReading ? false : inProgressOk;
|
||||||
return manifest;
|
List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, forReading,
|
||||||
|
inProgressOk);
|
||||||
|
|
||||||
|
if (inProgressOk) {
|
||||||
|
RemoteEditLog log = null;
|
||||||
|
for (Iterator<RemoteEditLog> iter = logs.iterator(); iter.hasNext();) {
|
||||||
|
log = iter.next();
|
||||||
|
if (log.isInProgress()) {
|
||||||
|
iter.remove();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (log != null && log.isInProgress()) {
|
||||||
|
logs.add(new RemoteEditLog(log.getStartTxId(), getHighestWrittenTxId()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new RemoteEditLogManifest(logs);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -175,10 +175,11 @@ class JournalNodeRpcServer implements QJournalProtocol {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
|
||||||
long sinceTxId, boolean forReading) throws IOException {
|
long sinceTxId, boolean forReading, boolean inProgressOk)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid)
|
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid)
|
||||||
.getEditLogManifest(sinceTxId, forReading);
|
.getEditLogManifest(sinceTxId, forReading, inProgressOk);
|
||||||
|
|
||||||
return GetEditLogManifestResponseProto.newBuilder()
|
return GetEditLogManifestResponseProto.newBuilder()
|
||||||
.setManifest(PBHelper.convert(manifest))
|
.setManifest(PBHelper.convert(manifest))
|
||||||
|
|
|
@ -1274,6 +1274,7 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxId, boolean inProgressOk, boolean forReading) {
|
long fromTxId, boolean inProgressOk, boolean forReading) {
|
||||||
journalSet.selectInputStreams(streams, fromTxId, inProgressOk, forReading);
|
journalSet.selectInputStreams(streams, fromTxId, inProgressOk, forReading);
|
||||||
|
@ -1284,18 +1285,27 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
|
return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Select a list of input streams to load */
|
||||||
|
public Collection<EditLogInputStream> selectInputStreams(
|
||||||
|
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
|
||||||
|
boolean inProgressOk) throws IOException {
|
||||||
|
return selectInputStreams(fromTxId, toAtLeastTxId, recovery, inProgressOk,
|
||||||
|
true);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Select a list of input streams to load.
|
* Select a list of input streams.
|
||||||
*
|
*
|
||||||
* @param fromTxId first transaction in the selected streams
|
* @param fromTxId first transaction in the selected streams
|
||||||
* @param toAtLeast the selected streams must contain this transaction
|
* @param toAtLeast the selected streams must contain this transaction
|
||||||
* @param inProgessOk set to true if in-progress streams are OK
|
* @param inProgessOk set to true if in-progress streams are OK
|
||||||
|
* @param forReading whether or not to use the streams to load the edit log
|
||||||
*/
|
*/
|
||||||
public synchronized Collection<EditLogInputStream> selectInputStreams(
|
public synchronized Collection<EditLogInputStream> selectInputStreams(
|
||||||
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
|
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
|
||||||
boolean inProgressOk) throws IOException {
|
boolean inProgressOk, boolean forReading) throws IOException {
|
||||||
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
||||||
selectInputStreams(streams, fromTxId, inProgressOk, true);
|
selectInputStreams(streams, fromTxId, inProgressOk, forReading);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
|
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
|
||||||
|
|
|
@ -169,18 +169,26 @@ public class FileJournalManager implements JournalManager {
|
||||||
* @param fromTxId the txnid which to start looking
|
* @param fromTxId the txnid which to start looking
|
||||||
* @param forReading whether or not the caller intends to read from the edit
|
* @param forReading whether or not the caller intends to read from the edit
|
||||||
* logs
|
* logs
|
||||||
|
* @param inProgressOk whether or not to include the in-progress edit log
|
||||||
|
* segment
|
||||||
* @return a list of remote edit logs
|
* @return a list of remote edit logs
|
||||||
* @throws IOException if edit logs cannot be listed.
|
* @throws IOException if edit logs cannot be listed.
|
||||||
*/
|
*/
|
||||||
public List<RemoteEditLog> getRemoteEditLogs(long firstTxId,
|
public List<RemoteEditLog> getRemoteEditLogs(long firstTxId,
|
||||||
boolean forReading) throws IOException {
|
boolean forReading, boolean inProgressOk) throws IOException {
|
||||||
|
// make sure not reading in-progress edit log, i.e., if forReading is true,
|
||||||
|
// we should ignore the in-progress edit log.
|
||||||
|
Preconditions.checkArgument(!(forReading && inProgressOk));
|
||||||
|
|
||||||
File currentDir = sd.getCurrentDir();
|
File currentDir = sd.getCurrentDir();
|
||||||
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
|
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
|
||||||
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
|
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
|
||||||
allLogFiles.size());
|
allLogFiles.size());
|
||||||
|
|
||||||
for (EditLogFile elf : allLogFiles) {
|
for (EditLogFile elf : allLogFiles) {
|
||||||
if (elf.hasCorruptHeader() || elf.isInProgress()) continue;
|
if (elf.hasCorruptHeader() || (!inProgressOk && elf.isInProgress())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (elf.getFirstTxId() >= firstTxId) {
|
if (elf.getFirstTxId() >= firstTxId) {
|
||||||
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
|
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
|
||||||
} else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) {
|
} else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) {
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -31,14 +33,10 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ComparisonChain;
|
import com.google.common.collect.ComparisonChain;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
@ -257,13 +255,12 @@ public class JournalSet implements JournalManager {
|
||||||
". Skipping.", ioe);
|
". Skipping.", ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
chainAndMakeRedundantStreams(streams, allStreams, fromTxId, inProgressOk);
|
chainAndMakeRedundantStreams(streams, allStreams, fromTxId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void chainAndMakeRedundantStreams(
|
public static void chainAndMakeRedundantStreams(
|
||||||
Collection<EditLogInputStream> outStreams,
|
Collection<EditLogInputStream> outStreams,
|
||||||
PriorityQueue<EditLogInputStream> allStreams,
|
PriorityQueue<EditLogInputStream> allStreams, long fromTxId) {
|
||||||
long fromTxId, boolean inProgressOk) {
|
|
||||||
// We want to group together all the streams that start on the same start
|
// We want to group together all the streams that start on the same start
|
||||||
// transaction ID. To do this, we maintain an accumulator (acc) of all
|
// transaction ID. To do this, we maintain an accumulator (acc) of all
|
||||||
// the streams we've seen at a given start transaction ID. When we see a
|
// the streams we've seen at a given start transaction ID. When we see a
|
||||||
|
@ -598,7 +595,7 @@ public class JournalSet implements JournalManager {
|
||||||
if (j.getManager() instanceof FileJournalManager) {
|
if (j.getManager() instanceof FileJournalManager) {
|
||||||
FileJournalManager fjm = (FileJournalManager)j.getManager();
|
FileJournalManager fjm = (FileJournalManager)j.getManager();
|
||||||
try {
|
try {
|
||||||
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId, forReading));
|
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId, forReading, false));
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.warn("Cannot list edit logs in " + fjm, t);
|
LOG.warn("Cannot list edit logs in " + fjm, t);
|
||||||
}
|
}
|
||||||
|
|
|
@ -226,7 +226,7 @@ public class BootstrapStandby implements Tool, Configurable {
|
||||||
try {
|
try {
|
||||||
Collection<EditLogInputStream> streams =
|
Collection<EditLogInputStream> streams =
|
||||||
image.getEditLog().selectInputStreams(
|
image.getEditLog().selectInputStreams(
|
||||||
firstTxIdInLogs, curTxIdOnOtherNode, null, true);
|
firstTxIdInLogs, curTxIdOnOtherNode, null, true, false);
|
||||||
for (EditLogInputStream stream : streams) {
|
for (EditLogInputStream stream : streams) {
|
||||||
IOUtils.closeStream(stream);
|
IOUtils.closeStream(stream);
|
||||||
}
|
}
|
||||||
|
|
|
@ -177,6 +177,7 @@ message GetEditLogManifestRequestProto {
|
||||||
required uint64 sinceTxId = 2; // Transaction ID
|
required uint64 sinceTxId = 2; // Transaction ID
|
||||||
// Whether or not the client will be reading from the returned streams.
|
// Whether or not the client will be reading from the returned streams.
|
||||||
optional bool forReading = 3 [default = true];
|
optional bool forReading = 3 [default = true];
|
||||||
|
optional bool inProgressOk = 4 [default = false];
|
||||||
}
|
}
|
||||||
|
|
||||||
message GetEditLogManifestResponseProto {
|
message GetEditLogManifestResponseProto {
|
||||||
|
|
|
@ -62,6 +62,7 @@ import org.mockito.Mockito;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
@ -555,4 +556,16 @@ public abstract class FSImageTestUtil {
|
||||||
public static long getNSQuota(FSNamesystem ns) {
|
public static long getNSQuota(FSNamesystem ns) {
|
||||||
return ns.dir.rootDir.getNsQuota();
|
return ns.dir.rootDir.getNsQuota();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void assertNNFilesMatch(MiniDFSCluster cluster) throws Exception {
|
||||||
|
List<File> curDirs = Lists.newArrayList();
|
||||||
|
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0));
|
||||||
|
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
|
||||||
|
|
||||||
|
// Ignore seen_txid file, since the newly bootstrapped standby
|
||||||
|
// will have a higher seen_txid than the one it bootstrapped from.
|
||||||
|
Set<String> ignoredFiles = ImmutableSet.of("seen_txid");
|
||||||
|
FSImageTestUtil.assertParallelFilesAreIdentical(curDirs,
|
||||||
|
ignoredFiles);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -479,6 +479,6 @@ public class TestFileJournalManager {
|
||||||
|
|
||||||
private static String getLogsAsString(
|
private static String getLogsAsString(
|
||||||
FileJournalManager fjm, long firstTxId) throws IOException {
|
FileJournalManager fjm, long firstTxId) throws IOException {
|
||||||
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId, true));
|
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId, true, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,8 +24,6 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -45,8 +43,6 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableSet;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
public class TestBootstrapStandby {
|
public class TestBootstrapStandby {
|
||||||
private static final Log LOG = LogFactory.getLog(TestBootstrapStandby.class);
|
private static final Log LOG = LogFactory.getLog(TestBootstrapStandby.class);
|
||||||
|
@ -107,7 +103,7 @@ public class TestBootstrapStandby {
|
||||||
// Should have copied over the namespace from the active
|
// Should have copied over the namespace from the active
|
||||||
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
|
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
|
||||||
ImmutableList.of(0));
|
ImmutableList.of(0));
|
||||||
assertNNFilesMatch();
|
FSImageTestUtil.assertNNFilesMatch(cluster);
|
||||||
|
|
||||||
// We should now be able to start the standby successfully.
|
// We should now be able to start the standby successfully.
|
||||||
cluster.restartNameNode(1);
|
cluster.restartNameNode(1);
|
||||||
|
@ -138,7 +134,7 @@ public class TestBootstrapStandby {
|
||||||
// Should have copied over the namespace from the active
|
// Should have copied over the namespace from the active
|
||||||
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
|
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
|
||||||
ImmutableList.of((int)expectedCheckpointTxId));
|
ImmutableList.of((int)expectedCheckpointTxId));
|
||||||
assertNNFilesMatch();
|
FSImageTestUtil.assertNNFilesMatch(cluster);
|
||||||
|
|
||||||
// We should now be able to start the standby successfully.
|
// We should now be able to start the standby successfully.
|
||||||
cluster.restartNameNode(1);
|
cluster.restartNameNode(1);
|
||||||
|
@ -209,18 +205,6 @@ public class TestBootstrapStandby {
|
||||||
assertEquals(0, rc);
|
assertEquals(0, rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertNNFilesMatch() throws Exception {
|
|
||||||
List<File> curDirs = Lists.newArrayList();
|
|
||||||
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0));
|
|
||||||
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
|
|
||||||
|
|
||||||
// Ignore seen_txid file, since the newly bootstrapped standby
|
|
||||||
// will have a higher seen_txid than the one it bootstrapped from.
|
|
||||||
Set<String> ignoredFiles = ImmutableSet.of("seen_txid");
|
|
||||||
FSImageTestUtil.assertParallelFilesAreIdentical(curDirs,
|
|
||||||
ignoredFiles);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void removeStandbyNameDirs() {
|
private void removeStandbyNameDirs() {
|
||||||
for (URI u : cluster.getNameDirs(1)) {
|
for (URI u : cluster.getNameDirs(1)) {
|
||||||
assertTrue(u.getScheme().equals("file"));
|
assertTrue(u.getScheme().equals("file"));
|
||||||
|
|
|
@ -0,0 +1,170 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test BootstrapStandby when QJM is used for shared edits.
|
||||||
|
*/
|
||||||
|
public class TestBootstrapStandbyWithQJM {
|
||||||
|
|
||||||
|
private static final String NAMESERVICE = "ns1";
|
||||||
|
private static final String NN1 = "nn1";
|
||||||
|
private static final String NN2 = "nn2";
|
||||||
|
private static final int NUM_JN = 3;
|
||||||
|
private static final int NN1_IPC_PORT = 10000;
|
||||||
|
private static final int NN1_INFO_PORT = 10001;
|
||||||
|
private static final int NN2_IPC_PORT = 10002;
|
||||||
|
private static final int NN2_INFO_PORT = 10003;
|
||||||
|
|
||||||
|
private MiniDFSCluster cluster;
|
||||||
|
private MiniJournalCluster jCluster;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
// start 3 journal nodes
|
||||||
|
jCluster = new MiniJournalCluster.Builder(new Configuration()).format(true)
|
||||||
|
.numJournalNodes(NUM_JN).build();
|
||||||
|
URI journalURI = jCluster.getQuorumJournalURI(NAMESERVICE);
|
||||||
|
|
||||||
|
// start cluster with 2 NameNodes
|
||||||
|
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
||||||
|
.addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN(
|
||||||
|
new MiniDFSNNTopology.NNConf("nn1").setIpcPort(NN1_IPC_PORT)
|
||||||
|
.setHttpPort(NN1_INFO_PORT)).addNN(
|
||||||
|
new MiniDFSNNTopology.NNConf("nn2").setIpcPort(NN2_IPC_PORT)
|
||||||
|
.setHttpPort(NN2_INFO_PORT)));
|
||||||
|
|
||||||
|
Configuration conf = initHAConf(journalURI);
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
|
||||||
|
.numDataNodes(1).manageNameDfsSharedDirs(false).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
Configuration confNN0 = new Configuration(conf);
|
||||||
|
cluster.shutdown();
|
||||||
|
// initialize the journal nodes
|
||||||
|
confNN0.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
|
||||||
|
NameNode.initializeSharedEdits(confNN0, true);
|
||||||
|
|
||||||
|
// restart the cluster
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).format(false)
|
||||||
|
.nnTopology(topology).numDataNodes(1).manageNameDfsSharedDirs(false)
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// make nn0 active
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
// do sth to generate in-progress edit log data
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem)
|
||||||
|
HATestUtil.configureFailoverFs(cluster, conf);
|
||||||
|
dfs.mkdirs(new Path("/test2"));
|
||||||
|
dfs.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanup() throws IOException {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
if (jCluster != null) {
|
||||||
|
jCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Configuration initHAConf(URI journalURI) {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||||
|
journalURI.toString());
|
||||||
|
|
||||||
|
String address1 = "127.0.0.1:" + NN1_IPC_PORT;
|
||||||
|
String address2 = "127.0.0.1:" + NN2_IPC_PORT;
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||||
|
NAMESERVICE, NN1), address1);
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||||
|
NAMESERVICE, NN2), address2);
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMESERVICES, NAMESERVICE);
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, NAMESERVICE),
|
||||||
|
NN1 + "," + NN2);
|
||||||
|
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + NAMESERVICE,
|
||||||
|
ConfiguredFailoverProxyProvider.class.getName());
|
||||||
|
conf.set("fs.defaultFS", "hdfs://" + NAMESERVICE);
|
||||||
|
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** BootstrapStandby when the existing NN is standby */
|
||||||
|
@Test
|
||||||
|
public void testBootstrapStandbyWithStandbyNN() throws Exception {
|
||||||
|
// make the first NN in standby state
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
Configuration confNN1 = cluster.getConfiguration(1);
|
||||||
|
|
||||||
|
// shut down nn1
|
||||||
|
cluster.shutdownNameNode(1);
|
||||||
|
|
||||||
|
int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1);
|
||||||
|
assertEquals(0, rc);
|
||||||
|
|
||||||
|
// Should have copied over the namespace from the standby
|
||||||
|
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
|
||||||
|
ImmutableList.of(0));
|
||||||
|
FSImageTestUtil.assertNNFilesMatch(cluster);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** BootstrapStandby when the existing NN is active */
|
||||||
|
@Test
|
||||||
|
public void testBootstrapStandbyWithActiveNN() throws Exception {
|
||||||
|
// make the first NN in active state
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
Configuration confNN1 = cluster.getConfiguration(1);
|
||||||
|
|
||||||
|
// shut down nn1
|
||||||
|
cluster.shutdownNameNode(1);
|
||||||
|
|
||||||
|
int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1);
|
||||||
|
assertEquals(0, rc);
|
||||||
|
|
||||||
|
// Should have copied over the namespace from the standby
|
||||||
|
FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
|
||||||
|
ImmutableList.of(0));
|
||||||
|
FSImageTestUtil.assertNNFilesMatch(cluster);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue