From c81ac2ff0220b180cd6cbbf18221290c3783bfd5 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Wed, 9 May 2018 15:40:07 -0700 Subject: [PATCH] HDFS-13607. [SBN read] Edit Tail Fast Path Part 1: Enhance JournalNode with an in-memory cache of recent edit transactions. Contributed by Erik Krogen. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 + .../qjournal/server/JournaledEditsCache.java | 393 ++++++++++++++++++ .../src/main/resources/hdfs-default.xml | 12 + .../server/TestJournaledEditsCache.java | 257 ++++++++++++ 4 files changed, 665 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournaledEditsCache.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f6ce0d53692..3bdff94b860 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1086,6 +1086,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY = "dfs.journalnode.sync.interval"; public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L; + public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY = + "dfs.journalnode.edit-cache-size.bytes"; + public static final int DFS_JOURNALNODE_EDIT_CACHE_SIZE_DEFAULT = 1024 * 1024; // Journal-node related configs for the client side. public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java new file mode 100644 index 00000000000..11510695052 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import com.google.common.annotations.VisibleForTesting; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; +import org.apache.hadoop.util.AutoCloseableLock; + + +/** + * An in-memory cache of edits in their serialized form. This is used to serve + * the {@link Journal#getJournaledEdits(long, int)} call, used by the + * QJM when {@value DFSConfigKeys#DFS_HA_TAILEDITS_INPROGRESS_KEY} is + * enabled. + * + *

When a batch of edits is received by the JournalNode, it is put into this + * cache via {@link #storeEdits(byte[], long, long, int)}. Edits must be + * stored contiguously; if a batch of edits is stored that does not align with + * the previously stored edits, the cache will be cleared before storing new + * edits to avoid gaps. This decision is made because gaps are only handled + * when in recovery mode, which the cache is not intended to be used for. + * + *

Batches of edits are stored in a {@link TreeMap} mapping the starting + * transaction ID of the batch to the data buffer. Upon retrieval, the + * relevant data buffers are concatenated together and a header is added + * to construct a fully-formed edit data stream. + * + *

The cache is of a limited size capacity determined by + * {@value DFSConfigKeys#DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY}. If the capacity + * is exceeded after adding a new batch of edits, batches of edits are removed + * until the total size is less than the capacity, starting from the ones + * containing the oldest transactions. Transactions range in size, but a + * decent rule of thumb is that 200 bytes are needed per transaction. Monitoring + * the {@link JournalMetrics#rpcRequestCacheMissAmount} metric is recommended + * to determine if the cache is too small; it will indicate both how many + * cache misses occurred, and how many more transactions would have been + * needed in the cache to serve the request. + */ +class JournaledEditsCache { + + /** The capacity, in bytes, of this cache. */ + private final int capacity; + + /** + * Read/write lock pair wrapped in AutoCloseable; these refer to the same + * underlying lock. + */ + private final AutoCloseableLock readLock; + private final AutoCloseableLock writeLock; + + // ** Start lock-protected fields ** + + /** + * Stores the actual data as a mapping of the StartTxnId of a batch of edits + * to the serialized batch of edits. Stores only contiguous ranges; that is, + * the last transaction ID in one batch is always one less than the first + * transaction ID in the next batch. Though the map is protected by the lock, + * individual data buffers are immutable and can be accessed without locking. + */ + private final NavigableMap dataMap = new TreeMap<>(); + /** Stores the layout version currently present in the cache. */ + private int layoutVersion = Integer.MAX_VALUE; + /** Stores the serialized version of the header for the current version. */ + private ByteBuffer layoutHeader; + + /** + * The lowest/highest transaction IDs present in the cache. -1 if there are no + * transactions in the cache. + */ + private long lowestTxnId; + private long highestTxnId; + /** + * The lowest transaction ID that was ever present in the cache since last + * being reset (i.e. since initialization or since reset due to being out of + * sync with the Journal). Until the cache size goes above capacity, this is + * equal to lowestTxnId. + */ + private long initialTxnId; + /** The current total size of all buffers in this cache. */ + private int totalSize; + + // ** End lock-protected fields ** + + JournaledEditsCache(Configuration conf) { + capacity = conf.getInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, + DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_DEFAULT); + if (capacity > 0.9 * Runtime.getRuntime().maxMemory()) { + Journal.LOG.warn(String.format("Cache capacity is set at %d bytes but " + + "maximum JVM memory is only %d bytes. It is recommended that you " + + "decrease the cache size or increase the heap size.", + capacity, Runtime.getRuntime().maxMemory())); + } + Journal.LOG.info("Enabling the journaled edits cache with a capacity " + + "of bytes: " + capacity); + ReadWriteLock lock = new ReentrantReadWriteLock(true); + readLock = new AutoCloseableLock(lock.readLock()); + writeLock = new AutoCloseableLock(lock.writeLock()); + initialize(-1); + } + + /** + * Fetch the data for edits starting at the specific transaction ID, fetching + * up to {@code maxTxns} transactions. Populates a list of output buffers + * which contains a serialized version of the edits, and returns the count of + * edits contained within the serialized buffers. The serialized edits are + * prefixed with a standard edit log header containing information about the + * layout version. The transactions returned are guaranteed to have contiguous + * transaction IDs. + * + * If {@code requestedStartTxn} is higher than the highest transaction which + * has been added to this cache, a response with an empty buffer and a + * transaction count of 0 will be returned. If {@code requestedStartTxn} is + * lower than the lowest transaction currently contained in this cache, or no + * transactions have yet been added to the cache, an exception will be thrown. + * @param requestedStartTxn The ID of the first transaction to return. If any + * transactions are returned, it is guaranteed that + * the first one will have this ID. + * @param maxTxns The maximum number of transactions to return. + * @param outputBuffers A list to populate with output buffers. When + * concatenated, these form a full response. + * @return The number of transactions contained within the set of output + * buffers. + * @throws IOException If transactions are requested which cannot be served + * by this cache. + */ + int retrieveEdits(long requestedStartTxn, int maxTxns, + List outputBuffers) throws IOException { + int txnCount = 0; + + try (AutoCloseableLock l = readLock.acquire()) { + if (lowestTxnId < 0 || requestedStartTxn < lowestTxnId) { + throw getCacheMissException(requestedStartTxn); + } else if (requestedStartTxn > highestTxnId) { + return 0; + } + outputBuffers.add(layoutHeader); + Iterator> incrBuffIter = + dataMap.tailMap(dataMap.floorKey(requestedStartTxn), true) + .entrySet().iterator(); + long prevTxn = requestedStartTxn; + byte[] prevBuf = null; + // Stop when maximum transactions reached... + while ((txnCount < maxTxns) && + // ... or there are no more entries ... + (incrBuffIter.hasNext() || prevBuf != null)) { + long currTxn; + byte[] currBuf; + if (incrBuffIter.hasNext()) { + Map.Entry ent = incrBuffIter.next(); + currTxn = ent.getKey(); + currBuf = ent.getValue(); + } else { + // This accounts for the trailing entry + currTxn = highestTxnId + 1; + currBuf = null; + } + if (prevBuf != null) { // True except for the first loop iteration + outputBuffers.add(ByteBuffer.wrap(prevBuf)); + // if prevTxn < requestedStartTxn, the extra transactions will get + // removed after the loop, so don't include them in the txn count + txnCount += currTxn - Math.max(requestedStartTxn, prevTxn); + } + prevTxn = currTxn; + prevBuf = currBuf; + } + // Release the lock before doing operations on the buffers (deserializing + // to find transaction boundaries, and copying into an output buffer) + } + // Remove extra leading transactions in the first buffer + ByteBuffer firstBuf = outputBuffers.get(1); // 0th is the header + firstBuf.position( + findTransactionPosition(firstBuf.array(), requestedStartTxn)); + // Remove trailing transactions in the last buffer if necessary + if (txnCount > maxTxns) { + ByteBuffer lastBuf = outputBuffers.get(outputBuffers.size() - 1); + int limit = + findTransactionPosition(lastBuf.array(), requestedStartTxn + maxTxns); + lastBuf.limit(limit); + txnCount = maxTxns; + } + + return txnCount; + } + + /** + * Store a batch of serialized edits into this cache. Removes old batches + * as necessary to keep the total size of the cache below the capacity. + * See the class Javadoc for more info. + * + * This attempts to always handle malformed inputs gracefully rather than + * throwing an exception, to allow the rest of the Journal's operations + * to proceed normally. + * @param inputData A buffer containing edits in serialized form + * @param newStartTxn The txn ID of the first edit in {@code inputData} + * @param newEndTxn The txn ID of the last edit in {@code inputData} + * @param newLayoutVersion The version of the layout used to serialize + * the edits + */ + void storeEdits(byte[] inputData, long newStartTxn, long newEndTxn, + int newLayoutVersion) { + if (newStartTxn < 0 || newEndTxn < newStartTxn) { + Journal.LOG.error(String.format("Attempted to cache data of length %d " + + "with newStartTxn %d and newEndTxn %d", + inputData.length, newStartTxn, newEndTxn)); + return; + } + try (AutoCloseableLock l = writeLock.acquire()) { + if (newLayoutVersion != layoutVersion) { + try { + updateLayoutVersion(newLayoutVersion, newStartTxn); + } catch (IOException ioe) { + Journal.LOG.error(String.format("Unable to save new edits [%d, %d] " + + "due to exception when updating to new layout version %d", + newStartTxn, newEndTxn, newLayoutVersion), ioe); + return; + } + } + if (lowestTxnId < 0 || (highestTxnId + 1) != newStartTxn) { + // Cache initialization step + if (lowestTxnId >= 0) { + // Cache is out of sync; clear to avoid storing noncontiguous regions + Journal.LOG.error(String.format("Edits cache is out of sync; " + + "looked for next txn id at %d but got start txn id for " + + "cache put request at %d", highestTxnId + 1, newStartTxn)); + } + initialize(newStartTxn); + } + + while ((totalSize + inputData.length) > capacity && !dataMap.isEmpty()) { + Map.Entry lowest = dataMap.firstEntry(); + dataMap.remove(lowest.getKey()); + totalSize -= lowest.getValue().length; + } + if (inputData.length > capacity) { + initialize(-1); + Journal.LOG.warn(String.format("A single batch of edits was too " + + "large to fit into the cache: startTxn = %d, endTxn = %d, " + + "input length = %d. The capacity of the cache (%s) must be " + + "increased for it to work properly (current capacity %d)", + newStartTxn, newEndTxn, inputData.length, + DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity)); + return; + } + if (dataMap.isEmpty()) { + lowestTxnId = newStartTxn; + } else { + lowestTxnId = dataMap.firstKey(); + } + + dataMap.put(newStartTxn, inputData); + highestTxnId = newEndTxn; + totalSize += inputData.length; + } + } + + /** + * Skip through a given stream of edits until the given transaction ID is + * found. Return the number of bytes that appear prior to the given + * transaction. + * @param buf A buffer containing a stream of serialized edits + * @param txnId The transaction ID to search for + * @return The number of bytes appearing in {@code buf} before + * the start of the transaction with ID {@code txnId}. + */ + private int findTransactionPosition(byte[] buf, long txnId) + throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(buf); + FSEditLogLoader.PositionTrackingInputStream tracker = + new FSEditLogLoader.PositionTrackingInputStream(bais); + FSEditLogOp.Reader reader = FSEditLogOp.Reader.create( + new DataInputStream(tracker), tracker, layoutVersion); + long previousPos = 0; + while (reader.scanOp() < txnId) { + previousPos = tracker.getPos(); + } + // tracker is backed by a byte[]; position cannot go above an integer + return (int) previousPos; + } + + /** + * Update the layout version of the cache. This clears out all existing + * entries, and populates the new layout version and header for that version. + * @param newLayoutVersion The new layout version to be stored in the cache + * @param newStartTxn The new lowest transaction in the cache + */ + private void updateLayoutVersion(int newLayoutVersion, long newStartTxn) + throws IOException { + Journal.LOG.info("Updating edits cache to use layout version " + + newLayoutVersion + "; previously was " + layoutVersion); + initialize(newStartTxn); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + EditLogFileOutputStream.writeHeader(newLayoutVersion, + new DataOutputStream(baos)); + layoutVersion = newLayoutVersion; + layoutHeader = ByteBuffer.wrap(baos.toByteArray()); + } + + /** + * Initialize the cache back to a clear state. + * @param newInitialTxnId The new lowest transaction ID stored in the cache. + * -1 if the cache is to remain empty at this time. + */ + private void initialize(long newInitialTxnId) { + dataMap.clear(); + totalSize = 0; + initialTxnId = newInitialTxnId; + lowestTxnId = initialTxnId; + highestTxnId = -1; + } + + /** + * Return the underlying data buffer used to store information about the + * given transaction ID. + * @param txnId Transaction ID whose containing buffer should be fetched. + * @return The data buffer for the transaction + */ + @VisibleForTesting + byte[] getRawDataForTests(long txnId) { + try (AutoCloseableLock l = readLock.acquire()) { + return dataMap.floorEntry(txnId).getValue(); + } + } + + private CacheMissException getCacheMissException(long requestedTxnId) { + if (lowestTxnId < 0) { + return new CacheMissException(0, "Cache is empty; either it was never " + + "written to or the last write overflowed the cache capacity."); + } else if (requestedTxnId < initialTxnId) { + return new CacheMissException(initialTxnId - requestedTxnId, + "Cache started at txn ID %d but requested txns starting at %d.", + initialTxnId, requestedTxnId); + } else { + return new CacheMissException(lowestTxnId - requestedTxnId, + "Oldest txn ID available in the cache is %d, but requested txns " + + "starting at %d. The cache size (%s) may need to be increased " + + "to hold more transactions (currently %d bytes containing %d " + + "transactions)", lowestTxnId, requestedTxnId, + DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity, + highestTxnId - lowestTxnId + 1); + } + } + + static class CacheMissException extends IOException { + + private static final long serialVersionUID = 0L; + + private final long cacheMissAmount; + + CacheMissException(long cacheMissAmount, String msgFormat, + Object... msgArgs) { + super(String.format(msgFormat, msgArgs)); + this.cacheMissAmount = cacheMissAmount; + } + + long getCacheMissAmount() { + return cacheMissAmount; + } + + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 2c054396f9d..6b52b0bf59e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4167,6 +4167,18 @@ + + dfs.journalnode.edit-cache-size.bytes + 1048576 + + The size, in bytes, of the in-memory cache of edits to keep on the + JournalNode. This cache is used to serve edits for tailing via the RPC-based + mechanism, and is only enabled when dfs.ha.tail-edits.in-progress is true. + Transactions range in size but are around 200 bytes on average, so the + default of 1MB can store around 5000 transactions. + + + dfs.journalnode.kerberos.internal.spnego.principal diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournaledEditsCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournaledEditsCache.java new file mode 100644 index 00000000000..9e15d60a5a5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournaledEditsCache.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import com.google.common.primitives.Bytes; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; +import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion; +import org.apache.hadoop.test.GenericTestUtils.LogCapturer; +import org.apache.hadoop.test.PathUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createGabageTxns; +import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + + +/** + * Test the {@link JournaledEditsCache} used for caching edits in-memory on the + * {@link Journal}. + */ +public class TestJournaledEditsCache { + + private static final int EDITS_CAPACITY = 100; + + private static final File TEST_DIR = + PathUtils.getTestDir(TestJournaledEditsCache.class, false); + private JournaledEditsCache cache; + + @Before + public void setup() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, + createTxnData(1, 1).length * EDITS_CAPACITY); + cache = new JournaledEditsCache(conf); + TEST_DIR.mkdirs(); + } + + @After + public void cleanup() throws Exception { + FileUtils.deleteQuietly(TEST_DIR); + } + + @Test + public void testCacheSingleSegment() throws Exception { + storeEdits(1, 20); + // Leading part of the segment + assertTxnCountAndContents(1, 5, 5); + // All of the segment + assertTxnCountAndContents(1, 20, 20); + // Past the segment + assertTxnCountAndContents(1, 40, 20); + // Trailing part of the segment + assertTxnCountAndContents(10, 11, 20); + // Trailing part of the segment, past the end + assertTxnCountAndContents(10, 20, 20); + } + + @Test + public void testCacheBelowCapacityRequestOnBoundary() throws Exception { + storeEdits(1, 5); + storeEdits(6, 20); + storeEdits(21, 30); + + // First segment only + assertTxnCountAndContents(1, 3, 3); + // Second segment only + assertTxnCountAndContents(6, 10, 15); + // First and second segment + assertTxnCountAndContents(1, 7, 7); + // All three segments + assertTxnCountAndContents(1, 25, 25); + // Second and third segment + assertTxnCountAndContents(6, 20, 25); + // Second and third segment; request past the end + assertTxnCountAndContents(6, 50, 30); + // Third segment only; request past the end + assertTxnCountAndContents(21, 20, 30); + } + + @Test + public void testCacheBelowCapacityRequestOffBoundary() throws Exception { + storeEdits(1, 5); + storeEdits(6, 20); + storeEdits(21, 30); + + // First segment only + assertTxnCountAndContents(3, 1, 3); + // First and second segment + assertTxnCountAndContents(3, 6, 8); + // Second and third segment + assertTxnCountAndContents(15, 10, 24); + // Second and third segment; request past the end + assertTxnCountAndContents(15, 50, 30); + // Start read past the end + List buffers = new ArrayList<>(); + assertEquals(0, cache.retrieveEdits(31, 10, buffers)); + assertTrue(buffers.isEmpty()); + } + + @Test + public void testCacheAboveCapacity() throws Exception { + int thirdCapacity = EDITS_CAPACITY / 3; + storeEdits(1, thirdCapacity); + storeEdits(thirdCapacity + 1, thirdCapacity * 2); + storeEdits(thirdCapacity * 2 + 1, EDITS_CAPACITY); + storeEdits(EDITS_CAPACITY + 1, thirdCapacity * 4); + storeEdits(thirdCapacity * 4 + 1, thirdCapacity * 5); + + try { + cache.retrieveEdits(1, 10, new ArrayList<>()); + fail(); + } catch (IOException ioe) { + // expected + } + assertTxnCountAndContents(EDITS_CAPACITY + 1, EDITS_CAPACITY, + thirdCapacity * 5); + } + + @Test + public void testCacheSingleAdditionAboveCapacity() throws Exception { + LogCapturer logs = LogCapturer.captureLogs(Journal.LOG); + storeEdits(1, EDITS_CAPACITY * 2); + logs.stopCapturing(); + assertTrue(logs.getOutput().contains("batch of edits was too large")); + try { + cache.retrieveEdits(1, 1, new ArrayList<>()); + fail(); + } catch (IOException ioe) { + // expected + } + storeEdits(EDITS_CAPACITY * 2 + 1, EDITS_CAPACITY * 2 + 5); + assertTxnCountAndContents(EDITS_CAPACITY * 2 + 1, 5, + EDITS_CAPACITY * 2 + 5); + } + + @Test + public void testCacheWithFutureLayoutVersion() throws Exception { + byte[] firstHalf = createGabageTxns(1, 5); + byte[] secondHalf = createGabageTxns(6, 5); + int futureVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1; + cache.storeEdits(Bytes.concat(firstHalf, secondHalf), 1, 10, + futureVersion); + List buffers = new ArrayList<>(); + assertEquals(5, cache.retrieveEdits(6, 5, buffers)); + assertArrayEquals(getHeaderForLayoutVersion(futureVersion), + buffers.get(0).array()); + byte[] retBytes = new byte[buffers.get(1).remaining()]; + System.arraycopy(buffers.get(1).array(), buffers.get(1).position(), + retBytes, 0, buffers.get(1).remaining()); + assertArrayEquals(secondHalf, retBytes); + } + + @Test + public void testCacheWithMultipleLayoutVersions() throws Exception { + int oldLayout = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION + 1; + cache.storeEdits(createTxnData(1, 5), 1, 5, oldLayout); + storeEdits(6, 10); + // Ensure the cache will only return edits from a single + // layout version at a time + try { + cache.retrieveEdits(1, 50, new ArrayList<>()); + fail("Expected a cache miss"); + } catch (JournaledEditsCache.CacheMissException cme) { + // expected + } + assertTxnCountAndContents(6, 50, 10); + } + + @Test + public void testCacheEditsWithGaps() throws Exception { + storeEdits(1, 5); + storeEdits(10, 15); + + try { + cache.retrieveEdits(1, 20, new ArrayList<>()); + fail(); + } catch (JournaledEditsCache.CacheMissException cme) { + assertEquals(9, cme.getCacheMissAmount()); + } + assertTxnCountAndContents(10, 10, 15); + } + + @Test(expected = JournaledEditsCache.CacheMissException.class) + public void testReadUninitializedCache() throws Exception { + cache.retrieveEdits(1, 10, new ArrayList<>()); + } + + @Test(expected = JournaledEditsCache.CacheMissException.class) + public void testCacheMalformedInput() throws Exception { + storeEdits(1, 1); + cache.retrieveEdits(-1, 10, new ArrayList<>()); + } + + private void storeEdits(int startTxn, int endTxn) throws Exception { + cache.storeEdits(createTxnData(startTxn, endTxn - startTxn + 1), startTxn, + endTxn, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + } + + private void assertTxnCountAndContents(int startTxn, int requestedMaxTxns, + int expectedEndTxn) throws Exception { + List buffers = new ArrayList<>(); + int expectedTxnCount = expectedEndTxn - startTxn + 1; + assertEquals(expectedTxnCount, + cache.retrieveEdits(startTxn, requestedMaxTxns, buffers)); + + byte[] expectedBytes = Bytes.concat( + getHeaderForLayoutVersion(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION), + createTxnData(startTxn, expectedTxnCount)); + byte[] actualBytes = + new byte[buffers.stream().mapToInt(ByteBuffer::remaining).sum()]; + int pos = 0; + for (ByteBuffer buf : buffers) { + System.arraycopy(buf.array(), buf.position(), actualBytes, pos, + buf.remaining()); + pos += buf.remaining(); + } + assertArrayEquals(expectedBytes, actualBytes); + } + + private static byte[] getHeaderForLayoutVersion(int version) + throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + EditLogFileOutputStream.writeHeader(version, new DataOutputStream(baos)); + return baos.toByteArray(); + } + +}