HDFS-6243. Merging change r1587410 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1587416 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-04-15 04:59:49 +00:00
parent d2bb90033e
commit 7c89cbfcd2
5 changed files with 89 additions and 8 deletions

View File

@ -93,6 +93,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6238. TestDirectoryScanner leaks file descriptors. (cnauroth) HDFS-6238. TestDirectoryScanner leaks file descriptors. (cnauroth)
HDFS-6243. HA NameNode transition to active or shutdown may leave lingering
image transfer thread. (cnauroth)
Release 2.4.1 - UNRELEASED Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.web.URLConnectionFactory; import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -193,14 +194,32 @@ public class TransferFsImage {
* @param storage the storage directory to transfer the image from * @param storage the storage directory to transfer the image from
* @param nnf the NameNodeFile type of the image * @param nnf the NameNodeFile type of the image
* @param txid the transaction ID of the image to be uploaded * @param txid the transaction ID of the image to be uploaded
* @throws IOException if there is an I/O error
*/ */
public static void uploadImageFromStorage(URL fsName, Configuration conf, public static void uploadImageFromStorage(URL fsName, Configuration conf,
NNStorage storage, NameNodeFile nnf, long txid) throws IOException { NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
uploadImageFromStorage(fsName, conf, storage, nnf, txid, null);
}
/**
* Requests that the NameNode download an image from this node. Allows for
* optional external cancelation.
*
* @param fsName the http address for the remote NN
* @param conf Configuration
* @param storage the storage directory to transfer the image from
* @param nnf the NameNodeFile type of the image
* @param txid the transaction ID of the image to be uploaded
* @param canceler optional canceler to check for abort of upload
* @throws IOException if there is an I/O error or cancellation
*/
public static void uploadImageFromStorage(URL fsName, Configuration conf,
NNStorage storage, NameNodeFile nnf, long txid, Canceler canceler)
throws IOException {
URL url = new URL(fsName, ImageServlet.PATH_SPEC); URL url = new URL(fsName, ImageServlet.PATH_SPEC);
long startTime = Time.monotonicNow(); long startTime = Time.monotonicNow();
try { try {
uploadImage(url, conf, storage, nnf, txid); uploadImage(url, conf, storage, nnf, txid, canceler);
} catch (HttpPutFailedException e) { } catch (HttpPutFailedException e) {
if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) { if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) {
// this is OK - this means that a previous attempt to upload // this is OK - this means that a previous attempt to upload
@ -223,7 +242,8 @@ public class TransferFsImage {
* Uploads the imagefile using HTTP PUT method * Uploads the imagefile using HTTP PUT method
*/ */
private static void uploadImage(URL url, Configuration conf, private static void uploadImage(URL url, Configuration conf,
NNStorage storage, NameNodeFile nnf, long txId) throws IOException { NNStorage storage, NameNodeFile nnf, long txId, Canceler canceler)
throws IOException {
File imageFile = storage.findImageFile(nnf, txId); File imageFile = storage.findImageFile(nnf, txId);
if (imageFile == null) { if (imageFile == null) {
@ -267,7 +287,7 @@ public class TransferFsImage {
ImageServlet.setVerificationHeadersForPut(connection, imageFile); ImageServlet.setVerificationHeadersForPut(connection, imageFile);
// Write the file to output stream. // Write the file to output stream.
writeFileToPutRequest(conf, connection, imageFile); writeFileToPutRequest(conf, connection, imageFile, canceler);
int responseCode = connection.getResponseCode(); int responseCode = connection.getResponseCode();
if (responseCode != HttpURLConnection.HTTP_OK) { if (responseCode != HttpURLConnection.HTTP_OK) {
@ -286,7 +306,7 @@ public class TransferFsImage {
} }
private static void writeFileToPutRequest(Configuration conf, private static void writeFileToPutRequest(Configuration conf,
HttpURLConnection connection, File imageFile) HttpURLConnection connection, File imageFile, Canceler canceler)
throws FileNotFoundException, IOException { throws FileNotFoundException, IOException {
connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream"); connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary"); connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
@ -294,7 +314,7 @@ public class TransferFsImage {
FileInputStream input = new FileInputStream(imageFile); FileInputStream input = new FileInputStream(imageFile);
try { try {
copyFileToStream(output, imageFile, input, copyFileToStream(output, imageFile, input,
ImageServlet.getThrottler(conf)); ImageServlet.getThrottler(conf), canceler);
} finally { } finally {
IOUtils.closeStream(input); IOUtils.closeStream(input);
IOUtils.closeStream(output); IOUtils.closeStream(output);
@ -308,6 +328,12 @@ public class TransferFsImage {
public static void copyFileToStream(OutputStream out, File localfile, public static void copyFileToStream(OutputStream out, File localfile,
FileInputStream infile, DataTransferThrottler throttler) FileInputStream infile, DataTransferThrottler throttler)
throws IOException { throws IOException {
copyFileToStream(out, localfile, infile, throttler, null);
}
private static void copyFileToStream(OutputStream out, File localfile,
FileInputStream infile, DataTransferThrottler throttler,
Canceler canceler) throws IOException {
byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE]; byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
try { try {
CheckpointFaultInjector.getInstance() CheckpointFaultInjector.getInstance()
@ -324,6 +350,10 @@ public class TransferFsImage {
} }
int num = 1; int num = 1;
while (num > 0) { while (num > 0) {
if (canceler != null && canceler.isCancelled()) {
throw new SaveNamespaceCancelledException(
canceler.getCancellationReason());
}
num = infile.read(buf); num = infile.read(buf);
if (num <= 0) { if (num <= 0) {
break; break;
@ -337,7 +367,7 @@ public class TransferFsImage {
out.write(buf, 0, num); out.write(buf, 0, num);
if (throttler != null) { if (throttler != null) {
throttler.throttle(num); throttler.throttle(num, canceler);
} }
} }
} finally { } finally {

View File

@ -196,13 +196,18 @@ public class StandbyCheckpointer {
@Override @Override
public Void call() throws IOException { public Void call() throws IOException {
TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
namesystem.getFSImage().getStorage(), imageType, txid); namesystem.getFSImage().getStorage(), imageType, txid, canceler);
return null; return null;
} }
}); });
executor.shutdown(); executor.shutdown();
try { try {
upload.get(); upload.get();
} catch (InterruptedException e) {
// The background thread may be blocked waiting in the throttler, so
// interrupt it.
upload.cancel(true);
throw e;
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw new IOException("Exception during image upload: " + e.getMessage(), throw new IOException("Exception during image upload: " + e.getMessage(),
e.getCause()); e.getCause());

View File

@ -81,6 +81,19 @@ public class DataTransferThrottler {
* number of bytes sent/received since last time throttle was called * number of bytes sent/received since last time throttle was called
*/ */
public synchronized void throttle(long numOfBytes) { public synchronized void throttle(long numOfBytes) {
throttle(numOfBytes, null);
}
/** Given the numOfBytes sent/received since last time throttle was called,
* make the current thread sleep if I/O rate is too fast
* compared to the given bandwidth. Allows for optional external cancelation.
*
* @param numOfBytes
* number of bytes sent/received since last time throttle was called
* @param canceler
* optional canceler to check for abort of throttle
*/
public synchronized void throttle(long numOfBytes, Canceler canceler) {
if ( numOfBytes <= 0 ) { if ( numOfBytes <= 0 ) {
return; return;
} }
@ -89,6 +102,9 @@ public class DataTransferThrottler {
bytesAlreadyUsed += numOfBytes; bytesAlreadyUsed += numOfBytes;
while (curReserve <= 0) { while (curReserve <= 0) {
if (canceler != null && canceler.isCancelled()) {
return;
}
long now = monotonicNow(); long now = monotonicNow();
long curPeriodEnd = curPeriodStart + period; long curPeriodEnd = curPeriodStart + period;

View File

@ -25,6 +25,9 @@ import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.List; import java.util.List;
@ -59,6 +62,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -270,6 +274,29 @@ public class TestStandbyCheckpoints {
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(104)); HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(104));
cluster.transitionToStandby(0); cluster.transitionToStandby(0);
cluster.transitionToActive(1); cluster.transitionToActive(1);
// Wait to make sure background TransferFsImageUpload thread was cancelled.
// This needs to be done before the next test in the suite starts, so that a
// file descriptor is not held open during the next cluster init.
cluster.shutdown();
cluster = null;
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threads = threadBean.getThreadInfo(
threadBean.getAllThreadIds(), 1);
for (ThreadInfo thread: threads) {
if (thread.getThreadName().startsWith("TransferFsImageUpload")) {
return false;
}
}
return true;
}
}, 1000, 30000);
// Assert that former active did not accept the canceled checkpoint file.
assertEquals(0, nn0.getFSImage().getMostRecentCheckpointTxId());
} }
/** /**