HDFS-4816. transitionToActive blocks if the SBN is doing checkpoint image transfer. (Andrew Wang)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1514095 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0cb5f08149
commit
49a892056d
|
@ -267,6 +267,9 @@ Release 2.3.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-5065. TestSymlinkHdfsDisable fails on Windows. (ivanmi)
|
HDFS-5065. TestSymlinkHdfsDisable fails on Windows. (ivanmi)
|
||||||
|
|
||||||
|
HDFS-4816. transitionToActive blocks if the SBN is doing checkpoint image
|
||||||
|
transfer. (Andrew Wang)
|
||||||
|
|
||||||
Release 2.1.1-beta - UNRELEASED
|
Release 2.1.1-beta - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -17,9 +17,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.util.Time.now;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -38,10 +46,10 @@ import org.apache.hadoop.hdfs.util.Canceler;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import static org.apache.hadoop.util.Time.now;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thread which runs inside the NN when it's in Standby state,
|
* Thread which runs inside the NN when it's in Standby state,
|
||||||
|
@ -57,6 +65,7 @@ public class StandbyCheckpointer {
|
||||||
private final FSNamesystem namesystem;
|
private final FSNamesystem namesystem;
|
||||||
private long lastCheckpointTime;
|
private long lastCheckpointTime;
|
||||||
private final CheckpointerThread thread;
|
private final CheckpointerThread thread;
|
||||||
|
private final ThreadFactory uploadThreadFactory;
|
||||||
private String activeNNAddress;
|
private String activeNNAddress;
|
||||||
private InetSocketAddress myNNAddress;
|
private InetSocketAddress myNNAddress;
|
||||||
|
|
||||||
|
@ -72,6 +81,8 @@ public class StandbyCheckpointer {
|
||||||
this.namesystem = ns;
|
this.namesystem = ns;
|
||||||
this.checkpointConf = new CheckpointConf(conf);
|
this.checkpointConf = new CheckpointConf(conf);
|
||||||
this.thread = new CheckpointerThread();
|
this.thread = new CheckpointerThread();
|
||||||
|
this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
|
||||||
|
.setNameFormat("TransferFsImageUpload-%d").build();
|
||||||
|
|
||||||
setNameNodeAddresses(conf);
|
setNameNodeAddresses(conf);
|
||||||
}
|
}
|
||||||
|
@ -142,7 +153,7 @@ public class StandbyCheckpointer {
|
||||||
|
|
||||||
private void doCheckpoint() throws InterruptedException, IOException {
|
private void doCheckpoint() throws InterruptedException, IOException {
|
||||||
assert canceler != null;
|
assert canceler != null;
|
||||||
long txid;
|
final long txid;
|
||||||
|
|
||||||
namesystem.writeLockInterruptibly();
|
namesystem.writeLockInterruptibly();
|
||||||
try {
|
try {
|
||||||
|
@ -171,9 +182,26 @@ public class StandbyCheckpointer {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upload the saved checkpoint back to the active
|
// Upload the saved checkpoint back to the active
|
||||||
TransferFsImage.uploadImageFromStorage(
|
// Do this in a separate thread to avoid blocking transition to active
|
||||||
activeNNAddress, myNNAddress,
|
// See HDFS-4816
|
||||||
namesystem.getFSImage().getStorage(), txid);
|
ExecutorService executor =
|
||||||
|
Executors.newSingleThreadExecutor(uploadThreadFactory);
|
||||||
|
Future<Void> upload = executor.submit(new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws IOException {
|
||||||
|
TransferFsImage.uploadImageFromStorage(
|
||||||
|
activeNNAddress, myNNAddress,
|
||||||
|
namesystem.getFSImage().getStorage(), txid);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
executor.shutdown();
|
||||||
|
try {
|
||||||
|
upload.get();
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new IOException("Exception during image upload: " + e.getMessage(),
|
||||||
|
e.getCause());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -301,6 +329,7 @@ public class StandbyCheckpointer {
|
||||||
LOG.info("Checkpoint was cancelled: " + ce.getMessage());
|
LOG.info("Checkpoint was cancelled: " + ce.getMessage());
|
||||||
canceledCount++;
|
canceledCount++;
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.info("Interrupted during checkpointing", ie);
|
||||||
// Probably requested shutdown.
|
// Probably requested shutdown.
|
||||||
continue;
|
continue;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
|
|
@ -240,6 +240,34 @@ public class TestStandbyCheckpoints {
|
||||||
assertTrue(canceledOne);
|
assertTrue(canceledOne);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test cancellation of ongoing checkpoints when failover happens
|
||||||
|
* mid-checkpoint during image upload from standby to active NN.
|
||||||
|
*/
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testCheckpointCancellationDuringUpload() throws Exception {
|
||||||
|
// don't compress, we want a big image
|
||||||
|
cluster.getConfiguration(0).setBoolean(
|
||||||
|
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
|
||||||
|
cluster.getConfiguration(1).setBoolean(
|
||||||
|
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
|
||||||
|
// Throttle SBN upload to make it hang during upload to ANN
|
||||||
|
cluster.getConfiguration(1).setLong(
|
||||||
|
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 100);
|
||||||
|
cluster.restartNameNode(0);
|
||||||
|
cluster.restartNameNode(1);
|
||||||
|
nn0 = cluster.getNameNode(0);
|
||||||
|
nn1 = cluster.getNameNode(1);
|
||||||
|
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
doEdits(0, 100);
|
||||||
|
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
|
||||||
|
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(104));
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make sure that clients will receive StandbyExceptions even when a
|
* Make sure that clients will receive StandbyExceptions even when a
|
||||||
* checkpoint is in progress on the SBN, and therefore the StandbyCheckpointer
|
* checkpoint is in progress on the SBN, and therefore the StandbyCheckpointer
|
||||||
|
|
Loading…
Reference in New Issue