HDFS-3049. During the normal NN startup process, fall back on a different edit log if we see one that is corrupt. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1417588 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-12-05 19:13:29 +00:00
parent d6a9d9f76c
commit 857c428a90
8 changed files with 481 additions and 43 deletions

View File

@ -128,6 +128,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4199. Provide test for HdfsVolumeId. (Ivan A. Veselovsky via atm) HDFS-4199. Provide test for HdfsVolumeId. (Ivan A. Veselovsky via atm)
HDFS-3049. During the normal NN startup process, fall back on a different
edit log if we see one that is corrupt (Colin Patrick McCabe via todd)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -1145,18 +1145,6 @@ public class FSEditLog implements LogsPurgeable {
throw e; throw e;
} }
} }
// This code will go away as soon as RedundantEditLogInputStream is
// introduced. (HDFS-3049)
try {
if (!streams.isEmpty()) {
streams.get(0).skipUntil(fromTxId);
}
} catch (IOException e) {
// We don't want to throw an exception from here, because that would make
// recovery impossible even if the user requested it. An exception will
// be thrown later, when we don't read the starting txid we expect.
LOG.error("error skipping until transaction " + fromTxId, e);
}
return streams; return streams;
} }

View File

@ -665,7 +665,9 @@ public class FSEditLogLoader {
FSImage.LOG.warn("Caught exception after reading " + numValid + FSImage.LOG.warn("Caught exception after reading " + numValid +
" ops from " + in + " while determining its valid length." + " ops from " + in + " while determining its valid length." +
"Position was " + lastPos, t); "Position was " + lastPos, t);
break; in.resync();
FSImage.LOG.warn("After resync, position is " + in.getPosition());
continue;
} }
if (lastTxId == HdfsConstants.INVALID_TXID if (lastTxId == HdfsConstants.INVALID_TXID
|| op.getTransactionId() > lastTxId) { || op.getTransactionId() > lastTxId) {

View File

@ -24,7 +24,9 @@ import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.PriorityQueue;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -43,7 +45,6 @@ import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps; import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultiset;
/** /**
* Manages a collection of Journals. None of the methods are synchronized, it is * Manages a collection of Journals. None of the methods are synchronized, it is
@ -232,8 +233,9 @@ public class JournalSet implements JournalManager {
@Override @Override
public void selectInputStreams(Collection<EditLogInputStream> streams, public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) throws IOException { long fromTxId, boolean inProgressOk) throws IOException {
final TreeMultiset<EditLogInputStream> allStreams = final PriorityQueue<EditLogInputStream> allStreams =
TreeMultiset.create(EDIT_LOG_INPUT_STREAM_COMPARATOR); new PriorityQueue<EditLogInputStream>(64,
EDIT_LOG_INPUT_STREAM_COMPARATOR);
for (JournalAndStream jas : journals) { for (JournalAndStream jas : journals) {
if (jas.isDisabled()) { if (jas.isDisabled()) {
LOG.info("Skipping jas " + jas + " since it's disabled"); LOG.info("Skipping jas " + jas + " since it's disabled");
@ -249,7 +251,8 @@ public class JournalSet implements JournalManager {
// transaction ID. // transaction ID.
LinkedList<EditLogInputStream> acc = LinkedList<EditLogInputStream> acc =
new LinkedList<EditLogInputStream>(); new LinkedList<EditLogInputStream>();
for (EditLogInputStream elis : allStreams) { EditLogInputStream elis;
while ((elis = allStreams.poll()) != null) {
if (acc.isEmpty()) { if (acc.isEmpty()) {
acc.add(elis); acc.add(elis);
} else { } else {
@ -257,7 +260,7 @@ public class JournalSet implements JournalManager {
if (accFirstTxId == elis.getFirstTxId()) { if (accFirstTxId == elis.getFirstTxId()) {
acc.add(elis); acc.add(elis);
} else if (accFirstTxId < elis.getFirstTxId()) { } else if (accFirstTxId < elis.getFirstTxId()) {
streams.add(acc.get(0)); streams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear(); acc.clear();
acc.add(elis); acc.add(elis);
} else if (accFirstTxId > elis.getFirstTxId()) { } else if (accFirstTxId > elis.getFirstTxId()) {
@ -268,7 +271,7 @@ public class JournalSet implements JournalManager {
} }
} }
if (!acc.isEmpty()) { if (!acc.isEmpty()) {
streams.add(acc.get(0)); streams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear(); acc.clear();
} }
} }

View File

@ -0,0 +1,270 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
/**
* A merged input stream that handles failover between different edit logs.
*
* We will currently try each edit log stream exactly once. In other words, we
* don't handle the "ping pong" scenario where different edit logs contain a
* different subset of the available edits.
*/
class RedundantEditLogInputStream extends EditLogInputStream {
public static final Log LOG = LogFactory.getLog(EditLogInputStream.class.getName());
private int curIdx;
private long prevTxId;
private final EditLogInputStream[] streams;
/**
* States that the RedundantEditLogInputStream can be in.
*
* <pre>
* start (if no streams)
* |
* V
* PrematureEOFException +----------------+
* +-------------->| EOF |<--------------+
* | +----------------+ |
* | |
* | start (if there are streams) |
* | | |
* | V | EOF
* | resync +----------------+ skipUntil +---------+
* | +---------->| SKIP_UNTIL |----------->| OK |
* | | +----------------+ +---------+
* | | | IOE ^ fail over to | IOE
* | | V | next stream |
* +----------------------+ +----------------+ |
* | STREAM_FAILED_RESYNC | | STREAM_FAILED |<----------+
* +----------------------+ +----------------+
* ^ Recovery mode |
* +--------------------+
* </pre>
*/
static private enum State {
/** We need to skip until prevTxId + 1 */
SKIP_UNTIL,
/** We're ready to read opcodes out of the current stream */
OK,
/** The current stream has failed. */
STREAM_FAILED,
/** The current stream has failed, and resync() was called. */
STREAM_FAILED_RESYNC,
/** There are no more opcodes to read from this
* RedundantEditLogInputStream */
EOF;
}
private State state;
private IOException prevException;
RedundantEditLogInputStream(Collection<EditLogInputStream> streams,
long startTxId) {
this.curIdx = 0;
this.prevTxId = (startTxId == HdfsConstants.INVALID_TXID) ?
HdfsConstants.INVALID_TXID : (startTxId - 1);
this.state = (streams.isEmpty()) ? State.EOF : State.SKIP_UNTIL;
this.prevException = null;
// EditLogInputStreams in a RedundantEditLogInputStream must be finalized,
// and can't be pre-transactional.
EditLogInputStream first = null;
for (EditLogInputStream s : streams) {
Preconditions.checkArgument(s.getFirstTxId() !=
HdfsConstants.INVALID_TXID, "invalid first txid in stream: %s", s);
Preconditions.checkArgument(s.getLastTxId() !=
HdfsConstants.INVALID_TXID, "invalid last txid in stream: %s", s);
if (first == null) {
first = s;
} else {
Preconditions.checkArgument(s.getFirstTxId() == first.getFirstTxId(),
"All streams in the RedundantEditLogInputStream must have the same " +
"start transaction ID! " + first + " had start txId " +
first.getFirstTxId() + ", but " + s + " had start txId " +
s.getFirstTxId());
}
}
this.streams = streams.toArray(new EditLogInputStream[0]);
// We sort the streams here so that the streams that end later come first.
Arrays.sort(this.streams, new Comparator<EditLogInputStream>() {
@Override
public int compare(EditLogInputStream a, EditLogInputStream b) {
return Longs.compare(b.getLastTxId(), a.getLastTxId());
}
});
}
@Override
public String getName() {
StringBuilder bld = new StringBuilder();
String prefix = "";
for (EditLogInputStream elis : streams) {
bld.append(prefix);
bld.append(elis.getName());
prefix = ", ";
}
return bld.toString();
}
@Override
public long getFirstTxId() {
return streams[curIdx].getFirstTxId();
}
@Override
public long getLastTxId() {
return streams[curIdx].getLastTxId();
}
@Override
public void close() throws IOException {
IOUtils.cleanup(LOG, streams);
}
@Override
protected FSEditLogOp nextValidOp() {
try {
if (state == State.STREAM_FAILED) {
state = State.STREAM_FAILED_RESYNC;
}
return nextOp();
} catch (IOException e) {
return null;
}
}
@Override
protected FSEditLogOp nextOp() throws IOException {
while (true) {
switch (state) {
case SKIP_UNTIL:
try {
if (prevTxId != HdfsConstants.INVALID_TXID) {
LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
"' to transaction ID " + (prevTxId + 1));
streams[curIdx].skipUntil(prevTxId + 1);
}
} catch (IOException e) {
prevException = e;
state = State.STREAM_FAILED;
}
state = State.OK;
break;
case OK:
try {
FSEditLogOp op = streams[curIdx].readOp();
if (op == null) {
state = State.EOF;
if (streams[curIdx].getLastTxId() == prevTxId) {
return null;
} else {
throw new PrematureEOFException("got premature end-of-file " +
"at txid " + prevTxId + "; expected file to go up to " +
streams[curIdx].getLastTxId());
}
}
prevTxId = op.getTransactionId();
return op;
} catch (IOException e) {
prevException = e;
state = State.STREAM_FAILED;
}
break;
case STREAM_FAILED:
if (curIdx + 1 == streams.length) {
throw prevException;
}
long oldLast = streams[curIdx].getLastTxId();
long newLast = streams[curIdx + 1].getLastTxId();
if (newLast < oldLast) {
throw new IOException("We encountered an error reading " +
streams[curIdx].getName() + ". During automatic edit log " +
"failover, we noticed that all of the remaining edit log " +
"streams are shorter than the current one! The best " +
"remaining edit log ends at transaction " +
newLast + ", but we thought we could read up to transaction " +
oldLast + ". If you continue, metadata will be lost forever!");
}
LOG.error("Got error reading edit log input stream " +
streams[curIdx].getName() + "; failing over to edit log " +
streams[curIdx + 1].getName(), prevException);
curIdx++;
state = State.SKIP_UNTIL;
break;
case STREAM_FAILED_RESYNC:
if (curIdx + 1 == streams.length) {
if (prevException instanceof PrematureEOFException) {
// bypass early EOF check
state = State.EOF;
} else {
streams[curIdx].resync();
state = State.SKIP_UNTIL;
}
} else {
LOG.error("failing over to edit log " +
streams[curIdx + 1].getName());
curIdx++;
state = State.SKIP_UNTIL;
}
break;
case EOF:
return null;
}
}
}
@Override
public int getVersion() throws IOException {
return streams[curIdx].getVersion();
}
@Override
public long getPosition() {
return streams[curIdx].getPosition();
}
@Override
public long length() throws IOException {
return streams[curIdx].length();
}
@Override
public boolean isInProgress() {
return streams[curIdx].isInProgress();
}
static private final class PrematureEOFException extends IOException {
private static final long serialVersionUID = 1L;
PrematureEOFException(String msg) {
super(msg);
}
}
}

View File

@ -38,6 +38,7 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -531,21 +532,29 @@ public class TestEditLog {
FSImage fsimage = namesystem.getFSImage(); FSImage fsimage = namesystem.getFSImage();
final FSEditLog editLog = fsimage.getEditLog(); final FSEditLog editLog = fsimage.getEditLog();
fileSys.mkdirs(new Path("/tmp")); fileSys.mkdirs(new Path("/tmp"));
StorageDirectory sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next();
Iterator<StorageDirectory> iter = fsimage.getStorage().
dirIterator(NameNodeDirType.EDITS);
LinkedList<StorageDirectory> sds = new LinkedList<StorageDirectory>();
while (iter.hasNext()) {
sds.add(iter.next());
}
editLog.close(); editLog.close();
cluster.shutdown(); cluster.shutdown();
File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 3); for (StorageDirectory sd : sds) {
assertTrue(editFile.exists()); File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 3);
assertTrue(editFile.exists());
long fileLen = editFile.length(); long fileLen = editFile.length();
System.out.println("File name: " + editFile + " len: " + fileLen); LOG.debug("Corrupting Log File: " + editFile + " len: " + fileLen);
RandomAccessFile rwf = new RandomAccessFile(editFile, "rw"); RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
rwf.seek(fileLen-4); // seek to checksum bytes rwf.seek(fileLen-4); // seek to checksum bytes
int b = rwf.readInt(); int b = rwf.readInt();
rwf.seek(fileLen-4); rwf.seek(fileLen-4);
rwf.writeInt(b+1); rwf.writeInt(b+1);
rwf.close(); rwf.close();
}
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).format(false).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).format(false).build();
@ -1264,6 +1273,113 @@ public class TestEditLog {
} }
} }
private static long readAllEdits(Collection<EditLogInputStream> streams,
long startTxId) throws IOException {
FSEditLogOp op;
long nextTxId = startTxId;
long numTx = 0;
for (EditLogInputStream s : streams) {
while (true) {
op = s.readOp();
if (op == null)
break;
if (op.getTransactionId() != nextTxId) {
throw new IOException("out of order transaction ID! expected " +
nextTxId + " but got " + op.getTransactionId() + " when " +
"reading " + s.getName());
}
numTx++;
nextTxId = op.getTransactionId() + 1;
}
}
return numTx;
}
/**
* Test edit log failover. If a single edit log is missing, other
* edits logs should be used instead.
*/
@Test
public void testEditLogFailOverFromMissing() throws IOException {
File f1 = new File(TEST_DIR + "/failover0");
File f2 = new File(TEST_DIR + "/failover1");
List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI());
NNStorage storage = setupEdits(editUris, 3);
final long startErrorTxId = 1*TXNS_PER_ROLL + 1;
final long endErrorTxId = 2*TXNS_PER_ROLL;
File[] files = new File(f1, "current").listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
if (name.startsWith(NNStorage.getFinalizedEditsFileName(startErrorTxId,
endErrorTxId))) {
return true;
}
return false;
}
});
assertEquals(1, files.length);
assertTrue(files[0].delete());
FSEditLog editlog = getFSEditLog(storage);
editlog.initJournalsForWrite();
long startTxId = 1;
try {
readAllEdits(editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL),
startTxId);
} catch (IOException e) {
LOG.error("edit log failover didn't work", e);
fail("Edit log failover didn't work");
}
}
/**
* Test edit log failover from a corrupt edit log
*/
@Test
public void testEditLogFailOverFromCorrupt() throws IOException {
File f1 = new File(TEST_DIR + "/failover0");
File f2 = new File(TEST_DIR + "/failover1");
List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI());
NNStorage storage = setupEdits(editUris, 3);
final long startErrorTxId = 1*TXNS_PER_ROLL + 1;
final long endErrorTxId = 2*TXNS_PER_ROLL;
File[] files = new File(f1, "current").listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
if (name.startsWith(NNStorage.getFinalizedEditsFileName(startErrorTxId,
endErrorTxId))) {
return true;
}
return false;
}
});
assertEquals(1, files.length);
long fileLen = files[0].length();
LOG.debug("Corrupting Log File: " + files[0] + " len: " + fileLen);
RandomAccessFile rwf = new RandomAccessFile(files[0], "rw");
rwf.seek(fileLen-4); // seek to checksum bytes
int b = rwf.readInt();
rwf.seek(fileLen-4);
rwf.writeInt(b+1);
rwf.close();
FSEditLog editlog = getFSEditLog(storage);
editlog.initJournalsForWrite();
long startTxId = 1;
try {
readAllEdits(editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL),
startTxId);
} catch (IOException e) {
LOG.error("edit log failover didn't work", e);
fail("Edit log failover didn't work");
}
}
/** /**
* Test creating a directory with lots and lots of edit log segments * Test creating a directory with lots and lots of edit log segments
*/ */

View File

@ -31,6 +31,7 @@ import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.Map;
import java.util.SortedMap; import java.util.SortedMap;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
@ -50,6 +51,7 @@ import org.apache.log4j.Level;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Files;
public class TestFSEditLogLoader { public class TestFSEditLogLoader {
@ -319,6 +321,56 @@ public class TestFSEditLogLoader {
assertTrue(validation.hasCorruptHeader()); assertTrue(validation.hasCorruptHeader());
} }
@Test
public void testValidateEditLogWithCorruptBody() throws IOException {
File testDir = new File(TEST_DIR, "testValidateEditLogWithCorruptBody");
SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
final int NUM_TXNS = 20;
File logFile = prepareUnfinalizedTestEditLog(testDir, NUM_TXNS,
offsetToTxId);
// Back up the uncorrupted log
File logFileBak = new File(testDir, logFile.getName() + ".bak");
Files.copy(logFile, logFileBak);
EditLogValidation validation =
EditLogFileInputStream.validateEditLog(logFile);
assertTrue(!validation.hasCorruptHeader());
// We expect that there will be an OP_START_LOG_SEGMENT, followed by
// NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT.
assertEquals(NUM_TXNS + 1, validation.getEndTxId());
// Corrupt each edit and verify that validation continues to work
for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) {
long txOffset = entry.getKey();
long txId = entry.getValue();
// Restore backup, corrupt the txn opcode
Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, txOffset);
validation = EditLogFileInputStream.validateEditLog(logFile);
long expectedEndTxId = (txId == (NUM_TXNS + 1)) ?
NUM_TXNS : (NUM_TXNS + 1);
assertEquals("Failed when corrupting txn opcode at " + txOffset,
expectedEndTxId, validation.getEndTxId());
assertTrue(!validation.hasCorruptHeader());
}
// Truncate right before each edit and verify that validation continues
// to work
for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) {
long txOffset = entry.getKey();
long txId = entry.getValue();
// Restore backup, corrupt the txn opcode
Files.copy(logFileBak, logFile);
truncateFile(logFile, txOffset);
validation = EditLogFileInputStream.validateEditLog(logFile);
long expectedEndTxId = (txId == 0) ?
HdfsConstants.INVALID_TXID : (txId - 1);
assertEquals("Failed when corrupting txid " + txId + " txn opcode " +
"at " + txOffset, expectedEndTxId, validation.getEndTxId());
assertTrue(!validation.hasCorruptHeader());
}
}
@Test @Test
public void testValidateEmptyEditLog() throws IOException { public void testValidateEmptyEditLog() throws IOException {
File testDir = new File(TEST_DIR, "testValidateEmptyEditLog"); File testDir = new File(TEST_DIR, "testValidateEmptyEditLog");

View File

@ -32,6 +32,7 @@ import java.net.URI;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.PriorityQueue;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -70,12 +71,13 @@ public class TestFileJournalManager {
static long getNumberOfTransactions(FileJournalManager jm, long fromTxId, static long getNumberOfTransactions(FileJournalManager jm, long fromTxId,
boolean inProgressOk, boolean abortOnGap) throws IOException { boolean inProgressOk, boolean abortOnGap) throws IOException {
long numTransactions = 0, txId = fromTxId; long numTransactions = 0, txId = fromTxId;
final TreeMultiset<EditLogInputStream> allStreams = final PriorityQueue<EditLogInputStream> allStreams =
TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); new PriorityQueue<EditLogInputStream>(64,
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
jm.selectInputStreams(allStreams, fromTxId, inProgressOk); jm.selectInputStreams(allStreams, fromTxId, inProgressOk);
EditLogInputStream elis = null;
try { try {
for (EditLogInputStream elis : allStreams) { while ((elis = allStreams.poll()) != null) {
elis.skipUntil(txId); elis.skipUntil(txId);
while (true) { while (true) {
FSEditLogOp op = elis.readOp(); FSEditLogOp op = elis.readOp();
@ -93,6 +95,7 @@ public class TestFileJournalManager {
} }
} finally { } finally {
IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0])); IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0]));
IOUtils.cleanup(LOG, elis);
} }
return numTransactions; return numTransactions;
} }
@ -387,27 +390,28 @@ public class TestFileJournalManager {
private static EditLogInputStream getJournalInputStream(JournalManager jm, private static EditLogInputStream getJournalInputStream(JournalManager jm,
long txId, boolean inProgressOk) throws IOException { long txId, boolean inProgressOk) throws IOException {
final TreeMultiset<EditLogInputStream> allStreams = final PriorityQueue<EditLogInputStream> allStreams =
TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); new PriorityQueue<EditLogInputStream>(64,
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
jm.selectInputStreams(allStreams, txId, inProgressOk); jm.selectInputStreams(allStreams, txId, inProgressOk);
EditLogInputStream elis = null, ret;
try { try {
for (Iterator<EditLogInputStream> iter = allStreams.iterator(); while ((elis = allStreams.poll()) != null) {
iter.hasNext();) {
EditLogInputStream elis = iter.next();
if (elis.getFirstTxId() > txId) { if (elis.getFirstTxId() > txId) {
break; break;
} }
if (elis.getLastTxId() < txId) { if (elis.getLastTxId() < txId) {
iter.remove();
elis.close(); elis.close();
continue; continue;
} }
elis.skipUntil(txId); elis.skipUntil(txId);
iter.remove(); ret = elis;
return elis; elis = null;
return ret;
} }
} finally { } finally {
IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0])); IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0]));
IOUtils.cleanup(LOG, elis);
} }
return null; return null;
} }