HDFS-13608. [SBN read] Edit Tail Fast Path Part 2: Add ability for JournalNode to serve edits via RPC. Contributed by Erik Krogen.
This commit is contained in:
parent
c40ddf9b5b
commit
abb8614dcf
|
@ -321,6 +321,11 @@ The server-side metrics for a journal from the JournalNode's perspective. Each m
|
||||||
| `LastWrittenTxId` | The highest transaction id stored on this JournalNode |
|
| `LastWrittenTxId` | The highest transaction id stored on this JournalNode |
|
||||||
| `LastPromisedEpoch` | The last epoch number which this node has promised not to accept any lower epoch, or 0 if no promises have been made |
|
| `LastPromisedEpoch` | The last epoch number which this node has promised not to accept any lower epoch, or 0 if no promises have been made |
|
||||||
| `LastJournalTimestamp` | The timestamp of last successfully written transaction |
|
| `LastJournalTimestamp` | The timestamp of last successfully written transaction |
|
||||||
|
| `TxnsServedViaRpc` | Number of transactions served via the RPC mechanism |
|
||||||
|
| `BytesServedViaRpc` | Number of bytes served via the RPC mechanism |
|
||||||
|
| `RpcRequestCacheMissAmountNumMisses` | Number of RPC requests which could not be served due to lack of data in the cache |
|
||||||
|
| `RpcRequestCacheMissAmountAvgTxns` | The average number of transactions by which a request missed the cache; for example if transaction ID 10 is requested and the cache's oldest transaction is ID 15, value 5 will be added to this average |
|
||||||
|
| `RpcEmptyResponses` | Number of RPC requests with zero edits returned |
|
||||||
|
|
||||||
datanode
|
datanode
|
||||||
--------
|
--------
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
|
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
||||||
|
@ -139,7 +140,28 @@ public interface QJournalProtocol {
|
||||||
long sinceTxId,
|
long sinceTxId,
|
||||||
boolean inProgressOk)
|
boolean inProgressOk)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch edit logs present in the Journal's in-memory cache of edits
|
||||||
|
* ({@link org.apache.hadoop.hdfs.qjournal.server.JournaledEditsCache}).
|
||||||
|
* To enable this cache, in-progress edit log tailing must be enabled via the
|
||||||
|
* {@value DFSConfigKeys#DFS_HA_TAILEDITS_INPROGRESS_KEY} configuration key.
|
||||||
|
*
|
||||||
|
* @param jid The ID of the journal from which to fetch edits.
|
||||||
|
* @param nameServiceId The ID of the namespace for which to fetch edits.
|
||||||
|
* @param sinceTxId Fetch edits starting at this transaction ID
|
||||||
|
* @param maxTxns Request at most this many transactions to be returned
|
||||||
|
* @throws IOException If there was an issue encountered while fetching edits
|
||||||
|
* from the cache, including a cache miss (cache does not contain the
|
||||||
|
* requested edits). The caller should then attempt to fetch the edits via
|
||||||
|
* the streaming mechanism (starting with
|
||||||
|
* {@link #getEditLogManifest(String, String, long, boolean)}).
|
||||||
|
* @return Response containing serialized edits to be loaded
|
||||||
|
* @see org.apache.hadoop.hdfs.qjournal.server.JournaledEditsCache
|
||||||
|
*/
|
||||||
|
GetJournaledEditsResponseProto getJournaledEdits(String jid,
|
||||||
|
String nameServiceId, long sinceTxId, int maxTxns) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Begin the recovery process for a given segment. See the HDFS-3077
|
* Begin the recovery process for a given segment. See the HDFS-3077
|
||||||
* design document for details.
|
* design document for details.
|
||||||
|
|
|
@ -45,6 +45,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatReq
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
|
||||||
|
@ -235,6 +237,18 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetJournaledEditsResponseProto getJournaledEdits(
|
||||||
|
RpcController controller, GetJournaledEditsRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
try {
|
||||||
|
return impl.getJournaledEdits(request.getJid().getIdentifier(),
|
||||||
|
request.hasNameServiceId() ? request.getNameServiceId() : null,
|
||||||
|
request.getSinceTxId(), request.getMaxTxns());
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
throw new ServiceException(ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PrepareRecoveryResponseProto prepareRecovery(RpcController controller,
|
public PrepareRecoveryResponseProto prepareRecovery(RpcController controller,
|
||||||
|
|
|
@ -40,6 +40,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeL
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
|
||||||
|
@ -281,6 +283,24 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetJournaledEditsResponseProto getJournaledEdits(String jid,
|
||||||
|
String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
|
||||||
|
try {
|
||||||
|
GetJournaledEditsRequestProto.Builder req =
|
||||||
|
GetJournaledEditsRequestProto.newBuilder()
|
||||||
|
.setJid(convertJournalId(jid))
|
||||||
|
.setSinceTxId(sinceTxId)
|
||||||
|
.setMaxTxns(maxTxns);
|
||||||
|
if (nameServiceId != null) {
|
||||||
|
req.setNameServiceId(nameServiceId);
|
||||||
|
}
|
||||||
|
return rpcProxy.getJournaledEdits(NULL_CONTROLLER, req.build());
|
||||||
|
} catch (ServiceException se) {
|
||||||
|
throw ProtobufHelper.getRemoteException(se);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
|
public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
|
||||||
long segmentTxId) throws IOException {
|
long segmentTxId) throws IOException {
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.qjournal.server;
|
package org.apache.hadoop.hdfs.qjournal.server;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
|
@ -24,9 +25,11 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStreamWriter;
|
import java.io.OutputStreamWriter;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.StandardCopyOption;
|
import java.nio.file.StandardCopyOption;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -36,10 +39,12 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
|
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
|
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
||||||
|
@ -85,6 +90,7 @@ public class Journal implements Closeable {
|
||||||
// Current writing state
|
// Current writing state
|
||||||
private EditLogOutputStream curSegment;
|
private EditLogOutputStream curSegment;
|
||||||
private long curSegmentTxId = HdfsServerConstants.INVALID_TXID;
|
private long curSegmentTxId = HdfsServerConstants.INVALID_TXID;
|
||||||
|
private int curSegmentLayoutVersion = 0;
|
||||||
private long nextTxId = HdfsServerConstants.INVALID_TXID;
|
private long nextTxId = HdfsServerConstants.INVALID_TXID;
|
||||||
private long highestWrittenTxId = 0;
|
private long highestWrittenTxId = 0;
|
||||||
|
|
||||||
|
@ -133,6 +139,8 @@ public class Journal implements Closeable {
|
||||||
|
|
||||||
private final FileJournalManager fjm;
|
private final FileJournalManager fjm;
|
||||||
|
|
||||||
|
private final JournaledEditsCache cache;
|
||||||
|
|
||||||
private final JournalMetrics metrics;
|
private final JournalMetrics metrics;
|
||||||
|
|
||||||
private long lastJournalTimestamp = 0;
|
private long lastJournalTimestamp = 0;
|
||||||
|
@ -156,6 +164,13 @@ public class Journal implements Closeable {
|
||||||
refreshCachedData();
|
refreshCachedData();
|
||||||
|
|
||||||
this.fjm = storage.getJournalManager();
|
this.fjm = storage.getJournalManager();
|
||||||
|
|
||||||
|
if (conf.getBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
|
||||||
|
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT)) {
|
||||||
|
this.cache = new JournaledEditsCache(conf);
|
||||||
|
} else {
|
||||||
|
this.cache = null;
|
||||||
|
}
|
||||||
|
|
||||||
this.metrics = JournalMetrics.create(this);
|
this.metrics = JournalMetrics.create(this);
|
||||||
|
|
||||||
|
@ -361,6 +376,7 @@ public class Journal implements Closeable {
|
||||||
curSegment.abort();
|
curSegment.abort();
|
||||||
curSegment = null;
|
curSegment = null;
|
||||||
curSegmentTxId = HdfsServerConstants.INVALID_TXID;
|
curSegmentTxId = HdfsServerConstants.INVALID_TXID;
|
||||||
|
curSegmentLayoutVersion = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -406,6 +422,9 @@ public class Journal implements Closeable {
|
||||||
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId +
|
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId +
|
||||||
" ; journal id: " + journalId);
|
" ; journal id: " + journalId);
|
||||||
}
|
}
|
||||||
|
if (cache != null) {
|
||||||
|
cache.storeEdits(records, firstTxnId, lastTxnId, curSegmentLayoutVersion);
|
||||||
|
}
|
||||||
|
|
||||||
// If the edit has already been marked as committed, we know
|
// If the edit has already been marked as committed, we know
|
||||||
// it has been fsynced on a quorum of other nodes, and we are
|
// it has been fsynced on a quorum of other nodes, and we are
|
||||||
|
@ -593,6 +612,7 @@ public class Journal implements Closeable {
|
||||||
|
|
||||||
curSegment = fjm.startLogSegment(txid, layoutVersion);
|
curSegment = fjm.startLogSegment(txid, layoutVersion);
|
||||||
curSegmentTxId = txid;
|
curSegmentTxId = txid;
|
||||||
|
curSegmentLayoutVersion = layoutVersion;
|
||||||
nextTxId = txid;
|
nextTxId = txid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -612,6 +632,7 @@ public class Journal implements Closeable {
|
||||||
curSegment.close();
|
curSegment.close();
|
||||||
curSegment = null;
|
curSegment = null;
|
||||||
curSegmentTxId = HdfsServerConstants.INVALID_TXID;
|
curSegmentTxId = HdfsServerConstants.INVALID_TXID;
|
||||||
|
curSegmentLayoutVersion = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
checkSync(nextTxId == endTxId + 1,
|
checkSync(nextTxId == endTxId + 1,
|
||||||
|
@ -712,6 +733,44 @@ public class Journal implements Closeable {
|
||||||
return new RemoteEditLogManifest(logs, getCommittedTxnId());
|
return new RemoteEditLogManifest(logs, getCommittedTxnId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see QJournalProtocol#getJournaledEdits(String, String, long, int)
|
||||||
|
*/
|
||||||
|
public GetJournaledEditsResponseProto getJournaledEdits(long sinceTxId,
|
||||||
|
int maxTxns) throws IOException {
|
||||||
|
if (cache == null) {
|
||||||
|
throw new IOException("The journal edits cache is not enabled, which " +
|
||||||
|
"is a requirement to fetch journaled edits via RPC. Please enable " +
|
||||||
|
"it via " + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY);
|
||||||
|
}
|
||||||
|
if (sinceTxId > getHighestWrittenTxId()) {
|
||||||
|
// Requested edits that don't exist yet; short-circuit the cache here
|
||||||
|
metrics.rpcEmptyResponses.incr();
|
||||||
|
return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
List<ByteBuffer> buffers = new ArrayList<>();
|
||||||
|
int txnCount = cache.retrieveEdits(sinceTxId, maxTxns, buffers);
|
||||||
|
int totalSize = 0;
|
||||||
|
for (ByteBuffer buf : buffers) {
|
||||||
|
totalSize += buf.remaining();
|
||||||
|
}
|
||||||
|
metrics.txnsServedViaRpc.incr(txnCount);
|
||||||
|
metrics.bytesServedViaRpc.incr(totalSize);
|
||||||
|
ByteString.Output output = ByteString.newOutput(totalSize);
|
||||||
|
for (ByteBuffer buf : buffers) {
|
||||||
|
output.write(buf.array(), buf.position(), buf.remaining());
|
||||||
|
}
|
||||||
|
return GetJournaledEditsResponseProto.newBuilder()
|
||||||
|
.setTxnCount(txnCount)
|
||||||
|
.setEditLog(output.toByteString())
|
||||||
|
.build();
|
||||||
|
} catch (JournaledEditsCache.CacheMissException cme) {
|
||||||
|
metrics.rpcRequestCacheMissAmount.add(cme.getCacheMissAmount());
|
||||||
|
throw cme;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the current state of the given segment, or null if the
|
* @return the current state of the given segment, or null if the
|
||||||
* segment does not exist.
|
* segment does not exist.
|
||||||
|
|
|
@ -25,6 +25,8 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableStat;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The server-side metrics for a journal from the JournalNode's
|
* The server-side metrics for a journal from the JournalNode's
|
||||||
|
@ -42,7 +44,23 @@ class JournalMetrics {
|
||||||
|
|
||||||
@Metric("Number of bytes written since startup")
|
@Metric("Number of bytes written since startup")
|
||||||
MutableCounterLong bytesWritten;
|
MutableCounterLong bytesWritten;
|
||||||
|
|
||||||
|
@Metric("Number of txns served via RPC")
|
||||||
|
MutableCounterLong txnsServedViaRpc;
|
||||||
|
|
||||||
|
@Metric("Number of bytes served via RPC")
|
||||||
|
MutableCounterLong bytesServedViaRpc;
|
||||||
|
|
||||||
|
@Metric
|
||||||
|
MutableStat rpcRequestCacheMissAmount = new MutableStat(
|
||||||
|
"RpcRequestCacheMissAmount", "Number of RPC requests unable to be " +
|
||||||
|
"served due to lack of availability in cache, and how many " +
|
||||||
|
"transactions away the request was from being in the cache.",
|
||||||
|
"Misses", "Txns");
|
||||||
|
|
||||||
|
@Metric("Number of RPC requests with zero edits returned")
|
||||||
|
MutableCounterLong rpcEmptyResponses;
|
||||||
|
|
||||||
@Metric("Number of batches written where this node was lagging")
|
@Metric("Number of batches written where this node was lagging")
|
||||||
MutableCounterLong batchesWrittenWhileLagging;
|
MutableCounterLong batchesWrittenWhileLagging;
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.InterQJournalProtocolService;
|
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.InterQJournalProtocolService;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
|
||||||
|
@ -233,6 +234,13 @@ public class JournalNodeRpcServer implements QJournalProtocol,
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetJournaledEditsResponseProto getJournaledEdits(String jid,
|
||||||
|
String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
|
||||||
|
return jn.getOrCreateJournal(jid, nameServiceId)
|
||||||
|
.getJournaledEdits(sinceTxId, maxTxns);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
|
public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
|
||||||
long segmentTxId) throws IOException {
|
long segmentTxId) throws IOException {
|
||||||
|
|
|
@ -284,6 +284,21 @@ message GetEditLogManifestResponseProto {
|
||||||
// required NamespaceInfoProto nsInfo = 2;
|
// required NamespaceInfoProto nsInfo = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* getJournaledEdits()
|
||||||
|
*/
|
||||||
|
message GetJournaledEditsRequestProto {
|
||||||
|
required JournalIdProto jid = 1;
|
||||||
|
required uint64 sinceTxId = 2;
|
||||||
|
required uint32 maxTxns = 3;
|
||||||
|
optional string nameServiceId = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetJournaledEditsResponseProto {
|
||||||
|
required uint32 txnCount = 1;
|
||||||
|
optional bytes editLog = 2;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* prepareRecovery()
|
* prepareRecovery()
|
||||||
*/
|
*/
|
||||||
|
@ -363,6 +378,9 @@ service QJournalProtocolService {
|
||||||
rpc getEditLogManifest(GetEditLogManifestRequestProto)
|
rpc getEditLogManifest(GetEditLogManifestRequestProto)
|
||||||
returns (GetEditLogManifestResponseProto);
|
returns (GetEditLogManifestResponseProto);
|
||||||
|
|
||||||
|
rpc getJournaledEdits(GetJournaledEditsRequestProto)
|
||||||
|
returns (GetJournaledEditsResponseProto);
|
||||||
|
|
||||||
rpc prepareRecovery(PrepareRecoveryRequestProto)
|
rpc prepareRecovery(PrepareRecoveryRequestProto)
|
||||||
returns (PrepareRecoveryResponseProto);
|
returns (PrepareRecoveryResponseProto);
|
||||||
|
|
||||||
|
|
|
@ -17,19 +17,25 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.qjournal.server;
|
package org.apache.hadoop.hdfs.qjournal.server;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Bytes;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
|
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
|
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder;
|
||||||
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
|
||||||
|
@ -38,6 +44,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
@ -71,6 +78,8 @@ public class TestJournal {
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
FileUtil.fullyDelete(TEST_LOG_DIR);
|
FileUtil.fullyDelete(TEST_LOG_DIR);
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
|
// Enable fetching edits via RPC
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
|
||||||
journal = new Journal(conf, TEST_LOG_DIR, JID, StartupOption.REGULAR,
|
journal = new Journal(conf, TEST_LOG_DIR, JID, StartupOption.REGULAR,
|
||||||
mockErrorReporter);
|
mockErrorReporter);
|
||||||
journal.format(FAKE_NSINFO);
|
journal.format(FAKE_NSINFO);
|
||||||
|
@ -434,4 +443,41 @@ public class TestJournal {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadFromCache() throws Exception {
|
||||||
|
journal.newEpoch(FAKE_NSINFO, 1);
|
||||||
|
journal.startLogSegment(makeRI(1), 1,
|
||||||
|
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||||
|
journal.journal(makeRI(2), 1, 1, 5, QJMTestUtil.createTxnData(1, 5));
|
||||||
|
journal.journal(makeRI(3), 1, 6, 5, QJMTestUtil.createTxnData(6, 5));
|
||||||
|
journal.journal(makeRI(4), 1, 11, 5, QJMTestUtil.createTxnData(11, 5));
|
||||||
|
assertJournaledEditsTxnCountAndContents(1, 7, 7,
|
||||||
|
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||||
|
assertJournaledEditsTxnCountAndContents(1, 30, 15,
|
||||||
|
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||||
|
|
||||||
|
journal.finalizeLogSegment(makeRI(5), 1, 15);
|
||||||
|
int newLayoutVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1;
|
||||||
|
journal.startLogSegment(makeRI(6), 16, newLayoutVersion);
|
||||||
|
journal.journal(makeRI(7), 16, 16, 5, QJMTestUtil.createTxnData(16, 5));
|
||||||
|
|
||||||
|
assertJournaledEditsTxnCountAndContents(16, 10, 20, newLayoutVersion);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertJournaledEditsTxnCountAndContents(int startTxn,
|
||||||
|
int requestedMaxTxns, int expectedEndTxn, int layoutVersion)
|
||||||
|
throws Exception {
|
||||||
|
GetJournaledEditsResponseProto result =
|
||||||
|
journal.getJournaledEdits(startTxn, requestedMaxTxns);
|
||||||
|
int expectedTxnCount = expectedEndTxn - startTxn + 1;
|
||||||
|
ByteArrayOutputStream headerBytes = new ByteArrayOutputStream();
|
||||||
|
EditLogFileOutputStream.writeHeader(layoutVersion,
|
||||||
|
new DataOutputStream(headerBytes));
|
||||||
|
assertEquals(expectedTxnCount, result.getTxnCount());
|
||||||
|
assertArrayEquals(
|
||||||
|
Bytes.concat(
|
||||||
|
headerBytes.toByteArray(),
|
||||||
|
QJMTestUtil.createTxnData(startTxn, expectedTxnCount)),
|
||||||
|
result.getEditLog().toByteArray());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue