HDFS-5074. Allow starting up from an fsimage checkpoint in the middle of a segment. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1550021 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2013-12-11 01:49:29 +00:00
parent 39a506d5a0
commit 75e97fc4bf
31 changed files with 262 additions and 183 deletions

View File

@ -199,6 +199,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5533. Symlink delete/create should be treated as DELETE/CREATE in snapshot diff HDFS-5533. Symlink delete/create should be treated as DELETE/CREATE in snapshot diff
report. (Binglin Chang via jing9) report. (Binglin Chang via jing9)
HDFS-5074. Allow starting up from an fsimage checkpoint in the middle of a
segment. (Todd Lipcon via atm)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -500,14 +500,9 @@ public class BookKeeperJournalManager implements JournalManager {
} }
} }
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) throws IOException {
selectInputStreams(streams, fromTxId, inProgressOk, true);
}
@Override @Override
public void selectInputStreams(Collection<EditLogInputStream> streams, public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk, boolean forReading) long fromTxId, boolean inProgressOk)
throws IOException { throws IOException {
List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId, List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
inProgressOk); inProgressOk);

View File

@ -109,7 +109,7 @@ interface AsyncLogger {
* Fetch the list of edit logs available on the remote node. * Fetch the list of edit logs available on the remote node.
*/ */
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest( public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
long fromTxnId, boolean forReading, boolean inProgressOk); long fromTxnId, boolean inProgressOk);
/** /**
* Prepare recovery. See the HDFS-3077 design document for details. * Prepare recovery. See the HDFS-3077 design document for details.

View File

@ -261,13 +261,13 @@ class AsyncLoggerSet {
} }
public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest( public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
long fromTxnId, boolean forReading, boolean inProgressOk) { long fromTxnId, boolean inProgressOk) {
Map<AsyncLogger, Map<AsyncLogger,
ListenableFuture<RemoteEditLogManifest>> calls ListenableFuture<RemoteEditLogManifest>> calls
= Maps.newHashMap(); = Maps.newHashMap();
for (AsyncLogger logger : loggers) { for (AsyncLogger logger : loggers) {
ListenableFuture<RemoteEditLogManifest> future = ListenableFuture<RemoteEditLogManifest> future =
logger.getEditLogManifest(fromTxnId, forReading, inProgressOk); logger.getEditLogManifest(fromTxnId, inProgressOk);
calls.put(logger, future); calls.put(logger, future);
} }
return QuorumCall.create(calls); return QuorumCall.create(calls);

View File

@ -179,6 +179,7 @@ public class IPCLoggerChannel implements AsyncLogger {
@Override @Override
public void close() { public void close() {
QuorumJournalManager.LOG.info("Closing", new Exception());
// No more tasks may be submitted after this point. // No more tasks may be submitted after this point.
executor.shutdown(); executor.shutdown();
if (proxy != null) { if (proxy != null) {
@ -519,13 +520,12 @@ public class IPCLoggerChannel implements AsyncLogger {
@Override @Override
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest( public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
final long fromTxnId, final boolean forReading, final long fromTxnId, final boolean inProgressOk) {
final boolean inProgressOk) {
return executor.submit(new Callable<RemoteEditLogManifest>() { return executor.submit(new Callable<RemoteEditLogManifest>() {
@Override @Override
public RemoteEditLogManifest call() throws IOException { public RemoteEditLogManifest call() throws IOException {
GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest( GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
journalId, fromTxnId, forReading, inProgressOk); journalId, fromTxnId, inProgressOk);
// Update the http port, since we need this to build URLs to any of the // Update the http port, since we need this to build URLs to any of the
// returned logs. // returned logs.
httpPort = ret.getHttpPort(); httpPort = ret.getHttpPort();

View File

@ -445,18 +445,13 @@ public class QuorumJournalManager implements JournalManager {
public void close() throws IOException { public void close() throws IOException {
loggers.close(); loggers.close();
} }
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk) throws IOException {
selectInputStreams(streams, fromTxnId, inProgressOk, true);
}
@Override @Override
public void selectInputStreams(Collection<EditLogInputStream> streams, public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException { long fromTxnId, boolean inProgressOk) throws IOException {
QuorumCall<AsyncLogger, RemoteEditLogManifest> q = QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
loggers.getEditLogManifest(fromTxnId, forReading, inProgressOk); loggers.getEditLogManifest(fromTxnId, inProgressOk);
Map<AsyncLogger, RemoteEditLogManifest> resps = Map<AsyncLogger, RemoteEditLogManifest> resps =
loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs, loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
"selectInputStreams"); "selectInputStreams");

View File

@ -123,14 +123,12 @@ public interface QJournalProtocol {
/** /**
* @param jid the journal from which to enumerate edits * @param jid the journal from which to enumerate edits
* @param sinceTxId the first transaction which the client cares about * @param sinceTxId the first transaction which the client cares about
* @param forReading whether or not the caller intends to read from the edit
* logs
* @param inProgressOk whether or not to check the in-progress edit log * @param inProgressOk whether or not to check the in-progress edit log
* segment * segment
* @return a list of edit log segments since the given transaction ID. * @return a list of edit log segments since the given transaction ID.
*/ */
public GetEditLogManifestResponseProto getEditLogManifest(String jid, public GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId, boolean forReading, boolean inProgressOk) long sinceTxId, boolean inProgressOk)
throws IOException; throws IOException;
/** /**

View File

@ -203,7 +203,6 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
return impl.getEditLogManifest( return impl.getEditLogManifest(
request.getJid().getIdentifier(), request.getJid().getIdentifier(),
request.getSinceTxId(), request.getSinceTxId(),
request.getForReading(),
request.getInProgressOk()); request.getInProgressOk());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);

View File

@ -228,14 +228,13 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
@Override @Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid, public GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId, boolean forReading, boolean inProgressOk) long sinceTxId, boolean inProgressOk)
throws IOException { throws IOException {
try { try {
return rpcProxy.getEditLogManifest(NULL_CONTROLLER, return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
GetEditLogManifestRequestProto.newBuilder() GetEditLogManifestRequestProto.newBuilder()
.setJid(convertJournalId(jid)) .setJid(convertJournalId(jid))
.setSinceTxId(sinceTxId) .setSinceTxId(sinceTxId)
.setForReading(forReading)
.setInProgressOk(inProgressOk) .setInProgressOk(inProgressOk)
.build()); .build());
} catch (ServiceException e) { } catch (ServiceException e) {

View File

@ -630,15 +630,12 @@ class Journal implements Closeable {
* @see QJournalProtocol#getEditLogManifest(String, long) * @see QJournalProtocol#getEditLogManifest(String, long)
*/ */
public RemoteEditLogManifest getEditLogManifest(long sinceTxId, public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
boolean forReading, boolean inProgressOk) throws IOException { boolean inProgressOk) throws IOException {
// No need to checkRequest() here - anyone may ask for the list // No need to checkRequest() here - anyone may ask for the list
// of segments. // of segments.
checkFormatted(); checkFormatted();
// if this is for reading, ignore the in-progress editlog segment List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, inProgressOk);
inProgressOk = forReading ? false : inProgressOk;
List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, forReading,
inProgressOk);
if (inProgressOk) { if (inProgressOk) {
RemoteEditLog log = null; RemoteEditLog log = null;

View File

@ -175,11 +175,11 @@ class JournalNodeRpcServer implements QJournalProtocol {
@Override @Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid, public GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId, boolean forReading, boolean inProgressOk) long sinceTxId, boolean inProgressOk)
throws IOException { throws IOException {
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid) RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid)
.getEditLogManifest(sinceTxId, forReading, inProgressOk); .getEditLogManifest(sinceTxId, inProgressOk);
return GetEditLogManifestResponseProto.newBuilder() return GetEditLogManifestResponseProto.newBuilder()
.setManifest(PBHelper.convert(manifest)) .setManifest(PBHelper.convert(manifest))

View File

@ -77,7 +77,7 @@ class BackupJournalManager implements JournalManager {
@Override @Override
public void selectInputStreams(Collection<EditLogInputStream> streams, public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk, boolean forReading) { long fromTxnId, boolean inProgressOk) {
// This JournalManager is never used for input. Therefore it cannot // This JournalManager is never used for input. Therefore it cannot
// return any transactions // return any transactions
} }

View File

@ -281,7 +281,7 @@ public class FSEditLog implements LogsPurgeable {
// Safety check: we should never start a segment if there are // Safety check: we should never start a segment if there are
// newer txids readable. // newer txids readable.
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
journalSet.selectInputStreams(streams, segmentTxId, true, true); journalSet.selectInputStreams(streams, segmentTxId, true);
if (!streams.isEmpty()) { if (!streams.isEmpty()) {
String error = String.format("Cannot start writing at txid %s " + String error = String.format("Cannot start writing at txid %s " +
"when there is a stream available for read: %s", "when there is a stream available for read: %s",
@ -982,7 +982,7 @@ public class FSEditLog implements LogsPurgeable {
*/ */
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId)
throws IOException { throws IOException {
return journalSet.getEditLogManifest(fromTxId, true); return journalSet.getEditLogManifest(fromTxId);
} }
/** /**
@ -1233,22 +1233,14 @@ public class FSEditLog implements LogsPurgeable {
@Override @Override
public void selectInputStreams(Collection<EditLogInputStream> streams, public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk, boolean forReading) throws IOException { long fromTxId, boolean inProgressOk) throws IOException {
journalSet.selectInputStreams(streams, fromTxId, inProgressOk, forReading); journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
} }
public Collection<EditLogInputStream> selectInputStreams( public Collection<EditLogInputStream> selectInputStreams(
long fromTxId, long toAtLeastTxId) throws IOException { long fromTxId, long toAtLeastTxId) throws IOException {
return selectInputStreams(fromTxId, toAtLeastTxId, null, true); return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
} }
/** Select a list of input streams to load */
public Collection<EditLogInputStream> selectInputStreams(
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
boolean inProgressOk) throws IOException {
return selectInputStreams(fromTxId, toAtLeastTxId, recovery, inProgressOk,
true);
}
/** /**
* Select a list of input streams. * Select a list of input streams.
@ -1256,13 +1248,12 @@ public class FSEditLog implements LogsPurgeable {
* @param fromTxId first transaction in the selected streams * @param fromTxId first transaction in the selected streams
* @param toAtLeast the selected streams must contain this transaction * @param toAtLeast the selected streams must contain this transaction
* @param inProgessOk set to true if in-progress streams are OK * @param inProgessOk set to true if in-progress streams are OK
* @param forReading whether or not to use the streams to load the edit log
*/ */
public synchronized Collection<EditLogInputStream> selectInputStreams( public synchronized Collection<EditLogInputStream> selectInputStreams(
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery, long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
boolean inProgressOk, boolean forReading) throws IOException { boolean inProgressOk) throws IOException {
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
selectInputStreams(streams, fromTxId, inProgressOk, forReading); selectInputStreams(streams, fromTxId, inProgressOk);
try { try {
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk); checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);

View File

@ -167,19 +167,13 @@ public class FileJournalManager implements JournalManager {
/** /**
* Find all editlog segments starting at or above the given txid. * Find all editlog segments starting at or above the given txid.
* @param fromTxId the txnid which to start looking * @param fromTxId the txnid which to start looking
* @param forReading whether or not the caller intends to read from the edit
* logs
* @param inProgressOk whether or not to include the in-progress edit log * @param inProgressOk whether or not to include the in-progress edit log
* segment * segment
* @return a list of remote edit logs * @return a list of remote edit logs
* @throws IOException if edit logs cannot be listed. * @throws IOException if edit logs cannot be listed.
*/ */
public List<RemoteEditLog> getRemoteEditLogs(long firstTxId, public List<RemoteEditLog> getRemoteEditLogs(long firstTxId,
boolean forReading, boolean inProgressOk) throws IOException { boolean inProgressOk) throws IOException {
// make sure not reading in-progress edit log, i.e., if forReading is true,
// we should ignore the in-progress edit log.
Preconditions.checkArgument(!(forReading && inProgressOk));
File currentDir = sd.getCurrentDir(); File currentDir = sd.getCurrentDir();
List<EditLogFile> allLogFiles = matchEditLogs(currentDir); List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity( List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
@ -192,14 +186,9 @@ public class FileJournalManager implements JournalManager {
if (elf.getFirstTxId() >= firstTxId) { if (elf.getFirstTxId() >= firstTxId) {
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId)); ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
} else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) { } else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) {
// If the firstTxId is in the middle of an edit log segment // If the firstTxId is in the middle of an edit log segment. Return this
if (forReading) { // anyway and let the caller figure out whether it wants to use it.
// Note that this behavior is different from getLogFiles below. ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
throw new IllegalStateException("Asked for firstTxId " + firstTxId
+ " which is in the middle of file " + elf.file);
} else {
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
}
} }
} }
@ -260,7 +249,7 @@ public class FileJournalManager implements JournalManager {
@Override @Override
synchronized public void selectInputStreams( synchronized public void selectInputStreams(
Collection<EditLogInputStream> streams, long fromTxId, Collection<EditLogInputStream> streams, long fromTxId,
boolean inProgressOk, boolean forReading) throws IOException { boolean inProgressOk) throws IOException {
List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir()); List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
LOG.debug(this + ": selecting input streams starting at " + fromTxId + LOG.debug(this + ": selecting input streams starting at " + fromTxId +
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") + (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +

View File

@ -234,12 +234,10 @@ public class JournalSet implements JournalManager {
* may not be sorted-- this is up to the caller. * may not be sorted-- this is up to the caller.
* @param fromTxId The transaction ID to start looking for streams at * @param fromTxId The transaction ID to start looking for streams at
* @param inProgressOk Should we consider unfinalized streams? * @param inProgressOk Should we consider unfinalized streams?
* @param forReading Whether or not the caller intends to read from
* the returned streams.
*/ */
@Override @Override
public void selectInputStreams(Collection<EditLogInputStream> streams, public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk, boolean forReading) throws IOException { long fromTxId, boolean inProgressOk) throws IOException {
final PriorityQueue<EditLogInputStream> allStreams = final PriorityQueue<EditLogInputStream> allStreams =
new PriorityQueue<EditLogInputStream>(64, new PriorityQueue<EditLogInputStream>(64,
EDIT_LOG_INPUT_STREAM_COMPARATOR); EDIT_LOG_INPUT_STREAM_COMPARATOR);
@ -249,8 +247,7 @@ public class JournalSet implements JournalManager {
continue; continue;
} }
try { try {
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk, jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
forReading);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Unable to determine input streams from " + jas.getManager() + LOG.warn("Unable to determine input streams from " + jas.getManager() +
". Skipping.", ioe); ". Skipping.", ioe);
@ -583,20 +580,20 @@ public class JournalSet implements JournalManager {
/** /**
* Return a manifest of what finalized edit logs are available. All available * Return a manifest of what finalized edit logs are available. All available
* edit logs are returned starting from the transaction id passed. * edit logs are returned starting from the transaction id passed. If
* 'fromTxId' falls in the middle of a log, that log is returned as well.
* *
* @param fromTxId Starting transaction id to read the logs. * @param fromTxId Starting transaction id to read the logs.
* @return RemoteEditLogManifest object. * @return RemoteEditLogManifest object.
*/ */
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId, public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
boolean forReading) {
// Collect RemoteEditLogs available from each FileJournalManager // Collect RemoteEditLogs available from each FileJournalManager
List<RemoteEditLog> allLogs = Lists.newArrayList(); List<RemoteEditLog> allLogs = Lists.newArrayList();
for (JournalAndStream j : journals) { for (JournalAndStream j : journals) {
if (j.getManager() instanceof FileJournalManager) { if (j.getManager() instanceof FileJournalManager) {
FileJournalManager fjm = (FileJournalManager)j.getManager(); FileJournalManager fjm = (FileJournalManager)j.getManager();
try { try {
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId, forReading, false)); allLogs.addAll(fjm.getRemoteEditLogs(fromTxId, false));
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Cannot list edit logs in " + fjm, t); LOG.warn("Cannot list edit logs in " + fjm, t);
} }

View File

@ -42,13 +42,11 @@ interface LogsPurgeable {
* *
* @param fromTxId the first transaction id we want to read * @param fromTxId the first transaction id we want to read
* @param inProgressOk whether or not in-progress streams should be returned * @param inProgressOk whether or not in-progress streams should be returned
* @param forReading whether or not the caller intends to read from the edit logs
*
* @return a list of streams * @return a list of streams
* @throws IOException if the underlying storage has an error or is otherwise * @throws IOException if the underlying storage has an error or is otherwise
* inaccessible * inaccessible
*/ */
void selectInputStreams(Collection<EditLogInputStream> streams, void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk, boolean forReading) throws IOException; long fromTxId, boolean inProgressOk) throws IOException;
} }

View File

@ -108,7 +108,7 @@ public class NNStorageRetentionManager {
long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain); long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>(); ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false, false); purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false);
Collections.sort(editLogs, new Comparator<EditLogInputStream>() { Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
@Override @Override
public int compare(EditLogInputStream a, EditLogInputStream b) { public int compare(EditLogInputStream a, EditLogInputStream b) {

View File

@ -817,7 +817,7 @@ public class SecondaryNameNode implements Runnable {
@Override @Override
public void selectInputStreams(Collection<EditLogInputStream> streams, public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk, boolean forReading) { long fromTxId, boolean inProgressOk) {
Iterator<StorageDirectory> iter = storage.dirIterator(); Iterator<StorageDirectory> iter = storage.dirIterator();
while (iter.hasNext()) { while (iter.hasNext()) {
StorageDirectory dir = iter.next(); StorageDirectory dir = iter.next();

View File

@ -226,7 +226,7 @@ public class BootstrapStandby implements Tool, Configurable {
try { try {
Collection<EditLogInputStream> streams = Collection<EditLogInputStream> streams =
image.getEditLog().selectInputStreams( image.getEditLog().selectInputStreams(
firstTxIdInLogs, curTxIdOnOtherNode, null, true, false); firstTxIdInLogs, curTxIdOnOtherNode, null, true);
for (EditLogInputStream stream : streams) { for (EditLogInputStream stream : streams) {
IOUtils.closeStream(stream); IOUtils.closeStream(stream);
} }

View File

@ -165,7 +165,7 @@ public class EditLogTailer {
} }
@VisibleForTesting @VisibleForTesting
void setEditLog(FSEditLog editLog) { public void setEditLog(FSEditLog editLog) {
this.editLog = editLog; this.editLog = editLog;
} }

View File

@ -176,7 +176,7 @@ message GetEditLogManifestRequestProto {
required JournalIdProto jid = 1; required JournalIdProto jid = 1;
required uint64 sinceTxId = 2; // Transaction ID required uint64 sinceTxId = 2; // Transaction ID
// Whether or not the client will be reading from the returned streams. // Whether or not the client will be reading from the returned streams.
optional bool forReading = 3 [default = true]; // optional bool forReading = 3 [default = true]; <obsolete, do not reuse>
optional bool inProgressOk = 4 [default = false]; optional bool inProgressOk = 4 [default = false];
} }

View File

@ -1480,8 +1480,9 @@ public class MiniDFSCluster {
*/ */
public synchronized void restartNameNodes() throws IOException { public synchronized void restartNameNodes() throws IOException {
for (int i = 0; i < nameNodes.length; i++) { for (int i = 0; i < nameNodes.length; i++) {
restartNameNode(i); restartNameNode(i, false);
} }
waitActive();
} }
/** /**

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
public class MiniQJMHACluster {
private MiniDFSCluster cluster;
private MiniJournalCluster journalCluster;
private final Configuration conf;
private static String NAMESERVICE = "ns1";
private static final String NN1 = "nn1";
private static final String NN2 = "nn2";
private static final int NN1_IPC_PORT = 10000;
private static final int NN1_INFO_PORT = 10001;
private static final int NN2_IPC_PORT = 10002;
private static final int NN2_INFO_PORT = 10003;
public static class Builder {
private final Configuration conf;
private final MiniDFSCluster.Builder dfsBuilder;
public Builder(Configuration conf) {
this.conf = conf;
this.dfsBuilder = new MiniDFSCluster.Builder(conf);
}
public MiniDFSCluster.Builder getDfsBuilder() {
return dfsBuilder;
}
public MiniQJMHACluster build() throws IOException {
return new MiniQJMHACluster(this);
}
}
public static MiniDFSNNTopology createDefaultTopology() {
return new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN(
new MiniDFSNNTopology.NNConf("nn1").setIpcPort(NN1_IPC_PORT)
.setHttpPort(NN1_INFO_PORT)).addNN(
new MiniDFSNNTopology.NNConf("nn2").setIpcPort(NN2_IPC_PORT)
.setHttpPort(NN2_INFO_PORT)));
}
private MiniQJMHACluster(Builder builder) throws IOException {
this.conf = builder.conf;
// start 3 journal nodes
journalCluster = new MiniJournalCluster.Builder(conf).format(true)
.build();
URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE);
// start cluster with 2 NameNodes
MiniDFSNNTopology topology = createDefaultTopology();
initHAConf(journalURI, builder.conf);
// First start up the NNs just to format the namespace. The MinIDFSCluster
// has no way to just format the NameNodes without also starting them.
cluster = builder.dfsBuilder.nnTopology(topology)
.manageNameDfsSharedDirs(false).build();
cluster.waitActive();
cluster.shutdown();
// initialize the journal nodes
Configuration confNN0 = cluster.getConfiguration(0);
NameNode.initializeSharedEdits(confNN0, true);
// restart the cluster
cluster.restartNameNodes();
}
private Configuration initHAConf(URI journalURI, Configuration conf) {
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
journalURI.toString());
String address1 = "127.0.0.1:" + NN1_IPC_PORT;
String address2 = "127.0.0.1:" + NN2_IPC_PORT;
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
NAMESERVICE, NN1), address1);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
NAMESERVICE, NN2), address2);
conf.set(DFSConfigKeys.DFS_NAMESERVICES, NAMESERVICE);
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, NAMESERVICE),
NN1 + "," + NN2);
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + NAMESERVICE,
ConfiguredFailoverProxyProvider.class.getName());
conf.set("fs.defaultFS", "hdfs://" + NAMESERVICE);
return conf;
}
public MiniDFSCluster getDfsCluster() {
return cluster;
}
public MiniJournalCluster getJournalCluster() {
return journalCluster;
}
public void shutdown() throws IOException {
cluster.shutdown();
journalCluster.shutdown();
}
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.qjournal; package org.apache.hadoop.hdfs.qjournal;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.junit.Assume.*;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;

View File

@ -916,7 +916,7 @@ public class TestQuorumJournalManager {
NNStorage.getFinalizedEditsFileName(41, 50)); NNStorage.getFinalizedEditsFileName(41, 50));
ArrayList<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); ArrayList<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
qjm.selectInputStreams(streams, 25, false, false); qjm.selectInputStreams(streams, 25, false);
verifyEdits(streams, 25, 50); verifyEdits(streams, 25, 50);
} }

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.SafeModeInfo;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
@ -179,6 +180,16 @@ public class NameNodeAdapter {
return spy; return spy;
} }
public static FSEditLog spyOnEditLog(NameNode nn) {
FSEditLog spyEditLog = spy(nn.getNamesystem().getFSImage().getEditLog());
nn.getFSImage().setEditLogForTesting(spyEditLog);
EditLogTailer tailer = nn.getNamesystem().getEditLogTailer();
if (tailer != null) {
tailer.setEditLog(spyEditLog);
}
return spyEditLog;
}
public static JournalSet spyOnJournalSet(NameNode nn) { public static JournalSet spyOnJournalSet(NameNode nn) {
FSEditLog editLog = nn.getFSImage().getEditLog(); FSEditLog editLog = nn.getFSImage().getEditLog();
JournalSet js = Mockito.spy(editLog.getJournalSet()); JournalSet js = Mockito.spy(editLog.getJournalSet());

View File

@ -83,7 +83,7 @@ public class TestFileJournalManager {
final PriorityQueue<EditLogInputStream> allStreams = final PriorityQueue<EditLogInputStream> allStreams =
new PriorityQueue<EditLogInputStream>(64, new PriorityQueue<EditLogInputStream>(64,
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
jm.selectInputStreams(allStreams, fromTxId, inProgressOk, true); jm.selectInputStreams(allStreams, fromTxId, inProgressOk);
EditLogInputStream elis = null; EditLogInputStream elis = null;
try { try {
while ((elis = allStreams.poll()) != null) { while ((elis = allStreams.poll()) != null) {
@ -379,14 +379,8 @@ public class TestFileJournalManager {
FileJournalManager fjm = new FileJournalManager(conf, sd, null); FileJournalManager fjm = new FileJournalManager(conf, sd, null);
assertEquals("[1,100],[101,200],[1001,1100]", getLogsAsString(fjm, 1)); assertEquals("[1,100],[101,200],[1001,1100]", getLogsAsString(fjm, 1));
assertEquals("[101,200],[1001,1100]", getLogsAsString(fjm, 101)); assertEquals("[101,200],[1001,1100]", getLogsAsString(fjm, 101));
assertEquals("[101,200],[1001,1100]", getLogsAsString(fjm, 150));
assertEquals("[1001,1100]", getLogsAsString(fjm, 201)); assertEquals("[1001,1100]", getLogsAsString(fjm, 201));
try {
assertEquals("[]", getLogsAsString(fjm, 150));
fail("Did not throw when asking for a txn in the middle of a log");
} catch (IllegalStateException ioe) {
GenericTestUtils.assertExceptionContains(
"150 which is in the middle", ioe);
}
assertEquals("Asking for a newer log than exists should return empty list", assertEquals("Asking for a newer log than exists should return empty list",
"", getLogsAsString(fjm, 9999)); "", getLogsAsString(fjm, 9999));
} }
@ -405,7 +399,7 @@ public class TestFileJournalManager {
final PriorityQueue<EditLogInputStream> allStreams = final PriorityQueue<EditLogInputStream> allStreams =
new PriorityQueue<EditLogInputStream>(64, new PriorityQueue<EditLogInputStream>(64,
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
jm.selectInputStreams(allStreams, txId, inProgressOk, true); jm.selectInputStreams(allStreams, txId, inProgressOk);
EditLogInputStream elis = null, ret; EditLogInputStream elis = null, ret;
try { try {
while ((elis = allStreams.poll()) != null) { while ((elis = allStreams.poll()) != null) {
@ -483,6 +477,6 @@ public class TestFileJournalManager {
private static String getLogsAsString( private static String getLogsAsString(
FileJournalManager fjm, long firstTxId) throws IOException { FileJournalManager fjm, long firstTxId) throws IOException {
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId, true, false)); return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId, false));
} }
} }

View File

@ -174,7 +174,7 @@ public class TestGenericJournalConf {
@Override @Override
public void selectInputStreams(Collection<EditLogInputStream> streams, public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk, boolean forReading) { long fromTxnId, boolean inProgressOk) {
} }
@Override @Override

View File

@ -360,12 +360,11 @@ public class TestNNStorageRetentionManager {
public Void answer(InvocationOnMock invocation) throws Throwable { public Void answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments(); Object[] args = invocation.getArguments();
journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0], journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
(long)((Long)args[1]), (boolean)((Boolean)args[2]), (long)((Long)args[1]), (boolean)((Boolean)args[2]));
(boolean)((Boolean)args[3]));
return null; return null;
} }
}).when(mockLog).selectInputStreams(Mockito.anyCollection(), }).when(mockLog).selectInputStreams(Mockito.anyCollection(),
Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean()); Mockito.anyLong(), Mockito.anyBoolean());
return mockLog; return mockLog;
} }
} }

View File

@ -17,24 +17,18 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -44,51 +38,23 @@ import com.google.common.collect.ImmutableList;
/** /**
* Test BootstrapStandby when QJM is used for shared edits. * Test BootstrapStandby when QJM is used for shared edits.
*/ */
public class TestBootstrapStandbyWithQJM { public class TestBootstrapStandbyWithQJM {
private MiniQJMHACluster miniQjmHaCluster;
private static final String NAMESERVICE = "ns1";
private static final String NN1 = "nn1";
private static final String NN2 = "nn2";
private static final int NUM_JN = 3;
private static final int NN1_IPC_PORT = 10000;
private static final int NN1_INFO_PORT = 10001;
private static final int NN2_IPC_PORT = 10002;
private static final int NN2_INFO_PORT = 10003;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private MiniJournalCluster jCluster; private MiniJournalCluster jCluster;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
// start 3 journal nodes Configuration conf = new Configuration();
jCluster = new MiniJournalCluster.Builder(new Configuration()).format(true) // Turn off IPC client caching, so that the suite can handle
.numJournalNodes(NUM_JN).build(); // the restart of the daemons between test cases.
URI journalURI = jCluster.getQuorumJournalURI(NAMESERVICE); conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
// start cluster with 2 NameNodes 0);
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN( miniQjmHaCluster = new MiniQJMHACluster.Builder(conf).build();
new MiniDFSNNTopology.NNConf("nn1").setIpcPort(NN1_IPC_PORT) cluster = miniQjmHaCluster.getDfsCluster();
.setHttpPort(NN1_INFO_PORT)).addNN( jCluster = miniQjmHaCluster.getJournalCluster();
new MiniDFSNNTopology.NNConf("nn2").setIpcPort(NN2_IPC_PORT)
.setHttpPort(NN2_INFO_PORT)));
Configuration conf = initHAConf(journalURI);
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
.numDataNodes(1).manageNameDfsSharedDirs(false).build();
cluster.waitActive();
Configuration confNN0 = new Configuration(conf);
cluster.shutdown();
// initialize the journal nodes
confNN0.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
NameNode.initializeSharedEdits(confNN0, true);
// restart the cluster
cluster = new MiniDFSCluster.Builder(conf).format(false)
.nnTopology(topology).numDataNodes(1).manageNameDfsSharedDirs(false)
.build();
cluster.waitActive();
// make nn0 active // make nn0 active
cluster.transitionToActive(0); cluster.transitionToActive(0);
@ -109,27 +75,6 @@ public class TestBootstrapStandbyWithQJM {
} }
} }
private Configuration initHAConf(URI journalURI) {
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
journalURI.toString());
String address1 = "127.0.0.1:" + NN1_IPC_PORT;
String address2 = "127.0.0.1:" + NN2_IPC_PORT;
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
NAMESERVICE, NN1), address1);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
NAMESERVICE, NN2), address2);
conf.set(DFSConfigKeys.DFS_NAMESERVICES, NAMESERVICE);
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, NAMESERVICE),
NN1 + "," + NN2);
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + NAMESERVICE,
ConfiguredFailoverProxyProvider.class.getName());
conf.set("fs.defaultFS", "hdfs://" + NAMESERVICE);
return conf;
}
/** BootstrapStandby when the existing NN is standby */ /** BootstrapStandby when the existing NN is standby */
@Test @Test
public void testBootstrapStandbyWithStandbyNN() throws Exception { public void testBootstrapStandbyWithStandbyNN() throws Exception {

View File

@ -27,6 +27,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
@ -37,6 +38,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster.Builder;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
@ -48,23 +51,49 @@ import org.apache.hadoop.util.ExitUtil.ExitException;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@RunWith(Parameterized.class)
public class TestFailureToReadEdits { public class TestFailureToReadEdits {
private static final String TEST_DIR1 = "/test1"; private static final String TEST_DIR1 = "/test1";
private static final String TEST_DIR2 = "/test2"; private static final String TEST_DIR2 = "/test2";
private static final String TEST_DIR3 = "/test3"; private static final String TEST_DIR3 = "/test3";
private final TestType clusterType;
private Configuration conf; private Configuration conf;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private MiniQJMHACluster miniQjmHaCluster; // for QJM case only
private NameNode nn0; private NameNode nn0;
private NameNode nn1; private NameNode nn1;
private FileSystem fs; private FileSystem fs;
private static enum TestType {
SHARED_DIR_HA,
QJM_HA;
};
/**
* Run this suite of tests both for QJM-based HA and for file-based
* HA.
*/
@Parameters
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{ TestType.SHARED_DIR_HA },
{ TestType.QJM_HA } });
}
public TestFailureToReadEdits(TestType clusterType) {
this.clusterType = clusterType;
}
@Before @Before
public void setUpCluster() throws Exception { public void setUpCluster() throws Exception {
conf = new Configuration(); conf = new Configuration();
@ -74,16 +103,19 @@ public class TestFailureToReadEdits {
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
HAUtil.setAllowStandbyReads(conf, true); HAUtil.setAllowStandbyReads(conf, true);
MiniDFSNNTopology topology = new MiniDFSNNTopology() if (clusterType == TestType.SHARED_DIR_HA) {
.addNameservice(new MiniDFSNNTopology.NSConf("ns1") MiniDFSNNTopology topology = MiniQJMHACluster.createDefaultTopology();
.addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10041)) cluster = new MiniDFSCluster.Builder(conf)
.addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10042))); .nnTopology(topology)
cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(0)
.nnTopology(topology) .checkExitOnShutdown(false)
.numDataNodes(0) .build();
.checkExitOnShutdown(false) } else {
.build(); Builder builder = new MiniQJMHACluster.Builder(conf);
builder.getDfsBuilder().numDataNodes(0).checkExitOnShutdown(false);
miniQjmHaCluster = builder.build();
cluster = miniQjmHaCluster.getDfsCluster();
}
cluster.waitActive(); cluster.waitActive();
nn0 = cluster.getNameNode(0); nn0 = cluster.getNameNode(0);
@ -99,8 +131,14 @@ public class TestFailureToReadEdits {
fs.close(); fs.close();
} }
if (cluster != null) { if (clusterType == TestType.SHARED_DIR_HA) {
cluster.shutdown(); if (cluster != null) {
cluster.shutdown();
}
} else {
if (miniQjmHaCluster != null) {
miniQjmHaCluster.shutdown();
}
} }
} }
@ -259,13 +297,10 @@ public class TestFailureToReadEdits {
} }
private LimitedEditLogAnswer causeFailureOnEditLogRead() throws IOException { private LimitedEditLogAnswer causeFailureOnEditLogRead() throws IOException {
FSEditLog spyEditLog = spy(nn1.getNamesystem().getEditLogTailer() FSEditLog spyEditLog = NameNodeAdapter.spyOnEditLog(nn1);
.getEditLog());
LimitedEditLogAnswer answer = new LimitedEditLogAnswer(); LimitedEditLogAnswer answer = new LimitedEditLogAnswer();
doAnswer(answer).when(spyEditLog).selectInputStreams( doAnswer(answer).when(spyEditLog).selectInputStreams(
anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean()); anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean());
nn1.getNamesystem().getEditLogTailer().setEditLog(spyEditLog);
return answer; return answer;
} }