HDFS-12979. [SBN read] StandbyNode should upload FsImage to ObserverNode after checkpointing. Contributed by Chen Liang.

This commit is contained in:
Chen Liang 2019-07-17 14:37:16 -07:00
parent 885dd17595
commit 05a1eb788f
6 changed files with 254 additions and 56 deletions

View File

@ -17,7 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.server.common.Util;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.net.HttpURLConnection;
@ -88,6 +94,10 @@ public class ImageServlet extends HttpServlet {
private SortedSet<ImageUploadRequest> currentlyDownloadingCheckpoints = Collections
.<ImageUploadRequest> synchronizedSortedSet(new TreeSet<ImageUploadRequest>());
public static final String RECENT_IMAGE_CHECK_ENABLED =
"recent.image.check.enabled";
public static final boolean RECENT_IMAGE_CHECK_ENABLED_DEFAULT = true;
@Override
public void doGet(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException {
@ -481,6 +491,23 @@ public class ImageServlet extends HttpServlet {
final PutImageParams parsedParams = new PutImageParams(request, response,
conf);
final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
final boolean checkRecentImageEnable;
Object checkRecentImageEnableObj =
context.getAttribute(RECENT_IMAGE_CHECK_ENABLED);
if (checkRecentImageEnableObj != null) {
if (checkRecentImageEnableObj instanceof Boolean) {
checkRecentImageEnable = (boolean) checkRecentImageEnableObj;
} else {
// This is an error case, but crashing NN due to this
// seems more undesirable. Only log the error and set to default.
LOG.error("Expecting boolean obj for setting checking recent image, "
+ "but got " + checkRecentImageEnableObj.getClass() + ". This is "
+ "unexpected! Setting to default.");
checkRecentImageEnable = RECENT_IMAGE_CHECK_ENABLED_DEFAULT;
}
} else {
checkRecentImageEnable = RECENT_IMAGE_CHECK_ENABLED_DEFAULT;
}
validateRequest(context, conf, request, response, nnImage,
parsedParams.getStorageInfoString());
@ -494,7 +521,8 @@ public class ImageServlet extends HttpServlet {
// target (regardless of the fact that we got the image)
HAServiceProtocol.HAServiceState state = NameNodeHttpServer
.getNameNodeStateFromContext(getServletContext());
if (state != HAServiceProtocol.HAServiceState.ACTIVE) {
if (state != HAServiceProtocol.HAServiceState.ACTIVE &&
state != HAServiceProtocol.HAServiceState.OBSERVER) {
// we need a different response type here so the client can differentiate this
// from the failure to upload due to (1) security, or (2) other checkpoints already
// present
@ -528,6 +556,39 @@ public class ImageServlet extends HttpServlet {
+ txid);
return null;
}
long now = System.currentTimeMillis();
long lastCheckpointTime =
nnImage.getStorage().getMostRecentCheckpointTime();
long lastCheckpointTxid =
nnImage.getStorage().getMostRecentCheckpointTxId();
long checkpointPeriod =
conf.getTimeDuration(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT, TimeUnit.SECONDS);
long checkpointTxnCount =
conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
long timeDelta = TimeUnit.MILLISECONDS.toSeconds(
now - lastCheckpointTime);
if (checkRecentImageEnable &&
timeDelta < checkpointPeriod &&
txid - lastCheckpointTxid < checkpointTxnCount) {
// only when at least one of two conditions are met we accept
// a new fsImage
// 1. most recent image's txid is too far behind
// 2. last checkpoint time was too old
response.sendError(HttpServletResponse.SC_CONFLICT,
"Most recent checkpoint is neither too far behind in "
+ "txid, nor too old. New txnid cnt is "
+ (txid - lastCheckpointTxid)
+ ", expecting at least " + checkpointTxnCount
+ " unless too long since last upload.");
return null;
}
try {
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
response.sendError(HttpServletResponse.SC_CONFLICT,

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.RetriableException;
@ -435,6 +436,11 @@ public class NameNode extends ReconfigurableBase implements
return rpcServer;
}
@VisibleForTesting
public HttpServer2 getHttpServer() {
return httpServer.getHttpServer();
}
public void queueExternalCall(ExternalCall<?> extCall)
throws IOException, InterruptedException {
if (rpcServer == null) {

View File

@ -19,14 +19,16 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.apache.hadoop.util.Time.monotonicNow;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -67,7 +69,6 @@ public class StandbyCheckpointer {
private final Configuration conf;
private final FSNamesystem namesystem;
private long lastCheckpointTime;
private long lastUploadTime;
private final CheckpointerThread thread;
private final ThreadFactory uploadThreadFactory;
private List<URL> activeNNAddresses;
@ -75,12 +76,14 @@ public class StandbyCheckpointer {
private final Object cancelLock = new Object();
private Canceler canceler;
private boolean isPrimaryCheckPointer = true;
// Keep track of how many checkpoints were canceled.
// This is for use in tests.
private static int canceledCount = 0;
// A map from NN url to the most recent image upload time.
private final HashMap<String, CheckpointReceiverEntry> checkpointReceivers;
public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
throws IOException {
this.namesystem = ns;
@ -89,8 +92,38 @@ public class StandbyCheckpointer {
this.thread = new CheckpointerThread();
this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("TransferFsImageUpload-%d").build();
setNameNodeAddresses(conf);
this.checkpointReceivers = new HashMap<>();
for (URL address : activeNNAddresses) {
this.checkpointReceivers.put(address.toString(),
new CheckpointReceiverEntry());
}
}
private static final class CheckpointReceiverEntry {
private long lastUploadTime;
private boolean isPrimary;
CheckpointReceiverEntry() {
this.lastUploadTime = 0L;
this.isPrimary = true;
}
void setLastUploadTime(long lastUploadTime) {
this.lastUploadTime = lastUploadTime;
}
void setIsPrimary(boolean isPrimaryFor) {
this.isPrimary = isPrimaryFor;
}
long getLastUploadTime() {
return lastUploadTime;
}
boolean isPrimary() {
return isPrimary;
}
}
/**
@ -158,7 +191,7 @@ public class StandbyCheckpointer {
thread.interrupt();
}
private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException {
private void doCheckpoint() throws InterruptedException, IOException {
assert canceler != null;
final long txid;
final NameNodeFile imageType;
@ -210,11 +243,6 @@ public class StandbyCheckpointer {
namesystem.cpUnlock();
}
//early exit if we shouldn't actually send the checkpoint to the ANN
if(!sendCheckpoint){
return;
}
// Upload the saved checkpoint back to the active
// Do this in a separate thread to avoid blocking transition to active, but don't allow more
// than the expected number of tasks to run or queue up
@ -224,56 +252,70 @@ public class StandbyCheckpointer {
uploadThreadFactory);
// for right now, just match the upload to the nn address by convention. There is no need to
// directly tie them together by adding a pair class.
List<Future<TransferFsImage.TransferResult>> uploads =
new ArrayList<Future<TransferFsImage.TransferResult>>();
HashMap<String, Future<TransferFsImage.TransferResult>> uploads =
new HashMap<>();
for (final URL activeNNAddress : activeNNAddresses) {
// Upload image if at least 1 of 2 following conditions met:
// 1. has been quiet for long enough, try to contact the node.
// 2. this standby IS the primary checkpointer of target NN.
String addressString = activeNNAddress.toString();
assert checkpointReceivers.containsKey(addressString);
CheckpointReceiverEntry receiverEntry =
checkpointReceivers.get(addressString);
long secsSinceLastUpload =
TimeUnit.MILLISECONDS.toSeconds(
monotonicNow() - receiverEntry.getLastUploadTime());
boolean shouldUpload = receiverEntry.isPrimary() ||
secsSinceLastUpload >= checkpointConf.getQuietPeriod();
if (shouldUpload) {
Future<TransferFsImage.TransferResult> upload =
executor.submit(new Callable<TransferFsImage.TransferResult>() {
@Override
public TransferFsImage.TransferResult call()
throws IOException, InterruptedException {
CheckpointFaultInjector.getInstance().duringUploadInProgess();
return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
.getFSImage().getStorage(), imageType, txid, canceler);
return TransferFsImage.uploadImageFromStorage(activeNNAddress,
conf, namesystem.getFSImage().getStorage(), imageType, txid,
canceler);
}
});
uploads.add(upload);
uploads.put(addressString, upload);
}
}
InterruptedException ie = null;
IOException ioe= null;
int i = 0;
boolean success = false;
for (; i < uploads.size(); i++) {
Future<TransferFsImage.TransferResult> upload = uploads.get(i);
List<IOException> ioes = Lists.newArrayList();
for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :
uploads.entrySet()) {
String url = entry.getKey();
Future<TransferFsImage.TransferResult> upload = entry.getValue();
try {
// TODO should there be some smarts here about retries nodes that are not the active NN?
// TODO should there be some smarts here about retries nodes that
// are not the active NN?
CheckpointReceiverEntry receiverEntry = checkpointReceivers.get(url);
if (upload.get() == TransferFsImage.TransferResult.SUCCESS) {
success = true;
//avoid getting the rest of the results - we don't care since we had a successful upload
break;
receiverEntry.setLastUploadTime(monotonicNow());
receiverEntry.setIsPrimary(true);
} else {
receiverEntry.setIsPrimary(false);
}
} catch (ExecutionException e) {
ioe = new IOException("Exception during image upload", e);
break;
// Even if exception happens, still proceeds to next NN url.
// so that fail to upload to previous NN does not cause the
// remaining NN not getting the fsImage.
ioes.add(new IOException("Exception during image upload", e));
} catch (InterruptedException e) {
ie = e;
break;
}
}
if (ie == null && ioe == null) {
//Update only when response from remote about success or
lastUploadTime = monotonicNow();
// we are primary if we successfully updated the ANN
this.isPrimaryCheckPointer = success;
}
// cleaner than copying code for multiple catch statements and better than catching all
// exceptions, so we just handle the ones we expect.
if (ie != null || ioe != null) {
if (ie != null) {
// cancel the rest of the tasks, and close the pool
for (; i < uploads.size(); i++) {
Future<TransferFsImage.TransferResult> upload = uploads.get(i);
for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :
uploads.entrySet()) {
Future<TransferFsImage.TransferResult> upload = entry.getValue();
// The background thread may be blocked waiting in the throttler, so
// interrupt it.
upload.cancel(true);
@ -286,11 +328,11 @@ public class StandbyCheckpointer {
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
// re-throw the exception we got, since one of these two must be non-null
if (ie != null) {
throw ie;
} else if (ioe != null) {
throw ioe;
}
if (!ioes.isEmpty()) {
throw MultipleIOException.createIOException(ioes);
}
}
@ -373,7 +415,6 @@ public class StandbyCheckpointer {
// Reset checkpoint time so that we don't always checkpoint
// on startup.
lastCheckpointTime = monotonicNow();
lastUploadTime = monotonicNow();
while (shouldRun) {
boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();
if (!needRollbackCheckpoint) {
@ -426,10 +467,7 @@ public class StandbyCheckpointer {
// on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
// rollback request, are the checkpointer, are outside the quiet period.
final long secsSinceLastUpload = (now - lastUploadTime) / 1000;
boolean sendRequest = isPrimaryCheckPointer
|| secsSinceLastUpload >= checkpointConf.getQuietPeriod();
doCheckpoint(sendRequest);
doCheckpoint();
// reset needRollbackCheckpoint to false only when we finish a ckpt
// for rollback image

View File

@ -67,6 +67,7 @@ import java.util.concurrent.TimeoutException;
import com.google.common.base.Supplier;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -969,6 +970,8 @@ public class MiniDFSCluster implements AutoCloseable {
}
copyKeys(conf, nnConf, nnInfo.nameserviceId, nnInfo.nnId);
}
nn.nameNode.getHttpServer()
.setAttribute(ImageServlet.RECENT_IMAGE_CHECK_ENABLED, false);
}
}
@ -2184,6 +2187,8 @@ public class MiniDFSCluster implements AutoCloseable {
}
NameNode nn = NameNode.createNameNode(args, info.conf);
nn.getHttpServer()
.setAttribute(ImageServlet.RECENT_IMAGE_CHECK_ENABLED, false);
info.nameNode = nn;
info.setStartOpt(startOpt);
if (waitActive) {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.assertNNHasCheckpoints;
import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.getNameNodeCurrentDirs;
import static org.apache.hadoop.hdfs.server.namenode.ImageServlet.RECENT_IMAGE_CHECK_ENABLED;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@ -2472,6 +2473,60 @@ public class TestCheckpoint {
}
}
@Test(timeout = 300000)
public void testActiveRejectSmallerDeltaImage() throws Exception {
MiniDFSCluster cluster = null;
Configuration conf = new HdfsConfiguration();
// Set the delta txid threshold to 10
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 10);
// Set the delta time threshold to some arbitrarily large value, so
// it does not trigger a checkpoint during this test.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 900000);
SecondaryNameNode secondary = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(true).build();
// enable small delta rejection
NameNode active = cluster.getNameNode();
active.httpServer.getHttpServer()
.setAttribute(RECENT_IMAGE_CHECK_ENABLED, true);
secondary = startSecondaryNameNode(conf);
FileSystem fs = cluster.getFileSystem();
assertEquals(0, active.getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
// create 5 dir.
for (int i = 0; i < 5; i++) {
fs.mkdirs(new Path("dir-" + i));
}
// Checkpoint 1st
secondary.doCheckpoint();
// at this point, the txid delta is smaller than threshold 10.
// active does not accept this image.
assertEquals(0, active.getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
// create another 10 dir.
for (int i = 0; i < 10; i++) {
fs.mkdirs(new Path("dir2-" + i));
}
// Checkpoint 2nd
secondary.doCheckpoint();
// here the delta is large enough and active accepts this image.
assertEquals(21, active.getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
} finally {
cleanup(secondary);
cleanup(cluster);
}
}
private static void cleanup(SecondaryNameNode snn) {
if (snn != null) {
try {

View File

@ -253,6 +253,39 @@ public class TestStandbyCheckpoints {
FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.<String>of());
}
/**
* Test for the case of when there are observer NameNodes, Standby node is
* able to upload fsImage to Observer node as well.
*/
@Test(timeout = 300000)
public void testStandbyAndObserverState() throws Exception {
// Transition 2 to observer
cluster.transitionToObserver(2);
doEdits(0, 10);
// After a rollEditLog, Standby(nn1) 's next checkpoint would be
// ahead of observer(nn2).
nns[0].getRpcServer().rollEditLog();
// After standby creating a checkpoint, it will try to push the image to
// active and all observer, updating it's own txid to the most recent.
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
HATestUtil.waitForCheckpoint(cluster, 2, ImmutableList.of(12));
assertEquals(12, nns[2].getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
assertEquals(12, nns[1].getNamesystem().getFSImage()
.getMostRecentCheckpointTxId());
List<File> dirs = Lists.newArrayList();
// observer and standby both have this same image.
dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 2));
dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.of());
// Restore 2 back to standby
cluster.transitionToStandby(2);
}
/**
* Test for the case when the SBN is configured to checkpoint based
* on a time period, but no transactions are happening on the