HDFS-3049. During the normal NN startup process, fall back on a different edit log if we see one that is corrupt. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1349114 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-06-12 04:15:18 +00:00
parent 543f86631b
commit 9947d8054c
11 changed files with 549 additions and 74 deletions

View File

@ -90,6 +90,9 @@ Trunk (unreleased changes)
HDFS-3040. TestMulitipleNNDataBlockScanner is misspelled. (Madhukara Phatak HDFS-3040. TestMulitipleNNDataBlockScanner is misspelled. (Madhukara Phatak
via atm) via atm)
HDFS-3049. During the normal NN startup process, fall back on a different
edit log if we see one that is corrupt (Colin Patrick McCabe via todd)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -1173,18 +1173,6 @@ public class FSEditLog {
throw e; throw e;
} }
} }
// This code will go away as soon as RedundantEditLogInputStream is
// introduced. (HDFS-3049)
try {
if (!streams.isEmpty()) {
streams.get(0).skipUntil(fromTxId);
}
} catch (IOException e) {
// We don't want to throw an exception from here, because that would make
// recovery impossible even if the user requested it. An exception will
// be thrown later, when we don't read the starting txid we expect.
LOG.error("error skipping until transaction " + fromTxId, e);
}
return streams; return streams;
} }

View File

@ -668,7 +668,9 @@ public class FSEditLogLoader {
FSImage.LOG.warn("Caught exception after reading " + numValid + FSImage.LOG.warn("Caught exception after reading " + numValid +
" ops from " + in + " while determining its valid length." + " ops from " + in + " while determining its valid length." +
"Position was " + lastPos, t); "Position was " + lastPos, t);
break; in.resync();
FSImage.LOG.warn("After resync, position is " + in.getPosition());
continue;
} }
if (lastTxId == HdfsConstants.INVALID_TXID if (lastTxId == HdfsConstants.INVALID_TXID
|| op.getTransactionId() > lastTxId) { || op.getTransactionId() > lastTxId) {

View File

@ -24,7 +24,9 @@ import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.PriorityQueue;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -40,7 +42,6 @@ import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps; import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultiset;
/** /**
* Manages a collection of Journals. None of the methods are synchronized, it is * Manages a collection of Journals. None of the methods are synchronized, it is
@ -222,8 +223,9 @@ public class JournalSet implements JournalManager {
@Override @Override
public void selectInputStreams(Collection<EditLogInputStream> streams, public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) { long fromTxId, boolean inProgressOk) {
final TreeMultiset<EditLogInputStream> allStreams = final PriorityQueue<EditLogInputStream> allStreams =
TreeMultiset.create(EDIT_LOG_INPUT_STREAM_COMPARATOR); new PriorityQueue<EditLogInputStream>(64,
EDIT_LOG_INPUT_STREAM_COMPARATOR);
for (JournalAndStream jas : journals) { for (JournalAndStream jas : journals) {
if (jas.isDisabled()) { if (jas.isDisabled()) {
LOG.info("Skipping jas " + jas + " since it's disabled"); LOG.info("Skipping jas " + jas + " since it's disabled");
@ -239,7 +241,8 @@ public class JournalSet implements JournalManager {
// transaction ID. // transaction ID.
LinkedList<EditLogInputStream> acc = LinkedList<EditLogInputStream> acc =
new LinkedList<EditLogInputStream>(); new LinkedList<EditLogInputStream>();
for (EditLogInputStream elis : allStreams) { EditLogInputStream elis;
while ((elis = allStreams.poll()) != null) {
if (acc.isEmpty()) { if (acc.isEmpty()) {
acc.add(elis); acc.add(elis);
} else { } else {
@ -247,7 +250,7 @@ public class JournalSet implements JournalManager {
if (accFirstTxId == elis.getFirstTxId()) { if (accFirstTxId == elis.getFirstTxId()) {
acc.add(elis); acc.add(elis);
} else if (accFirstTxId < elis.getFirstTxId()) { } else if (accFirstTxId < elis.getFirstTxId()) {
streams.add(acc.get(0)); streams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear(); acc.clear();
acc.add(elis); acc.add(elis);
} else if (accFirstTxId > elis.getFirstTxId()) { } else if (accFirstTxId > elis.getFirstTxId()) {
@ -258,7 +261,7 @@ public class JournalSet implements JournalManager {
} }
} }
if (!acc.isEmpty()) { if (!acc.isEmpty()) {
streams.add(acc.get(0)); streams.add(new RedundantEditLogInputStream(acc, fromTxId));
acc.clear(); acc.clear();
} }
} }

View File

@ -0,0 +1,276 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
/**
* A merged input stream that handles failover between different edit logs.
*
* We will currently try each edit log stream exactly once. In other words, we
* don't handle the "ping pong" scenario where different edit logs contain a
* different subset of the available edits.
*/
class RedundantEditLogInputStream extends EditLogInputStream {
public static final Log LOG = LogFactory.getLog(EditLogInputStream.class.getName());
private int curIdx;
private long prevTxId;
private final EditLogInputStream[] streams;
/**
* States that the RedundantEditLogInputStream can be in.
*
* <pre>
* start (if no streams)
* |
* V
* PrematureEOFException +----------------+
* +-------------->| EOF |<--------------+
* | +----------------+ |
* | |
* | start (if there are streams) |
* | | |
* | V | EOF
* | resync +----------------+ skipUntil +---------+
* | +---------->| SKIP_UNTIL |----------->| OK |
* | | +----------------+ +---------+
* | | | IOE ^ fail over to | IOE
* | | V | next stream |
* +----------------------+ +----------------+ |
* | STREAM_FAILED_RESYNC | | STREAM_FAILED |<----------+
* +----------------------+ +----------------+
* ^ Recovery mode |
* +--------------------+
* </pre>
*/
static private enum State {
/** We need to skip until prevTxId + 1 */
SKIP_UNTIL,
/** We're ready to read opcodes out of the current stream */
OK,
/** The current stream has failed. */
STREAM_FAILED,
/** The current stream has failed, and resync() was called. */
STREAM_FAILED_RESYNC,
/** There are no more opcodes to read from this
* RedundantEditLogInputStream */
EOF;
}
private State state;
private IOException prevException;
RedundantEditLogInputStream(Collection<EditLogInputStream> streams,
long startTxId) {
this.curIdx = 0;
this.prevTxId = (startTxId == HdfsConstants.INVALID_TXID) ?
HdfsConstants.INVALID_TXID : (startTxId - 1);
this.state = (streams.isEmpty()) ? State.EOF : State.SKIP_UNTIL;
this.prevException = null;
// EditLogInputStreams in a RedundantEditLogInputStream must be finalized,
// and can't be pre-transactional.
EditLogInputStream first = null;
for (EditLogInputStream s : streams) {
Preconditions.checkArgument(s.getFirstTxId() !=
HdfsConstants.INVALID_TXID, "invalid first txid in stream: %s", s);
Preconditions.checkArgument(s.getLastTxId() !=
HdfsConstants.INVALID_TXID, "invalid last txid in stream: %s", s);
if (first == null) {
first = s;
} else {
Preconditions.checkArgument(s.getFirstTxId() == first.getFirstTxId(),
"All streams in the RedundantEditLogInputStream must have the same " +
"start transaction ID! " + first + " had start txId " +
first.getFirstTxId() + ", but " + s + " had start txId " +
s.getFirstTxId());
}
}
this.streams = streams.toArray(new EditLogInputStream[0]);
// We sort the streams here so that the streams that end later come first.
Arrays.sort(this.streams, new Comparator<EditLogInputStream>() {
@Override
public int compare(EditLogInputStream a, EditLogInputStream b) {
return Longs.compare(b.getLastTxId(), a.getLastTxId());
}
});
}
@Override
public String getName() {
StringBuilder bld = new StringBuilder();
String prefix = "";
for (EditLogInputStream elis : streams) {
bld.append(prefix);
bld.append(elis.getName());
prefix = ", ";
}
return bld.toString();
}
@Override
public long getFirstTxId() {
return streams[curIdx].getFirstTxId();
}
@Override
public long getLastTxId() {
return streams[curIdx].getLastTxId();
}
@Override
public void close() throws IOException {
IOUtils.cleanup(LOG, streams);
}
@Override
protected FSEditLogOp nextValidOp() {
try {
if (state == State.STREAM_FAILED) {
state = State.STREAM_FAILED_RESYNC;
}
return nextOp();
} catch (IOException e) {
return null;
}
}
@Override
protected FSEditLogOp nextOp() throws IOException {
while (true) {
switch (state) {
case SKIP_UNTIL:
try {
if (prevTxId != HdfsConstants.INVALID_TXID) {
LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
"' to transaction ID " + (prevTxId + 1));
streams[curIdx].skipUntil(prevTxId + 1);
}
} catch (IOException e) {
prevException = e;
state = State.STREAM_FAILED;
}
state = State.OK;
break;
case OK:
try {
FSEditLogOp op = streams[curIdx].readOp();
if (op == null) {
state = State.EOF;
if (streams[curIdx].getLastTxId() == prevTxId) {
return null;
} else {
throw new PrematureEOFException("got premature end-of-file " +
"at txid " + prevTxId + "; expected file to go up to " +
streams[curIdx].getLastTxId());
}
}
prevTxId = op.getTransactionId();
return op;
} catch (IOException e) {
prevException = e;
state = State.STREAM_FAILED;
}
break;
case STREAM_FAILED:
if (curIdx + 1 == streams.length) {
throw prevException;
}
long oldLast = streams[curIdx].getLastTxId();
long newLast = streams[curIdx + 1].getLastTxId();
if (newLast < oldLast) {
throw new IOException("We encountered an error reading " +
streams[curIdx].getName() + ". During automatic edit log " +
"failover, we noticed that all of the remaining edit log " +
"streams are shorter than the current one! The best " +
"remaining edit log ends at transaction " +
newLast + ", but we thought we could read up to transaction " +
oldLast + ". If you continue, metadata will be lost forever!");
}
LOG.error("Got error reading edit log input stream " +
streams[curIdx].getName() + "; failing over to edit log " +
streams[curIdx + 1].getName(), prevException);
curIdx++;
state = State.SKIP_UNTIL;
break;
case STREAM_FAILED_RESYNC:
if (curIdx + 1 == streams.length) {
if (prevException instanceof PrematureEOFException) {
// bypass early EOF check
state = State.EOF;
} else {
streams[curIdx].resync();
state = State.SKIP_UNTIL;
}
} else {
LOG.error("failing over to edit log " +
streams[curIdx + 1].getName());
curIdx++;
state = State.SKIP_UNTIL;
}
break;
case EOF:
return null;
}
}
}
@Override
public int getVersion() throws IOException {
return streams[curIdx].getVersion();
}
@Override
public long getPosition() {
return streams[curIdx].getPosition();
}
@Override
public long length() throws IOException {
return streams[curIdx].length();
}
@Override
public boolean isInProgress() {
return streams[curIdx].isInProgress();
}
static private final class PrematureEOFException extends IOException {
private static final long serialVersionUID = 1L;
PrematureEOFException(String msg) {
super(msg);
}
}
}

View File

@ -134,6 +134,7 @@ public class MiniDFSCluster {
private boolean format = true; private boolean format = true;
private boolean manageNameDfsDirs = true; private boolean manageNameDfsDirs = true;
private boolean manageNameDfsSharedDirs = true; private boolean manageNameDfsSharedDirs = true;
private boolean enableManagedDfsDirsRedundancy = true;
private boolean manageDataDfsDirs = true; private boolean manageDataDfsDirs = true;
private StartupOption option = null; private StartupOption option = null;
private String[] racks = null; private String[] racks = null;
@ -196,6 +197,14 @@ public class MiniDFSCluster {
return this; return this;
} }
/**
* Default: true
*/
public Builder enableManagedDfsDirsRedundancy(boolean val) {
this.enableManagedDfsDirsRedundancy = val;
return this;
}
/** /**
* Default: true * Default: true
*/ */
@ -298,6 +307,7 @@ public class MiniDFSCluster {
builder.format, builder.format,
builder.manageNameDfsDirs, builder.manageNameDfsDirs,
builder.manageNameDfsSharedDirs, builder.manageNameDfsSharedDirs,
builder.enableManagedDfsDirsRedundancy,
builder.manageDataDfsDirs, builder.manageDataDfsDirs,
builder.option, builder.option,
builder.racks, builder.racks,
@ -385,7 +395,7 @@ public class MiniDFSCluster {
public MiniDFSCluster(Configuration conf, public MiniDFSCluster(Configuration conf,
int numDataNodes, int numDataNodes,
StartupOption nameNodeOperation) throws IOException { StartupOption nameNodeOperation) throws IOException {
this(0, conf, numDataNodes, false, false, false, nameNodeOperation, this(0, conf, numDataNodes, false, false, false, false, nameNodeOperation,
null, null, null); null, null, null);
} }
@ -407,7 +417,8 @@ public class MiniDFSCluster {
int numDataNodes, int numDataNodes,
boolean format, boolean format,
String[] racks) throws IOException { String[] racks) throws IOException {
this(0, conf, numDataNodes, format, true, true, null, racks, null, null); this(0, conf, numDataNodes, format, true, true, true, null,
racks, null, null);
} }
/** /**
@ -429,7 +440,8 @@ public class MiniDFSCluster {
int numDataNodes, int numDataNodes,
boolean format, boolean format,
String[] racks, String[] hosts) throws IOException { String[] racks, String[] hosts) throws IOException {
this(0, conf, numDataNodes, format, true, true, null, racks, hosts, null); this(0, conf, numDataNodes, format, true, true, true, null,
racks, hosts, null);
} }
/** /**
@ -462,8 +474,8 @@ public class MiniDFSCluster {
boolean manageDfsDirs, boolean manageDfsDirs,
StartupOption operation, StartupOption operation,
String[] racks) throws IOException { String[] racks) throws IOException {
this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs, this(nameNodePort, conf, numDataNodes, format, manageDfsDirs,
operation, racks, null, null); manageDfsDirs, manageDfsDirs, operation, racks, null, null);
} }
/** /**
@ -497,7 +509,7 @@ public class MiniDFSCluster {
String[] racks, String[] racks,
long[] simulatedCapacities) throws IOException { long[] simulatedCapacities) throws IOException {
this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs, this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs,
operation, racks, null, simulatedCapacities); manageDfsDirs, operation, racks, null, simulatedCapacities);
} }
/** /**
@ -531,13 +543,15 @@ public class MiniDFSCluster {
int numDataNodes, int numDataNodes,
boolean format, boolean format,
boolean manageNameDfsDirs, boolean manageNameDfsDirs,
boolean enableManagedDfsDirsRedundancy,
boolean manageDataDfsDirs, boolean manageDataDfsDirs,
StartupOption operation, StartupOption operation,
String[] racks, String hosts[], String[] racks, String hosts[],
long[] simulatedCapacities) throws IOException { long[] simulatedCapacities) throws IOException {
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
initMiniDFSCluster(conf, numDataNodes, format, initMiniDFSCluster(conf, numDataNodes, format,
manageNameDfsDirs, true, manageDataDfsDirs, operation, racks, hosts, manageNameDfsDirs, true, enableManagedDfsDirsRedundancy, manageDataDfsDirs,
operation, racks, hosts,
simulatedCapacities, null, true, false, simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0)); MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0));
} }
@ -545,8 +559,8 @@ public class MiniDFSCluster {
private void initMiniDFSCluster( private void initMiniDFSCluster(
Configuration conf, Configuration conf,
int numDataNodes, boolean format, boolean manageNameDfsDirs, int numDataNodes, boolean format, boolean manageNameDfsDirs,
boolean manageNameDfsSharedDirs, boolean manageDataDfsDirs, boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
StartupOption operation, String[] racks, boolean manageDataDfsDirs, StartupOption operation, String[] racks,
String[] hosts, long[] simulatedCapacities, String clusterId, String[] hosts, long[] simulatedCapacities, String clusterId,
boolean waitSafeMode, boolean setupHostsFile, boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology) MiniDFSNNTopology nnTopology)
@ -586,6 +600,7 @@ public class MiniDFSCluster {
federation = nnTopology.isFederated(); federation = nnTopology.isFederated();
createNameNodesAndSetConf( createNameNodesAndSetConf(
nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs, nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs,
enableManagedDfsDirsRedundancy,
format, operation, clusterId, conf); format, operation, clusterId, conf);
if (format) { if (format) {
@ -608,7 +623,8 @@ public class MiniDFSCluster {
private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology, private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
boolean manageNameDfsDirs, boolean manageNameDfsSharedDirs, boolean manageNameDfsDirs, boolean manageNameDfsSharedDirs,
boolean format, StartupOption operation, String clusterId, boolean enableManagedDfsDirsRedundancy, boolean format,
StartupOption operation, String clusterId,
Configuration conf) throws IOException { Configuration conf) throws IOException {
Preconditions.checkArgument(nnTopology.countNameNodes() > 0, Preconditions.checkArgument(nnTopology.countNameNodes() > 0,
"empty NN topology: no namenodes specified!"); "empty NN topology: no namenodes specified!");
@ -664,7 +680,7 @@ public class MiniDFSCluster {
Collection<URI> prevNNDirs = null; Collection<URI> prevNNDirs = null;
int nnCounterForFormat = nnCounter; int nnCounterForFormat = nnCounter;
for (NNConf nn : nameservice.getNNs()) { for (NNConf nn : nameservice.getNNs()) {
initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, manageNameDfsDirs,
nnCounterForFormat); nnCounterForFormat);
Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf); Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
if (format) { if (format) {
@ -696,7 +712,8 @@ public class MiniDFSCluster {
// Start all Namenodes // Start all Namenodes
for (NNConf nn : nameservice.getNNs()) { for (NNConf nn : nameservice.getNNs()) {
initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, nnCounter); initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs,
enableManagedDfsDirsRedundancy, nnCounter);
createNameNode(nnCounter++, conf, numDataNodes, false, operation, createNameNode(nnCounter++, conf, numDataNodes, false, operation,
clusterId, nsId, nn.getNnId()); clusterId, nsId, nn.getNnId());
} }
@ -721,8 +738,8 @@ public class MiniDFSCluster {
private void initNameNodeConf(Configuration conf, private void initNameNodeConf(Configuration conf,
String nameserviceId, String nnId, String nameserviceId, String nnId,
boolean manageNameDfsDirs, int nnIndex) boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy,
throws IOException { int nnIndex) throws IOException {
if (nameserviceId != null) { if (nameserviceId != null) {
conf.set(DFS_NAMESERVICE_ID, nameserviceId); conf.set(DFS_NAMESERVICE_ID, nameserviceId);
} }
@ -731,12 +748,21 @@ public class MiniDFSCluster {
} }
if (manageNameDfsDirs) { if (manageNameDfsDirs) {
if (enableManagedDfsDirsRedundancy) {
conf.set(DFS_NAMENODE_NAME_DIR_KEY, conf.set(DFS_NAMENODE_NAME_DIR_KEY,
fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+ fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+
fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2)))); fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2))));
conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY,
fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+ fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+
fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2)))); fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2))));
} else {
conf.set(DFS_NAMENODE_NAME_DIR_KEY,
fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1))).
toString());
conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY,
fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1))).
toString());
}
} }
} }
@ -2134,7 +2160,7 @@ public class MiniDFSCluster {
String nnId = null; String nnId = null;
initNameNodeAddress(conf, nameserviceId, initNameNodeAddress(conf, nameserviceId,
new NNConf(nnId).setIpcPort(namenodePort)); new NNConf(nnId).setIpcPort(namenodePort));
initNameNodeConf(conf, nameserviceId, nnId, true, nnIndex); initNameNodeConf(conf, nameserviceId, nnId, true, true, nnIndex);
createNameNode(nnIndex, conf, numDataNodes, true, null, null, createNameNode(nnIndex, conf, numDataNodes, true, null, null,
nameserviceId, nnId); nameserviceId, nnId);

View File

@ -506,21 +506,29 @@ public class TestEditLog extends TestCase {
FSImage fsimage = namesystem.getFSImage(); FSImage fsimage = namesystem.getFSImage();
final FSEditLog editLog = fsimage.getEditLog(); final FSEditLog editLog = fsimage.getEditLog();
fileSys.mkdirs(new Path("/tmp")); fileSys.mkdirs(new Path("/tmp"));
StorageDirectory sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next();
Iterator<StorageDirectory> iter = fsimage.getStorage().
dirIterator(NameNodeDirType.EDITS);
LinkedList<StorageDirectory> sds = new LinkedList<StorageDirectory>();
while (iter.hasNext()) {
sds.add(iter.next());
}
editLog.close(); editLog.close();
cluster.shutdown(); cluster.shutdown();
for (StorageDirectory sd : sds) {
File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 3); File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 3);
assertTrue(editFile.exists()); assertTrue(editFile.exists());
long fileLen = editFile.length(); long fileLen = editFile.length();
System.out.println("File name: " + editFile + " len: " + fileLen); LOG.debug("Corrupting Log File: " + editFile + " len: " + fileLen);
RandomAccessFile rwf = new RandomAccessFile(editFile, "rw"); RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
rwf.seek(fileLen-4); // seek to checksum bytes rwf.seek(fileLen-4); // seek to checksum bytes
int b = rwf.readInt(); int b = rwf.readInt();
rwf.seek(fileLen-4); rwf.seek(fileLen-4);
rwf.writeInt(b+1); rwf.writeInt(b+1);
rwf.close(); rwf.close();
}
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).format(false).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).format(false).build();
@ -1232,6 +1240,113 @@ public class TestEditLog extends TestCase {
} }
} }
private static long readAllEdits(Collection<EditLogInputStream> streams,
long startTxId) throws IOException {
FSEditLogOp op;
long nextTxId = startTxId;
long numTx = 0;
for (EditLogInputStream s : streams) {
while (true) {
op = s.readOp();
if (op == null)
break;
if (op.getTransactionId() != nextTxId) {
throw new IOException("out of order transaction ID! expected " +
nextTxId + " but got " + op.getTransactionId() + " when " +
"reading " + s.getName());
}
numTx++;
nextTxId = op.getTransactionId() + 1;
}
}
return numTx;
}
/**
* Test edit log failover. If a single edit log is missing, other
* edits logs should be used instead.
*/
@Test
public void testEditLogFailOverFromMissing() throws IOException {
File f1 = new File(TEST_DIR + "/failover0");
File f2 = new File(TEST_DIR + "/failover1");
List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI());
NNStorage storage = setupEdits(editUris, 3);
final long startErrorTxId = 1*TXNS_PER_ROLL + 1;
final long endErrorTxId = 2*TXNS_PER_ROLL;
File[] files = new File(f1, "current").listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
if (name.startsWith(NNStorage.getFinalizedEditsFileName(startErrorTxId,
endErrorTxId))) {
return true;
}
return false;
}
});
assertEquals(1, files.length);
assertTrue(files[0].delete());
FSEditLog editlog = getFSEditLog(storage);
editlog.initJournalsForWrite();
long startTxId = 1;
try {
readAllEdits(editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL),
startTxId);
} catch (IOException e) {
LOG.error("edit log failover didn't work", e);
fail("Edit log failover didn't work");
}
}
/**
* Test edit log failover from a corrupt edit log
*/
@Test
public void testEditLogFailOverFromCorrupt() throws IOException {
File f1 = new File(TEST_DIR + "/failover0");
File f2 = new File(TEST_DIR + "/failover1");
List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI());
NNStorage storage = setupEdits(editUris, 3);
final long startErrorTxId = 1*TXNS_PER_ROLL + 1;
final long endErrorTxId = 2*TXNS_PER_ROLL;
File[] files = new File(f1, "current").listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
if (name.startsWith(NNStorage.getFinalizedEditsFileName(startErrorTxId,
endErrorTxId))) {
return true;
}
return false;
}
});
assertEquals(1, files.length);
long fileLen = files[0].length();
LOG.debug("Corrupting Log File: " + files[0] + " len: " + fileLen);
RandomAccessFile rwf = new RandomAccessFile(files[0], "rw");
rwf.seek(fileLen-4); // seek to checksum bytes
int b = rwf.readInt();
rwf.seek(fileLen-4);
rwf.writeInt(b+1);
rwf.close();
FSEditLog editlog = getFSEditLog(storage);
editlog.initJournalsForWrite();
long startTxId = 1;
try {
readAllEdits(editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL),
startTxId);
} catch (IOException e) {
LOG.error("edit log failover didn't work", e);
fail("Edit log failover didn't work");
}
}
/** /**
* Test creating a directory with lots and lots of edit log segments * Test creating a directory with lots and lots of edit log segments
*/ */

View File

@ -50,6 +50,16 @@ public class TestEditLogFileOutputStream {
TEST_EDITS.delete(); TEST_EDITS.delete();
} }
@Test
public void testConstants() {
// Each call to FSEditLogOp#Reader#readOp can read at most MAX_OP_SIZE bytes
// before getting an exception. So we don't want to preallocate a longer
// region than MAX_OP_SIZE, because then we'd get an IOException when reading
// through the padding at the end of the file.
assertTrue(EditLogFileOutputStream.PREALLOCATION_LENGTH <
FSEditLogOp.MAX_OP_SIZE);
}
@Test @Test
public void testPreallocation() throws IOException { public void testPreallocation() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();

View File

@ -77,7 +77,7 @@ public class TestFSEditLogLoader {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
FileSystem fileSys = null; FileSystem fileSys = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
.build(); .enableManagedDfsDirsRedundancy(false).build();
cluster.waitActive(); cluster.waitActive();
fileSys = cluster.getFileSystem(); fileSys = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem(); final FSNamesystem namesystem = cluster.getNamesystem();
@ -107,7 +107,7 @@ public class TestFSEditLogLoader {
bld.append("Recent opcode offsets: (\\d+\\s*){4}$"); bld.append("Recent opcode offsets: (\\d+\\s*){4}$");
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
.format(false).build(); .enableManagedDfsDirsRedundancy(false).format(false).build();
fail("should not be able to start"); fail("should not be able to start");
} catch (IOException e) { } catch (IOException e) {
assertTrue("error message contains opcodes message", assertTrue("error message contains opcodes message",
@ -326,6 +326,56 @@ public class TestFSEditLogLoader {
assertTrue(validation.hasCorruptHeader()); assertTrue(validation.hasCorruptHeader());
} }
@Test
public void testValidateEditLogWithCorruptBody() throws IOException {
File testDir = new File(TEST_DIR, "testValidateEditLogWithCorruptBody");
SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
final int NUM_TXNS = 20;
File logFile = prepareUnfinalizedTestEditLog(testDir, NUM_TXNS,
offsetToTxId);
// Back up the uncorrupted log
File logFileBak = new File(testDir, logFile.getName() + ".bak");
Files.copy(logFile, logFileBak);
EditLogValidation validation =
EditLogFileInputStream.validateEditLog(logFile);
assertTrue(!validation.hasCorruptHeader());
// We expect that there will be an OP_START_LOG_SEGMENT, followed by
// NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT.
assertEquals(NUM_TXNS + 1, validation.getEndTxId());
// Corrupt each edit and verify that validation continues to work
for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) {
long txOffset = entry.getKey();
long txId = entry.getValue();
// Restore backup, corrupt the txn opcode
Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, txOffset);
validation = EditLogFileInputStream.validateEditLog(logFile);
long expectedEndTxId = (txId == (NUM_TXNS + 1)) ?
NUM_TXNS : (NUM_TXNS + 1);
assertEquals("Failed when corrupting txn opcode at " + txOffset,
expectedEndTxId, validation.getEndTxId());
assertTrue(!validation.hasCorruptHeader());
}
// Truncate right before each edit and verify that validation continues
// to work
for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) {
long txOffset = entry.getKey();
long txId = entry.getValue();
// Restore backup, corrupt the txn opcode
Files.copy(logFileBak, logFile);
truncateFile(logFile, txOffset);
validation = EditLogFileInputStream.validateEditLog(logFile);
long expectedEndTxId = (txId == 0) ?
HdfsConstants.INVALID_TXID : (txId - 1);
assertEquals("Failed when corrupting txid " + txId + " txn opcode " +
"at " + txOffset, expectedEndTxId, validation.getEndTxId());
assertTrue(!validation.hasCorruptHeader());
}
}
@Test @Test
public void testValidateEmptyEditLog() throws IOException { public void testValidateEmptyEditLog() throws IOException {
File testDir = new File(TEST_DIR, "testValidateEmptyEditLog"); File testDir = new File(TEST_DIR, "testValidateEmptyEditLog");

View File

@ -20,10 +20,10 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Iterator; import java.util.Iterator;
import java.util.PriorityQueue;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.io.File; import java.io.File;
@ -33,7 +33,6 @@ import org.junit.Test;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException; import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@ -45,7 +44,6 @@ import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_ROLL;
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_FAIL; import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_FAIL;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.TreeMultiset;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
public class TestFileJournalManager { public class TestFileJournalManager {
@ -64,12 +62,13 @@ public class TestFileJournalManager {
static long getNumberOfTransactions(FileJournalManager jm, long fromTxId, static long getNumberOfTransactions(FileJournalManager jm, long fromTxId,
boolean inProgressOk, boolean abortOnGap) throws IOException { boolean inProgressOk, boolean abortOnGap) throws IOException {
long numTransactions = 0, txId = fromTxId; long numTransactions = 0, txId = fromTxId;
final TreeMultiset<EditLogInputStream> allStreams = final PriorityQueue<EditLogInputStream> allStreams =
TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); new PriorityQueue<EditLogInputStream>(64,
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
jm.selectInputStreams(allStreams, fromTxId, inProgressOk); jm.selectInputStreams(allStreams, fromTxId, inProgressOk);
EditLogInputStream elis = null;
try { try {
for (EditLogInputStream elis : allStreams) { while ((elis = allStreams.poll()) != null) {
elis.skipUntil(txId); elis.skipUntil(txId);
while (true) { while (true) {
FSEditLogOp op = elis.readOp(); FSEditLogOp op = elis.readOp();
@ -87,6 +86,7 @@ public class TestFileJournalManager {
} }
} finally { } finally {
IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0])); IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0]));
IOUtils.cleanup(LOG, elis);
} }
return numTransactions; return numTransactions;
} }
@ -379,27 +379,28 @@ public class TestFileJournalManager {
private static EditLogInputStream getJournalInputStream(JournalManager jm, private static EditLogInputStream getJournalInputStream(JournalManager jm,
long txId, boolean inProgressOk) throws IOException { long txId, boolean inProgressOk) throws IOException {
final TreeMultiset<EditLogInputStream> allStreams = final PriorityQueue<EditLogInputStream> allStreams =
TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); new PriorityQueue<EditLogInputStream>(64,
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
jm.selectInputStreams(allStreams, txId, inProgressOk); jm.selectInputStreams(allStreams, txId, inProgressOk);
EditLogInputStream elis = null, ret;
try { try {
for (Iterator<EditLogInputStream> iter = allStreams.iterator(); while ((elis = allStreams.poll()) != null) {
iter.hasNext();) {
EditLogInputStream elis = iter.next();
if (elis.getFirstTxId() > txId) { if (elis.getFirstTxId() > txId) {
break; break;
} }
if (elis.getLastTxId() < txId) { if (elis.getLastTxId() < txId) {
iter.remove();
elis.close(); elis.close();
continue; continue;
} }
elis.skipUntil(txId); elis.skipUntil(txId);
iter.remove(); ret = elis;
return elis; elis = null;
return ret;
} }
} finally { } finally {
IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0])); IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0]));
IOUtils.cleanup(LOG, elis);
} }
return null; return null;
} }

View File

@ -343,7 +343,7 @@ public class TestNameNodeRecovery {
StorageDirectory sd = null; StorageDirectory sd = null;
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.build(); .enableManagedDfsDirsRedundancy(false).build();
cluster.waitActive(); cluster.waitActive();
if (!finalize) { if (!finalize) {
// Normally, the in-progress edit log would be finalized by // Normally, the in-progress edit log would be finalized by
@ -379,7 +379,7 @@ public class TestNameNodeRecovery {
try { try {
LOG.debug("trying to start normally (this should fail)..."); LOG.debug("trying to start normally (this should fail)...");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false).build(); .enableManagedDfsDirsRedundancy(false).format(false).build();
cluster.waitActive(); cluster.waitActive();
cluster.shutdown(); cluster.shutdown();
if (needRecovery) { if (needRecovery) {
@ -404,7 +404,8 @@ public class TestNameNodeRecovery {
try { try {
LOG.debug("running recovery..."); LOG.debug("running recovery...");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false).startupOption(recoverStartOpt).build(); .enableManagedDfsDirsRedundancy(false).format(false)
.startupOption(recoverStartOpt).build();
} catch (IOException e) { } catch (IOException e) {
fail("caught IOException while trying to recover. " + fail("caught IOException while trying to recover. " +
"message was " + e.getMessage() + "message was " + e.getMessage() +
@ -420,7 +421,7 @@ public class TestNameNodeRecovery {
try { try {
LOG.debug("starting cluster normally after recovery..."); LOG.debug("starting cluster normally after recovery...");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false).build(); .enableManagedDfsDirsRedundancy(false).format(false).build();
LOG.debug("successfully recovered the " + corruptor.getName() + LOG.debug("successfully recovered the " + corruptor.getName() +
" corrupted edit log"); " corrupted edit log");
cluster.waitActive(); cluster.waitActive();