HDFS-2797. Fix misuses of InputStream#skip in the edit log code. Contributed by Colin Patrick McCabe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1348945 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-06-11 18:01:38 +00:00
parent 27d1c74a0c
commit 56d2ef6f5e
6 changed files with 48 additions and 5 deletions

View File

@ -308,6 +308,9 @@ Branch-2 ( Unreleased changes )
HDFS-3490. DatanodeWebHdfsMethods throws NullPointerException if HDFS-3490. DatanodeWebHdfsMethods throws NullPointerException if
NamenodeRpcAddressParam is not set. (szetszwo) NamenodeRpcAddressParam is not set. (szetszwo)
HDFS-2797. Fix misuses of InputStream#skip in the edit log code.
(Colin Patrick McCabe via eli)
BREAKDOWN OF HDFS-3042 SUBTASKS BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd) HDFS-2185. HDFS portion of ZK-based FailoverController (todd)

View File

@ -165,7 +165,8 @@ public class EditLogFileInputStream extends EditLogInputStream {
LOG.warn("skipping " + skipAmt + " bytes at the end " + LOG.warn("skipping " + skipAmt + " bytes at the end " +
"of edit log '" + getName() + "': reached txid " + txId + "of edit log '" + getName() + "': reached txid " + txId +
" out of " + lastTxId); " out of " + lastTxId);
tracker.skip(skipAmt); tracker.clearLimit();
IOUtils.skipFully(tracker, skipAmt);
} }
} }
} }

View File

@ -751,6 +751,11 @@ public class FSEditLogLoader {
limitPos = curPos + limit; limitPos = curPos + limit;
} }
@Override
public void clearLimit() {
limitPos = Long.MAX_VALUE;
}
@Override @Override
public void mark(int limit) { public void mark(int limit) {
super.mark(limit); super.mark(limit);

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.io.WritableFactories;
@ -2289,9 +2290,11 @@ public abstract class FSEditLogOp {
// 0xff, we want to skip over that region, because there's nothing // 0xff, we want to skip over that region, because there's nothing
// interesting there. // interesting there.
long numSkip = e.getNumAfterTerminator(); long numSkip = e.getNumAfterTerminator();
if (in.skip(numSkip) < numSkip) { try {
IOUtils.skipFully(in, numSkip);
} catch (Throwable t) {
FSImage.LOG.error("Failed to skip " + numSkip + " bytes of " + FSImage.LOG.error("Failed to skip " + numSkip + " bytes of " +
"garbage after an OP_INVALID. Unexpected early EOF."); "garbage after an OP_INVALID.", t);
return null; return null;
} }
} catch (IOException e) { } catch (IOException e) {

View File

@ -27,4 +27,9 @@ interface StreamLimiter {
* Set a limit. Calling this function clears any existing limit. * Set a limit. Calling this function clears any existing limit.
*/ */
public void setLimit(long limit); public void setLimit(long limit);
/**
* Disable limit.
*/
public void clearLimit();
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.io.File; import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Collections; import java.util.Collections;
@ -35,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test; import org.junit.Test;
@ -118,8 +120,8 @@ public class TestEditLogsDuringFailover {
} }
} }
@Test private void testFailoverFinalizesAndReadsInProgress(
public void testFailoverFinalizesAndReadsInProgress() throws Exception { boolean partialTxAtEnd) throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology()) .nnTopology(MiniDFSNNTopology.simpleHATopology())
@ -130,8 +132,21 @@ public class TestEditLogsDuringFailover {
URI sharedUri = cluster.getSharedEditsDir(0, 1); URI sharedUri = cluster.getSharedEditsDir(0, 1);
File sharedDir = new File(sharedUri.getPath(), "current"); File sharedDir = new File(sharedUri.getPath(), "current");
FSImageTestUtil.createAbortedLogWithMkdirs(sharedDir, NUM_DIRS_IN_LOG, 1); FSImageTestUtil.createAbortedLogWithMkdirs(sharedDir, NUM_DIRS_IN_LOG, 1);
assertEditFiles(Collections.singletonList(sharedUri), assertEditFiles(Collections.singletonList(sharedUri),
NNStorage.getInProgressEditsFileName(1)); NNStorage.getInProgressEditsFileName(1));
if (partialTxAtEnd) {
FileOutputStream outs = null;
try {
File editLogFile =
new File(sharedDir, NNStorage.getInProgressEditsFileName(1));
outs = new FileOutputStream(editLogFile, true);
outs.write(new byte[] { 0x18, 0x00, 0x00, 0x00 } );
LOG.error("editLogFile = " + editLogFile);
} finally {
IOUtils.cleanup(LOG, outs);
}
}
// Transition one of the NNs to active // Transition one of the NNs to active
cluster.transitionToActive(0); cluster.transitionToActive(0);
@ -149,7 +164,18 @@ public class TestEditLogsDuringFailover {
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
}
@Test
public void testFailoverFinalizesAndReadsInProgressSimple()
throws Exception {
testFailoverFinalizesAndReadsInProgress(false);
}
@Test
public void testFailoverFinalizesAndReadsInProgressWithPartialTxAtEnd()
throws Exception {
testFailoverFinalizesAndReadsInProgress(true);
} }
/** /**