HDFS-10519. Add a configuration option to enable in-progress edit log tailing. Contributed by Jiayi Zhou.
This commit is contained in:
parent
b43de80031
commit
098ec2b11f
@ -536,9 +536,15 @@ public void finalizeLogSegment(long firstTxId, long lastTxId)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void selectInputStreams(
|
||||||
|
Collection<EditLogInputStream> streams,
|
||||||
|
long fromTxnId, boolean inProgressOk) throws IOException {
|
||||||
|
selectInputStreams(streams, fromTxnId, inProgressOk, false);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxId, boolean inProgressOk)
|
long fromTxId, boolean inProgressOk, boolean onlyDurableTxns)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
|
List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
|
||||||
inProgressOk);
|
inProgressOk);
|
||||||
|
@ -728,6 +728,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m
|
public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m
|
||||||
public static final String DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY = "dfs.ha.tail-edits.namenode-retries";
|
public static final String DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY = "dfs.ha.tail-edits.namenode-retries";
|
||||||
public static final int DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT = 3;
|
public static final int DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT = 3;
|
||||||
|
public static final String DFS_HA_TAILEDITS_INPROGRESS_KEY =
|
||||||
|
"dfs.ha.tail-edits.in-progress";
|
||||||
|
public static final boolean DFS_HA_TAILEDITS_INPROGRESS_DEFAULT = false;
|
||||||
public static final String DFS_HA_LOGROLL_RPC_TIMEOUT_KEY = "dfs.ha.log-roll.rpc.timeout";
|
public static final String DFS_HA_LOGROLL_RPC_TIMEOUT_KEY = "dfs.ha.log-roll.rpc.timeout";
|
||||||
public static final int DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT = 20000; // 20s
|
public static final int DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT = 20000; // 20s
|
||||||
public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods";
|
public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods";
|
||||||
|
@ -289,7 +289,8 @@ public static RemoteEditLog convert(RemoteEditLogProto l) {
|
|||||||
public static RemoteEditLogManifestProto convert(
|
public static RemoteEditLogManifestProto convert(
|
||||||
RemoteEditLogManifest manifest) {
|
RemoteEditLogManifest manifest) {
|
||||||
RemoteEditLogManifestProto.Builder builder = RemoteEditLogManifestProto
|
RemoteEditLogManifestProto.Builder builder = RemoteEditLogManifestProto
|
||||||
.newBuilder();
|
.newBuilder()
|
||||||
|
.setCommittedTxnId(manifest.getCommittedTxnId());
|
||||||
for (RemoteEditLog log : manifest.getLogs()) {
|
for (RemoteEditLog log : manifest.getLogs()) {
|
||||||
builder.addLogs(convert(log));
|
builder.addLogs(convert(log));
|
||||||
}
|
}
|
||||||
@ -303,7 +304,8 @@ public static RemoteEditLogManifest convert(
|
|||||||
for (RemoteEditLogProto l : manifest.getLogsList()) {
|
for (RemoteEditLogProto l : manifest.getLogsList()) {
|
||||||
logs.add(convert(l));
|
logs.add(convert(l));
|
||||||
}
|
}
|
||||||
return new RemoteEditLogManifest(logs);
|
return new RemoteEditLogManifest(logs,
|
||||||
|
manifest.getCommittedTxnId());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static CheckpointCommandProto convert(CheckpointCommand cmd) {
|
public static CheckpointCommandProto convert(CheckpointCommand cmd) {
|
||||||
|
@ -402,8 +402,11 @@ public EditLogOutputStream startLogSegment(long txId, int layoutVersion)
|
|||||||
layoutVersion);
|
layoutVersion);
|
||||||
loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
|
loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
|
||||||
"startLogSegment(" + txId + ")");
|
"startLogSegment(" + txId + ")");
|
||||||
return new QuorumOutputStream(loggers, txId,
|
boolean updateCommittedTxId = conf.getBoolean(
|
||||||
outputBufferCapacity, writeTxnsTimeoutMs);
|
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
|
||||||
|
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
|
||||||
|
return new QuorumOutputStream(loggers, txId, outputBufferCapacity,
|
||||||
|
writeTxnsTimeoutMs, updateCommittedTxId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -462,9 +465,15 @@ public void close() throws IOException {
|
|||||||
loggers.close();
|
loggers.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxnId, boolean inProgressOk) throws IOException {
|
long fromTxnId, boolean inProgressOk) throws IOException {
|
||||||
|
selectInputStreams(streams, fromTxnId, inProgressOk, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
|
long fromTxnId, boolean inProgressOk,
|
||||||
|
boolean onlyDurableTxns) throws IOException {
|
||||||
|
|
||||||
QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
|
QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
|
||||||
loggers.getEditLogManifest(fromTxnId, inProgressOk);
|
loggers.getEditLogManifest(fromTxnId, inProgressOk);
|
||||||
@ -481,13 +490,22 @@ public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|||||||
for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
|
for (Map.Entry<AsyncLogger, RemoteEditLogManifest> e : resps.entrySet()) {
|
||||||
AsyncLogger logger = e.getKey();
|
AsyncLogger logger = e.getKey();
|
||||||
RemoteEditLogManifest manifest = e.getValue();
|
RemoteEditLogManifest manifest = e.getValue();
|
||||||
|
long committedTxnId = manifest.getCommittedTxnId();
|
||||||
|
|
||||||
for (RemoteEditLog remoteLog : manifest.getLogs()) {
|
for (RemoteEditLog remoteLog : manifest.getLogs()) {
|
||||||
URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());
|
URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());
|
||||||
|
|
||||||
|
long endTxId = remoteLog.getEndTxId();
|
||||||
|
|
||||||
|
// If it's bounded by durable Txns, endTxId could not be larger
|
||||||
|
// than committedTxnId. This ensures the consistency.
|
||||||
|
if (onlyDurableTxns && inProgressOk) {
|
||||||
|
endTxId = Math.min(endTxId, committedTxnId);
|
||||||
|
}
|
||||||
|
|
||||||
EditLogInputStream elis = EditLogFileInputStream.fromUrl(
|
EditLogInputStream elis = EditLogFileInputStream.fromUrl(
|
||||||
connectionFactory, url, remoteLog.getStartTxId(),
|
connectionFactory, url, remoteLog.getStartTxId(),
|
||||||
remoteLog.getEndTxId(), remoteLog.isInProgress());
|
endTxId, remoteLog.isInProgress());
|
||||||
allStreams.add(elis);
|
allStreams.add(elis);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,15 +33,17 @@ class QuorumOutputStream extends EditLogOutputStream {
|
|||||||
private EditsDoubleBuffer buf;
|
private EditsDoubleBuffer buf;
|
||||||
private final long segmentTxId;
|
private final long segmentTxId;
|
||||||
private final int writeTimeoutMs;
|
private final int writeTimeoutMs;
|
||||||
|
private final boolean updateCommittedTxId;
|
||||||
|
|
||||||
public QuorumOutputStream(AsyncLoggerSet loggers,
|
public QuorumOutputStream(AsyncLoggerSet loggers,
|
||||||
long txId, int outputBufferCapacity,
|
long txId, int outputBufferCapacity,
|
||||||
int writeTimeoutMs) throws IOException {
|
int writeTimeoutMs, boolean updateCommittedTxId) throws IOException {
|
||||||
super();
|
super();
|
||||||
this.buf = new EditsDoubleBuffer(outputBufferCapacity);
|
this.buf = new EditsDoubleBuffer(outputBufferCapacity);
|
||||||
this.loggers = loggers;
|
this.loggers = loggers;
|
||||||
this.segmentTxId = txId;
|
this.segmentTxId = txId;
|
||||||
this.writeTimeoutMs = writeTimeoutMs;
|
this.writeTimeoutMs = writeTimeoutMs;
|
||||||
|
this.updateCommittedTxId = updateCommittedTxId;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -110,6 +112,15 @@ protected void flushAndSync(boolean durable) throws IOException {
|
|||||||
// RPCs will thus let the loggers know of the most recent transaction, even
|
// RPCs will thus let the loggers know of the most recent transaction, even
|
||||||
// if a logger has fallen behind.
|
// if a logger has fallen behind.
|
||||||
loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1);
|
loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1);
|
||||||
|
|
||||||
|
// If we don't have this dummy send, committed TxId might be one-batch
|
||||||
|
// stale on the Journal Nodes
|
||||||
|
if (updateCommittedTxId) {
|
||||||
|
QuorumCall<AsyncLogger, Void> fakeCall = loggers.sendEdits(
|
||||||
|
segmentTxId, firstTxToFlush,
|
||||||
|
0, new byte[0]);
|
||||||
|
loggers.waitForWriteQuorum(fakeCall, writeTimeoutMs, "sendEdits");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,7 +253,7 @@ synchronized public long getLastWriterEpoch() throws IOException {
|
|||||||
return lastWriterEpoch.get();
|
return lastWriterEpoch.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized long getCommittedTxnIdForTests() throws IOException {
|
synchronized long getCommittedTxnId() throws IOException {
|
||||||
return committedTxnId.get();
|
return committedTxnId.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -357,6 +357,12 @@ synchronized void journal(RequestInfo reqInfo,
|
|||||||
checkFormatted();
|
checkFormatted();
|
||||||
checkWriteRequest(reqInfo);
|
checkWriteRequest(reqInfo);
|
||||||
|
|
||||||
|
// If numTxns is 0, it's actually a fake send which aims at updating
|
||||||
|
// committedTxId only. So we can return early.
|
||||||
|
if (numTxns == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
checkSync(curSegment != null,
|
checkSync(curSegment != null,
|
||||||
"Can't write, no segment open");
|
"Can't write, no segment open");
|
||||||
|
|
||||||
@ -673,12 +679,12 @@ public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (log != null && log.isInProgress()) {
|
if (log != null && log.isInProgress()) {
|
||||||
logs.add(new RemoteEditLog(log.getStartTxId(), getHighestWrittenTxId(),
|
logs.add(new RemoteEditLog(log.getStartTxId(),
|
||||||
true));
|
getHighestWrittenTxId(), true));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new RemoteEditLogManifest(logs);
|
return new RemoteEditLogManifest(logs, getCommittedTxnId());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -80,7 +80,7 @@ public void purgeLogsOlderThan(long minTxIdToKeep)
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxnId, boolean inProgressOk) {
|
long fromTxnId, boolean inProgressOk, boolean onlyDurableTxns) {
|
||||||
// This JournalManager is never used for input. Therefore it cannot
|
// This JournalManager is never used for input. Therefore it cannot
|
||||||
// return any transactions
|
// return any transactions
|
||||||
}
|
}
|
||||||
|
@ -317,7 +317,7 @@ synchronized void openForWrite(int layoutVersion) throws IOException {
|
|||||||
// Safety check: we should never start a segment if there are
|
// Safety check: we should never start a segment if there are
|
||||||
// newer txids readable.
|
// newer txids readable.
|
||||||
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
||||||
journalSet.selectInputStreams(streams, segmentTxId, true);
|
journalSet.selectInputStreams(streams, segmentTxId, true, false);
|
||||||
if (!streams.isEmpty()) {
|
if (!streams.isEmpty()) {
|
||||||
String error = String.format("Cannot start writing at txid %s " +
|
String error = String.format("Cannot start writing at txid %s " +
|
||||||
"when there is a stream available for read: %s",
|
"when there is a stream available for read: %s",
|
||||||
@ -1575,15 +1575,23 @@ public synchronized void discardSegments(long markerTxid)
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxId, boolean inProgressOk) throws IOException {
|
long fromTxId, boolean inProgressOk, boolean onlyDurableTxns)
|
||||||
journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
|
throws IOException {
|
||||||
|
journalSet.selectInputStreams(streams, fromTxId,
|
||||||
|
inProgressOk, onlyDurableTxns);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Collection<EditLogInputStream> selectInputStreams(
|
public Collection<EditLogInputStream> selectInputStreams(
|
||||||
long fromTxId, long toAtLeastTxId) throws IOException {
|
long fromTxId, long toAtLeastTxId) throws IOException {
|
||||||
return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
|
return selectInputStreams(fromTxId, toAtLeastTxId, null, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Collection<EditLogInputStream> selectInputStreams(
|
||||||
|
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
|
||||||
|
boolean inProgressOK) throws IOException {
|
||||||
|
return selectInputStreams(fromTxId, toAtLeastTxId,
|
||||||
|
recovery, inProgressOK, false);
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Select a list of input streams.
|
* Select a list of input streams.
|
||||||
*
|
*
|
||||||
@ -1591,16 +1599,18 @@ public Collection<EditLogInputStream> selectInputStreams(
|
|||||||
* @param toAtLeastTxId the selected streams must contain this transaction
|
* @param toAtLeastTxId the selected streams must contain this transaction
|
||||||
* @param recovery recovery context
|
* @param recovery recovery context
|
||||||
* @param inProgressOk set to true if in-progress streams are OK
|
* @param inProgressOk set to true if in-progress streams are OK
|
||||||
|
* @param onlyDurableTxns set to true if streams are bounded
|
||||||
|
* by the durable TxId
|
||||||
*/
|
*/
|
||||||
public Collection<EditLogInputStream> selectInputStreams(
|
public Collection<EditLogInputStream> selectInputStreams(long fromTxId,
|
||||||
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
|
long toAtLeastTxId, MetaRecoveryContext recovery, boolean inProgressOk,
|
||||||
boolean inProgressOk) throws IOException {
|
boolean onlyDurableTxns) throws IOException {
|
||||||
|
|
||||||
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
|
||||||
synchronized(journalSetLock) {
|
synchronized(journalSetLock) {
|
||||||
Preconditions.checkState(journalSet.isOpen(), "Cannot call " +
|
Preconditions.checkState(journalSet.isOpen(), "Cannot call " +
|
||||||
"selectInputStreams() on closed FSEditLog");
|
"selectInputStreams() on closed FSEditLog");
|
||||||
selectInputStreams(streams, fromTxId, inProgressOk);
|
selectInputStreams(streams, fromTxId, inProgressOk, onlyDurableTxns);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -333,10 +333,17 @@ private static List<EditLogFile> matchEditLogs(File[] filesInStorage,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
synchronized public void selectInputStreams(
|
||||||
|
Collection<EditLogInputStream> streams,
|
||||||
|
long fromTxnId, boolean inProgressOk) throws IOException {
|
||||||
|
selectInputStreams(streams, fromTxnId, inProgressOk, false);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
synchronized public void selectInputStreams(
|
synchronized public void selectInputStreams(
|
||||||
Collection<EditLogInputStream> streams, long fromTxId,
|
Collection<EditLogInputStream> streams, long fromTxId,
|
||||||
boolean inProgressOk) throws IOException {
|
boolean inProgressOk, boolean onlyDurableTxns)
|
||||||
|
throws IOException {
|
||||||
List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
|
List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
|
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
|
||||||
|
@ -261,10 +261,13 @@ public boolean isOpen() {
|
|||||||
* may not be sorted-- this is up to the caller.
|
* may not be sorted-- this is up to the caller.
|
||||||
* @param fromTxId The transaction ID to start looking for streams at
|
* @param fromTxId The transaction ID to start looking for streams at
|
||||||
* @param inProgressOk Should we consider unfinalized streams?
|
* @param inProgressOk Should we consider unfinalized streams?
|
||||||
|
* @param onlyDurableTxns Set to true if streams are bounded by the durable
|
||||||
|
* TxId. A durable TxId is the committed txid in QJM
|
||||||
|
* or the largest txid written into file in FJM
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxId, boolean inProgressOk) {
|
long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) {
|
||||||
final PriorityQueue<EditLogInputStream> allStreams =
|
final PriorityQueue<EditLogInputStream> allStreams =
|
||||||
new PriorityQueue<EditLogInputStream>(64,
|
new PriorityQueue<EditLogInputStream>(64,
|
||||||
EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
EDIT_LOG_INPUT_STREAM_COMPARATOR);
|
||||||
@ -274,7 +277,8 @@ public void selectInputStreams(Collection<EditLogInputStream> streams,
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
|
jas.getManager().selectInputStreams(allStreams, fromTxId,
|
||||||
|
inProgressOk, onlyDurableTxns);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Unable to determine input streams from " + jas.getManager() +
|
LOG.warn("Unable to determine input streams from " + jas.getManager() +
|
||||||
". Skipping.", ioe);
|
". Skipping.", ioe);
|
||||||
@ -681,7 +685,8 @@ public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
|
|||||||
// And then start looking from after that point
|
// And then start looking from after that point
|
||||||
curStartTxId = bestLog.getEndTxId() + 1;
|
curStartTxId = bestLog.getEndTxId() + 1;
|
||||||
}
|
}
|
||||||
RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
|
RemoteEditLogManifest ret = new RemoteEditLogManifest(logs,
|
||||||
|
curStartTxId - 1);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Generated manifest for logs since " + fromTxId + ":"
|
LOG.debug("Generated manifest for logs since " + fromTxId + ":"
|
||||||
|
@ -42,10 +42,14 @@ interface LogsPurgeable {
|
|||||||
*
|
*
|
||||||
* @param fromTxId the first transaction id we want to read
|
* @param fromTxId the first transaction id we want to read
|
||||||
* @param inProgressOk whether or not in-progress streams should be returned
|
* @param inProgressOk whether or not in-progress streams should be returned
|
||||||
|
* @param onlyDurableTxns whether or not streams should be bounded by durable
|
||||||
|
* TxId. A durable TxId is the committed txid in QJM
|
||||||
|
* or the largest txid written into file in FJM
|
||||||
* @throws IOException if the underlying storage has an error or is otherwise
|
* @throws IOException if the underlying storage has an error or is otherwise
|
||||||
* inaccessible
|
* inaccessible
|
||||||
*/
|
*/
|
||||||
void selectInputStreams(Collection<EditLogInputStream> streams,
|
void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxId, boolean inProgressOk) throws IOException;
|
long fromTxId, boolean inProgressOk, boolean onlyDurableTxns)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -134,7 +134,7 @@ void purgeOldStorage(NameNodeFile nnf) throws IOException {
|
|||||||
long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
|
long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
|
||||||
|
|
||||||
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
|
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
|
||||||
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false);
|
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false, false);
|
||||||
Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
|
Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
|
||||||
@Override
|
@Override
|
||||||
public int compare(EditLogInputStream a, EditLogInputStream b) {
|
public int compare(EditLogInputStream a, EditLogInputStream b) {
|
||||||
|
@ -904,7 +904,7 @@ public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxId, boolean inProgressOk) {
|
long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) {
|
||||||
Iterator<StorageDirectory> iter = storage.dirIterator();
|
Iterator<StorageDirectory> iter = storage.dirIterator();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
StorageDirectory dir = iter.next();
|
StorageDirectory dir = iter.next();
|
||||||
|
@ -117,6 +117,11 @@ public class EditLogTailer {
|
|||||||
*/
|
*/
|
||||||
private int maxRetries;
|
private int maxRetries;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the tailer should tail the in-progress edit log segments.
|
||||||
|
*/
|
||||||
|
private final boolean inProgressOk;
|
||||||
|
|
||||||
public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
|
public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
|
||||||
this.tailerThread = new EditLogTailerThread();
|
this.tailerThread = new EditLogTailerThread();
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
@ -164,6 +169,10 @@ public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
|
|||||||
maxRetries = DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT;
|
maxRetries = DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inProgressOk = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
|
||||||
|
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
|
||||||
|
|
||||||
nnCount = nns.size();
|
nnCount = nns.size();
|
||||||
// setup the iterator to endlessly loop the nns
|
// setup the iterator to endlessly loop the nns
|
||||||
this.nnLookup = Iterators.cycle(nns);
|
this.nnLookup = Iterators.cycle(nns);
|
||||||
@ -236,7 +245,8 @@ void doTailEdits() throws IOException, InterruptedException {
|
|||||||
}
|
}
|
||||||
Collection<EditLogInputStream> streams;
|
Collection<EditLogInputStream> streams;
|
||||||
try {
|
try {
|
||||||
streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false);
|
streams = editLog.selectInputStreams(lastTxnId + 1, 0,
|
||||||
|
null, inProgressOk, true);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
// This is acceptable. If we try to tail edits in the middle of an edits
|
// This is acceptable. If we try to tail edits in the middle of an edits
|
||||||
// log roll, i.e. the last one has been finalized but the new inprogress
|
// log roll, i.e. the last one has been finalized but the new inprogress
|
||||||
|
@ -30,11 +30,14 @@ public class RemoteEditLogManifest {
|
|||||||
|
|
||||||
private List<RemoteEditLog> logs;
|
private List<RemoteEditLog> logs;
|
||||||
|
|
||||||
|
private long committedTxnId = -1;
|
||||||
|
|
||||||
public RemoteEditLogManifest() {
|
public RemoteEditLogManifest() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public RemoteEditLogManifest(List<RemoteEditLog> logs) {
|
public RemoteEditLogManifest(List<RemoteEditLog> logs, long committedTxnId) {
|
||||||
this.logs = logs;
|
this.logs = logs;
|
||||||
|
this.committedTxnId = committedTxnId;
|
||||||
checkState();
|
checkState();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,7 +59,6 @@ private void checkState() {
|
|||||||
+ this);
|
+ this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
prev = log;
|
prev = log;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -65,10 +67,13 @@ public List<RemoteEditLog> getLogs() {
|
|||||||
return Collections.unmodifiableList(logs);
|
return Collections.unmodifiableList(logs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getCommittedTxnId() {
|
||||||
|
return committedTxnId;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "[" + Joiner.on(", ").join(logs) + "]";
|
return "[" + Joiner.on(", ").join(logs) + "]" + " CommittedTxId: "
|
||||||
|
+ committedTxnId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -88,6 +88,7 @@ message RemoteEditLogProto {
|
|||||||
*/
|
*/
|
||||||
message RemoteEditLogManifestProto {
|
message RemoteEditLogManifestProto {
|
||||||
repeated RemoteEditLogProto logs = 1;
|
repeated RemoteEditLogProto logs = 1;
|
||||||
|
required uint64 committedTxnId = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2789,6 +2789,16 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.ha.tail-edits.in-progress</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>
|
||||||
|
Whether enable standby namenode to tail in-progress edit logs.
|
||||||
|
Clients might want to turn it on when they want Standby NN to have
|
||||||
|
more up-to-date data.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.datanode.ec.reconstruction.stripedread.timeout.millis</name>
|
<name>dfs.datanode.ec.reconstruction.stripedread.timeout.millis</name>
|
||||||
<value>5000</value>
|
<value>5000</value>
|
||||||
|
@ -329,7 +329,7 @@ public void testConvertRemoteEditLogManifest() {
|
|||||||
List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>();
|
List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>();
|
||||||
logs.add(new RemoteEditLog(1, 10));
|
logs.add(new RemoteEditLog(1, 10));
|
||||||
logs.add(new RemoteEditLog(11, 20));
|
logs.add(new RemoteEditLog(11, 20));
|
||||||
RemoteEditLogManifest m = new RemoteEditLogManifest(logs);
|
RemoteEditLogManifest m = new RemoteEditLogManifest(logs, 20);
|
||||||
RemoteEditLogManifestProto mProto = PBHelper.convert(m);
|
RemoteEditLogManifestProto mProto = PBHelper.convert(m);
|
||||||
RemoteEditLogManifest m1 = PBHelper.convert(mProto);
|
RemoteEditLogManifest m1 = PBHelper.convert(mProto);
|
||||||
|
|
||||||
|
@ -158,12 +158,12 @@ public void testMaintainCommittedTxId() throws Exception {
|
|||||||
// Send txids 1-3, with a request indicating only 0 committed
|
// Send txids 1-3, with a request indicating only 0 committed
|
||||||
journal.journal(new RequestInfo(JID, 1, 2, 0), 1, 1, 3,
|
journal.journal(new RequestInfo(JID, 1, 2, 0), 1, 1, 3,
|
||||||
QJMTestUtil.createTxnData(1, 3));
|
QJMTestUtil.createTxnData(1, 3));
|
||||||
assertEquals(0, journal.getCommittedTxnIdForTests());
|
assertEquals(0, journal.getCommittedTxnId());
|
||||||
|
|
||||||
// Send 4-6, with request indicating that through 3 is committed.
|
// Send 4-6, with request indicating that through 3 is committed.
|
||||||
journal.journal(new RequestInfo(JID, 1, 3, 3), 1, 4, 3,
|
journal.journal(new RequestInfo(JID, 1, 3, 3), 1, 4, 3,
|
||||||
QJMTestUtil.createTxnData(4, 6));
|
QJMTestUtil.createTxnData(4, 6));
|
||||||
assertEquals(3, journal.getCommittedTxnIdForTests());
|
assertEquals(3, journal.getCommittedTxnId());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 10000)
|
@Test (timeout = 10000)
|
||||||
|
@ -1034,9 +1034,9 @@ public void testEditLogManifestMocks() throws IOException {
|
|||||||
"[1,100]|[101,200]|[201,]");
|
"[1,100]|[101,200]|[201,]");
|
||||||
log = getFSEditLog(storage);
|
log = getFSEditLog(storage);
|
||||||
log.initJournalsForWrite();
|
log.initJournalsForWrite();
|
||||||
assertEquals("[[1,100], [101,200]]",
|
assertEquals("[[1,100], [101,200]] CommittedTxId: 200",
|
||||||
log.getEditLogManifest(1).toString());
|
log.getEditLogManifest(1).toString());
|
||||||
assertEquals("[[101,200]]",
|
assertEquals("[[101,200]] CommittedTxId: 200",
|
||||||
log.getEditLogManifest(101).toString());
|
log.getEditLogManifest(101).toString());
|
||||||
|
|
||||||
// Another simple case, different directories have different
|
// Another simple case, different directories have different
|
||||||
@ -1046,8 +1046,8 @@ public void testEditLogManifestMocks() throws IOException {
|
|||||||
"[1,100]|[201,300]|[301,400]"); // nothing starting at 101
|
"[1,100]|[201,300]|[301,400]"); // nothing starting at 101
|
||||||
log = getFSEditLog(storage);
|
log = getFSEditLog(storage);
|
||||||
log.initJournalsForWrite();
|
log.initJournalsForWrite();
|
||||||
assertEquals("[[1,100], [101,200], [201,300], [301,400]]",
|
assertEquals("[[1,100], [101,200], [201,300], [301,400]]" +
|
||||||
log.getEditLogManifest(1).toString());
|
" CommittedTxId: 400", log.getEditLogManifest(1).toString());
|
||||||
|
|
||||||
// Case where one directory has an earlier finalized log, followed
|
// Case where one directory has an earlier finalized log, followed
|
||||||
// by a gap. The returned manifest should start after the gap.
|
// by a gap. The returned manifest should start after the gap.
|
||||||
@ -1056,7 +1056,7 @@ public void testEditLogManifestMocks() throws IOException {
|
|||||||
"[301,400]|[401,500]");
|
"[301,400]|[401,500]");
|
||||||
log = getFSEditLog(storage);
|
log = getFSEditLog(storage);
|
||||||
log.initJournalsForWrite();
|
log.initJournalsForWrite();
|
||||||
assertEquals("[[301,400], [401,500]]",
|
assertEquals("[[301,400], [401,500]] CommittedTxId: 500",
|
||||||
log.getEditLogManifest(1).toString());
|
log.getEditLogManifest(1).toString());
|
||||||
|
|
||||||
// Case where different directories have different length logs
|
// Case where different directories have different length logs
|
||||||
@ -1066,9 +1066,9 @@ public void testEditLogManifestMocks() throws IOException {
|
|||||||
"[1,50]|[101,200]"); // short log at 1
|
"[1,50]|[101,200]"); // short log at 1
|
||||||
log = getFSEditLog(storage);
|
log = getFSEditLog(storage);
|
||||||
log.initJournalsForWrite();
|
log.initJournalsForWrite();
|
||||||
assertEquals("[[1,100], [101,200]]",
|
assertEquals("[[1,100], [101,200]] CommittedTxId: 200",
|
||||||
log.getEditLogManifest(1).toString());
|
log.getEditLogManifest(1).toString());
|
||||||
assertEquals("[[101,200]]",
|
assertEquals("[[101,200]] CommittedTxId: 200",
|
||||||
log.getEditLogManifest(101).toString());
|
log.getEditLogManifest(101).toString());
|
||||||
|
|
||||||
// Case where the first storage has an inprogress while
|
// Case where the first storage has an inprogress while
|
||||||
@ -1079,9 +1079,9 @@ public void testEditLogManifestMocks() throws IOException {
|
|||||||
"[1,100]|[101,200]");
|
"[1,100]|[101,200]");
|
||||||
log = getFSEditLog(storage);
|
log = getFSEditLog(storage);
|
||||||
log.initJournalsForWrite();
|
log.initJournalsForWrite();
|
||||||
assertEquals("[[1,100], [101,200]]",
|
assertEquals("[[1,100], [101,200]] CommittedTxId: 200",
|
||||||
log.getEditLogManifest(1).toString());
|
log.getEditLogManifest(1).toString());
|
||||||
assertEquals("[[101,200]]",
|
assertEquals("[[101,200]] CommittedTxId: 200",
|
||||||
log.getEditLogManifest(101).toString());
|
log.getEditLogManifest(101).toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -398,7 +398,7 @@ public void testMatchEditLogInvalidDirThrowsIOException() throws IOException {
|
|||||||
FileJournalManager.matchEditLogs(badDir);
|
FileJournalManager.matchEditLogs(badDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static EditLogInputStream getJournalInputStream(JournalManager jm,
|
private static EditLogInputStream getJournalInputStream(FileJournalManager jm,
|
||||||
long txId, boolean inProgressOk) throws IOException {
|
long txId, boolean inProgressOk) throws IOException {
|
||||||
final PriorityQueue<EditLogInputStream> allStreams =
|
final PriorityQueue<EditLogInputStream> allStreams =
|
||||||
new PriorityQueue<EditLogInputStream>(64,
|
new PriorityQueue<EditLogInputStream>(64,
|
||||||
|
@ -173,7 +173,7 @@ public void finalizeLogSegment(long firstTxId, long lastTxId)
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
public void selectInputStreams(Collection<EditLogInputStream> streams,
|
||||||
long fromTxnId, boolean inProgressOk) {
|
long fromTxnId, boolean inProgressOk, boolean onlyDurableTxns) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -368,11 +368,11 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|||||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
Object[] args = invocation.getArguments();
|
Object[] args = invocation.getArguments();
|
||||||
journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
|
journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
|
||||||
(Long)args[1], (Boolean)args[2]);
|
(Long)args[1], (Boolean)args[2], (Boolean)args[3]);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}).when(mockLog).selectInputStreams(Mockito.anyCollection(),
|
}).when(mockLog).selectInputStreams(Mockito.anyCollection(),
|
||||||
Mockito.anyLong(), Mockito.anyBoolean());
|
Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
|
||||||
return mockLog;
|
return mockLog;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -329,7 +329,8 @@ private LimitedEditLogAnswer causeFailureOnEditLogRead() throws IOException {
|
|||||||
FSEditLog spyEditLog = NameNodeAdapter.spyOnEditLog(nn1);
|
FSEditLog spyEditLog = NameNodeAdapter.spyOnEditLog(nn1);
|
||||||
LimitedEditLogAnswer answer = new LimitedEditLogAnswer();
|
LimitedEditLogAnswer answer = new LimitedEditLogAnswer();
|
||||||
doAnswer(answer).when(spyEditLog).selectInputStreams(
|
doAnswer(answer).when(spyEditLog).selectInputStreams(
|
||||||
anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean());
|
anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean(),
|
||||||
|
anyBoolean());
|
||||||
return answer;
|
return answer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,339 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.HAUtil;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test cases for in progress tailing edit logs by
|
||||||
|
* the standby node.
|
||||||
|
*/
|
||||||
|
public class TestStandbyInProgressTail {
|
||||||
|
private static final Log LOG =
|
||||||
|
LogFactory.getLog(TestStandbyInProgressTail.class);
|
||||||
|
private Configuration conf;
|
||||||
|
private MiniQJMHACluster qjmhaCluster;
|
||||||
|
private MiniDFSCluster cluster;
|
||||||
|
private NameNode nn0;
|
||||||
|
private NameNode nn1;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void startUp() throws IOException {
|
||||||
|
conf = new Configuration();
|
||||||
|
// Set period of tail edits to a large value (20 mins) for test purposes
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 20 * 60);
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
|
||||||
|
HAUtil.setAllowStandbyReads(conf, true);
|
||||||
|
qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
|
||||||
|
cluster = qjmhaCluster.getDfsCluster();
|
||||||
|
|
||||||
|
// Get NameNode from cluster to future manual control
|
||||||
|
nn0 = cluster.getNameNode(0);
|
||||||
|
nn1 = cluster.getNameNode(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
if (qjmhaCluster != null) {
|
||||||
|
qjmhaCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDefault() throws Exception {
|
||||||
|
if (qjmhaCluster != null) {
|
||||||
|
qjmhaCluster.shutdown();
|
||||||
|
}
|
||||||
|
conf = new Configuration();
|
||||||
|
// Set period of tail edits to a large value (20 mins) for test purposes
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 20 * 60);
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
|
||||||
|
HAUtil.setAllowStandbyReads(conf, true);
|
||||||
|
qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
|
||||||
|
cluster = qjmhaCluster.getDfsCluster();
|
||||||
|
|
||||||
|
try {
|
||||||
|
// During HA startup, both nodes should be in
|
||||||
|
// standby and we shouldn't have any edits files
|
||||||
|
// in any edits directory!
|
||||||
|
List<URI> allDirs = Lists.newArrayList();
|
||||||
|
allDirs.addAll(cluster.getNameDirs(0));
|
||||||
|
allDirs.addAll(cluster.getNameDirs(1));
|
||||||
|
assertNoEditFiles(allDirs);
|
||||||
|
|
||||||
|
// Set the first NN to active, make sure it creates edits
|
||||||
|
// in its own dirs and the shared dir. The standby
|
||||||
|
// should still have no edits!
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
assertEditFiles(cluster.getNameDirs(0),
|
||||||
|
NNStorage.getInProgressEditsFileName(1));
|
||||||
|
assertNoEditFiles(cluster.getNameDirs(1));
|
||||||
|
|
||||||
|
cluster.getNameNode(0).getRpcServer().mkdirs("/test",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
|
||||||
|
cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits();
|
||||||
|
|
||||||
|
// StandbyNameNode should not finish tailing in-progress logs
|
||||||
|
assertNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
|
||||||
|
"/test", true));
|
||||||
|
|
||||||
|
// Restarting the standby should not finalize any edits files
|
||||||
|
// in the shared directory when it starts up!
|
||||||
|
cluster.restartNameNode(1);
|
||||||
|
|
||||||
|
assertEditFiles(cluster.getNameDirs(0),
|
||||||
|
NNStorage.getInProgressEditsFileName(1));
|
||||||
|
assertNoEditFiles(cluster.getNameDirs(1));
|
||||||
|
|
||||||
|
// Additionally it should not have applied any in-progress logs
|
||||||
|
// at start-up -- otherwise, it would have read half-way into
|
||||||
|
// the current log segment, and on the next roll, it would have to
|
||||||
|
// either replay starting in the middle of the segment (not allowed)
|
||||||
|
// or double-replay the edits (incorrect).
|
||||||
|
assertNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
|
||||||
|
"/test", true));
|
||||||
|
|
||||||
|
cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
|
||||||
|
// If we restart NN0, it'll come back as standby, and we can
|
||||||
|
// transition NN1 to active and make sure it reads edits correctly.
|
||||||
|
cluster.restartNameNode(0);
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
|
||||||
|
// NN1 should have both the edits that came before its restart,
|
||||||
|
// and the edits that came after its restart.
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
|
||||||
|
"/test", true));
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
|
||||||
|
"/test2", true));
|
||||||
|
} finally {
|
||||||
|
if (qjmhaCluster != null) {
|
||||||
|
qjmhaCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetup() throws Exception {
|
||||||
|
// During HA startup, both nodes should be in
|
||||||
|
// standby and we shouldn't have any edits files
|
||||||
|
// in any edits directory!
|
||||||
|
List<URI> allDirs = Lists.newArrayList();
|
||||||
|
allDirs.addAll(cluster.getNameDirs(0));
|
||||||
|
allDirs.addAll(cluster.getNameDirs(1));
|
||||||
|
assertNoEditFiles(allDirs);
|
||||||
|
|
||||||
|
// Set the first NN to active, make sure it creates edits
|
||||||
|
// in its own dirs and the shared dir. The standby
|
||||||
|
// should still have no edits!
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
assertEditFiles(cluster.getNameDirs(0),
|
||||||
|
NNStorage.getInProgressEditsFileName(1));
|
||||||
|
assertNoEditFiles(cluster.getNameDirs(1));
|
||||||
|
|
||||||
|
cluster.getNameNode(0).getRpcServer().mkdirs("/test",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
|
||||||
|
nn1.getNamesystem().getEditLogTailer().doTailEdits();
|
||||||
|
|
||||||
|
// After waiting for 5 seconds, StandbyNameNode should finish tailing
|
||||||
|
// in-progress logs
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
|
||||||
|
"/test", true));
|
||||||
|
|
||||||
|
// Restarting the standby should not finalize any edits files
|
||||||
|
// in the shared directory when it starts up!
|
||||||
|
cluster.restartNameNode(1);
|
||||||
|
|
||||||
|
assertEditFiles(cluster.getNameDirs(0),
|
||||||
|
NNStorage.getInProgressEditsFileName(1));
|
||||||
|
assertNoEditFiles(cluster.getNameDirs(1));
|
||||||
|
|
||||||
|
// Because we're using in-progress tailer, this should not be null
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
|
||||||
|
"/test", true));
|
||||||
|
|
||||||
|
cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
|
||||||
|
// If we restart NN0, it'll come back as standby, and we can
|
||||||
|
// transition NN1 to active and make sure it reads edits correctly.
|
||||||
|
cluster.restartNameNode(0);
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
|
||||||
|
// NN1 should have both the edits that came before its restart,
|
||||||
|
// and the edits that came after its restart.
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
|
||||||
|
"/test", true));
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
|
||||||
|
"/test2", true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHalfStartInProgressTail() throws Exception {
|
||||||
|
// Set the first NN to active, make sure it creates edits
|
||||||
|
// in its own dirs and the shared dir. The standby
|
||||||
|
// should still have no edits!
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
assertEditFiles(cluster.getNameDirs(0),
|
||||||
|
NNStorage.getInProgressEditsFileName(1));
|
||||||
|
assertNoEditFiles(cluster.getNameDirs(1));
|
||||||
|
|
||||||
|
cluster.getNameNode(0).getRpcServer().mkdirs("/test",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
nn1.getNamesystem().getEditLogTailer().doTailEdits();
|
||||||
|
|
||||||
|
// StandbyNameNode should tail the in-progress edit
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true));
|
||||||
|
|
||||||
|
// Create a new edit and finalized it
|
||||||
|
cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
nn0.getRpcServer().rollEditLog();
|
||||||
|
|
||||||
|
// StandbyNameNode shouldn't tail the edit since we do not call the method
|
||||||
|
assertNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true));
|
||||||
|
|
||||||
|
// Create a new in-progress edit and let SBNN do the tail
|
||||||
|
cluster.getNameNode(0).getRpcServer().mkdirs("/test3",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
nn1.getNamesystem().getEditLogTailer().doTailEdits();
|
||||||
|
|
||||||
|
// StandbyNameNode should tail the finalized edit and the new in-progress
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true));
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true));
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test3", true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInitStartInProgressTail() throws Exception {
|
||||||
|
// Set the first NN to active, make sure it creates edits
|
||||||
|
// in its own dirs and the shared dir. The standby
|
||||||
|
// should still have no edits!
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
assertEditFiles(cluster.getNameDirs(0),
|
||||||
|
NNStorage.getInProgressEditsFileName(1));
|
||||||
|
assertNoEditFiles(cluster.getNameDirs(1));
|
||||||
|
|
||||||
|
cluster.getNameNode(0).getRpcServer().mkdirs("/test",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
nn0.getRpcServer().rollEditLog();
|
||||||
|
|
||||||
|
cluster.getNameNode(0).getRpcServer().mkdirs("/test3",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
|
||||||
|
assertNull(NameNodeAdapter.getFileInfo(nn1, "/test", true));
|
||||||
|
assertNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true));
|
||||||
|
assertNull(NameNodeAdapter.getFileInfo(nn1, "/test3", true));
|
||||||
|
|
||||||
|
nn1.getNamesystem().getEditLogTailer().doTailEdits();
|
||||||
|
|
||||||
|
// StandbyNameNode shoudl tail the finalized edit and the new in-progress
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true));
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true));
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test3", true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNewStartInProgressTail() throws Exception {
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
assertEditFiles(cluster.getNameDirs(0),
|
||||||
|
NNStorage.getInProgressEditsFileName(1));
|
||||||
|
assertNoEditFiles(cluster.getNameDirs(1));
|
||||||
|
|
||||||
|
cluster.getNameNode(0).getRpcServer().mkdirs("/test",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
nn1.getNamesystem().getEditLogTailer().doTailEdits();
|
||||||
|
nn0.getRpcServer().rollEditLog();
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true));
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true));
|
||||||
|
|
||||||
|
cluster.getNameNode(0).getRpcServer().mkdirs("/test3",
|
||||||
|
FsPermission.createImmutable((short) 0755), true);
|
||||||
|
nn1.getNamesystem().getEditLogTailer().doTailEdits();
|
||||||
|
|
||||||
|
// StandbyNameNode shoudl tail the finalized edit and the new in-progress
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test", true));
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test2", true));
|
||||||
|
assertNotNull(NameNodeAdapter.getFileInfo(nn1, "/test3", true));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that no edits files are present in the given storage dirs.
|
||||||
|
*/
|
||||||
|
private static void assertNoEditFiles(Iterable<URI> dirs) throws IOException {
|
||||||
|
assertEditFiles(dirs);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that the given list of edits files are present in the given storage
|
||||||
|
* dirs.
|
||||||
|
*/
|
||||||
|
private static void assertEditFiles(Iterable<URI> dirs, String... files)
|
||||||
|
throws IOException {
|
||||||
|
for (URI u : dirs) {
|
||||||
|
File editDirRoot = new File(u.getPath());
|
||||||
|
File editDir = new File(editDirRoot, "current");
|
||||||
|
GenericTestUtils.assertExists(editDir);
|
||||||
|
if (files.length == 0) {
|
||||||
|
LOG.info("Checking no edit files exist in " + editDir);
|
||||||
|
} else {
|
||||||
|
LOG.info("Checking for following edit files in " + editDir
|
||||||
|
+ ": " + Joiner.on(",").join(files));
|
||||||
|
}
|
||||||
|
|
||||||
|
GenericTestUtils.assertGlobEquals(editDir, "edits_.*", files);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user