diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt index f114bb613bb..918fd8b43ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt @@ -36,3 +36,5 @@ HDFS-3839. QJM: hadoop-daemon.sh should be updated to accept "journalnode" (eli) HDFS-3845. Fixes for edge cases in QJM recovery protocol (todd) HDFS-3877. QJM: Provide defaults for dfs.journalnode.*address (eli) + +HDFS-3863. Track last "committed" txid in QJM (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java index 62bac82de57..6b5e3f33ecd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java @@ -123,6 +123,13 @@ public ListenableFuture acceptRecovery(SegmentStateProto log, */ public void setEpoch(long e); + /** + * Let the logger know the highest committed txid across all loggers in the + * set. This txid may be higher than the last committed txid for this + * logger. See HDFS-3863 for details. + */ + public void setCommittedTxId(long txid); + /** * Build an HTTP URL to fetch the log segment with the given startTxId. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java index 76414f9bb7f..96300db5fd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java @@ -99,6 +99,18 @@ private void setEpoch(long e) { } } + + /** + * Set the highest successfully committed txid seen by the writer. + * This should be called after a successful write to a quorum, and is used + * for extra sanity checks against the protocol. See HDFS-3863. + */ + public void setCommittedTxId(long txid) { + for (AsyncLogger logger : loggers) { + logger.setCommittedTxId(txid); + } + } + /** * @return the epoch number for this writer. This may only be called after * a successful call to {@link #createNewUniqueEpoch(NamespaceInfo)}. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index 9d19ec173b7..80228f89008 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; @@ -73,6 +74,8 @@ public class IPCLoggerChannel implements AsyncLogger { private final ListeningExecutorService executor; private long ipcSerial = 0; private long epoch = -1; + private long committedTxId = HdfsConstants.INVALID_TXID; + private final String journalId; private final NamespaceInfo nsInfo; private int httpPort = -1; @@ -115,11 +118,20 @@ public IPCLoggerChannel(Configuration conf, executor = MoreExecutors.listeningDecorator( createExecutor()); } + @Override public synchronized void setEpoch(long epoch) { this.epoch = epoch; } + @Override + public synchronized void setCommittedTxId(long txid) { + Preconditions.checkArgument(txid >= committedTxId, + "Trying to move committed txid backwards in client " + + "old: %s new: %s", committedTxId, txid); + this.committedTxId = txid; + } + @Override public void close() { // No more tasks may be submitted after this point. @@ -183,7 +195,8 @@ public URL buildURLToFetchLogs(long segmentTxId) { private synchronized RequestInfo createReqInfo() { Preconditions.checkState(epoch > 0, "bad epoch: " + epoch); - return new RequestInfo(journalId, epoch, ipcSerial++); + return new RequestInfo(journalId, epoch, ipcSerial++, + committedTxId); } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index 4dcc12e4097..7f11648ddf9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -265,6 +265,21 @@ private void recoverUnclosedSegment(long segmentTxId) throws IOException { SegmentStateProto logToSync = bestResponse.getSegmentState(); assert segmentTxId == logToSync.getStartTxId(); + // Sanity check: none of the loggers should be aware of a higher + // txid than the txid we intend to truncate to + for (Map.Entry e : + prepareResponses.entrySet()) { + AsyncLogger logger = e.getKey(); + PrepareRecoveryResponseProto resp = e.getValue(); + + if (resp.hasLastCommittedTxId() && + resp.getLastCommittedTxId() > logToSync.getEndTxId()) { + throw new AssertionError("Decided to synchronize log to " + logToSync + + " but logger " + logger + " had seen txid " + + resp.getLastCommittedTxId() + " committed"); + } + } + URL syncFromUrl = bestLogger.buildURLToFetchLogs(segmentTxId); QuorumCall accept = loggers.acceptRecovery(logToSync, syncFromUrl); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java index 2725e72587c..e46676fd7a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java @@ -102,6 +102,11 @@ protected void flushAndSync() throws IOException { segmentTxId, firstTxToFlush, numReadyTxns, data); loggers.waitForWriteQuorum(qcall, 20000); // TODO: configurable timeout + + // Since we successfully wrote this batch, let the loggers know. Any future + // RPCs will thus let the loggers know of the most recent transaction, even + // if a logger has fallen behind. + loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java index 86574f7efdb..b2167a5e41e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java @@ -18,17 +18,21 @@ package org.apache.hadoop.hdfs.qjournal.protocol; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; @InterfaceAudience.Private public class RequestInfo { private String jid; private long epoch; private long ipcSerialNumber; + private long committedTxId; - public RequestInfo(String jid, long epoch, long ipcSerialNumber) { + public RequestInfo(String jid, long epoch, long ipcSerialNumber, + long committedTxId) { this.jid = jid; this.epoch = epoch; this.ipcSerialNumber = ipcSerialNumber; + this.committedTxId = committedTxId; } public long getEpoch() { @@ -51,4 +55,11 @@ public void setIpcSerialNumber(long ipcSerialNumber) { this.ipcSerialNumber = ipcSerialNumber; } + public long getCommittedTxId() { + return committedTxId; + } + + public boolean hasCommittedTxId() { + return (committedTxId != HdfsConstants.INVALID_TXID); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java index c34f6482c2c..4f0e8af2339 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.qjournal.protocolPB; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; @@ -199,6 +200,8 @@ private RequestInfo convert( return new RequestInfo( reqInfo.getJournalId().getIdentifier(), reqInfo.getEpoch(), - reqInfo.getIpcSerialNumber()); + reqInfo.getIpcSerialNumber(), + reqInfo.hasCommittedTxId() ? + reqInfo.getCommittedTxId() : HdfsConstants.INVALID_TXID); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java index 0ac70702c9c..6e3b1411153 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.RequestInfoProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; @@ -143,11 +144,14 @@ public void journal(RequestInfo reqInfo, private QJournalProtocolProtos.RequestInfoProto convert( RequestInfo reqInfo) { - return QJournalProtocolProtos.RequestInfoProto.newBuilder() - .setJournalId(convertJournalId(reqInfo.getJournalId())) - .setEpoch(reqInfo.getEpoch()) - .setIpcSerialNumber(reqInfo.getIpcSerialNumber()) - .build(); + RequestInfoProto.Builder builder = RequestInfoProto.newBuilder() + .setJournalId(convertJournalId(reqInfo.getJournalId())) + .setEpoch(reqInfo.getEpoch()) + .setIpcSerialNumber(reqInfo.getIpcSerialNumber()); + if (reqInfo.hasCommittedTxId()) { + builder.setCommittedTxId(reqInfo.getCommittedTxId()); + } + return builder.build(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index c633faf509c..0c38b84cb42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -46,10 +46,13 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.util.AtomicFileOutputStream; +import org.apache.hadoop.hdfs.util.BestEffortLongFile; import org.apache.hadoop.hdfs.util.PersistentLongFile; import org.apache.hadoop.io.IOUtils; import com.google.common.base.Preconditions; +import com.google.common.collect.Range; +import com.google.common.collect.Ranges; import com.google.protobuf.ByteString; import com.google.protobuf.TextFormat; @@ -85,9 +88,18 @@ class Journal implements Closeable { */ private PersistentLongFile lastWriterEpoch; + /** + * Lower-bound on the last committed transaction ID. This is not + * depended upon for correctness, but acts as a sanity check + * during the recovery procedures, and as a visibility mark + * for clients reading in-progress logs. + */ + private BestEffortLongFile committedTxnId; + private static final String LAST_PROMISED_FILENAME = "last-promised-epoch"; private static final String LAST_WRITER_EPOCH = "last-writer-epoch"; - + private static final String COMMITTED_TXID_FILENAME = "committed-txid"; + private final FileJournalManager fjm; Journal(File logDir, StorageErrorReporter errorReporter) throws IOException { @@ -98,7 +110,10 @@ class Journal implements Closeable { new File(currentDir, LAST_PROMISED_FILENAME), 0); this.lastWriterEpoch = new PersistentLongFile( new File(currentDir, LAST_WRITER_EPOCH), 0); - + this.committedTxnId = new BestEffortLongFile( + new File(currentDir, COMMITTED_TXID_FILENAME), + HdfsConstants.INVALID_TXID); + this.fjm = storage.getJournalManager(); } @@ -113,22 +128,21 @@ private synchronized void scanStorage() throws IOException { } LOG.info("Scanning storage " + fjm); List files = fjm.getLogFiles(0); - if (files.isEmpty()) { - curSegmentTxId = HdfsConstants.INVALID_TXID; - return; - } + curSegmentTxId = HdfsConstants.INVALID_TXID; - EditLogFile latestLog = files.get(files.size() - 1); - latestLog.validateLog(); - LOG.info("Latest log is " + latestLog); - if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) { - // the log contains no transactions - LOG.warn("Latest log " + latestLog + " has no transactions. " + - "moving it aside"); - latestLog.moveAsideEmptyFile(); - curSegmentTxId = HdfsConstants.INVALID_TXID; - } else { - curSegmentTxId = latestLog.getFirstTxId(); + while (!files.isEmpty()) { + EditLogFile latestLog = files.remove(files.size() - 1); + latestLog.validateLog(); + LOG.info("Latest log is " + latestLog); + if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) { + // the log contains no transactions + LOG.warn("Latest log " + latestLog + " has no transactions. " + + "moving it aside and looking for previous log"); + latestLog.moveAsideEmptyFile(); + } else { + curSegmentTxId = latestLog.getFirstTxId(); + break; + } } } @@ -150,6 +164,8 @@ void format(NamespaceInfo nsInfo) throws IOException { @Override // Closeable public void close() throws IOException { storage.close(); + + IOUtils.closeStream(committedTxnId); } JNStorage getStorage() { @@ -164,6 +180,10 @@ synchronized long getLastPromisedEpoch() throws IOException { checkFormatted(); return lastPromisedEpoch.get(); } + + synchronized long getCommittedTxnIdForTests() throws IOException { + return committedTxnId.get(); + } /** * Try to create a new epoch for this journal. @@ -213,8 +233,8 @@ synchronized NewEpochResponseProto newEpoch( synchronized void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { - checkWriteRequest(reqInfo); checkFormatted(); + checkWriteRequest(reqInfo); // TODO: if a JN goes down and comes back up, then it will throw // this exception on every edit. We should instead send back @@ -245,6 +265,7 @@ synchronized void journal(RequestInfo reqInfo, if (LOG.isTraceEnabled()) { LOG.trace("Writing txid " + firstTxnId + "-" + (firstTxnId + numTxns - 1)); } + curSegment.writeRaw(records, 0, records.length); curSegment.setReadyToFlush(); curSegment.flush(); @@ -270,6 +291,15 @@ private synchronized void checkRequest(RequestInfo reqInfo) throws IOException { // TODO: some check on serial number that they only increase from a given // client + + if (reqInfo.hasCommittedTxId()) { + Preconditions.checkArgument( + reqInfo.getCommittedTxId() >= committedTxnId.get(), + "Client trying to move committed txid backward from " + + committedTxnId.get() + " to " + reqInfo.getCommittedTxId()); + + committedTxnId.set(reqInfo.getCommittedTxId()); + } } private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOException { @@ -296,8 +326,8 @@ private void checkFormatted() throws JournalNotFormattedException { public synchronized void startLogSegment(RequestInfo reqInfo, long txid) throws IOException { assert fjm != null; - checkRequest(reqInfo); checkFormatted(); + checkRequest(reqInfo); if (curSegment != null) { LOG.warn("Client is requesting a new log segment " + txid + @@ -352,8 +382,8 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid) */ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException { - checkRequest(reqInfo); checkFormatted(); + checkRequest(reqInfo); if (startTxId == curSegmentTxId) { if (curSegment != null) { @@ -397,8 +427,8 @@ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, */ public synchronized void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep) throws IOException { - checkRequest(reqInfo); checkFormatted(); + checkRequest(reqInfo); fjm.purgeLogsOlderThan(minTxIdToKeep); purgePaxosDecisionsOlderThan(minTxIdToKeep); @@ -492,8 +522,8 @@ private SegmentStateProto getSegmentInfo(long segmentTxId) */ public synchronized PrepareRecoveryResponseProto prepareRecovery( RequestInfo reqInfo, long segmentTxId) throws IOException { - checkRequest(reqInfo); checkFormatted(); + checkRequest(reqInfo); PrepareRecoveryResponseProto.Builder builder = PrepareRecoveryResponseProto.newBuilder(); @@ -519,6 +549,9 @@ public synchronized PrepareRecoveryResponseProto prepareRecovery( } builder.setLastWriterEpoch(lastWriterEpoch.get()); + if (committedTxnId.get() != HdfsConstants.INVALID_TXID) { + builder.setLastCommittedTxId(committedTxnId.get()); + } PrepareRecoveryResponseProto resp = builder.build(); LOG.info("Prepared recovery for segment " + segmentTxId + ": " + @@ -532,8 +565,8 @@ public synchronized PrepareRecoveryResponseProto prepareRecovery( public synchronized void acceptRecovery(RequestInfo reqInfo, SegmentStateProto segment, URL fromUrl) throws IOException { - checkRequest(reqInfo); checkFormatted(); + checkRequest(reqInfo); long segmentTxId = segment.getStartTxId(); // TODO: right now, a recovery of a segment when the log is @@ -563,8 +596,22 @@ public synchronized void acceptRecovery(RequestInfo reqInfo, ": no current segment in place"); } else { LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + - ": old segment " + TextFormat.shortDebugString(segment) + " is " + - "not the right length"); + ": old segment " + TextFormat.shortDebugString(currentSegment) + + " is not the right length"); + + // Paranoid sanity check: if the new log is shorter than the log we + // currently have, we should not end up discarding any transactions + // which are already Committed. + if (txnRange(currentSegment).contains(committedTxnId.get()) && + !txnRange(segment).contains(committedTxnId.get())) { + throw new AssertionError( + "Cannot replace segment " + + TextFormat.shortDebugString(currentSegment) + + " with new segment " + + TextFormat.shortDebugString(segment) + + ": would discard already-committed txn " + + committedTxnId.get()); + } } syncLog(reqInfo, segment, fromUrl); } else { @@ -581,6 +628,12 @@ public synchronized void acceptRecovery(RequestInfo reqInfo, TextFormat.shortDebugString(newData)); } + private Range txnRange(SegmentStateProto seg) { + Preconditions.checkArgument(seg.hasEndTxId(), + "invalid segment: %s", seg); + return Ranges.closed(seg.getStartTxId(), seg.getEndTxId()); + } + /** * Synchronize a log segment from another JournalNode. * @param reqInfo the request info for the recovery IPC diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java new file mode 100644 index 00000000000..292402b245f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.io.Closeable; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.IOUtils; + +import com.google.common.io.Files; +import com.google.common.primitives.Longs; + +/** + * Class that represents a file on disk which stores a single long + * value, but does not make any effort to make it truly durable. This is in + * contrast to {@link PersistentLongFile} which fsync()s the value on every + * change. + * + * This should be used for values which are updated frequently (such that + * performance is important) and not required to be up-to-date for correctness. + * + * This class also differs in that it stores the value as binary data instead + * of a textual string. + */ +@InterfaceAudience.Private +public class BestEffortLongFile implements Closeable { + + private final File file; + private final long defaultVal; + + private long value; + + private FileChannel ch = null; + + private ByteBuffer buf = ByteBuffer.allocate(Long.SIZE/8); + + public BestEffortLongFile(File file, long defaultVal) { + this.file = file; + this.defaultVal = defaultVal; + } + + public long get() throws IOException { + lazyOpen(); + return value; + } + + public void set(long newVal) throws IOException { + lazyOpen(); + buf.clear(); + buf.putLong(newVal); + buf.flip(); + IOUtils.writeFully(ch, buf, 0); + value = newVal; + } + + private void lazyOpen() throws IOException { + if (ch != null) { + return; + } + + // Load current value. + byte[] data = null; + try { + data = Files.toByteArray(file); + } catch (FileNotFoundException fnfe) { + // Expected - this will use default value. + } + + if (data != null && data.length != 0) { + if (data.length != Longs.BYTES) { + throw new IOException("File " + file + " had invalid length: " + + data.length); + } + value = Longs.fromByteArray(data); + } else { + value = defaultVal; + } + + // Now open file for future writes. + RandomAccessFile raf = new RandomAccessFile(file, "rw"); + try { + ch = raf.getChannel(); + } finally { + if (ch == null) { + IOUtils.closeStream(raf); + } + } + } + + @Override + public void close() throws IOException { + if (ch != null) { + ch.close(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto index 177758aa924..e1269bf523c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto @@ -31,6 +31,13 @@ message RequestInfoProto { required JournalIdProto journalId = 1; required uint64 epoch = 2; required uint64 ipcSerialNumber = 3; + + // Whenever a writer makes a request, it informs + // the node of the latest committed txid. This may + // be higher than the transaction data included in the + // request itself, eg in the case that the node has + // fallen behind. + optional uint64 committedTxId = 4; } message SegmentStateProto { @@ -163,6 +170,11 @@ message PrepareRecoveryResponseProto { optional SegmentStateProto segmentState = 1; optional uint64 acceptedInEpoch = 2; required uint64 lastWriterEpoch = 3; + + // The highest committed txid that this logger has ever seen. + // This may be higher than the data it actually has, in the case + // that it was lagging before the old writer crashed. + optional uint64 lastCommittedTxId = 4; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java index 0438bacc9f3..295eab1986c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java @@ -186,6 +186,8 @@ public void testWriteEditsOneSlow() throws Exception { Mockito.doReturn(slowLog).when(spyLoggers.get(2)).sendEdits( anyLong(), eq(1L), eq(1), Mockito.any()); stm.flush(); + + Mockito.verify(spyLoggers.get(0)).setCommittedTxId(1L); } private EditLogOutputStream createLogSegment() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java index 2ca588c912d..8f555e6b0c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Assume; @@ -70,6 +71,11 @@ public void verifyNoStorageErrors() throws Exception{ .reportErrorOnFile(Mockito.any()); } + @After + public void cleanup() { + IOUtils.closeStream(journal); + } + @Test public void testEpochHandling() throws Exception { assertEquals(0, journal.getLastPromisedEpoch()); @@ -88,28 +94,41 @@ public void testEpochHandling() throws Exception { "Proposed epoch 3 <= last promise 3", ioe); } try { - journal.startLogSegment(new RequestInfo(JID, 1L, 1L), - 12345L); + journal.startLogSegment(makeRI(1), 12345L); fail("Should have rejected call from prior epoch"); } catch (IOException ioe) { GenericTestUtils.assertExceptionContains( "epoch 1 is less than the last promised epoch 3", ioe); } try { - journal.journal(new RequestInfo(JID, 1L, 1L), - 12345L, 100L, 0, new byte[0]); + journal.journal(makeRI(1), 12345L, 100L, 0, new byte[0]); fail("Should have rejected call from prior epoch"); } catch (IOException ioe) { GenericTestUtils.assertExceptionContains( "epoch 1 is less than the last promised epoch 3", ioe); } } - + + @Test + public void testMaintainCommittedTxId() throws Exception { + journal.newEpoch(FAKE_NSINFO, 1); + journal.startLogSegment(makeRI(1), 1); + // Send txids 1-3, with a request indicating only 0 committed + journal.journal(new RequestInfo(JID, 1, 2, 0), 1, 1, 3, + QJMTestUtil.createTxnData(1, 3)); + assertEquals(0, journal.getCommittedTxnIdForTests()); + + // Send 4-6, with request indicating that through 3 is committed. + journal.journal(new RequestInfo(JID, 1, 3, 3), 1, 4, 3, + QJMTestUtil.createTxnData(4, 6)); + assertEquals(3, journal.getCommittedTxnIdForTests()); + } + @Test public void testRestartJournal() throws Exception { journal.newEpoch(FAKE_NSINFO, 1); - journal.startLogSegment(new RequestInfo("j", 1, 1), 1); - journal.journal(new RequestInfo("j", 1, 2), 1, 1, 2, + journal.startLogSegment(makeRI(1), 1); + journal.journal(makeRI(2), 1, 1, 2, QJMTestUtil.createTxnData(1, 2)); // Don't finalize. @@ -129,6 +148,23 @@ public void testRestartJournal() throws Exception { assertEquals(1, newEpoch.getLastSegmentTxId()); } + /** + * Test that, if the writer crashes at the very beginning of a segment, + * before any transactions are written, that the next newEpoch() call + * returns the prior segment txid as its most recent segment. + */ + @Test + public void testNewEpochAtBeginningOfSegment() throws Exception { + journal.newEpoch(FAKE_NSINFO, 1); + journal.startLogSegment(makeRI(1), 1); + journal.journal(makeRI(2), 1, 1, 2, + QJMTestUtil.createTxnData(1, 2)); + journal.finalizeLogSegment(makeRI(3), 1, 2); + journal.startLogSegment(makeRI(3), 3); + NewEpochResponseProto resp = journal.newEpoch(FAKE_NSINFO, 2); + assertEquals(1, resp.getLastSegmentTxId()); + } + @Test public void testJournalLocking() throws Exception { Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported()); @@ -289,7 +325,7 @@ public void testStartLogSegmentWhenAlreadyExists() throws Exception { } private static RequestInfo makeRI(int serial) { - return new RequestInfo(JID, 1, serial); + return new RequestInfo(JID, 1, serial, 0); } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java index ef83104f030..75f1c69cc0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java @@ -119,8 +119,8 @@ public void testReturnsSegmentInfoAtEpochTransition() throws Exception { response = ch.newEpoch(4).get(); ch.setEpoch(4); // Because the new segment is empty, it is equivalent to not having - // started writing it. - assertEquals(0, response.getLastSegmentTxId()); + // started writing it. Hence, we should return the prior segment txid. + assertEquals(1, response.getLastSegmentTxId()); } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java new file mode 100644 index 00000000000..c57dc97031f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.io.File; +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IOUtils; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestBestEffortLongFile { + + private static final File FILE = new File(MiniDFSCluster.getBaseDirectory() + + File.separatorChar + "TestBestEffortLongFile"); + + @Before + public void cleanup() { + if (FILE.exists()) { + assertTrue(FILE.delete()); + } + FILE.getParentFile().mkdirs(); + } + + @Test + public void testGetSet() throws IOException { + BestEffortLongFile f = new BestEffortLongFile(FILE, 12345L); + try { + // Before the file exists, should return default. + assertEquals(12345L, f.get()); + + // And first access should open it. + assertTrue(FILE.exists()); + + Random r = new Random(); + for (int i = 0; i < 100; i++) { + long newVal = r.nextLong(); + // Changing the value should be reflected in the next get() call. + f.set(newVal); + assertEquals(newVal, f.get()); + + // And should be reflected in a new instance (ie it actually got + // written to the file) + BestEffortLongFile f2 = new BestEffortLongFile(FILE, 999L); + try { + assertEquals(newVal, f2.get()); + } finally { + IOUtils.closeStream(f2); + } + } + } finally { + IOUtils.closeStream(f); + } + } + + @Test + public void testTruncatedFileReturnsDefault() throws IOException { + assertTrue(FILE.createNewFile()); + assertEquals(0, FILE.length()); + BestEffortLongFile f = new BestEffortLongFile(FILE, 12345L); + try { + assertEquals(12345L, f.get()); + } finally { + f.close(); + } + } +}