HDFS-13607. [SBN read] Edit Tail Fast Path Part 1: Enhance JournalNode with an in-memory cache of recent edit transactions. Contributed by Erik Krogen.
This commit is contained in:
parent
a3521c53fe
commit
174c41c68f
|
@ -1083,6 +1083,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY =
|
public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY =
|
||||||
"dfs.journalnode.sync.interval";
|
"dfs.journalnode.sync.interval";
|
||||||
public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
|
public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
|
||||||
|
public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY =
|
||||||
|
"dfs.journalnode.edit-cache-size.bytes";
|
||||||
|
public static final int DFS_JOURNALNODE_EDIT_CACHE_SIZE_DEFAULT = 1024 * 1024;
|
||||||
|
|
||||||
// Journal-node related configs for the client side.
|
// Journal-node related configs for the client side.
|
||||||
public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
|
public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
|
||||||
|
|
|
@ -0,0 +1,393 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.qjournal.server;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
||||||
|
import org.apache.hadoop.util.AutoCloseableLock;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An in-memory cache of edits in their serialized form. This is used to serve
|
||||||
|
* the {@link Journal#getJournaledEdits(long, int)} call, used by the
|
||||||
|
* QJM when {@value DFSConfigKeys#DFS_HA_TAILEDITS_INPROGRESS_KEY} is
|
||||||
|
* enabled.
|
||||||
|
*
|
||||||
|
* <p>When a batch of edits is received by the JournalNode, it is put into this
|
||||||
|
* cache via {@link #storeEdits(byte[], long, long, int)}. Edits must be
|
||||||
|
* stored contiguously; if a batch of edits is stored that does not align with
|
||||||
|
* the previously stored edits, the cache will be cleared before storing new
|
||||||
|
* edits to avoid gaps. This decision is made because gaps are only handled
|
||||||
|
* when in recovery mode, which the cache is not intended to be used for.
|
||||||
|
*
|
||||||
|
* <p>Batches of edits are stored in a {@link TreeMap} mapping the starting
|
||||||
|
* transaction ID of the batch to the data buffer. Upon retrieval, the
|
||||||
|
* relevant data buffers are concatenated together and a header is added
|
||||||
|
* to construct a fully-formed edit data stream.
|
||||||
|
*
|
||||||
|
* <p>The cache is of a limited size capacity determined by
|
||||||
|
* {@value DFSConfigKeys#DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY}. If the capacity
|
||||||
|
* is exceeded after adding a new batch of edits, batches of edits are removed
|
||||||
|
* until the total size is less than the capacity, starting from the ones
|
||||||
|
* containing the oldest transactions. Transactions range in size, but a
|
||||||
|
* decent rule of thumb is that 200 bytes are needed per transaction. Monitoring
|
||||||
|
* the {@link JournalMetrics#rpcRequestCacheMissAmount} metric is recommended
|
||||||
|
* to determine if the cache is too small; it will indicate both how many
|
||||||
|
* cache misses occurred, and how many more transactions would have been
|
||||||
|
* needed in the cache to serve the request.
|
||||||
|
*/
|
||||||
|
class JournaledEditsCache {
|
||||||
|
|
||||||
|
/** The capacity, in bytes, of this cache. */
|
||||||
|
private final int capacity;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read/write lock pair wrapped in AutoCloseable; these refer to the same
|
||||||
|
* underlying lock.
|
||||||
|
*/
|
||||||
|
private final AutoCloseableLock readLock;
|
||||||
|
private final AutoCloseableLock writeLock;
|
||||||
|
|
||||||
|
// ** Start lock-protected fields **
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stores the actual data as a mapping of the StartTxnId of a batch of edits
|
||||||
|
* to the serialized batch of edits. Stores only contiguous ranges; that is,
|
||||||
|
* the last transaction ID in one batch is always one less than the first
|
||||||
|
* transaction ID in the next batch. Though the map is protected by the lock,
|
||||||
|
* individual data buffers are immutable and can be accessed without locking.
|
||||||
|
*/
|
||||||
|
private final NavigableMap<Long, byte[]> dataMap = new TreeMap<>();
|
||||||
|
/** Stores the layout version currently present in the cache. */
|
||||||
|
private int layoutVersion = Integer.MAX_VALUE;
|
||||||
|
/** Stores the serialized version of the header for the current version. */
|
||||||
|
private ByteBuffer layoutHeader;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The lowest/highest transaction IDs present in the cache. -1 if there are no
|
||||||
|
* transactions in the cache.
|
||||||
|
*/
|
||||||
|
private long lowestTxnId;
|
||||||
|
private long highestTxnId;
|
||||||
|
/**
|
||||||
|
* The lowest transaction ID that was ever present in the cache since last
|
||||||
|
* being reset (i.e. since initialization or since reset due to being out of
|
||||||
|
* sync with the Journal). Until the cache size goes above capacity, this is
|
||||||
|
* equal to lowestTxnId.
|
||||||
|
*/
|
||||||
|
private long initialTxnId;
|
||||||
|
/** The current total size of all buffers in this cache. */
|
||||||
|
private int totalSize;
|
||||||
|
|
||||||
|
// ** End lock-protected fields **
|
||||||
|
|
||||||
|
JournaledEditsCache(Configuration conf) {
|
||||||
|
capacity = conf.getInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
|
||||||
|
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_DEFAULT);
|
||||||
|
if (capacity > 0.9 * Runtime.getRuntime().maxMemory()) {
|
||||||
|
Journal.LOG.warn(String.format("Cache capacity is set at %d bytes but " +
|
||||||
|
"maximum JVM memory is only %d bytes. It is recommended that you " +
|
||||||
|
"decrease the cache size or increase the heap size.",
|
||||||
|
capacity, Runtime.getRuntime().maxMemory()));
|
||||||
|
}
|
||||||
|
Journal.LOG.info("Enabling the journaled edits cache with a capacity " +
|
||||||
|
"of bytes: " + capacity);
|
||||||
|
ReadWriteLock lock = new ReentrantReadWriteLock(true);
|
||||||
|
readLock = new AutoCloseableLock(lock.readLock());
|
||||||
|
writeLock = new AutoCloseableLock(lock.writeLock());
|
||||||
|
initialize(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch the data for edits starting at the specific transaction ID, fetching
|
||||||
|
* up to {@code maxTxns} transactions. Populates a list of output buffers
|
||||||
|
* which contains a serialized version of the edits, and returns the count of
|
||||||
|
* edits contained within the serialized buffers. The serialized edits are
|
||||||
|
* prefixed with a standard edit log header containing information about the
|
||||||
|
* layout version. The transactions returned are guaranteed to have contiguous
|
||||||
|
* transaction IDs.
|
||||||
|
*
|
||||||
|
* If {@code requestedStartTxn} is higher than the highest transaction which
|
||||||
|
* has been added to this cache, a response with an empty buffer and a
|
||||||
|
* transaction count of 0 will be returned. If {@code requestedStartTxn} is
|
||||||
|
* lower than the lowest transaction currently contained in this cache, or no
|
||||||
|
* transactions have yet been added to the cache, an exception will be thrown.
|
||||||
|
* @param requestedStartTxn The ID of the first transaction to return. If any
|
||||||
|
* transactions are returned, it is guaranteed that
|
||||||
|
* the first one will have this ID.
|
||||||
|
* @param maxTxns The maximum number of transactions to return.
|
||||||
|
* @param outputBuffers A list to populate with output buffers. When
|
||||||
|
* concatenated, these form a full response.
|
||||||
|
* @return The number of transactions contained within the set of output
|
||||||
|
* buffers.
|
||||||
|
* @throws IOException If transactions are requested which cannot be served
|
||||||
|
* by this cache.
|
||||||
|
*/
|
||||||
|
int retrieveEdits(long requestedStartTxn, int maxTxns,
|
||||||
|
List<ByteBuffer> outputBuffers) throws IOException {
|
||||||
|
int txnCount = 0;
|
||||||
|
|
||||||
|
try (AutoCloseableLock l = readLock.acquire()) {
|
||||||
|
if (lowestTxnId < 0 || requestedStartTxn < lowestTxnId) {
|
||||||
|
throw getCacheMissException(requestedStartTxn);
|
||||||
|
} else if (requestedStartTxn > highestTxnId) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
outputBuffers.add(layoutHeader);
|
||||||
|
Iterator<Map.Entry<Long, byte[]>> incrBuffIter =
|
||||||
|
dataMap.tailMap(dataMap.floorKey(requestedStartTxn), true)
|
||||||
|
.entrySet().iterator();
|
||||||
|
long prevTxn = requestedStartTxn;
|
||||||
|
byte[] prevBuf = null;
|
||||||
|
// Stop when maximum transactions reached...
|
||||||
|
while ((txnCount < maxTxns) &&
|
||||||
|
// ... or there are no more entries ...
|
||||||
|
(incrBuffIter.hasNext() || prevBuf != null)) {
|
||||||
|
long currTxn;
|
||||||
|
byte[] currBuf;
|
||||||
|
if (incrBuffIter.hasNext()) {
|
||||||
|
Map.Entry<Long, byte[]> ent = incrBuffIter.next();
|
||||||
|
currTxn = ent.getKey();
|
||||||
|
currBuf = ent.getValue();
|
||||||
|
} else {
|
||||||
|
// This accounts for the trailing entry
|
||||||
|
currTxn = highestTxnId + 1;
|
||||||
|
currBuf = null;
|
||||||
|
}
|
||||||
|
if (prevBuf != null) { // True except for the first loop iteration
|
||||||
|
outputBuffers.add(ByteBuffer.wrap(prevBuf));
|
||||||
|
// if prevTxn < requestedStartTxn, the extra transactions will get
|
||||||
|
// removed after the loop, so don't include them in the txn count
|
||||||
|
txnCount += currTxn - Math.max(requestedStartTxn, prevTxn);
|
||||||
|
}
|
||||||
|
prevTxn = currTxn;
|
||||||
|
prevBuf = currBuf;
|
||||||
|
}
|
||||||
|
// Release the lock before doing operations on the buffers (deserializing
|
||||||
|
// to find transaction boundaries, and copying into an output buffer)
|
||||||
|
}
|
||||||
|
// Remove extra leading transactions in the first buffer
|
||||||
|
ByteBuffer firstBuf = outputBuffers.get(1); // 0th is the header
|
||||||
|
firstBuf.position(
|
||||||
|
findTransactionPosition(firstBuf.array(), requestedStartTxn));
|
||||||
|
// Remove trailing transactions in the last buffer if necessary
|
||||||
|
if (txnCount > maxTxns) {
|
||||||
|
ByteBuffer lastBuf = outputBuffers.get(outputBuffers.size() - 1);
|
||||||
|
int limit =
|
||||||
|
findTransactionPosition(lastBuf.array(), requestedStartTxn + maxTxns);
|
||||||
|
lastBuf.limit(limit);
|
||||||
|
txnCount = maxTxns;
|
||||||
|
}
|
||||||
|
|
||||||
|
return txnCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store a batch of serialized edits into this cache. Removes old batches
|
||||||
|
* as necessary to keep the total size of the cache below the capacity.
|
||||||
|
* See the class Javadoc for more info.
|
||||||
|
*
|
||||||
|
* This attempts to always handle malformed inputs gracefully rather than
|
||||||
|
* throwing an exception, to allow the rest of the Journal's operations
|
||||||
|
* to proceed normally.
|
||||||
|
* @param inputData A buffer containing edits in serialized form
|
||||||
|
* @param newStartTxn The txn ID of the first edit in {@code inputData}
|
||||||
|
* @param newEndTxn The txn ID of the last edit in {@code inputData}
|
||||||
|
* @param newLayoutVersion The version of the layout used to serialize
|
||||||
|
* the edits
|
||||||
|
*/
|
||||||
|
void storeEdits(byte[] inputData, long newStartTxn, long newEndTxn,
|
||||||
|
int newLayoutVersion) {
|
||||||
|
if (newStartTxn < 0 || newEndTxn < newStartTxn) {
|
||||||
|
Journal.LOG.error(String.format("Attempted to cache data of length %d " +
|
||||||
|
"with newStartTxn %d and newEndTxn %d",
|
||||||
|
inputData.length, newStartTxn, newEndTxn));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try (AutoCloseableLock l = writeLock.acquire()) {
|
||||||
|
if (newLayoutVersion != layoutVersion) {
|
||||||
|
try {
|
||||||
|
updateLayoutVersion(newLayoutVersion, newStartTxn);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
Journal.LOG.error(String.format("Unable to save new edits [%d, %d] " +
|
||||||
|
"due to exception when updating to new layout version %d",
|
||||||
|
newStartTxn, newEndTxn, newLayoutVersion), ioe);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (lowestTxnId < 0 || (highestTxnId + 1) != newStartTxn) {
|
||||||
|
// Cache initialization step
|
||||||
|
if (lowestTxnId >= 0) {
|
||||||
|
// Cache is out of sync; clear to avoid storing noncontiguous regions
|
||||||
|
Journal.LOG.error(String.format("Edits cache is out of sync; " +
|
||||||
|
"looked for next txn id at %d but got start txn id for " +
|
||||||
|
"cache put request at %d", highestTxnId + 1, newStartTxn));
|
||||||
|
}
|
||||||
|
initialize(newStartTxn);
|
||||||
|
}
|
||||||
|
|
||||||
|
while ((totalSize + inputData.length) > capacity && !dataMap.isEmpty()) {
|
||||||
|
Map.Entry<Long, byte[]> lowest = dataMap.firstEntry();
|
||||||
|
dataMap.remove(lowest.getKey());
|
||||||
|
totalSize -= lowest.getValue().length;
|
||||||
|
}
|
||||||
|
if (inputData.length > capacity) {
|
||||||
|
initialize(-1);
|
||||||
|
Journal.LOG.warn(String.format("A single batch of edits was too " +
|
||||||
|
"large to fit into the cache: startTxn = %d, endTxn = %d, " +
|
||||||
|
"input length = %d. The capacity of the cache (%s) must be " +
|
||||||
|
"increased for it to work properly (current capacity %d)",
|
||||||
|
newStartTxn, newEndTxn, inputData.length,
|
||||||
|
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (dataMap.isEmpty()) {
|
||||||
|
lowestTxnId = newStartTxn;
|
||||||
|
} else {
|
||||||
|
lowestTxnId = dataMap.firstKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
dataMap.put(newStartTxn, inputData);
|
||||||
|
highestTxnId = newEndTxn;
|
||||||
|
totalSize += inputData.length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Skip through a given stream of edits until the given transaction ID is
|
||||||
|
* found. Return the number of bytes that appear prior to the given
|
||||||
|
* transaction.
|
||||||
|
* @param buf A buffer containing a stream of serialized edits
|
||||||
|
* @param txnId The transaction ID to search for
|
||||||
|
* @return The number of bytes appearing in {@code buf} <i>before</i>
|
||||||
|
* the start of the transaction with ID {@code txnId}.
|
||||||
|
*/
|
||||||
|
private int findTransactionPosition(byte[] buf, long txnId)
|
||||||
|
throws IOException {
|
||||||
|
ByteArrayInputStream bais = new ByteArrayInputStream(buf);
|
||||||
|
FSEditLogLoader.PositionTrackingInputStream tracker =
|
||||||
|
new FSEditLogLoader.PositionTrackingInputStream(bais);
|
||||||
|
FSEditLogOp.Reader reader = FSEditLogOp.Reader.create(
|
||||||
|
new DataInputStream(tracker), tracker, layoutVersion);
|
||||||
|
long previousPos = 0;
|
||||||
|
while (reader.scanOp() < txnId) {
|
||||||
|
previousPos = tracker.getPos();
|
||||||
|
}
|
||||||
|
// tracker is backed by a byte[]; position cannot go above an integer
|
||||||
|
return (int) previousPos;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the layout version of the cache. This clears out all existing
|
||||||
|
* entries, and populates the new layout version and header for that version.
|
||||||
|
* @param newLayoutVersion The new layout version to be stored in the cache
|
||||||
|
* @param newStartTxn The new lowest transaction in the cache
|
||||||
|
*/
|
||||||
|
private void updateLayoutVersion(int newLayoutVersion, long newStartTxn)
|
||||||
|
throws IOException {
|
||||||
|
Journal.LOG.info("Updating edits cache to use layout version " +
|
||||||
|
newLayoutVersion + "; previously was " + layoutVersion);
|
||||||
|
initialize(newStartTxn);
|
||||||
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
EditLogFileOutputStream.writeHeader(newLayoutVersion,
|
||||||
|
new DataOutputStream(baos));
|
||||||
|
layoutVersion = newLayoutVersion;
|
||||||
|
layoutHeader = ByteBuffer.wrap(baos.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the cache back to a clear state.
|
||||||
|
* @param newInitialTxnId The new lowest transaction ID stored in the cache.
|
||||||
|
* -1 if the cache is to remain empty at this time.
|
||||||
|
*/
|
||||||
|
private void initialize(long newInitialTxnId) {
|
||||||
|
dataMap.clear();
|
||||||
|
totalSize = 0;
|
||||||
|
initialTxnId = newInitialTxnId;
|
||||||
|
lowestTxnId = initialTxnId;
|
||||||
|
highestTxnId = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the underlying data buffer used to store information about the
|
||||||
|
* given transaction ID.
|
||||||
|
* @param txnId Transaction ID whose containing buffer should be fetched.
|
||||||
|
* @return The data buffer for the transaction
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
byte[] getRawDataForTests(long txnId) {
|
||||||
|
try (AutoCloseableLock l = readLock.acquire()) {
|
||||||
|
return dataMap.floorEntry(txnId).getValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private CacheMissException getCacheMissException(long requestedTxnId) {
|
||||||
|
if (lowestTxnId < 0) {
|
||||||
|
return new CacheMissException(0, "Cache is empty; either it was never " +
|
||||||
|
"written to or the last write overflowed the cache capacity.");
|
||||||
|
} else if (requestedTxnId < initialTxnId) {
|
||||||
|
return new CacheMissException(initialTxnId - requestedTxnId,
|
||||||
|
"Cache started at txn ID %d but requested txns starting at %d.",
|
||||||
|
initialTxnId, requestedTxnId);
|
||||||
|
} else {
|
||||||
|
return new CacheMissException(lowestTxnId - requestedTxnId,
|
||||||
|
"Oldest txn ID available in the cache is %d, but requested txns " +
|
||||||
|
"starting at %d. The cache size (%s) may need to be increased " +
|
||||||
|
"to hold more transactions (currently %d bytes containing %d " +
|
||||||
|
"transactions)", lowestTxnId, requestedTxnId,
|
||||||
|
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity,
|
||||||
|
highestTxnId - lowestTxnId + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class CacheMissException extends IOException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 0L;
|
||||||
|
|
||||||
|
private final long cacheMissAmount;
|
||||||
|
|
||||||
|
CacheMissException(long cacheMissAmount, String msgFormat,
|
||||||
|
Object... msgArgs) {
|
||||||
|
super(String.format(msgFormat, msgArgs));
|
||||||
|
this.cacheMissAmount = cacheMissAmount;
|
||||||
|
}
|
||||||
|
|
||||||
|
long getCacheMissAmount() {
|
||||||
|
return cacheMissAmount;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -4121,6 +4121,18 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.journalnode.edit-cache-size.bytes</name>
|
||||||
|
<value>1048576</value>
|
||||||
|
<description>
|
||||||
|
The size, in bytes, of the in-memory cache of edits to keep on the
|
||||||
|
JournalNode. This cache is used to serve edits for tailing via the RPC-based
|
||||||
|
mechanism, and is only enabled when dfs.ha.tail-edits.in-progress is true.
|
||||||
|
Transactions range in size but are around 200 bytes on average, so the
|
||||||
|
default of 1MB can store around 5000 transactions.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.journalnode.kerberos.internal.spnego.principal</name>
|
<name>dfs.journalnode.kerberos.internal.spnego.principal</name>
|
||||||
<value></value>
|
<value></value>
|
||||||
|
|
|
@ -0,0 +1,257 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.qjournal.server;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Bytes;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
|
import org.apache.hadoop.test.PathUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createGabageTxns;
|
||||||
|
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData;
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the {@link JournaledEditsCache} used for caching edits in-memory on the
|
||||||
|
* {@link Journal}.
|
||||||
|
*/
|
||||||
|
public class TestJournaledEditsCache {
|
||||||
|
|
||||||
|
private static final int EDITS_CAPACITY = 100;
|
||||||
|
|
||||||
|
private static final File TEST_DIR =
|
||||||
|
PathUtils.getTestDir(TestJournaledEditsCache.class, false);
|
||||||
|
private JournaledEditsCache cache;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
|
||||||
|
createTxnData(1, 1).length * EDITS_CAPACITY);
|
||||||
|
cache = new JournaledEditsCache(conf);
|
||||||
|
TEST_DIR.mkdirs();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanup() throws Exception {
|
||||||
|
FileUtils.deleteQuietly(TEST_DIR);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheSingleSegment() throws Exception {
|
||||||
|
storeEdits(1, 20);
|
||||||
|
// Leading part of the segment
|
||||||
|
assertTxnCountAndContents(1, 5, 5);
|
||||||
|
// All of the segment
|
||||||
|
assertTxnCountAndContents(1, 20, 20);
|
||||||
|
// Past the segment
|
||||||
|
assertTxnCountAndContents(1, 40, 20);
|
||||||
|
// Trailing part of the segment
|
||||||
|
assertTxnCountAndContents(10, 11, 20);
|
||||||
|
// Trailing part of the segment, past the end
|
||||||
|
assertTxnCountAndContents(10, 20, 20);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheBelowCapacityRequestOnBoundary() throws Exception {
|
||||||
|
storeEdits(1, 5);
|
||||||
|
storeEdits(6, 20);
|
||||||
|
storeEdits(21, 30);
|
||||||
|
|
||||||
|
// First segment only
|
||||||
|
assertTxnCountAndContents(1, 3, 3);
|
||||||
|
// Second segment only
|
||||||
|
assertTxnCountAndContents(6, 10, 15);
|
||||||
|
// First and second segment
|
||||||
|
assertTxnCountAndContents(1, 7, 7);
|
||||||
|
// All three segments
|
||||||
|
assertTxnCountAndContents(1, 25, 25);
|
||||||
|
// Second and third segment
|
||||||
|
assertTxnCountAndContents(6, 20, 25);
|
||||||
|
// Second and third segment; request past the end
|
||||||
|
assertTxnCountAndContents(6, 50, 30);
|
||||||
|
// Third segment only; request past the end
|
||||||
|
assertTxnCountAndContents(21, 20, 30);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheBelowCapacityRequestOffBoundary() throws Exception {
|
||||||
|
storeEdits(1, 5);
|
||||||
|
storeEdits(6, 20);
|
||||||
|
storeEdits(21, 30);
|
||||||
|
|
||||||
|
// First segment only
|
||||||
|
assertTxnCountAndContents(3, 1, 3);
|
||||||
|
// First and second segment
|
||||||
|
assertTxnCountAndContents(3, 6, 8);
|
||||||
|
// Second and third segment
|
||||||
|
assertTxnCountAndContents(15, 10, 24);
|
||||||
|
// Second and third segment; request past the end
|
||||||
|
assertTxnCountAndContents(15, 50, 30);
|
||||||
|
// Start read past the end
|
||||||
|
List<ByteBuffer> buffers = new ArrayList<>();
|
||||||
|
assertEquals(0, cache.retrieveEdits(31, 10, buffers));
|
||||||
|
assertTrue(buffers.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheAboveCapacity() throws Exception {
|
||||||
|
int thirdCapacity = EDITS_CAPACITY / 3;
|
||||||
|
storeEdits(1, thirdCapacity);
|
||||||
|
storeEdits(thirdCapacity + 1, thirdCapacity * 2);
|
||||||
|
storeEdits(thirdCapacity * 2 + 1, EDITS_CAPACITY);
|
||||||
|
storeEdits(EDITS_CAPACITY + 1, thirdCapacity * 4);
|
||||||
|
storeEdits(thirdCapacity * 4 + 1, thirdCapacity * 5);
|
||||||
|
|
||||||
|
try {
|
||||||
|
cache.retrieveEdits(1, 10, new ArrayList<>());
|
||||||
|
fail();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
assertTxnCountAndContents(EDITS_CAPACITY + 1, EDITS_CAPACITY,
|
||||||
|
thirdCapacity * 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheSingleAdditionAboveCapacity() throws Exception {
|
||||||
|
LogCapturer logs = LogCapturer.captureLogs(Journal.LOG);
|
||||||
|
storeEdits(1, EDITS_CAPACITY * 2);
|
||||||
|
logs.stopCapturing();
|
||||||
|
assertTrue(logs.getOutput().contains("batch of edits was too large"));
|
||||||
|
try {
|
||||||
|
cache.retrieveEdits(1, 1, new ArrayList<>());
|
||||||
|
fail();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
storeEdits(EDITS_CAPACITY * 2 + 1, EDITS_CAPACITY * 2 + 5);
|
||||||
|
assertTxnCountAndContents(EDITS_CAPACITY * 2 + 1, 5,
|
||||||
|
EDITS_CAPACITY * 2 + 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheWithFutureLayoutVersion() throws Exception {
|
||||||
|
byte[] firstHalf = createGabageTxns(1, 5);
|
||||||
|
byte[] secondHalf = createGabageTxns(6, 5);
|
||||||
|
int futureVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1;
|
||||||
|
cache.storeEdits(Bytes.concat(firstHalf, secondHalf), 1, 10,
|
||||||
|
futureVersion);
|
||||||
|
List<ByteBuffer> buffers = new ArrayList<>();
|
||||||
|
assertEquals(5, cache.retrieveEdits(6, 5, buffers));
|
||||||
|
assertArrayEquals(getHeaderForLayoutVersion(futureVersion),
|
||||||
|
buffers.get(0).array());
|
||||||
|
byte[] retBytes = new byte[buffers.get(1).remaining()];
|
||||||
|
System.arraycopy(buffers.get(1).array(), buffers.get(1).position(),
|
||||||
|
retBytes, 0, buffers.get(1).remaining());
|
||||||
|
assertArrayEquals(secondHalf, retBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheWithMultipleLayoutVersions() throws Exception {
|
||||||
|
int oldLayout = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION + 1;
|
||||||
|
cache.storeEdits(createTxnData(1, 5), 1, 5, oldLayout);
|
||||||
|
storeEdits(6, 10);
|
||||||
|
// Ensure the cache will only return edits from a single
|
||||||
|
// layout version at a time
|
||||||
|
try {
|
||||||
|
cache.retrieveEdits(1, 50, new ArrayList<>());
|
||||||
|
fail("Expected a cache miss");
|
||||||
|
} catch (JournaledEditsCache.CacheMissException cme) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
assertTxnCountAndContents(6, 50, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheEditsWithGaps() throws Exception {
|
||||||
|
storeEdits(1, 5);
|
||||||
|
storeEdits(10, 15);
|
||||||
|
|
||||||
|
try {
|
||||||
|
cache.retrieveEdits(1, 20, new ArrayList<>());
|
||||||
|
fail();
|
||||||
|
} catch (JournaledEditsCache.CacheMissException cme) {
|
||||||
|
assertEquals(9, cme.getCacheMissAmount());
|
||||||
|
}
|
||||||
|
assertTxnCountAndContents(10, 10, 15);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = JournaledEditsCache.CacheMissException.class)
|
||||||
|
public void testReadUninitializedCache() throws Exception {
|
||||||
|
cache.retrieveEdits(1, 10, new ArrayList<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = JournaledEditsCache.CacheMissException.class)
|
||||||
|
public void testCacheMalformedInput() throws Exception {
|
||||||
|
storeEdits(1, 1);
|
||||||
|
cache.retrieveEdits(-1, 10, new ArrayList<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void storeEdits(int startTxn, int endTxn) throws Exception {
|
||||||
|
cache.storeEdits(createTxnData(startTxn, endTxn - startTxn + 1), startTxn,
|
||||||
|
endTxn, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertTxnCountAndContents(int startTxn, int requestedMaxTxns,
|
||||||
|
int expectedEndTxn) throws Exception {
|
||||||
|
List<ByteBuffer> buffers = new ArrayList<>();
|
||||||
|
int expectedTxnCount = expectedEndTxn - startTxn + 1;
|
||||||
|
assertEquals(expectedTxnCount,
|
||||||
|
cache.retrieveEdits(startTxn, requestedMaxTxns, buffers));
|
||||||
|
|
||||||
|
byte[] expectedBytes = Bytes.concat(
|
||||||
|
getHeaderForLayoutVersion(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION),
|
||||||
|
createTxnData(startTxn, expectedTxnCount));
|
||||||
|
byte[] actualBytes =
|
||||||
|
new byte[buffers.stream().mapToInt(ByteBuffer::remaining).sum()];
|
||||||
|
int pos = 0;
|
||||||
|
for (ByteBuffer buf : buffers) {
|
||||||
|
System.arraycopy(buf.array(), buf.position(), actualBytes, pos,
|
||||||
|
buf.remaining());
|
||||||
|
pos += buf.remaining();
|
||||||
|
}
|
||||||
|
assertArrayEquals(expectedBytes, actualBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] getHeaderForLayoutVersion(int version)
|
||||||
|
throws IOException {
|
||||||
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
EditLogFileOutputStream.writeHeader(version, new DataOutputStream(baos));
|
||||||
|
return baos.toByteArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue