diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d13a247dd9e..7b27c52afc1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -90,6 +90,9 @@ Trunk (unreleased changes) HDFS-3040. TestMulitipleNNDataBlockScanner is misspelled. (Madhukara Phatak via atm) + HDFS-3049. During the normal NN startup process, fall back on a different + edit log if we see one that is corrupt (Colin Patrick McCabe via todd) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index ff38c9594d9..c0e8ff0679a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -1173,18 +1173,6 @@ public synchronized Collection selectInputStreams( throw e; } } - // This code will go away as soon as RedundantEditLogInputStream is - // introduced. (HDFS-3049) - try { - if (!streams.isEmpty()) { - streams.get(0).skipUntil(fromTxId); - } - } catch (IOException e) { - // We don't want to throw an exception from here, because that would make - // recovery impossible even if the user requested it. An exception will - // be thrown later, when we don't read the starting txid we expect. - LOG.error("error skipping until transaction " + fromTxId, e); - } return streams; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 343472dfa12..e1b1eccaa23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -668,7 +668,9 @@ static EditLogValidation validateEditLog(EditLogInputStream in) { FSImage.LOG.warn("Caught exception after reading " + numValid + " ops from " + in + " while determining its valid length." + "Position was " + lastPos, t); - break; + in.resync(); + FSImage.LOG.warn("After resync, position is " + in.getPosition()); + continue; } if (lastTxId == HdfsConstants.INVALID_TXID || op.getTransactionId() > lastTxId) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java index 3d2e23bf2d2..92c3e523818 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java @@ -24,7 +24,9 @@ import java.util.Comparator; import java.util.LinkedList; import java.util.List; +import java.util.PriorityQueue; import java.util.SortedSet; +import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,7 +42,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimaps; import com.google.common.collect.Sets; -import com.google.common.collect.TreeMultiset; /** * Manages a collection of Journals. None of the methods are synchronized, it is @@ -222,8 +223,9 @@ public void apply(JournalAndStream jas) throws IOException { @Override public void selectInputStreams(Collection streams, long fromTxId, boolean inProgressOk) { - final TreeMultiset allStreams = - TreeMultiset.create(EDIT_LOG_INPUT_STREAM_COMPARATOR); + final PriorityQueue allStreams = + new PriorityQueue(64, + EDIT_LOG_INPUT_STREAM_COMPARATOR); for (JournalAndStream jas : journals) { if (jas.isDisabled()) { LOG.info("Skipping jas " + jas + " since it's disabled"); @@ -239,7 +241,8 @@ public void selectInputStreams(Collection streams, // transaction ID. LinkedList acc = new LinkedList(); - for (EditLogInputStream elis : allStreams) { + EditLogInputStream elis; + while ((elis = allStreams.poll()) != null) { if (acc.isEmpty()) { acc.add(elis); } else { @@ -247,7 +250,7 @@ public void selectInputStreams(Collection streams, if (accFirstTxId == elis.getFirstTxId()) { acc.add(elis); } else if (accFirstTxId < elis.getFirstTxId()) { - streams.add(acc.get(0)); + streams.add(new RedundantEditLogInputStream(acc, fromTxId)); acc.clear(); acc.add(elis); } else if (accFirstTxId > elis.getFirstTxId()) { @@ -258,7 +261,7 @@ public void selectInputStreams(Collection streams, } } if (!acc.isEmpty()) { - streams.add(acc.get(0)); + streams.add(new RedundantEditLogInputStream(acc, fromTxId)); acc.clear(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java new file mode 100644 index 00000000000..7a30869290e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.io.IOUtils; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; + +/** + * A merged input stream that handles failover between different edit logs. + * + * We will currently try each edit log stream exactly once. In other words, we + * don't handle the "ping pong" scenario where different edit logs contain a + * different subset of the available edits. + */ +class RedundantEditLogInputStream extends EditLogInputStream { + public static final Log LOG = LogFactory.getLog(EditLogInputStream.class.getName()); + private int curIdx; + private long prevTxId; + private final EditLogInputStream[] streams; + + /** + * States that the RedundantEditLogInputStream can be in. + * + *
+   *                   start (if no streams)
+   *                           |
+   *                           V
+   * PrematureEOFException  +----------------+
+   *        +-------------->| EOF            |<--------------+
+   *        |               +----------------+               |
+   *        |                                                |
+   *        |          start (if there are streams)          |
+   *        |                  |                             |
+   *        |                  V                             | EOF
+   *        |   resync      +----------------+ skipUntil  +---------+
+   *        |   +---------->| SKIP_UNTIL     |----------->|  OK     |
+   *        |   |           +----------------+            +---------+
+   *        |   |                | IOE   ^ fail over to      | IOE
+   *        |   |                V       | next stream       |
+   * +----------------------+   +----------------+           |
+   * | STREAM_FAILED_RESYNC |   | STREAM_FAILED  |<----------+
+   * +----------------------+   +----------------+
+   *                  ^   Recovery mode    |
+   *                  +--------------------+
+   * 
+ */ + static private enum State { + /** We need to skip until prevTxId + 1 */ + SKIP_UNTIL, + /** We're ready to read opcodes out of the current stream */ + OK, + /** The current stream has failed. */ + STREAM_FAILED, + /** The current stream has failed, and resync() was called. */ + STREAM_FAILED_RESYNC, + /** There are no more opcodes to read from this + * RedundantEditLogInputStream */ + EOF; + } + + private State state; + private IOException prevException; + + RedundantEditLogInputStream(Collection streams, + long startTxId) { + this.curIdx = 0; + this.prevTxId = (startTxId == HdfsConstants.INVALID_TXID) ? + HdfsConstants.INVALID_TXID : (startTxId - 1); + this.state = (streams.isEmpty()) ? State.EOF : State.SKIP_UNTIL; + this.prevException = null; + // EditLogInputStreams in a RedundantEditLogInputStream must be finalized, + // and can't be pre-transactional. + EditLogInputStream first = null; + for (EditLogInputStream s : streams) { + Preconditions.checkArgument(s.getFirstTxId() != + HdfsConstants.INVALID_TXID, "invalid first txid in stream: %s", s); + Preconditions.checkArgument(s.getLastTxId() != + HdfsConstants.INVALID_TXID, "invalid last txid in stream: %s", s); + if (first == null) { + first = s; + } else { + Preconditions.checkArgument(s.getFirstTxId() == first.getFirstTxId(), + "All streams in the RedundantEditLogInputStream must have the same " + + "start transaction ID! " + first + " had start txId " + + first.getFirstTxId() + ", but " + s + " had start txId " + + s.getFirstTxId()); + } + } + + this.streams = streams.toArray(new EditLogInputStream[0]); + + // We sort the streams here so that the streams that end later come first. + Arrays.sort(this.streams, new Comparator() { + @Override + public int compare(EditLogInputStream a, EditLogInputStream b) { + return Longs.compare(b.getLastTxId(), a.getLastTxId()); + } + }); + } + + @Override + public String getName() { + StringBuilder bld = new StringBuilder(); + String prefix = ""; + for (EditLogInputStream elis : streams) { + bld.append(prefix); + bld.append(elis.getName()); + prefix = ", "; + } + return bld.toString(); + } + + @Override + public long getFirstTxId() { + return streams[curIdx].getFirstTxId(); + } + + @Override + public long getLastTxId() { + return streams[curIdx].getLastTxId(); + } + + @Override + public void close() throws IOException { + IOUtils.cleanup(LOG, streams); + } + + @Override + protected FSEditLogOp nextValidOp() { + try { + if (state == State.STREAM_FAILED) { + state = State.STREAM_FAILED_RESYNC; + } + return nextOp(); + } catch (IOException e) { + return null; + } + } + + @Override + protected FSEditLogOp nextOp() throws IOException { + while (true) { + switch (state) { + case SKIP_UNTIL: + try { + if (prevTxId != HdfsConstants.INVALID_TXID) { + LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() + + "' to transaction ID " + (prevTxId + 1)); + streams[curIdx].skipUntil(prevTxId + 1); + } + } catch (IOException e) { + prevException = e; + state = State.STREAM_FAILED; + } + state = State.OK; + break; + case OK: + try { + FSEditLogOp op = streams[curIdx].readOp(); + if (op == null) { + state = State.EOF; + if (streams[curIdx].getLastTxId() == prevTxId) { + return null; + } else { + throw new PrematureEOFException("got premature end-of-file " + + "at txid " + prevTxId + "; expected file to go up to " + + streams[curIdx].getLastTxId()); + } + } + prevTxId = op.getTransactionId(); + return op; + } catch (IOException e) { + prevException = e; + state = State.STREAM_FAILED; + } + break; + case STREAM_FAILED: + if (curIdx + 1 == streams.length) { + throw prevException; + } + long oldLast = streams[curIdx].getLastTxId(); + long newLast = streams[curIdx + 1].getLastTxId(); + if (newLast < oldLast) { + throw new IOException("We encountered an error reading " + + streams[curIdx].getName() + ". During automatic edit log " + + "failover, we noticed that all of the remaining edit log " + + "streams are shorter than the current one! The best " + + "remaining edit log ends at transaction " + + newLast + ", but we thought we could read up to transaction " + + oldLast + ". If you continue, metadata will be lost forever!"); + } + LOG.error("Got error reading edit log input stream " + + streams[curIdx].getName() + "; failing over to edit log " + + streams[curIdx + 1].getName(), prevException); + curIdx++; + state = State.SKIP_UNTIL; + break; + case STREAM_FAILED_RESYNC: + if (curIdx + 1 == streams.length) { + if (prevException instanceof PrematureEOFException) { + // bypass early EOF check + state = State.EOF; + } else { + streams[curIdx].resync(); + state = State.SKIP_UNTIL; + } + } else { + LOG.error("failing over to edit log " + + streams[curIdx + 1].getName()); + curIdx++; + state = State.SKIP_UNTIL; + } + break; + case EOF: + return null; + } + } + } + + @Override + public int getVersion() throws IOException { + return streams[curIdx].getVersion(); + } + + @Override + public long getPosition() { + return streams[curIdx].getPosition(); + } + + @Override + public long length() throws IOException { + return streams[curIdx].length(); + } + + @Override + public boolean isInProgress() { + return streams[curIdx].isInProgress(); + } + + static private final class PrematureEOFException extends IOException { + private static final long serialVersionUID = 1L; + PrematureEOFException(String msg) { + super(msg); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index d57f792472e..5c9c4659978 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -134,6 +134,7 @@ public static class Builder { private boolean format = true; private boolean manageNameDfsDirs = true; private boolean manageNameDfsSharedDirs = true; + private boolean enableManagedDfsDirsRedundancy = true; private boolean manageDataDfsDirs = true; private StartupOption option = null; private String[] racks = null; @@ -187,7 +188,7 @@ public Builder manageNameDfsDirs(boolean val) { this.manageNameDfsDirs = val; return this; } - + /** * Default: true */ @@ -196,6 +197,14 @@ public Builder manageNameDfsSharedDirs(boolean val) { return this; } + /** + * Default: true + */ + public Builder enableManagedDfsDirsRedundancy(boolean val) { + this.enableManagedDfsDirsRedundancy = val; + return this; + } + /** * Default: true */ @@ -298,6 +307,7 @@ private MiniDFSCluster(Builder builder) throws IOException { builder.format, builder.manageNameDfsDirs, builder.manageNameDfsSharedDirs, + builder.enableManagedDfsDirsRedundancy, builder.manageDataDfsDirs, builder.option, builder.racks, @@ -385,7 +395,7 @@ public MiniDFSCluster() { public MiniDFSCluster(Configuration conf, int numDataNodes, StartupOption nameNodeOperation) throws IOException { - this(0, conf, numDataNodes, false, false, false, nameNodeOperation, + this(0, conf, numDataNodes, false, false, false, false, nameNodeOperation, null, null, null); } @@ -407,7 +417,8 @@ public MiniDFSCluster(Configuration conf, int numDataNodes, boolean format, String[] racks) throws IOException { - this(0, conf, numDataNodes, format, true, true, null, racks, null, null); + this(0, conf, numDataNodes, format, true, true, true, null, + racks, null, null); } /** @@ -429,7 +440,8 @@ public MiniDFSCluster(Configuration conf, int numDataNodes, boolean format, String[] racks, String[] hosts) throws IOException { - this(0, conf, numDataNodes, format, true, true, null, racks, hosts, null); + this(0, conf, numDataNodes, format, true, true, true, null, + racks, hosts, null); } /** @@ -462,8 +474,8 @@ public MiniDFSCluster(int nameNodePort, boolean manageDfsDirs, StartupOption operation, String[] racks) throws IOException { - this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs, - operation, racks, null, null); + this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, + manageDfsDirs, manageDfsDirs, operation, racks, null, null); } /** @@ -497,7 +509,7 @@ public MiniDFSCluster(int nameNodePort, String[] racks, long[] simulatedCapacities) throws IOException { this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs, - operation, racks, null, simulatedCapacities); + manageDfsDirs, operation, racks, null, simulatedCapacities); } /** @@ -531,13 +543,15 @@ public MiniDFSCluster(int nameNodePort, int numDataNodes, boolean format, boolean manageNameDfsDirs, + boolean enableManagedDfsDirsRedundancy, boolean manageDataDfsDirs, StartupOption operation, String[] racks, String hosts[], long[] simulatedCapacities) throws IOException { this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster initMiniDFSCluster(conf, numDataNodes, format, - manageNameDfsDirs, true, manageDataDfsDirs, operation, racks, hosts, + manageNameDfsDirs, true, enableManagedDfsDirsRedundancy, manageDataDfsDirs, + operation, racks, hosts, simulatedCapacities, null, true, false, MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0)); } @@ -545,8 +559,8 @@ public MiniDFSCluster(int nameNodePort, private void initMiniDFSCluster( Configuration conf, int numDataNodes, boolean format, boolean manageNameDfsDirs, - boolean manageNameDfsSharedDirs, boolean manageDataDfsDirs, - StartupOption operation, String[] racks, + boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy, + boolean manageDataDfsDirs, StartupOption operation, String[] racks, String[] hosts, long[] simulatedCapacities, String clusterId, boolean waitSafeMode, boolean setupHostsFile, MiniDFSNNTopology nnTopology) @@ -586,6 +600,7 @@ private void initMiniDFSCluster( federation = nnTopology.isFederated(); createNameNodesAndSetConf( nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs, + enableManagedDfsDirsRedundancy, format, operation, clusterId, conf); if (format) { @@ -608,7 +623,8 @@ private void initMiniDFSCluster( private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology, boolean manageNameDfsDirs, boolean manageNameDfsSharedDirs, - boolean format, StartupOption operation, String clusterId, + boolean enableManagedDfsDirsRedundancy, boolean format, + StartupOption operation, String clusterId, Configuration conf) throws IOException { Preconditions.checkArgument(nnTopology.countNameNodes() > 0, "empty NN topology: no namenodes specified!"); @@ -664,7 +680,7 @@ private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology, Collection prevNNDirs = null; int nnCounterForFormat = nnCounter; for (NNConf nn : nameservice.getNNs()) { - initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, + initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, manageNameDfsDirs, nnCounterForFormat); Collection namespaceDirs = FSNamesystem.getNamespaceDirs(conf); if (format) { @@ -696,7 +712,8 @@ private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology, // Start all Namenodes for (NNConf nn : nameservice.getNNs()) { - initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, nnCounter); + initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs, + enableManagedDfsDirsRedundancy, nnCounter); createNameNode(nnCounter++, conf, numDataNodes, false, operation, clusterId, nsId, nn.getNnId()); } @@ -721,8 +738,8 @@ public NameNodeInfo[] getNameNodeInfos() { private void initNameNodeConf(Configuration conf, String nameserviceId, String nnId, - boolean manageNameDfsDirs, int nnIndex) - throws IOException { + boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy, + int nnIndex) throws IOException { if (nameserviceId != null) { conf.set(DFS_NAMESERVICE_ID, nameserviceId); } @@ -731,12 +748,21 @@ private void initNameNodeConf(Configuration conf, } if (manageNameDfsDirs) { - conf.set(DFS_NAMENODE_NAME_DIR_KEY, - fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+ - fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2)))); - conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, - fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+ - fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2)))); + if (enableManagedDfsDirsRedundancy) { + conf.set(DFS_NAMENODE_NAME_DIR_KEY, + fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+ + fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2)))); + conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, + fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+ + fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2)))); + } else { + conf.set(DFS_NAMENODE_NAME_DIR_KEY, + fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1))). + toString()); + conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, + fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1))). + toString()); + } } } @@ -2134,7 +2160,7 @@ public NameNode addNameNode(Configuration conf, int namenodePort) String nnId = null; initNameNodeAddress(conf, nameserviceId, new NNConf(nnId).setIpcPort(namenodePort)); - initNameNodeConf(conf, nameserviceId, nnId, true, nnIndex); + initNameNodeConf(conf, nameserviceId, nnId, true, true, nnIndex); createNameNode(nnIndex, conf, numDataNodes, true, null, null, nameserviceId, nnId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index 5e77d730b6f..ee037cb2d32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -506,21 +506,29 @@ public void testEditChecksum() throws Exception { FSImage fsimage = namesystem.getFSImage(); final FSEditLog editLog = fsimage.getEditLog(); fileSys.mkdirs(new Path("/tmp")); - StorageDirectory sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next(); + + Iterator iter = fsimage.getStorage(). + dirIterator(NameNodeDirType.EDITS); + LinkedList sds = new LinkedList(); + while (iter.hasNext()) { + sds.add(iter.next()); + } editLog.close(); cluster.shutdown(); - File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 3); - assertTrue(editFile.exists()); - - long fileLen = editFile.length(); - System.out.println("File name: " + editFile + " len: " + fileLen); - RandomAccessFile rwf = new RandomAccessFile(editFile, "rw"); - rwf.seek(fileLen-4); // seek to checksum bytes - int b = rwf.readInt(); - rwf.seek(fileLen-4); - rwf.writeInt(b+1); - rwf.close(); + for (StorageDirectory sd : sds) { + File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 3); + assertTrue(editFile.exists()); + + long fileLen = editFile.length(); + LOG.debug("Corrupting Log File: " + editFile + " len: " + fileLen); + RandomAccessFile rwf = new RandomAccessFile(editFile, "rw"); + rwf.seek(fileLen-4); // seek to checksum bytes + int b = rwf.readInt(); + rwf.seek(fileLen-4); + rwf.writeInt(b+1); + rwf.close(); + } try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).format(false).build(); @@ -1232,6 +1240,113 @@ public void testFuzzSequences() throws IOException { } } + private static long readAllEdits(Collection streams, + long startTxId) throws IOException { + FSEditLogOp op; + long nextTxId = startTxId; + long numTx = 0; + for (EditLogInputStream s : streams) { + while (true) { + op = s.readOp(); + if (op == null) + break; + if (op.getTransactionId() != nextTxId) { + throw new IOException("out of order transaction ID! expected " + + nextTxId + " but got " + op.getTransactionId() + " when " + + "reading " + s.getName()); + } + numTx++; + nextTxId = op.getTransactionId() + 1; + } + } + return numTx; + } + + /** + * Test edit log failover. If a single edit log is missing, other + * edits logs should be used instead. + */ + @Test + public void testEditLogFailOverFromMissing() throws IOException { + File f1 = new File(TEST_DIR + "/failover0"); + File f2 = new File(TEST_DIR + "/failover1"); + List editUris = ImmutableList.of(f1.toURI(), f2.toURI()); + + NNStorage storage = setupEdits(editUris, 3); + + final long startErrorTxId = 1*TXNS_PER_ROLL + 1; + final long endErrorTxId = 2*TXNS_PER_ROLL; + + File[] files = new File(f1, "current").listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + if (name.startsWith(NNStorage.getFinalizedEditsFileName(startErrorTxId, + endErrorTxId))) { + return true; + } + return false; + } + }); + assertEquals(1, files.length); + assertTrue(files[0].delete()); + + FSEditLog editlog = getFSEditLog(storage); + editlog.initJournalsForWrite(); + long startTxId = 1; + try { + readAllEdits(editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL), + startTxId); + } catch (IOException e) { + LOG.error("edit log failover didn't work", e); + fail("Edit log failover didn't work"); + } + } + + /** + * Test edit log failover from a corrupt edit log + */ + @Test + public void testEditLogFailOverFromCorrupt() throws IOException { + File f1 = new File(TEST_DIR + "/failover0"); + File f2 = new File(TEST_DIR + "/failover1"); + List editUris = ImmutableList.of(f1.toURI(), f2.toURI()); + + NNStorage storage = setupEdits(editUris, 3); + + final long startErrorTxId = 1*TXNS_PER_ROLL + 1; + final long endErrorTxId = 2*TXNS_PER_ROLL; + + File[] files = new File(f1, "current").listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + if (name.startsWith(NNStorage.getFinalizedEditsFileName(startErrorTxId, + endErrorTxId))) { + return true; + } + return false; + } + }); + assertEquals(1, files.length); + + long fileLen = files[0].length(); + LOG.debug("Corrupting Log File: " + files[0] + " len: " + fileLen); + RandomAccessFile rwf = new RandomAccessFile(files[0], "rw"); + rwf.seek(fileLen-4); // seek to checksum bytes + int b = rwf.readInt(); + rwf.seek(fileLen-4); + rwf.writeInt(b+1); + rwf.close(); + + FSEditLog editlog = getFSEditLog(storage); + editlog.initJournalsForWrite(); + long startTxId = 1; + try { + readAllEdits(editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL), + startTxId); + } catch (IOException e) { + LOG.error("edit log failover didn't work", e); + fail("Edit log failover didn't work"); + } + } + /** * Test creating a directory with lots and lots of edit log segments */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java index d39df4030d8..c1ca4ac09c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java @@ -50,6 +50,16 @@ public void deleteEditsFile() { TEST_EDITS.delete(); } + @Test + public void testConstants() { + // Each call to FSEditLogOp#Reader#readOp can read at most MAX_OP_SIZE bytes + // before getting an exception. So we don't want to preallocate a longer + // region than MAX_OP_SIZE, because then we'd get an IOException when reading + // through the padding at the end of the file. + assertTrue(EditLogFileOutputStream.PREALLOCATION_LENGTH < + FSEditLogOp.MAX_OP_SIZE); + } + @Test public void testPreallocation() throws IOException { Configuration conf = new HdfsConfiguration(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index a54df2ca818..267e128413c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -77,7 +77,7 @@ public void testDisplayRecentEditLogOpCodes() throws IOException { MiniDFSCluster cluster = null; FileSystem fileSys = null; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES) - .build(); + .enableManagedDfsDirsRedundancy(false).build(); cluster.waitActive(); fileSys = cluster.getFileSystem(); final FSNamesystem namesystem = cluster.getNamesystem(); @@ -107,7 +107,7 @@ public void testDisplayRecentEditLogOpCodes() throws IOException { bld.append("Recent opcode offsets: (\\d+\\s*){4}$"); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES) - .format(false).build(); + .enableManagedDfsDirsRedundancy(false).format(false).build(); fail("should not be able to start"); } catch (IOException e) { assertTrue("error message contains opcodes message", @@ -326,6 +326,56 @@ public void testValidateEditLogWithCorruptHeader() throws IOException { assertTrue(validation.hasCorruptHeader()); } + @Test + public void testValidateEditLogWithCorruptBody() throws IOException { + File testDir = new File(TEST_DIR, "testValidateEditLogWithCorruptBody"); + SortedMap offsetToTxId = Maps.newTreeMap(); + final int NUM_TXNS = 20; + File logFile = prepareUnfinalizedTestEditLog(testDir, NUM_TXNS, + offsetToTxId); + // Back up the uncorrupted log + File logFileBak = new File(testDir, logFile.getName() + ".bak"); + Files.copy(logFile, logFileBak); + EditLogValidation validation = + EditLogFileInputStream.validateEditLog(logFile); + assertTrue(!validation.hasCorruptHeader()); + // We expect that there will be an OP_START_LOG_SEGMENT, followed by + // NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT. + assertEquals(NUM_TXNS + 1, validation.getEndTxId()); + // Corrupt each edit and verify that validation continues to work + for (Map.Entry entry : offsetToTxId.entrySet()) { + long txOffset = entry.getKey(); + long txId = entry.getValue(); + + // Restore backup, corrupt the txn opcode + Files.copy(logFileBak, logFile); + corruptByteInFile(logFile, txOffset); + validation = EditLogFileInputStream.validateEditLog(logFile); + long expectedEndTxId = (txId == (NUM_TXNS + 1)) ? + NUM_TXNS : (NUM_TXNS + 1); + assertEquals("Failed when corrupting txn opcode at " + txOffset, + expectedEndTxId, validation.getEndTxId()); + assertTrue(!validation.hasCorruptHeader()); + } + + // Truncate right before each edit and verify that validation continues + // to work + for (Map.Entry entry : offsetToTxId.entrySet()) { + long txOffset = entry.getKey(); + long txId = entry.getValue(); + + // Restore backup, corrupt the txn opcode + Files.copy(logFileBak, logFile); + truncateFile(logFile, txOffset); + validation = EditLogFileInputStream.validateEditLog(logFile); + long expectedEndTxId = (txId == 0) ? + HdfsConstants.INVALID_TXID : (txId - 1); + assertEquals("Failed when corrupting txid " + txId + " txn opcode " + + "at " + txOffset, expectedEndTxId, validation.getEndTxId()); + assertTrue(!validation.hasCorruptHeader()); + } + } + @Test public void testValidateEmptyEditLog() throws IOException { File testDir = new File(TEST_DIR, "testValidateEmptyEditLog"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java index e972f599b96..80d79585afc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java @@ -20,10 +20,10 @@ import static org.junit.Assert.*; import java.net.URI; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Iterator; +import java.util.PriorityQueue; import java.io.RandomAccessFile; import java.io.File; @@ -33,7 +33,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; @@ -45,7 +44,6 @@ import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_FAIL; import com.google.common.collect.ImmutableList; -import com.google.common.collect.TreeMultiset; import com.google.common.base.Joiner; public class TestFileJournalManager { @@ -64,12 +62,13 @@ public class TestFileJournalManager { static long getNumberOfTransactions(FileJournalManager jm, long fromTxId, boolean inProgressOk, boolean abortOnGap) throws IOException { long numTransactions = 0, txId = fromTxId; - final TreeMultiset allStreams = - TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); + final PriorityQueue allStreams = + new PriorityQueue(64, + JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); jm.selectInputStreams(allStreams, fromTxId, inProgressOk); - + EditLogInputStream elis = null; try { - for (EditLogInputStream elis : allStreams) { + while ((elis = allStreams.poll()) != null) { elis.skipUntil(txId); while (true) { FSEditLogOp op = elis.readOp(); @@ -87,6 +86,7 @@ static long getNumberOfTransactions(FileJournalManager jm, long fromTxId, } } finally { IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0])); + IOUtils.cleanup(LOG, elis); } return numTransactions; } @@ -379,27 +379,28 @@ public void testMatchEditLogInvalidDirThrowsIOException() throws IOException { private static EditLogInputStream getJournalInputStream(JournalManager jm, long txId, boolean inProgressOk) throws IOException { - final TreeMultiset allStreams = - TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); + final PriorityQueue allStreams = + new PriorityQueue(64, + JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); jm.selectInputStreams(allStreams, txId, inProgressOk); + EditLogInputStream elis = null, ret; try { - for (Iterator iter = allStreams.iterator(); - iter.hasNext();) { - EditLogInputStream elis = iter.next(); + while ((elis = allStreams.poll()) != null) { if (elis.getFirstTxId() > txId) { break; } if (elis.getLastTxId() < txId) { - iter.remove(); elis.close(); continue; } elis.skipUntil(txId); - iter.remove(); - return elis; + ret = elis; + elis = null; + return ret; } } finally { IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0])); + IOUtils.cleanup(LOG, elis); } return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java index 608ee26e3fa..c1287e7d5da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java @@ -343,7 +343,7 @@ static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize) StorageDirectory sd = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) - .build(); + .enableManagedDfsDirsRedundancy(false).build(); cluster.waitActive(); if (!finalize) { // Normally, the in-progress edit log would be finalized by @@ -379,7 +379,7 @@ static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize) try { LOG.debug("trying to start normally (this should fail)..."); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) - .format(false).build(); + .enableManagedDfsDirsRedundancy(false).format(false).build(); cluster.waitActive(); cluster.shutdown(); if (needRecovery) { @@ -404,7 +404,8 @@ static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize) try { LOG.debug("running recovery..."); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) - .format(false).startupOption(recoverStartOpt).build(); + .enableManagedDfsDirsRedundancy(false).format(false) + .startupOption(recoverStartOpt).build(); } catch (IOException e) { fail("caught IOException while trying to recover. " + "message was " + e.getMessage() + @@ -420,7 +421,7 @@ static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize) try { LOG.debug("starting cluster normally after recovery..."); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) - .format(false).build(); + .enableManagedDfsDirsRedundancy(false).format(false).build(); LOG.debug("successfully recovered the " + corruptor.getName() + " corrupted edit log"); cluster.waitActive();