HDFS-2800. Fix cancellation of checkpoints in the standby node to be more reliable. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1339744 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-05-17 17:49:36 +00:00
parent 84a39c2711
commit c86a4cbe93
9 changed files with 175 additions and 92 deletions

View File

@ -70,6 +70,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3434. InvalidProtocolBufferException when visiting DN HDFS-3434. InvalidProtocolBufferException when visiting DN
browseDirectory.jsp (eli) browseDirectory.jsp (eli)
HDFS-2800. Fix cancellation of checkpoints in the standby node to be more
reliable. (todd)
Release 2.0.0-alpha - UNRELEASED Release 2.0.0-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -89,9 +90,6 @@ public class FSImage implements Closeable {
private final NNStorageRetentionManager archivalManager; private final NNStorageRetentionManager archivalManager;
private SaveNamespaceContext curSaveNamespaceContext = null;
/** /**
* Construct an FSImage * Construct an FSImage
* @param conf Configuration * @param conf Configuration
@ -804,17 +802,28 @@ public class FSImage implements Closeable {
try { try {
thread.join(); thread.join();
} catch (InterruptedException iex) { } catch (InterruptedException iex) {
LOG.error("Caught exception while waiting for thread " + LOG.error("Caught interrupted exception while waiting for thread " +
thread.getName() + " to finish. Retrying join"); thread.getName() + " to finish. Retrying join");
} }
} }
} }
} }
/**
* @see #saveNamespace(FSNamesystem, Canceler)
*/
public synchronized void saveNamespace(FSNamesystem source)
throws IOException {
saveNamespace(source, null);
}
/** /**
* Save the contents of the FS image to a new image file in each of the * Save the contents of the FS image to a new image file in each of the
* current storage directories. * current storage directories.
* @param canceler
*/ */
public synchronized void saveNamespace(FSNamesystem source) throws IOException { public synchronized void saveNamespace(FSNamesystem source,
Canceler canceler) throws IOException {
assert editLog != null : "editLog must be initialized"; assert editLog != null : "editLog must be initialized";
storage.attemptRestoreRemovedStorage(); storage.attemptRestoreRemovedStorage();
@ -825,7 +834,7 @@ public class FSImage implements Closeable {
} }
long imageTxId = getLastAppliedOrWrittenTxId(); long imageTxId = getLastAppliedOrWrittenTxId();
try { try {
saveFSImageInAllDirs(source, imageTxId); saveFSImageInAllDirs(source, imageTxId, canceler);
storage.writeAll(); storage.writeAll();
} finally { } finally {
if (editLogWasOpen) { if (editLogWasOpen) {
@ -837,27 +846,27 @@ public class FSImage implements Closeable {
storage.writeTransactionIdFileToStorage(imageTxId + 1); storage.writeTransactionIdFileToStorage(imageTxId + 1);
} }
} }
}
public void cancelSaveNamespace(String reason)
throws InterruptedException {
SaveNamespaceContext ctx = curSaveNamespaceContext;
if (ctx != null) {
ctx.cancel(reason); // waits until complete
}
} }
/**
* @see #saveFSImageInAllDirs(FSNamesystem, long, Canceler)
*/
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid) protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
throws IOException {
saveFSImageInAllDirs(source, txid, null);
}
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid,
Canceler canceler)
throws IOException { throws IOException {
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) { if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException("No image directories available!"); throw new IOException("No image directories available!");
} }
if (canceler == null) {
canceler = new Canceler();
}
SaveNamespaceContext ctx = new SaveNamespaceContext( SaveNamespaceContext ctx = new SaveNamespaceContext(
source, txid); source, txid, canceler);
curSaveNamespaceContext = ctx;
try { try {
List<Thread> saveThreads = new ArrayList<Thread>(); List<Thread> saveThreads = new ArrayList<Thread>();
@ -878,7 +887,7 @@ public class FSImage implements Closeable {
throw new IOException( throw new IOException(
"Failed to save in any storage directories while saving namespace."); "Failed to save in any storage directories while saving namespace.");
} }
if (ctx.isCancelled()) { if (canceler.isCancelled()) {
deleteCancelledCheckpoint(txid); deleteCancelledCheckpoint(txid);
ctx.checkCancelled(); // throws ctx.checkCancelled(); // throws
assert false : "should have thrown above!"; assert false : "should have thrown above!";

View File

@ -540,7 +540,6 @@ class FSImageFormat {
private void saveImage(ByteBuffer currentDirName, private void saveImage(ByteBuffer currentDirName,
INodeDirectory current, INodeDirectory current,
DataOutputStream out) throws IOException { DataOutputStream out) throws IOException {
context.checkCancelled();
List<INode> children = current.getChildrenRaw(); List<INode> children = current.getChildrenRaw();
if (children == null || children.isEmpty()) if (children == null || children.isEmpty())
return; return;
@ -554,9 +553,13 @@ class FSImageFormat {
out.write(currentDirName.array(), 0, prefixLen); out.write(currentDirName.array(), 0, prefixLen);
} }
out.writeInt(children.size()); out.writeInt(children.size());
int i = 0;
for(INode child : children) { for(INode child : children) {
// print all children first // print all children first
FSImageSerialization.saveINode2Image(child, out); FSImageSerialization.saveINode2Image(child, out);
if (i++ % 50 == 0) {
context.checkCancelled();
}
} }
for(INode child : children) { for(INode child : children) {
if(!child.isDirectory()) if(!child.isDirectory())

View File

@ -687,7 +687,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/ */
void prepareToStopStandbyServices() throws ServiceFailedException { void prepareToStopStandbyServices() throws ServiceFailedException {
if (standbyCheckpointer != null) { if (standbyCheckpointer != null) {
standbyCheckpointer.cancelAndPreventCheckpoints(); standbyCheckpointer.cancelAndPreventCheckpoints(
"About to leave standby state");
} }
} }
@ -3357,27 +3358,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
} }
/**
* Cancel an ongoing saveNamespace operation and wait for its
* threads to exit, if one is currently in progress.
*
* If no such operation is in progress, this call does nothing.
*
* @param reason a reason to be communicated to the caller saveNamespace
* @throws IOException
*/
void cancelSaveNamespace(String reason) throws IOException {
readLock();
try {
checkSuperuserPrivilege();
getFSImage().cancelSaveNamespace(reason);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
readUnlock();
}
}
/** /**
* Enables/Disables/Checks restoring failed storage replicas if the storage becomes available again. * Enables/Disables/Checks restoring failed storage replicas if the storage becomes available again.
* Requires superuser privilege. * Requires superuser privilege.

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.util.Canceler;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -36,20 +37,17 @@ class SaveNamespaceContext {
private final long txid; private final long txid;
private final List<StorageDirectory> errorSDs = private final List<StorageDirectory> errorSDs =
Collections.synchronizedList(new ArrayList<StorageDirectory>()); Collections.synchronizedList(new ArrayList<StorageDirectory>());
/**
* If the operation has been canceled, set to the reason why
* it has been canceled (eg standby moving to active)
*/
private volatile String cancelReason = null;
private final Canceler canceller;
private CountDownLatch completionLatch = new CountDownLatch(1); private CountDownLatch completionLatch = new CountDownLatch(1);
SaveNamespaceContext( SaveNamespaceContext(
FSNamesystem sourceNamesystem, FSNamesystem sourceNamesystem,
long txid) { long txid,
Canceler canceller) {
this.sourceNamesystem = sourceNamesystem; this.sourceNamesystem = sourceNamesystem;
this.txid = txid; this.txid = txid;
this.canceller = canceller;
} }
FSNamesystem getSourceNamesystem() { FSNamesystem getSourceNamesystem() {
@ -68,17 +66,6 @@ class SaveNamespaceContext {
return errorSDs; return errorSDs;
} }
/**
* Requests that the current saveNamespace operation be
* canceled if it is still running.
* @param reason the reason why cancellation is requested
* @throws InterruptedException
*/
void cancel(String reason) throws InterruptedException {
this.cancelReason = reason;
completionLatch.await();
}
void markComplete() { void markComplete() {
Preconditions.checkState(completionLatch.getCount() == 1, Preconditions.checkState(completionLatch.getCount() == 1,
"Context already completed!"); "Context already completed!");
@ -86,13 +73,9 @@ class SaveNamespaceContext {
} }
void checkCancelled() throws SaveNamespaceCancelledException { void checkCancelled() throws SaveNamespaceCancelledException {
if (cancelReason != null) { if (canceller.isCancelled()) {
throw new SaveNamespaceCancelledException( throw new SaveNamespaceCancelledException(
cancelReason); canceller.getCancellationReason());
} }
} }
boolean isCancelled() {
return cancelReason != null;
}
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException; import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -58,6 +59,9 @@ public class StandbyCheckpointer {
private final CheckpointerThread thread; private final CheckpointerThread thread;
private String activeNNAddress; private String activeNNAddress;
private InetSocketAddress myNNAddress; private InetSocketAddress myNNAddress;
private Object cancelLock = new Object();
private Canceler canceler;
// Keep track of how many checkpoints were canceled. // Keep track of how many checkpoints were canceled.
// This is for use in tests. // This is for use in tests.
@ -123,6 +127,7 @@ public class StandbyCheckpointer {
} }
public void stop() throws IOException { public void stop() throws IOException {
cancelAndPreventCheckpoints("Stopping checkpointer");
thread.setShouldRun(false); thread.setShouldRun(false);
thread.interrupt(); thread.interrupt();
try { try {
@ -134,6 +139,7 @@ public class StandbyCheckpointer {
} }
private void doCheckpoint() throws InterruptedException, IOException { private void doCheckpoint() throws InterruptedException, IOException {
assert canceler != null;
long txid; long txid;
namesystem.writeLockInterruptibly(); namesystem.writeLockInterruptibly();
@ -153,8 +159,8 @@ public class StandbyCheckpointer {
thisCheckpointTxId + ". Skipping..."); thisCheckpointTxId + ". Skipping...");
return; return;
} }
img.saveNamespace(namesystem); img.saveNamespace(namesystem, canceler);
txid = img.getStorage().getMostRecentCheckpointTxId(); txid = img.getStorage().getMostRecentCheckpointTxId();
assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" + assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
thisCheckpointTxId + " but instead saved at txid=" + txid; thisCheckpointTxId + " but instead saved at txid=" + txid;
@ -173,16 +179,18 @@ public class StandbyCheckpointer {
* and prevent any new checkpoints from starting for the next * and prevent any new checkpoints from starting for the next
* minute or so. * minute or so.
*/ */
public void cancelAndPreventCheckpoints() throws ServiceFailedException { public void cancelAndPreventCheckpoints(String msg) throws ServiceFailedException {
try { thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS);
thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS); synchronized (cancelLock) {
// TODO(HA): there is a really narrow race here if we are just // Before beginning a checkpoint, the checkpointer thread
// about to start a checkpoint - this won't cancel it! // takes this lock, and creates a canceler object.
namesystem.getFSImage().cancelSaveNamespace( // If the canceler is non-null, then a checkpoint is in
"About to exit standby state"); // progress and we need to cancel it. If it's null, then
} catch (InterruptedException e) { // the operation has not started, meaning that the above
throw new ServiceFailedException( // time-based prevention will take effect.
"Interrupted while trying to cancel checkpoint"); if (canceler != null) {
canceler.cancel(msg);
}
} }
} }
@ -272,10 +280,18 @@ public class StandbyCheckpointer {
"exceeds the configured interval " + checkpointConf.getPeriod()); "exceeds the configured interval " + checkpointConf.getPeriod());
needCheckpoint = true; needCheckpoint = true;
} }
if (needCheckpoint && now < preventCheckpointsUntil) {
LOG.info("But skipping this checkpoint since we are about to failover!"); synchronized (cancelLock) {
canceledCount++; if (now < preventCheckpointsUntil) {
} else if (needCheckpoint) { LOG.info("But skipping this checkpoint since we are about to failover!");
canceledCount++;
continue;
}
assert canceler == null;
canceler = new Canceler();
}
if (needCheckpoint) {
doCheckpoint(); doCheckpoint();
lastCheckpointTime = now; lastCheckpointTime = now;
} }
@ -287,6 +303,10 @@ public class StandbyCheckpointer {
continue; continue;
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("Exception in doCheckpoint", t); LOG.error("Exception in doCheckpoint", t);
} finally {
synchronized (cancelLock) {
canceler = null;
}
} }
} }
} }

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Provides a simple interface where one thread can mark an operation
* for cancellation, and another thread can poll for whether the
* cancellation has occurred.
*/
@InterfaceAudience.Private
public class Canceler {
/**
* If the operation has been canceled, set to the reason why
* it has been canceled (eg standby moving to active)
*/
private volatile String cancelReason = null;
/**
* Requests that the current operation be canceled if it is still running.
* This does not block until the cancellation is successful.
* @param reason the reason why cancellation is requested
*/
public void cancel(String reason) {
this.cancelReason = reason;
}
public boolean isCancelled() {
return cancelReason != null;
}
public String getCancellationReason() {
return cancelReason;
}
}

View File

@ -52,8 +52,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -546,14 +547,15 @@ public class TestSaveNamespace {
try { try {
doAnEdit(fsn, 1); doAnEdit(fsn, 1);
final Canceler canceler = new Canceler();
// Save namespace // Save namespace
fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER); fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
try { try {
Future<Void> saverFuture = pool.submit(new Callable<Void>() { Future<Void> saverFuture = pool.submit(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
image.saveNamespace(finalFsn); image.saveNamespace(finalFsn, canceler);
return null; return null;
} }
}); });
@ -563,7 +565,7 @@ public class TestSaveNamespace {
// then cancel the saveNamespace // then cancel the saveNamespace
Future<Void> cancelFuture = pool.submit(new Callable<Void>() { Future<Void> cancelFuture = pool.submit(new Callable<Void>() {
public Void call() throws Exception { public Void call() throws Exception {
image.cancelSaveNamespace("cancelled"); canceler.cancel("cancelled");
return null; return null;
} }
}); });

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.*;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
@ -36,6 +37,11 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -52,12 +58,18 @@ public class TestStandbyCheckpoints {
private NameNode nn0, nn1; private NameNode nn0, nn1;
private FileSystem fs; private FileSystem fs;
@SuppressWarnings("rawtypes")
@Before @Before
public void setupCluster() throws Exception { public void setupCluster() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
SlowCodec.class.getCanonicalName());
CompressionCodecFactory.setCodecClasses(conf,
ImmutableList.<Class>of(SlowCodec.class));
MiniDFSNNTopology topology = new MiniDFSNNTopology() MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1") .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
@ -159,14 +171,15 @@ public class TestStandbyCheckpoints {
// We should make exactly one checkpoint at this new txid. // We should make exactly one checkpoint at this new txid.
Mockito.verify(spyImage1, Mockito.times(1)) Mockito.verify(spyImage1, Mockito.times(1))
.saveNamespace((FSNamesystem) Mockito.anyObject()); .saveNamespace((FSNamesystem) Mockito.anyObject(),
(Canceler)Mockito.anyObject());
} }
/** /**
* Test cancellation of ongoing checkpoints when failover happens * Test cancellation of ongoing checkpoints when failover happens
* mid-checkpoint. * mid-checkpoint.
*/ */
@Test @Test(timeout=120000)
public void testCheckpointCancellation() throws Exception { public void testCheckpointCancellation() throws Exception {
cluster.transitionToStandby(0); cluster.transitionToStandby(0);
@ -191,16 +204,18 @@ public class TestStandbyCheckpoints {
cluster.transitionToActive(0); cluster.transitionToActive(0);
for (int i = 0; i < 10; i++) { boolean canceledOne = false;
for (int i = 0; i < 10 && !canceledOne; i++) {
doEdits(i*10, i*10 + 10); doEdits(i*10, i*10 + 10);
cluster.transitionToStandby(0); cluster.transitionToStandby(0);
cluster.transitionToActive(1); cluster.transitionToActive(1);
cluster.transitionToStandby(1); cluster.transitionToStandby(1);
cluster.transitionToActive(0); cluster.transitionToActive(0);
canceledOne = StandbyCheckpointer.getCanceledCount() > 0;
} }
assertTrue(StandbyCheckpointer.getCanceledCount() > 0); assertTrue(canceledOne);
} }
private void doEdits(int start, int stop) throws IOException { private void doEdits(int start, int stop) throws IOException {
@ -209,5 +224,22 @@ public class TestStandbyCheckpoints {
fs.mkdirs(p); fs.mkdirs(p);
} }
} }
/**
* A codec which just slows down the saving of the image significantly
* by sleeping a few milliseconds on every write. This makes it easy to
* catch the standby in the middle of saving a checkpoint.
*/
public static class SlowCodec extends GzipCodec {
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
CompressionOutputStream ret = super.createOutputStream(out);
CompressionOutputStream spy = Mockito.spy(ret);
Mockito.doAnswer(new GenericTestUtils.SleepAnswer(2))
.when(spy).write(Mockito.<byte[]>any(), Mockito.anyInt(), Mockito.anyInt());
return spy;
}
}
} }