HDFS-3440. More effectively limit stream memory consumption when reading corrupt edit logs. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1339979 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-05-18 05:28:26 +00:00
parent b1d3b518c2
commit 0e7ea9b56c
9 changed files with 117 additions and 7 deletions

View File

@ -41,6 +41,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3419. Cleanup LocatedBlock. (eli)
HDFS-3440. More effectively limit stream memory consumption when reading
corrupt edit logs (Colin Patrick McCabe via todd)
OPTIMIZATIONS
BUG FIXES

View File

@ -75,7 +75,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
DataInputStream in = new DataInputStream(tracker);
reader = new FSEditLogOp.Reader(in, logVersion);
reader = new FSEditLogOp.Reader(in, tracker, logVersion);
}
@Override

View File

@ -119,7 +119,7 @@ void setBytes(byte[] newBytes, int version) throws IOException {
this.version = version;
reader = new FSEditLogOp.Reader(in, version);
reader = new FSEditLogOp.Reader(in, tracker, version);
}
void clear() throws IOException {

View File

@ -85,7 +85,7 @@ public EditLogFileInputStream(File name, long firstTxId, long lastTxId,
throw new LogHeaderCorruptException("No header found in log");
}
reader = new FSEditLogOp.Reader(in, logVersion);
reader = new FSEditLogOp.Reader(in, tracker, logVersion);
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.isInProgress = isInProgress;

View File

@ -723,17 +723,31 @@ long getNumTransactions() {
/**
* Stream wrapper that keeps track of the current stream position.
*
* This stream also allows us to set a limit on how many bytes we can read
* without getting an exception.
*/
public static class PositionTrackingInputStream extends FilterInputStream {
public static class PositionTrackingInputStream extends FilterInputStream
implements StreamLimiter {
private long curPos = 0;
private long markPos = -1;
private long limitPos = Long.MAX_VALUE;
public PositionTrackingInputStream(InputStream is) {
super(is);
}
private void checkLimit(long amt) throws IOException {
long extra = (curPos + amt) - limitPos;
if (extra > 0) {
throw new IOException("Tried to read " + amt + " byte(s) past " +
"the limit at offset " + limitPos);
}
}
@Override
public int read() throws IOException {
checkLimit(1);
int ret = super.read();
if (ret != -1) curPos++;
return ret;
@ -741,6 +755,7 @@ public int read() throws IOException {
@Override
public int read(byte[] data) throws IOException {
checkLimit(data.length);
int ret = super.read(data);
if (ret > 0) curPos += ret;
return ret;
@ -748,11 +763,17 @@ public int read(byte[] data) throws IOException {
@Override
public int read(byte[] data, int offset, int length) throws IOException {
checkLimit(length);
int ret = super.read(data, offset, length);
if (ret > 0) curPos += ret;
return ret;
}
@Override
public void setLimit(long limit) {
limitPos = curPos + limit;
}
@Override
public void mark(int limit) {
super.mark(limit);
@ -775,6 +796,11 @@ public long getPos() {
@Override
public long skip(long amt) throws IOException {
long extra = (curPos + amt) - limitPos;
if (extra > 0) {
throw new IOException("Tried to skip " + extra + " bytes past " +
"the limit at offset " + limitPos);
}
long ret = super.skip(amt);
curPos += ret;
return ret;

View File

@ -75,7 +75,10 @@
public abstract class FSEditLogOp {
public final FSEditLogOpCodes opCode;
long txid;
private static final int MAX_OP_SIZE = 100 * 1024 * 1024;
/**
* Opcode size is limited to 1.5 megabytes
*/
public static final int MAX_OP_SIZE = (3 * 1024 * 1024) / 2;
@SuppressWarnings("deprecation")
@ -2229,6 +2232,7 @@ public void writeOp(FSEditLogOp op) throws IOException {
*/
public static class Reader {
private final DataInputStream in;
private final StreamLimiter limiter;
private final int logVersion;
private final Checksum checksum;
private final OpInstanceCache cache;
@ -2239,7 +2243,7 @@ public static class Reader {
* @param logVersion The version of the data coming from the stream.
*/
@SuppressWarnings("deprecation")
public Reader(DataInputStream in, int logVersion) {
public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
this.logVersion = logVersion;
if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
this.checksum = new PureJavaCrc32();
@ -2253,6 +2257,7 @@ public Reader(DataInputStream in, int logVersion) {
} else {
this.in = in;
}
this.limiter = limiter;
this.cache = new OpInstanceCache();
}
@ -2272,6 +2277,7 @@ public Reader(DataInputStream in, int logVersion) {
public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
while (true) {
try {
limiter.setLimit(MAX_OP_SIZE);
in.mark(MAX_OP_SIZE);
return decodeOp();
} catch (GarbageAfterTerminatorException e) {

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
/**
* An object that allows you to set a limit on a stream. This limit
* represents the number of bytes that can be read without getting an
* exception.
*/
interface StreamLimiter {
/**
* Set a limit. Calling this function clears any existing limit.
*/
public void setLimit(long limit);
}

View File

@ -766,7 +766,7 @@ public EditLogByteInputStream(byte[] data) throws IOException {
tracker = new FSEditLogLoader.PositionTrackingInputStream(in);
in = new DataInputStream(tracker);
reader = new FSEditLogOp.Reader(in, version);
reader = new FSEditLogOp.Reader(in, tracker, version);
}
@Override

View File

@ -22,8 +22,10 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
@ -316,4 +318,47 @@ private static long getNonTrailerLength(File f) throws IOException {
fis.close();
}
}
@Test
public void testStreamLimiter() throws IOException {
final File LIMITER_TEST_FILE = new File(TEST_DIR, "limiter.test");
FileOutputStream fos = new FileOutputStream(LIMITER_TEST_FILE);
try {
fos.write(0x12);
fos.write(0x12);
fos.write(0x12);
} finally {
fos.close();
}
FileInputStream fin = new FileInputStream(LIMITER_TEST_FILE);
BufferedInputStream bin = new BufferedInputStream(fin);
FSEditLogLoader.PositionTrackingInputStream tracker =
new FSEditLogLoader.PositionTrackingInputStream(bin);
try {
tracker.setLimit(2);
tracker.mark(100);
tracker.read();
tracker.read();
try {
tracker.read();
fail("expected to get IOException after reading past the limit");
} catch (IOException e) {
}
tracker.reset();
tracker.mark(100);
byte arr[] = new byte[3];
try {
tracker.read(arr);
fail("expected to get IOException after reading past the limit");
} catch (IOException e) {
}
tracker.reset();
arr = new byte[2];
tracker.read(arr);
} finally {
tracker.close();
}
}
}