HDFS-12979. [SBN read] StandbyNode should upload FsImage to ObserverNode after checkpointing. Contributed by Chen Liang.

This commit is contained in:
Chen Liang 2019-07-17 14:37:16 -07:00
parent 7f882570d7
commit 40c772a750
6 changed files with 254 additions and 56 deletions

View File

@ -17,7 +17,13 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY;
import static org.apache.hadoop.util.Time.monotonicNow; import static org.apache.hadoop.util.Time.monotonicNow;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
@ -88,6 +94,10 @@ public class ImageServlet extends HttpServlet {
private SortedSet<ImageUploadRequest> currentlyDownloadingCheckpoints = Collections private SortedSet<ImageUploadRequest> currentlyDownloadingCheckpoints = Collections
.<ImageUploadRequest> synchronizedSortedSet(new TreeSet<ImageUploadRequest>()); .<ImageUploadRequest> synchronizedSortedSet(new TreeSet<ImageUploadRequest>());
public static final String RECENT_IMAGE_CHECK_ENABLED =
"recent.image.check.enabled";
public static final boolean RECENT_IMAGE_CHECK_ENABLED_DEFAULT = true;
@Override @Override
public void doGet(final HttpServletRequest request, public void doGet(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException { final HttpServletResponse response) throws ServletException, IOException {
@ -481,6 +491,23 @@ public class ImageServlet extends HttpServlet {
final PutImageParams parsedParams = new PutImageParams(request, response, final PutImageParams parsedParams = new PutImageParams(request, response,
conf); conf);
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
final boolean checkRecentImageEnable;
Object checkRecentImageEnableObj =
context.getAttribute(RECENT_IMAGE_CHECK_ENABLED);
if (checkRecentImageEnableObj != null) {
if (checkRecentImageEnableObj instanceof Boolean) {
checkRecentImageEnable = (boolean) checkRecentImageEnableObj;
} else {
// This is an error case, but crashing NN due to this
// seems more undesirable. Only log the error and set to default.
LOG.error("Expecting boolean obj for setting checking recent image, "
+ "but got " + checkRecentImageEnableObj.getClass() + ". This is "
+ "unexpected! Setting to default.");
checkRecentImageEnable = RECENT_IMAGE_CHECK_ENABLED_DEFAULT;
}
} else {
checkRecentImageEnable = RECENT_IMAGE_CHECK_ENABLED_DEFAULT;
}
validateRequest(context, conf, request, response, nnImage, validateRequest(context, conf, request, response, nnImage,
parsedParams.getStorageInfoString()); parsedParams.getStorageInfoString());
@ -494,7 +521,8 @@ public class ImageServlet extends HttpServlet {
// target (regardless of the fact that we got the image) // target (regardless of the fact that we got the image)
HAServiceProtocol.HAServiceState state = NameNodeHttpServer HAServiceProtocol.HAServiceState state = NameNodeHttpServer
.getNameNodeStateFromContext(getServletContext()); .getNameNodeStateFromContext(getServletContext());
if (state != HAServiceProtocol.HAServiceState.ACTIVE) { if (state != HAServiceProtocol.HAServiceState.ACTIVE &&
state != HAServiceProtocol.HAServiceState.OBSERVER) {
// we need a different response type here so the client can differentiate this // we need a different response type here so the client can differentiate this
// from the failure to upload due to (1) security, or (2) other checkpoints already // from the failure to upload due to (1) security, or (2) other checkpoints already
// present // present
@ -528,6 +556,39 @@ public class ImageServlet extends HttpServlet {
+ txid); + txid);
return null; return null;
} }
long now = System.currentTimeMillis();
long lastCheckpointTime =
nnImage.getStorage().getMostRecentCheckpointTime();
long lastCheckpointTxid =
nnImage.getStorage().getMostRecentCheckpointTxId();
long checkpointPeriod =
conf.getTimeDuration(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT, TimeUnit.SECONDS);
long checkpointTxnCount =
conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
long timeDelta = TimeUnit.MILLISECONDS.toSeconds(
now - lastCheckpointTime);
if (checkRecentImageEnable &&
timeDelta < checkpointPeriod &&
txid - lastCheckpointTxid < checkpointTxnCount) {
// only when at least one of two conditions are met we accept
// a new fsImage
// 1. most recent image's txid is too far behind
// 2. last checkpoint time was too old
response.sendError(HttpServletResponse.SC_CONFLICT,
"Most recent checkpoint is neither too far behind in "
+ "txid, nor too old. New txnid cnt is "
+ (txid - lastCheckpointTxid)
+ ", expecting at least " + checkpointTxnCount
+ " unless too long since last upload.");
return null;
}
try { try {
if (nnImage.getStorage().findImageFile(nnf, txid) != null) { if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
response.sendError(HttpServletResponse.SC_CONFLICT, response.sendError(HttpServletResponse.SC_CONFLICT,

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.ipc.ExternalCall; import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetriableException;
@ -431,6 +432,11 @@ public class NameNode extends ReconfigurableBase implements
return rpcServer; return rpcServer;
} }
@VisibleForTesting
public HttpServer2 getHttpServer() {
return httpServer.getHttpServer();
}
public void queueExternalCall(ExternalCall<?> extCall) public void queueExternalCall(ExternalCall<?> extCall)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (rpcServer == null) { if (rpcServer == null) {

View File

@ -19,14 +19,16 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.util.Time.monotonicNow; import static org.apache.hadoop.util.Time.monotonicNow;
import com.google.common.collect.Lists;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -67,7 +69,6 @@ public class StandbyCheckpointer {
private final Configuration conf; private final Configuration conf;
private final FSNamesystem namesystem; private final FSNamesystem namesystem;
private long lastCheckpointTime; private long lastCheckpointTime;
private long lastUploadTime;
private final CheckpointerThread thread; private final CheckpointerThread thread;
private final ThreadFactory uploadThreadFactory; private final ThreadFactory uploadThreadFactory;
private List<URL> activeNNAddresses; private List<URL> activeNNAddresses;
@ -75,12 +76,14 @@ public class StandbyCheckpointer {
private final Object cancelLock = new Object(); private final Object cancelLock = new Object();
private Canceler canceler; private Canceler canceler;
private boolean isPrimaryCheckPointer = true;
// Keep track of how many checkpoints were canceled. // Keep track of how many checkpoints were canceled.
// This is for use in tests. // This is for use in tests.
private static int canceledCount = 0; private static int canceledCount = 0;
// A map from NN url to the most recent image upload time.
private final HashMap<String, CheckpointReceiverEntry> checkpointReceivers;
public StandbyCheckpointer(Configuration conf, FSNamesystem ns) public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
throws IOException { throws IOException {
this.namesystem = ns; this.namesystem = ns;
@ -89,8 +92,38 @@ public class StandbyCheckpointer {
this.thread = new CheckpointerThread(); this.thread = new CheckpointerThread();
this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true) this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("TransferFsImageUpload-%d").build(); .setNameFormat("TransferFsImageUpload-%d").build();
setNameNodeAddresses(conf); setNameNodeAddresses(conf);
this.checkpointReceivers = new HashMap<>();
for (URL address : activeNNAddresses) {
this.checkpointReceivers.put(address.toString(),
new CheckpointReceiverEntry());
}
}
private static final class CheckpointReceiverEntry {
private long lastUploadTime;
private boolean isPrimary;
CheckpointReceiverEntry() {
this.lastUploadTime = 0L;
this.isPrimary = true;
}
void setLastUploadTime(long lastUploadTime) {
this.lastUploadTime = lastUploadTime;
}
void setIsPrimary(boolean isPrimaryFor) {
this.isPrimary = isPrimaryFor;
}
long getLastUploadTime() {
return lastUploadTime;
}
boolean isPrimary() {
return isPrimary;
}
} }
/** /**
@ -158,7 +191,7 @@ public class StandbyCheckpointer {
thread.interrupt(); thread.interrupt();
} }
private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException { private void doCheckpoint() throws InterruptedException, IOException {
assert canceler != null; assert canceler != null;
final long txid; final long txid;
final NameNodeFile imageType; final NameNodeFile imageType;
@ -210,11 +243,6 @@ public class StandbyCheckpointer {
namesystem.cpUnlock(); namesystem.cpUnlock();
} }
//early exit if we shouldn't actually send the checkpoint to the ANN
if(!sendCheckpoint){
return;
}
// Upload the saved checkpoint back to the active // Upload the saved checkpoint back to the active
// Do this in a separate thread to avoid blocking transition to active, but don't allow more // Do this in a separate thread to avoid blocking transition to active, but don't allow more
// than the expected number of tasks to run or queue up // than the expected number of tasks to run or queue up
@ -224,56 +252,70 @@ public class StandbyCheckpointer {
uploadThreadFactory); uploadThreadFactory);
// for right now, just match the upload to the nn address by convention. There is no need to // for right now, just match the upload to the nn address by convention. There is no need to
// directly tie them together by adding a pair class. // directly tie them together by adding a pair class.
List<Future<TransferFsImage.TransferResult>> uploads = HashMap<String, Future<TransferFsImage.TransferResult>> uploads =
new ArrayList<Future<TransferFsImage.TransferResult>>(); new HashMap<>();
for (final URL activeNNAddress : activeNNAddresses) { for (final URL activeNNAddress : activeNNAddresses) {
Future<TransferFsImage.TransferResult> upload = // Upload image if at least 1 of 2 following conditions met:
executor.submit(new Callable<TransferFsImage.TransferResult>() { // 1. has been quiet for long enough, try to contact the node.
@Override // 2. this standby IS the primary checkpointer of target NN.
public TransferFsImage.TransferResult call() String addressString = activeNNAddress.toString();
throws IOException, InterruptedException { assert checkpointReceivers.containsKey(addressString);
CheckpointFaultInjector.getInstance().duringUploadInProgess(); CheckpointReceiverEntry receiverEntry =
return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem checkpointReceivers.get(addressString);
.getFSImage().getStorage(), imageType, txid, canceler); long secsSinceLastUpload =
} TimeUnit.MILLISECONDS.toSeconds(
}); monotonicNow() - receiverEntry.getLastUploadTime());
uploads.add(upload); boolean shouldUpload = receiverEntry.isPrimary() ||
secsSinceLastUpload >= checkpointConf.getQuietPeriod();
if (shouldUpload) {
Future<TransferFsImage.TransferResult> upload =
executor.submit(new Callable<TransferFsImage.TransferResult>() {
@Override
public TransferFsImage.TransferResult call()
throws IOException, InterruptedException {
CheckpointFaultInjector.getInstance().duringUploadInProgess();
return TransferFsImage.uploadImageFromStorage(activeNNAddress,
conf, namesystem.getFSImage().getStorage(), imageType, txid,
canceler);
}
});
uploads.put(addressString, upload);
}
} }
InterruptedException ie = null; InterruptedException ie = null;
IOException ioe= null; List<IOException> ioes = Lists.newArrayList();
int i = 0; for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :
boolean success = false; uploads.entrySet()) {
for (; i < uploads.size(); i++) { String url = entry.getKey();
Future<TransferFsImage.TransferResult> upload = uploads.get(i); Future<TransferFsImage.TransferResult> upload = entry.getValue();
try { try {
// TODO should there be some smarts here about retries nodes that are not the active NN? // TODO should there be some smarts here about retries nodes that
// are not the active NN?
CheckpointReceiverEntry receiverEntry = checkpointReceivers.get(url);
if (upload.get() == TransferFsImage.TransferResult.SUCCESS) { if (upload.get() == TransferFsImage.TransferResult.SUCCESS) {
success = true; receiverEntry.setLastUploadTime(monotonicNow());
//avoid getting the rest of the results - we don't care since we had a successful upload receiverEntry.setIsPrimary(true);
break; } else {
receiverEntry.setIsPrimary(false);
} }
} catch (ExecutionException e) { } catch (ExecutionException e) {
ioe = new IOException("Exception during image upload", e); // Even if exception happens, still proceeds to next NN url.
break; // so that fail to upload to previous NN does not cause the
// remaining NN not getting the fsImage.
ioes.add(new IOException("Exception during image upload", e));
} catch (InterruptedException e) { } catch (InterruptedException e) {
ie = e; ie = e;
break; break;
} }
} }
if (ie == null && ioe == null) {
//Update only when response from remote about success or
lastUploadTime = monotonicNow();
// we are primary if we successfully updated the ANN
this.isPrimaryCheckPointer = success;
}
// cleaner than copying code for multiple catch statements and better than catching all // cleaner than copying code for multiple catch statements and better than catching all
// exceptions, so we just handle the ones we expect. // exceptions, so we just handle the ones we expect.
if (ie != null || ioe != null) { if (ie != null) {
// cancel the rest of the tasks, and close the pool // cancel the rest of the tasks, and close the pool
for (; i < uploads.size(); i++) { for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :
Future<TransferFsImage.TransferResult> upload = uploads.get(i); uploads.entrySet()) {
Future<TransferFsImage.TransferResult> upload = entry.getValue();
// The background thread may be blocked waiting in the throttler, so // The background thread may be blocked waiting in the throttler, so
// interrupt it. // interrupt it.
upload.cancel(true); upload.cancel(true);
@ -286,11 +328,11 @@ public class StandbyCheckpointer {
executor.awaitTermination(500, TimeUnit.MILLISECONDS); executor.awaitTermination(500, TimeUnit.MILLISECONDS);
// re-throw the exception we got, since one of these two must be non-null // re-throw the exception we got, since one of these two must be non-null
if (ie != null) { throw ie;
throw ie; }
} else if (ioe != null) {
throw ioe; if (!ioes.isEmpty()) {
} throw MultipleIOException.createIOException(ioes);
} }
} }
@ -373,7 +415,6 @@ public class StandbyCheckpointer {
// Reset checkpoint time so that we don't always checkpoint // Reset checkpoint time so that we don't always checkpoint
// on startup. // on startup.
lastCheckpointTime = monotonicNow(); lastCheckpointTime = monotonicNow();
lastUploadTime = monotonicNow();
while (shouldRun) { while (shouldRun) {
boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage(); boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();
if (!needRollbackCheckpoint) { if (!needRollbackCheckpoint) {
@ -426,10 +467,7 @@ public class StandbyCheckpointer {
// on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a // on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
// rollback request, are the checkpointer, are outside the quiet period. // rollback request, are the checkpointer, are outside the quiet period.
final long secsSinceLastUpload = (now - lastUploadTime) / 1000; doCheckpoint();
boolean sendRequest = isPrimaryCheckPointer
|| secsSinceLastUpload >= checkpointConf.getQuietPeriod();
doCheckpoint(sendRequest);
// reset needRollbackCheckpoint to false only when we finish a ckpt // reset needRollbackCheckpoint to false only when we finish a ckpt
// for rollback image // for rollback image

View File

@ -69,6 +69,7 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -968,6 +969,8 @@ public class MiniDFSCluster implements AutoCloseable {
} }
copyKeys(conf, nnConf, nnInfo.nameserviceId, nnInfo.nnId); copyKeys(conf, nnConf, nnInfo.nameserviceId, nnInfo.nnId);
} }
nn.nameNode.getHttpServer()
.setAttribute(ImageServlet.RECENT_IMAGE_CHECK_ENABLED, false);
} }
} }
@ -2172,6 +2175,8 @@ public class MiniDFSCluster implements AutoCloseable {
} }
NameNode nn = NameNode.createNameNode(args, info.conf); NameNode nn = NameNode.createNameNode(args, info.conf);
nn.getHttpServer()
.setAttribute(ImageServlet.RECENT_IMAGE_CHECK_ENABLED, false);
info.nameNode = nn; info.nameNode = nn;
info.setStartOpt(startOpt); info.setStartOpt(startOpt);
if (waitActive) { if (waitActive) {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI; import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.assertNNHasCheckpoints; import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.assertNNHasCheckpoints;
import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.getNameNodeCurrentDirs; import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.getNameNodeCurrentDirs;
import static org.apache.hadoop.hdfs.server.namenode.ImageServlet.RECENT_IMAGE_CHECK_ENABLED;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt; import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@ -2472,6 +2473,60 @@ public class TestCheckpoint {
} }
} }
@Test(timeout = 300000)
public void testActiveRejectSmallerDeltaImage() throws Exception {
MiniDFSCluster cluster = null;
Configuration conf = new HdfsConfiguration();
// Set the delta txid threshold to 10
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 10);
// Set the delta time threshold to some arbitrarily large value, so
// it does not trigger a checkpoint during this test.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 900000);
SecondaryNameNode secondary = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(true).build();
// enable small delta rejection
NameNode active = cluster.getNameNode();
active.httpServer.getHttpServer()
.setAttribute(RECENT_IMAGE_CHECK_ENABLED, true);
secondary = startSecondaryNameNode(conf);
FileSystem fs = cluster.getFileSystem();
assertEquals(0, active.getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
// create 5 dir.
for (int i = 0; i < 5; i++) {
fs.mkdirs(new Path("dir-" + i));
}
// Checkpoint 1st
secondary.doCheckpoint();
// at this point, the txid delta is smaller than threshold 10.
// active does not accept this image.
assertEquals(0, active.getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
// create another 10 dir.
for (int i = 0; i < 10; i++) {
fs.mkdirs(new Path("dir2-" + i));
}
// Checkpoint 2nd
secondary.doCheckpoint();
// here the delta is large enough and active accepts this image.
assertEquals(21, active.getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
} finally {
cleanup(secondary);
cleanup(cluster);
}
}
private static void cleanup(SecondaryNameNode snn) { private static void cleanup(SecondaryNameNode snn) {
if (snn != null) { if (snn != null) {
try { try {

View File

@ -253,6 +253,39 @@ public class TestStandbyCheckpoints {
FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.<String>of()); FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.<String>of());
} }
/**
* Test for the case of when there are observer NameNodes, Standby node is
* able to upload fsImage to Observer node as well.
*/
@Test(timeout = 300000)
public void testStandbyAndObserverState() throws Exception {
// Transition 2 to observer
cluster.transitionToObserver(2);
doEdits(0, 10);
// After a rollEditLog, Standby(nn1) 's next checkpoint would be
// ahead of observer(nn2).
nns[0].getRpcServer().rollEditLog();
// After standby creating a checkpoint, it will try to push the image to
// active and all observer, updating it's own txid to the most recent.
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
HATestUtil.waitForCheckpoint(cluster, 2, ImmutableList.of(12));
assertEquals(12, nns[2].getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
assertEquals(12, nns[1].getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
List<File> dirs = Lists.newArrayList();
// observer and standby both have this same image.
dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 2));
dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.of());
// Restore 2 back to standby
cluster.transitionToStandby(2);
}
/** /**
* Test for the case when the SBN is configured to checkpoint based * Test for the case when the SBN is configured to checkpoint based
* on a time period, but no transactions are happening on the * on a time period, but no transactions are happening on the