diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 740ccad56ec..c64fe75049d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -93,6 +93,9 @@ Release 2.5.0 - UNRELEASED HDFS-6238. TestDirectoryScanner leaks file descriptors. (cnauroth) + HDFS-6243. HA NameNode transition to active or shutdown may leave lingering + image transfer thread. (cnauroth) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java index 088e06a43e0..ca42d075d4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; +import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.web.URLConnectionFactory; import org.apache.hadoop.io.IOUtils; @@ -193,14 +194,32 @@ static void downloadEditsToStorage(URL fsName, RemoteEditLog log, * @param storage the storage directory to transfer the image from * @param nnf the NameNodeFile type of the image * @param txid the transaction ID of the image to be uploaded + * @throws IOException if there is an I/O error */ public static void uploadImageFromStorage(URL fsName, Configuration conf, NNStorage storage, NameNodeFile nnf, long txid) throws IOException { - + uploadImageFromStorage(fsName, conf, storage, nnf, txid, null); + } + + /** + * Requests that the NameNode download an image from this node. Allows for + * optional external cancelation. + * + * @param fsName the http address for the remote NN + * @param conf Configuration + * @param storage the storage directory to transfer the image from + * @param nnf the NameNodeFile type of the image + * @param txid the transaction ID of the image to be uploaded + * @param canceler optional canceler to check for abort of upload + * @throws IOException if there is an I/O error or cancellation + */ + public static void uploadImageFromStorage(URL fsName, Configuration conf, + NNStorage storage, NameNodeFile nnf, long txid, Canceler canceler) + throws IOException { URL url = new URL(fsName, ImageServlet.PATH_SPEC); long startTime = Time.monotonicNow(); try { - uploadImage(url, conf, storage, nnf, txid); + uploadImage(url, conf, storage, nnf, txid, canceler); } catch (HttpPutFailedException e) { if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) { // this is OK - this means that a previous attempt to upload @@ -223,7 +242,8 @@ public static void uploadImageFromStorage(URL fsName, Configuration conf, * Uploads the imagefile using HTTP PUT method */ private static void uploadImage(URL url, Configuration conf, - NNStorage storage, NameNodeFile nnf, long txId) throws IOException { + NNStorage storage, NameNodeFile nnf, long txId, Canceler canceler) + throws IOException { File imageFile = storage.findImageFile(nnf, txId); if (imageFile == null) { @@ -267,7 +287,7 @@ private static void uploadImage(URL url, Configuration conf, ImageServlet.setVerificationHeadersForPut(connection, imageFile); // Write the file to output stream. - writeFileToPutRequest(conf, connection, imageFile); + writeFileToPutRequest(conf, connection, imageFile, canceler); int responseCode = connection.getResponseCode(); if (responseCode != HttpURLConnection.HTTP_OK) { @@ -286,7 +306,7 @@ private static void uploadImage(URL url, Configuration conf, } private static void writeFileToPutRequest(Configuration conf, - HttpURLConnection connection, File imageFile) + HttpURLConnection connection, File imageFile, Canceler canceler) throws FileNotFoundException, IOException { connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream"); connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary"); @@ -294,7 +314,7 @@ private static void writeFileToPutRequest(Configuration conf, FileInputStream input = new FileInputStream(imageFile); try { copyFileToStream(output, imageFile, input, - ImageServlet.getThrottler(conf)); + ImageServlet.getThrottler(conf), canceler); } finally { IOUtils.closeStream(input); IOUtils.closeStream(output); @@ -308,6 +328,12 @@ private static void writeFileToPutRequest(Configuration conf, public static void copyFileToStream(OutputStream out, File localfile, FileInputStream infile, DataTransferThrottler throttler) throws IOException { + copyFileToStream(out, localfile, infile, throttler, null); + } + + private static void copyFileToStream(OutputStream out, File localfile, + FileInputStream infile, DataTransferThrottler throttler, + Canceler canceler) throws IOException { byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE]; try { CheckpointFaultInjector.getInstance() @@ -324,6 +350,10 @@ public static void copyFileToStream(OutputStream out, File localfile, } int num = 1; while (num > 0) { + if (canceler != null && canceler.isCancelled()) { + throw new SaveNamespaceCancelledException( + canceler.getCancellationReason()); + } num = infile.read(buf); if (num <= 0) { break; @@ -337,7 +367,7 @@ public static void copyFileToStream(OutputStream out, File localfile, out.write(buf, 0, num); if (throttler != null) { - throttler.throttle(num); + throttler.throttle(num, canceler); } } } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java index 27e5b3c488c..8e51b3df1c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java @@ -196,13 +196,18 @@ private void doCheckpoint() throws InterruptedException, IOException { @Override public Void call() throws IOException { TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, - namesystem.getFSImage().getStorage(), imageType, txid); + namesystem.getFSImage().getStorage(), imageType, txid, canceler); return null; } }); executor.shutdown(); try { upload.get(); + } catch (InterruptedException e) { + // The background thread may be blocked waiting in the throttler, so + // interrupt it. + upload.cancel(true); + throw e; } catch (ExecutionException e) { throw new IOException("Exception during image upload: " + e.getMessage(), e.getCause()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java index cf33ca3a453..537cb5e4378 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java @@ -81,6 +81,19 @@ public synchronized void setBandwidth(long bytesPerSecond) { * number of bytes sent/received since last time throttle was called */ public synchronized void throttle(long numOfBytes) { + throttle(numOfBytes, null); + } + + /** Given the numOfBytes sent/received since last time throttle was called, + * make the current thread sleep if I/O rate is too fast + * compared to the given bandwidth. Allows for optional external cancelation. + * + * @param numOfBytes + * number of bytes sent/received since last time throttle was called + * @param canceler + * optional canceler to check for abort of throttle + */ + public synchronized void throttle(long numOfBytes, Canceler canceler) { if ( numOfBytes <= 0 ) { return; } @@ -89,6 +102,9 @@ public synchronized void throttle(long numOfBytes) { bytesAlreadyUsed += numOfBytes; while (curReserve <= 0) { + if (canceler != null && canceler.isCancelled()) { + return; + } long now = monotonicNow(); long curPeriodEnd = curPeriodStart + period; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java index f3616bdebfe..67a590548f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java @@ -25,6 +25,9 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.net.URI; import java.net.URL; import java.util.List; @@ -59,6 +62,7 @@ import org.junit.Test; import org.mockito.Mockito; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -270,6 +274,29 @@ public void testCheckpointCancellationDuringUpload() throws Exception { HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(104)); cluster.transitionToStandby(0); cluster.transitionToActive(1); + + // Wait to make sure background TransferFsImageUpload thread was cancelled. + // This needs to be done before the next test in the suite starts, so that a + // file descriptor is not held open during the next cluster init. + cluster.shutdown(); + cluster = null; + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + ThreadInfo[] threads = threadBean.getThreadInfo( + threadBean.getAllThreadIds(), 1); + for (ThreadInfo thread: threads) { + if (thread.getThreadName().startsWith("TransferFsImageUpload")) { + return false; + } + } + return true; + } + }, 1000, 30000); + + // Assert that former active did not accept the canceled checkpoint file. + assertEquals(0, nn0.getFSImage().getMostRecentCheckpointTxId()); } /**