From abb8614dcff1465e2c5bc7a672d6c07102c98e22 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Wed, 23 May 2018 12:42:13 -0700 Subject: [PATCH] HDFS-13608. [SBN read] Edit Tail Fast Path Part 2: Add ability for JournalNode to serve edits via RPC. Contributed by Erik Krogen. --- .../src/site/markdown/Metrics.md | 5 ++ .../qjournal/protocol/QJournalProtocol.java | 24 +++++++- ...JournalProtocolServerSideTranslatorPB.java | 14 +++++ .../QJournalProtocolTranslatorPB.java | 20 +++++++ .../hadoop/hdfs/qjournal/server/Journal.java | 59 +++++++++++++++++++ .../hdfs/qjournal/server/JournalMetrics.java | 20 ++++++- .../qjournal/server/JournalNodeRpcServer.java | 8 +++ .../src/main/proto/QJournalProtocol.proto | 18 ++++++ .../hdfs/qjournal/server/TestJournal.java | 46 +++++++++++++++ 9 files changed, 212 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index afd1e1eef35..07d3497c632 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -321,6 +321,11 @@ The server-side metrics for a journal from the JournalNode's perspective. Each m | `LastWrittenTxId` | The highest transaction id stored on this JournalNode | | `LastPromisedEpoch` | The last epoch number which this node has promised not to accept any lower epoch, or 0 if no promises have been made | | `LastJournalTimestamp` | The timestamp of last successfully written transaction | +| `TxnsServedViaRpc` | Number of transactions served via the RPC mechanism | +| `BytesServedViaRpc` | Number of bytes served via the RPC mechanism | +| `RpcRequestCacheMissAmountNumMisses` | Number of RPC requests which could not be served due to lack of data in the cache | +| `RpcRequestCacheMissAmountAvgTxns` | The average number of transactions by which a request missed the cache; for example if transaction ID 10 is requested and the cache's oldest transaction is ID 15, value 5 will be added to this average | +| `RpcEmptyResponses` | Number of RPC requests with zero edits returned | datanode -------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java index 5558bd54721..c0027966d5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; @@ -139,7 +140,28 @@ public interface QJournalProtocol { long sinceTxId, boolean inProgressOk) throws IOException; - + + /** + * Fetch edit logs present in the Journal's in-memory cache of edits + * ({@link org.apache.hadoop.hdfs.qjournal.server.JournaledEditsCache}). + * To enable this cache, in-progress edit log tailing must be enabled via the + * {@value DFSConfigKeys#DFS_HA_TAILEDITS_INPROGRESS_KEY} configuration key. + * + * @param jid The ID of the journal from which to fetch edits. + * @param nameServiceId The ID of the namespace for which to fetch edits. + * @param sinceTxId Fetch edits starting at this transaction ID + * @param maxTxns Request at most this many transactions to be returned + * @throws IOException If there was an issue encountered while fetching edits + * from the cache, including a cache miss (cache does not contain the + * requested edits). The caller should then attempt to fetch the edits via + * the streaming mechanism (starting with + * {@link #getEditLogManifest(String, String, long, boolean)}). + * @return Response containing serialized edits to be loaded + * @see org.apache.hadoop.hdfs.qjournal.server.JournaledEditsCache + */ + GetJournaledEditsResponseProto getJournaledEdits(String jid, + String nameServiceId, long sinceTxId, int maxTxns) throws IOException; + /** * Begin the recovery process for a given segment. See the HDFS-3077 * design document for details. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java index 865d2969220..00b1be36bf2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatReq import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsRequestProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto; @@ -235,6 +237,18 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP } } + @Override + public GetJournaledEditsResponseProto getJournaledEdits( + RpcController controller, GetJournaledEditsRequestProto request) + throws ServiceException { + try { + return impl.getJournaledEdits(request.getJid().getIdentifier(), + request.hasNameServiceId() ? request.getNameServiceId() : null, + request.getSinceTxId(), request.getMaxTxns()); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } @Override public PrepareRecoveryResponseProto prepareRecovery(RpcController controller, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java index d7cd7b55811..536de00197a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java @@ -40,6 +40,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeL import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsRequestProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto; @@ -281,6 +283,24 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface, } } + @Override + public GetJournaledEditsResponseProto getJournaledEdits(String jid, + String nameServiceId, long sinceTxId, int maxTxns) throws IOException { + try { + GetJournaledEditsRequestProto.Builder req = + GetJournaledEditsRequestProto.newBuilder() + .setJid(convertJournalId(jid)) + .setSinceTxId(sinceTxId) + .setMaxTxns(maxTxns); + if (nameServiceId != null) { + req.setNameServiceId(nameServiceId); + } + return rpcProxy.getJournaledEdits(NULL_CONTROLLER, req.build()); + } catch (ServiceException se) { + throw ProtobufHelper.getRemoteException(se); + } + } + @Override public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo, long segmentTxId) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 6d3a081fa72..90796a815be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.qjournal.server; +import com.google.protobuf.ByteString; import java.io.Closeable; import java.io.File; import java.io.FileInputStream; @@ -24,9 +25,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStreamWriter; import java.net.URL; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; @@ -36,10 +39,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException; import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; @@ -85,6 +90,7 @@ public class Journal implements Closeable { // Current writing state private EditLogOutputStream curSegment; private long curSegmentTxId = HdfsServerConstants.INVALID_TXID; + private int curSegmentLayoutVersion = 0; private long nextTxId = HdfsServerConstants.INVALID_TXID; private long highestWrittenTxId = 0; @@ -133,6 +139,8 @@ public class Journal implements Closeable { private final FileJournalManager fjm; + private final JournaledEditsCache cache; + private final JournalMetrics metrics; private long lastJournalTimestamp = 0; @@ -156,6 +164,13 @@ public class Journal implements Closeable { refreshCachedData(); this.fjm = storage.getJournalManager(); + + if (conf.getBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT)) { + this.cache = new JournaledEditsCache(conf); + } else { + this.cache = null; + } this.metrics = JournalMetrics.create(this); @@ -361,6 +376,7 @@ public class Journal implements Closeable { curSegment.abort(); curSegment = null; curSegmentTxId = HdfsServerConstants.INVALID_TXID; + curSegmentLayoutVersion = 0; } /** @@ -406,6 +422,9 @@ public class Journal implements Closeable { LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId + " ; journal id: " + journalId); } + if (cache != null) { + cache.storeEdits(records, firstTxnId, lastTxnId, curSegmentLayoutVersion); + } // If the edit has already been marked as committed, we know // it has been fsynced on a quorum of other nodes, and we are @@ -593,6 +612,7 @@ public class Journal implements Closeable { curSegment = fjm.startLogSegment(txid, layoutVersion); curSegmentTxId = txid; + curSegmentLayoutVersion = layoutVersion; nextTxId = txid; } @@ -612,6 +632,7 @@ public class Journal implements Closeable { curSegment.close(); curSegment = null; curSegmentTxId = HdfsServerConstants.INVALID_TXID; + curSegmentLayoutVersion = 0; } checkSync(nextTxId == endTxId + 1, @@ -712,6 +733,44 @@ public class Journal implements Closeable { return new RemoteEditLogManifest(logs, getCommittedTxnId()); } + /** + * @see QJournalProtocol#getJournaledEdits(String, String, long, int) + */ + public GetJournaledEditsResponseProto getJournaledEdits(long sinceTxId, + int maxTxns) throws IOException { + if (cache == null) { + throw new IOException("The journal edits cache is not enabled, which " + + "is a requirement to fetch journaled edits via RPC. Please enable " + + "it via " + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY); + } + if (sinceTxId > getHighestWrittenTxId()) { + // Requested edits that don't exist yet; short-circuit the cache here + metrics.rpcEmptyResponses.incr(); + return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build(); + } + try { + List buffers = new ArrayList<>(); + int txnCount = cache.retrieveEdits(sinceTxId, maxTxns, buffers); + int totalSize = 0; + for (ByteBuffer buf : buffers) { + totalSize += buf.remaining(); + } + metrics.txnsServedViaRpc.incr(txnCount); + metrics.bytesServedViaRpc.incr(totalSize); + ByteString.Output output = ByteString.newOutput(totalSize); + for (ByteBuffer buf : buffers) { + output.write(buf.array(), buf.position(), buf.remaining()); + } + return GetJournaledEditsResponseProto.newBuilder() + .setTxnCount(txnCount) + .setEditLog(output.toByteString()) + .build(); + } catch (JournaledEditsCache.CacheMissException cme) { + metrics.rpcRequestCacheMissAmount.add(cme.getCacheMissAmount()); + throw cme; + } + } + /** * @return the current state of the given segment, or null if the * segment does not exist. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java index fcfd9016cd1..7d271f36653 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java @@ -25,6 +25,8 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableQuantiles; +import org.apache.hadoop.metrics2.lib.MutableStat; + /** * The server-side metrics for a journal from the JournalNode's @@ -42,7 +44,23 @@ class JournalMetrics { @Metric("Number of bytes written since startup") MutableCounterLong bytesWritten; - + + @Metric("Number of txns served via RPC") + MutableCounterLong txnsServedViaRpc; + + @Metric("Number of bytes served via RPC") + MutableCounterLong bytesServedViaRpc; + + @Metric + MutableStat rpcRequestCacheMissAmount = new MutableStat( + "RpcRequestCacheMissAmount", "Number of RPC requests unable to be " + + "served due to lack of availability in cache, and how many " + + "transactions away the request was from being in the cache.", + "Misses", "Txns"); + + @Metric("Number of RPC requests with zero edits returned") + MutableCounterLong rpcEmptyResponses; + @Metric("Number of batches written where this node was lagging") MutableCounterLong batchesWrittenWhileLagging; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java index b1a3c9665d7..39d75ad0d96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.InterQJournalProtocolService; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; @@ -233,6 +234,13 @@ public class JournalNodeRpcServer implements QJournalProtocol, .build(); } + @Override + public GetJournaledEditsResponseProto getJournaledEdits(String jid, + String nameServiceId, long sinceTxId, int maxTxns) throws IOException { + return jn.getOrCreateJournal(jid, nameServiceId) + .getJournaledEdits(sinceTxId, maxTxns); + } + @Override public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo, long segmentTxId) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto index a37c7236a65..26f44960979 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto @@ -284,6 +284,21 @@ message GetEditLogManifestResponseProto { // required NamespaceInfoProto nsInfo = 2; } +/** + * getJournaledEdits() + */ +message GetJournaledEditsRequestProto { + required JournalIdProto jid = 1; + required uint64 sinceTxId = 2; + required uint32 maxTxns = 3; + optional string nameServiceId = 4; +} + +message GetJournaledEditsResponseProto { + required uint32 txnCount = 1; + optional bytes editLog = 2; +} + /** * prepareRecovery() */ @@ -363,6 +378,9 @@ service QJournalProtocolService { rpc getEditLogManifest(GetEditLogManifestRequestProto) returns (GetEditLogManifestResponseProto); + rpc getJournaledEdits(GetJournaledEditsRequestProto) + returns (GetJournaledEditsResponseProto); + rpc prepareRecovery(PrepareRecoveryRequestProto) returns (PrepareRecoveryResponseProto); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java index b71d69445c7..ac75db96103 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java @@ -17,19 +17,25 @@ */ package org.apache.hadoop.hdfs.qjournal.server; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.primitives.Bytes; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; @@ -38,6 +44,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; +import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.IOUtils; @@ -71,6 +78,8 @@ public class TestJournal { public void setup() throws Exception { FileUtil.fullyDelete(TEST_LOG_DIR); conf = new Configuration(); + // Enable fetching edits via RPC + conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); journal = new Journal(conf, TEST_LOG_DIR, JID, StartupOption.REGULAR, mockErrorReporter); journal.format(FAKE_NSINFO); @@ -434,4 +443,41 @@ public class TestJournal { } } + @Test + public void testReadFromCache() throws Exception { + journal.newEpoch(FAKE_NSINFO, 1); + journal.startLogSegment(makeRI(1), 1, + NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + journal.journal(makeRI(2), 1, 1, 5, QJMTestUtil.createTxnData(1, 5)); + journal.journal(makeRI(3), 1, 6, 5, QJMTestUtil.createTxnData(6, 5)); + journal.journal(makeRI(4), 1, 11, 5, QJMTestUtil.createTxnData(11, 5)); + assertJournaledEditsTxnCountAndContents(1, 7, 7, + NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + assertJournaledEditsTxnCountAndContents(1, 30, 15, + NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + + journal.finalizeLogSegment(makeRI(5), 1, 15); + int newLayoutVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1; + journal.startLogSegment(makeRI(6), 16, newLayoutVersion); + journal.journal(makeRI(7), 16, 16, 5, QJMTestUtil.createTxnData(16, 5)); + + assertJournaledEditsTxnCountAndContents(16, 10, 20, newLayoutVersion); + } + + private void assertJournaledEditsTxnCountAndContents(int startTxn, + int requestedMaxTxns, int expectedEndTxn, int layoutVersion) + throws Exception { + GetJournaledEditsResponseProto result = + journal.getJournaledEdits(startTxn, requestedMaxTxns); + int expectedTxnCount = expectedEndTxn - startTxn + 1; + ByteArrayOutputStream headerBytes = new ByteArrayOutputStream(); + EditLogFileOutputStream.writeHeader(layoutVersion, + new DataOutputStream(headerBytes)); + assertEquals(expectedTxnCount, result.getTxnCount()); + assertArrayEquals( + Bytes.concat( + headerBytes.toByteArray(), + QJMTestUtil.createTxnData(startTxn, expectedTxnCount)), + result.getEditLog().toByteArray()); + } }