HDFS-3519. Checkpoint upload may interfere with a concurrent saveNamespace. Contributed by Ming Ma.
This commit is contained in:
parent
dcd14db1a8
commit
639c7b0ddc
|
@ -478,6 +478,9 @@ Release 2.7.0 - UNRELEASED
|
|||
HDFS-7575. Upgrade should generate a unique storage ID for each
|
||||
volume. (Arpit Agarwal)
|
||||
|
||||
HDFS-3519. Checkpoint upload may interfere with a concurrent saveNamespace.
|
||||
(Ming Ma via cnauroth)
|
||||
|
||||
Release 2.6.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -29,9 +29,11 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -97,6 +99,15 @@ public class FSImage implements Closeable {
|
|||
|
||||
protected NNStorageRetentionManager archivalManager;
|
||||
|
||||
/* Used to make sure there are no concurrent checkpoints for a given txid
|
||||
* The checkpoint here could be one of the following operations.
|
||||
* a. checkpoint when NN is in standby.
|
||||
* b. admin saveNameSpace operation.
|
||||
* c. download checkpoint file from any remote checkpointer.
|
||||
*/
|
||||
private final Set<Long> currentlyCheckpointing =
|
||||
Collections.<Long>synchronizedSet(new HashSet<Long>());
|
||||
|
||||
/**
|
||||
* Construct an FSImage
|
||||
* @param conf Configuration
|
||||
|
@ -1058,18 +1069,26 @@ public class FSImage implements Closeable {
|
|||
editLog.endCurrentLogSegment(true);
|
||||
}
|
||||
long imageTxId = getLastAppliedOrWrittenTxId();
|
||||
if (!addToCheckpointing(imageTxId)) {
|
||||
throw new IOException(
|
||||
"FS image is being downloaded from another NN at txid " + imageTxId);
|
||||
}
|
||||
try {
|
||||
saveFSImageInAllDirs(source, nnf, imageTxId, canceler);
|
||||
storage.writeAll();
|
||||
} finally {
|
||||
if (editLogWasOpen) {
|
||||
editLog.startLogSegment(imageTxId + 1, true);
|
||||
// Take this opportunity to note the current transaction.
|
||||
// Even if the namespace save was cancelled, this marker
|
||||
// is only used to determine what transaction ID is required
|
||||
// for startup. So, it doesn't hurt to update it unnecessarily.
|
||||
storage.writeTransactionIdFileToStorage(imageTxId + 1);
|
||||
try {
|
||||
saveFSImageInAllDirs(source, nnf, imageTxId, canceler);
|
||||
storage.writeAll();
|
||||
} finally {
|
||||
if (editLogWasOpen) {
|
||||
editLog.startLogSegment(imageTxId + 1, true);
|
||||
// Take this opportunity to note the current transaction.
|
||||
// Even if the namespace save was cancelled, this marker
|
||||
// is only used to determine what transaction ID is required
|
||||
// for startup. So, it doesn't hurt to update it unnecessarily.
|
||||
storage.writeTransactionIdFileToStorage(imageTxId + 1);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
removeFromCheckpointing(imageTxId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1078,7 +1097,22 @@ public class FSImage implements Closeable {
|
|||
*/
|
||||
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
|
||||
throws IOException {
|
||||
saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null);
|
||||
if (!addToCheckpointing(txid)) {
|
||||
throw new IOException(("FS image is being downloaded from another NN"));
|
||||
}
|
||||
try {
|
||||
saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null);
|
||||
} finally {
|
||||
removeFromCheckpointing(txid);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean addToCheckpointing(long txid) {
|
||||
return currentlyCheckpointing.add(txid);
|
||||
}
|
||||
|
||||
public void removeFromCheckpointing(long txid) {
|
||||
currentlyCheckpointing.remove(txid);
|
||||
}
|
||||
|
||||
private synchronized void saveFSImageInAllDirs(FSNamesystem source,
|
||||
|
|
|
@ -81,9 +81,6 @@ public class ImageServlet extends HttpServlet {
|
|||
private static final String LATEST_FSIMAGE_VALUE = "latest";
|
||||
private static final String IMAGE_FILE_TYPE = "imageFile";
|
||||
|
||||
private static final Set<Long> currentlyDownloadingCheckpoints =
|
||||
Collections.synchronizedSet(new HashSet<Long>());
|
||||
|
||||
@Override
|
||||
public void doGet(final HttpServletRequest request,
|
||||
final HttpServletResponse response) throws ServletException, IOException {
|
||||
|
@ -467,17 +464,20 @@ public class ImageServlet extends HttpServlet {
|
|||
|
||||
final NameNodeFile nnf = parsedParams.getNameNodeFile();
|
||||
|
||||
if (!currentlyDownloadingCheckpoints.add(txid)) {
|
||||
if (!nnImage.addToCheckpointing(txid)) {
|
||||
response.sendError(HttpServletResponse.SC_CONFLICT,
|
||||
"Another checkpointer is already in the process of uploading a"
|
||||
+ " checkpoint made at transaction ID " + txid);
|
||||
"Either current namenode is checkpointing or another"
|
||||
+ " checkpointer is already in the process of "
|
||||
+ "uploading a checkpoint made at transaction ID "
|
||||
+ txid);
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
|
||||
response.sendError(HttpServletResponse.SC_CONFLICT,
|
||||
"Another checkpointer already uploaded an checkpoint "
|
||||
+ "for txid " + txid);
|
||||
"Either current namenode has checkpointed or "
|
||||
+ "another checkpointer already uploaded an "
|
||||
+ "checkpoint for txid " + txid);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -502,7 +502,7 @@ public class ImageServlet extends HttpServlet {
|
|||
stream.close();
|
||||
}
|
||||
} finally {
|
||||
currentlyDownloadingCheckpoints.remove(txid);
|
||||
nnImage.removeFromCheckpointing(txid);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -466,7 +466,7 @@ public class TestStandbyCheckpoints {
|
|||
throws IOException {
|
||||
CompressionOutputStream ret = super.createOutputStream(out);
|
||||
CompressionOutputStream spy = Mockito.spy(ret);
|
||||
Mockito.doAnswer(new GenericTestUtils.SleepAnswer(2))
|
||||
Mockito.doAnswer(new GenericTestUtils.SleepAnswer(5))
|
||||
.when(spy).write(Mockito.<byte[]>any(), Mockito.anyInt(), Mockito.anyInt());
|
||||
return spy;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue