HDFS-2227. getRemoteEditLogManifest should pull its information from FileJournalManager during checkpoint process. Contributed by Ivan Kelly and Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1155977 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
27b66e2bba
commit
eb6e44b1ba
|
@ -650,6 +650,10 @@ Trunk (unreleased changes)
|
|||
HDFS-2230. ivy to resolve/retrieve latest common-tests jar published by
|
||||
hadoop common maven build. (gkesavan)
|
||||
|
||||
HDFS-2227. getRemoteEditLogManifest should pull its information from
|
||||
FileJournalManager during checkpoint process (Ivan Kelly and Todd Lipcon
|
||||
via todd)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
||||
|
|
|
@ -91,7 +91,6 @@ public class BackupImage extends FSImage {
|
|||
super(conf);
|
||||
storage.setDisablePreUpgradableLayoutCheck(true);
|
||||
bnState = BNState.DROP_UNTIL_NEXT_ROLL;
|
||||
editLog.initJournals();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -207,7 +207,7 @@ class Checkpointer extends Daemon {
|
|||
long lastApplied = bnImage.getLastAppliedTxId();
|
||||
LOG.debug("Doing checkpoint. Last applied: " + lastApplied);
|
||||
RemoteEditLogManifest manifest =
|
||||
getNamenode().getEditLogManifest(bnImage.getLastAppliedTxId());
|
||||
getNamenode().getEditLogManifest(bnImage.getLastAppliedTxId() + 1);
|
||||
|
||||
if (!manifest.getLogs().isEmpty()) {
|
||||
RemoteEditLog firstRemoteLog = manifest.getLogs().get(0);
|
||||
|
|
|
@ -18,8 +18,10 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -27,27 +29,26 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
|
||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableListMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Multimaps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
|
||||
|
||||
/**
|
||||
|
@ -133,15 +134,6 @@ public class FSEditLog {
|
|||
this.storage = storage;
|
||||
metrics = NameNode.getNameNodeMetrics();
|
||||
lastPrintTime = now();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the list of edit journals
|
||||
*/
|
||||
synchronized void initJournals() {
|
||||
assert journals.isEmpty();
|
||||
Preconditions.checkState(state == State.UNINITIALIZED,
|
||||
"Bad state: %s", state);
|
||||
|
||||
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
|
||||
journals.add(new JournalAndStream(new FileJournalManager(sd)));
|
||||
|
@ -159,8 +151,7 @@ public class FSEditLog {
|
|||
* log segment.
|
||||
*/
|
||||
synchronized void open() throws IOException {
|
||||
Preconditions.checkState(state == State.UNINITIALIZED);
|
||||
initJournals();
|
||||
Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS);
|
||||
|
||||
startLogSegment(getLastWrittenTxId() + 1, true);
|
||||
assert state == State.IN_SEGMENT : "Bad state: " + state;
|
||||
|
@ -740,18 +731,64 @@ public class FSEditLog {
|
|||
/**
|
||||
* Return a manifest of what finalized edit logs are available
|
||||
*/
|
||||
public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
|
||||
throws IOException {
|
||||
FSImageTransactionalStorageInspector inspector =
|
||||
new FSImageTransactionalStorageInspector();
|
||||
|
||||
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
|
||||
inspector.inspectDirectory(sd);
|
||||
public synchronized RemoteEditLogManifest getEditLogManifest(
|
||||
long fromTxId) throws IOException {
|
||||
// Collect RemoteEditLogs available from each FileJournalManager
|
||||
List<RemoteEditLog> allLogs = Lists.newArrayList();
|
||||
for (JournalAndStream j : journals) {
|
||||
if (j.getManager() instanceof FileJournalManager) {
|
||||
FileJournalManager fjm = (FileJournalManager)j.getManager();
|
||||
try {
|
||||
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Cannot list edit logs in " + fjm, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return inspector.getEditLogManifest(sinceTxId);
|
||||
// Group logs by their starting txid
|
||||
ImmutableListMultimap<Long, RemoteEditLog> logsByStartTxId =
|
||||
Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
|
||||
long curStartTxId = fromTxId;
|
||||
|
||||
List<RemoteEditLog> logs = Lists.newArrayList();
|
||||
while (true) {
|
||||
ImmutableList<RemoteEditLog> logGroup = logsByStartTxId.get(curStartTxId);
|
||||
if (logGroup.isEmpty()) {
|
||||
// we have a gap in logs - for example because we recovered some old
|
||||
// storage directory with ancient logs. Clear out any logs we've
|
||||
// accumulated so far, and then skip to the next segment of logs
|
||||
// after the gap.
|
||||
SortedSet<Long> startTxIds = Sets.newTreeSet(logsByStartTxId.keySet());
|
||||
startTxIds = startTxIds.tailSet(curStartTxId);
|
||||
if (startTxIds.isEmpty()) {
|
||||
break;
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Found gap in logs at " + curStartTxId + ": " +
|
||||
"not returning previous logs in manifest.");
|
||||
}
|
||||
logs.clear();
|
||||
curStartTxId = startTxIds.first();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Find the one that extends the farthest forward
|
||||
RemoteEditLog bestLog = Collections.max(logGroup);
|
||||
logs.add(bestLog);
|
||||
// And then start looking from after that point
|
||||
curStartTxId = bestLog.getEndTxId() + 1;
|
||||
}
|
||||
RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Generated manifest for logs since " + fromTxId + ":"
|
||||
+ ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Finalizes the current edit log and opens a new log segment.
|
||||
* @return the transaction id of the BEGIN_LOG_SEGMENT transaction
|
||||
|
@ -1064,7 +1101,8 @@ public class FSEditLog {
|
|||
stream = null;
|
||||
}
|
||||
|
||||
private void abort() {
|
||||
@VisibleForTesting
|
||||
void abort() {
|
||||
if (stream == null) return;
|
||||
try {
|
||||
stream.abort();
|
||||
|
|
|
@ -268,22 +268,6 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
|
|||
return needToSave;
|
||||
}
|
||||
|
||||
|
||||
RemoteEditLogManifest getEditLogManifest(long sinceTxId) {
|
||||
List<RemoteEditLog> logs = Lists.newArrayList();
|
||||
for (LogGroup g : logGroups.values()) {
|
||||
if (!g.hasFinalized) continue;
|
||||
|
||||
EditLogFile fel = g.getBestNonCorruptLog();
|
||||
if (fel.getLastTxId() < sinceTxId) continue;
|
||||
|
||||
logs.add(new RemoteEditLog(fel.getFirstTxId(),
|
||||
fel.getLastTxId()));
|
||||
}
|
||||
|
||||
return new RemoteEditLogManifest(logs);
|
||||
}
|
||||
|
||||
/**
|
||||
* A group of logs that all start at the same txid.
|
||||
*
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|||
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -125,6 +126,33 @@ class FileJournalManager implements JournalManager {
|
|||
File f = NNStorage.getInProgressEditsFile(sd, segmentStartsAtTxId);
|
||||
return new EditLogFileInputStream(f);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find all editlog segments starting at or above the given txid.
|
||||
* @param fromTxId the txnid which to start looking
|
||||
* @return a list of remote edit logs
|
||||
* @throws IOException if edit logs cannot be listed.
|
||||
*/
|
||||
List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
|
||||
File currentDir = sd.getCurrentDir();
|
||||
List<EditLogFile> allLogFiles = matchEditLogs(
|
||||
FileUtil.listFiles(currentDir));
|
||||
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
|
||||
allLogFiles.size());
|
||||
|
||||
for (EditLogFile elf : allLogFiles) {
|
||||
if (elf.isCorrupt() || elf.isInProgress()) continue;
|
||||
if (elf.getFirstTxId() >= firstTxId) {
|
||||
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
|
||||
} else if ((firstTxId > elf.getFirstTxId()) &&
|
||||
(firstTxId <= elf.getLastTxId())) {
|
||||
throw new IOException("Asked for firstTxId " + firstTxId
|
||||
+ " which is in the middle of file " + elf.file);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static List<EditLogFile> matchEditLogs(File[] filesInStorage) {
|
||||
List<EditLogFile> ret = Lists.newArrayList();
|
||||
|
|
|
@ -69,6 +69,8 @@ class TransferFsImage implements FSConstants {
|
|||
|
||||
static void downloadEditsToStorage(String fsName, RemoteEditLog log,
|
||||
NNStorage dstStorage) throws IOException {
|
||||
assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
|
||||
"bad log: " + log;
|
||||
String fileid = GetImageServlet.getParamStringForLog(
|
||||
log, dstStorage);
|
||||
String fileName = NNStorage.getFinalizedEditsFileName(
|
||||
|
|
|
@ -20,11 +20,15 @@ package org.apache.hadoop.hdfs.server.protocol;
|
|||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
public class RemoteEditLog implements Writable {
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ComparisonChain;
|
||||
|
||||
public class RemoteEditLog implements Writable, Comparable<RemoteEditLog> {
|
||||
private long startTxId = FSConstants.INVALID_TXID;
|
||||
private long endTxId = FSConstants.INVALID_TXID;
|
||||
|
||||
|
@ -60,5 +64,34 @@ public class RemoteEditLog implements Writable {
|
|||
startTxId = in.readLong();
|
||||
endTxId = in.readLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(RemoteEditLog log) {
|
||||
return ComparisonChain.start()
|
||||
.compare(startTxId, log.startTxId)
|
||||
.compare(endTxId, log.endTxId)
|
||||
.result();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (!(o instanceof RemoteEditLog)) return false;
|
||||
return this.compareTo((RemoteEditLog)o) == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return (int) (startTxId * endTxId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Guava <code>Function</code> which applies {@link #getStartTxId()}
|
||||
*/
|
||||
public static final Function<RemoteEditLog, Long> GET_START_TXID =
|
||||
new Function<RemoteEditLog, Long>() {
|
||||
@Override
|
||||
public Long apply(RemoteEditLog log) {
|
||||
return log.getStartTxId();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -24,14 +24,18 @@ import java.io.IOException;
|
|||
import java.io.RandomAccessFile;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
|
||||
|
@ -108,6 +112,40 @@ public abstract class FSImageTestUtil {
|
|||
return sd;
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a mock storage directory that returns some set of file contents.
|
||||
* @param type type of storage dir
|
||||
* @param previousExists should we mock that the previous/ dir exists?
|
||||
* @param fileNames the names of files contained in current/
|
||||
*/
|
||||
static StorageDirectory mockStorageDirectory(
|
||||
StorageDirType type,
|
||||
boolean previousExists,
|
||||
String... fileNames) {
|
||||
StorageDirectory sd = mock(StorageDirectory.class);
|
||||
|
||||
doReturn(type).when(sd).getStorageDirType();
|
||||
|
||||
// Version file should always exist
|
||||
doReturn(mockFile(true)).when(sd).getVersionFile();
|
||||
|
||||
// Previous dir optionally exists
|
||||
doReturn(mockFile(previousExists))
|
||||
.when(sd).getPreviousDir();
|
||||
|
||||
// Return a mock 'current' directory which has the given paths
|
||||
File[] files = new File[fileNames.length];
|
||||
for (int i = 0; i < fileNames.length; i++) {
|
||||
files[i] = new File(fileNames[i]);
|
||||
}
|
||||
|
||||
File mockDir = Mockito.spy(new File("/dir/current"));
|
||||
doReturn(files).when(mockDir).listFiles();
|
||||
doReturn(mockDir).when(sd).getCurrentDir();
|
||||
|
||||
return sd;
|
||||
}
|
||||
|
||||
static File mockFile(boolean exists) {
|
||||
File mockFile = mock(File.class);
|
||||
doReturn(exists).when(mockFile).exists();
|
||||
|
@ -361,5 +399,16 @@ public abstract class FSImageTestUtil {
|
|||
assertNotNull(image);
|
||||
}
|
||||
|
||||
|
||||
public static void logStorageContents(Log LOG, NNStorage storage) {
|
||||
LOG.info("current storages and corresponding sizes:");
|
||||
for (StorageDirectory sd : storage.dirIterable(null)) {
|
||||
File curDir = sd.getCurrentDir();
|
||||
LOG.info("In directory " + curDir);
|
||||
File[] files = curDir.listFiles();
|
||||
Arrays.sort(files);
|
||||
for (File f : files) {
|
||||
LOG.info(" file " + f.getAbsolutePath() + "; len = " + f.length());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1450,7 +1450,7 @@ public class TestCheckpoint extends TestCase {
|
|||
|
||||
// Make a finalized log on the server side.
|
||||
nn.rollEditLog();
|
||||
RemoteEditLogManifest manifest = nn.getEditLogManifest(0);
|
||||
RemoteEditLogManifest manifest = nn.getEditLogManifest(1);
|
||||
RemoteEditLog log = manifest.getLogs().get(0);
|
||||
|
||||
NNStorage dstImage = Mockito.mock(NNStorage.class);
|
||||
|
|
|
@ -22,9 +22,15 @@ import java.io.*;
|
|||
import java.net.URI;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -49,6 +55,10 @@ import org.apache.log4j.Level;
|
|||
import org.aspectj.util.FileUtil;
|
||||
|
||||
import org.mockito.Mockito;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import static org.apache.hadoop.test.MetricsAsserts.*;
|
||||
|
||||
|
@ -745,4 +755,103 @@ public class TestEditLog extends TestCase {
|
|||
log.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the getEditLogManifest function using mock storage for a number
|
||||
* of different situations.
|
||||
*/
|
||||
@Test
|
||||
public void testEditLogManifestMocks() throws IOException {
|
||||
NNStorage storage;
|
||||
FSEditLog log;
|
||||
// Simple case - different directories have the same
|
||||
// set of logs, with an in-progress one at end
|
||||
storage = mockStorageWithEdits(
|
||||
"[1,100]|[101,200]|[201,]",
|
||||
"[1,100]|[101,200]|[201,]");
|
||||
log = new FSEditLog(storage);
|
||||
assertEquals("[[1,100], [101,200]]",
|
||||
log.getEditLogManifest(1).toString());
|
||||
assertEquals("[[101,200]]",
|
||||
log.getEditLogManifest(101).toString());
|
||||
|
||||
// Another simple case, different directories have different
|
||||
// sets of files
|
||||
storage = mockStorageWithEdits(
|
||||
"[1,100]|[101,200]",
|
||||
"[1,100]|[201,300]|[301,400]"); // nothing starting at 101
|
||||
log = new FSEditLog(storage);
|
||||
assertEquals("[[1,100], [101,200], [201,300], [301,400]]",
|
||||
log.getEditLogManifest(1).toString());
|
||||
|
||||
// Case where one directory has an earlier finalized log, followed
|
||||
// by a gap. The returned manifest should start after the gap.
|
||||
storage = mockStorageWithEdits(
|
||||
"[1,100]|[301,400]", // gap from 101 to 300
|
||||
"[301,400]|[401,500]");
|
||||
log = new FSEditLog(storage);
|
||||
assertEquals("[[301,400], [401,500]]",
|
||||
log.getEditLogManifest(1).toString());
|
||||
|
||||
// Case where different directories have different length logs
|
||||
// starting at the same txid - should pick the longer one
|
||||
storage = mockStorageWithEdits(
|
||||
"[1,100]|[101,150]", // short log at 101
|
||||
"[1,50]|[101,200]"); // short log at 1
|
||||
log = new FSEditLog(storage);
|
||||
assertEquals("[[1,100], [101,200]]",
|
||||
log.getEditLogManifest(1).toString());
|
||||
assertEquals("[[101,200]]",
|
||||
log.getEditLogManifest(101).toString());
|
||||
|
||||
// Case where the first storage has an inprogress while
|
||||
// the second has finalised that file (i.e. the first failed
|
||||
// recently)
|
||||
storage = mockStorageWithEdits(
|
||||
"[1,100]|[101,]",
|
||||
"[1,100]|[101,200]");
|
||||
log = new FSEditLog(storage);
|
||||
assertEquals("[[1,100], [101,200]]",
|
||||
log.getEditLogManifest(1).toString());
|
||||
assertEquals("[[101,200]]",
|
||||
log.getEditLogManifest(101).toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a mock NNStorage object with several directories, each directory
|
||||
* holding edit logs according to a specification. Each directory
|
||||
* is specified by a pipe-separated string. For example:
|
||||
* <code>[1,100]|[101,200]</code> specifies a directory which
|
||||
* includes two finalized segments, one from 1-100, and one from 101-200.
|
||||
* The syntax <code>[1,]</code> specifies an in-progress log starting at
|
||||
* txid 1.
|
||||
*/
|
||||
private NNStorage mockStorageWithEdits(String... editsDirSpecs) {
|
||||
List<StorageDirectory> sds = Lists.newArrayList();
|
||||
for (String dirSpec : editsDirSpecs) {
|
||||
List<String> files = Lists.newArrayList();
|
||||
String[] logSpecs = dirSpec.split("\\|");
|
||||
for (String logSpec : logSpecs) {
|
||||
Matcher m = Pattern.compile("\\[(\\d+),(\\d+)?\\]").matcher(logSpec);
|
||||
assertTrue("bad spec: " + logSpec, m.matches());
|
||||
if (m.group(2) == null) {
|
||||
files.add(NNStorage.getInProgressEditsFileName(
|
||||
Long.valueOf(m.group(1))));
|
||||
} else {
|
||||
files.add(NNStorage.getFinalizedEditsFileName(
|
||||
Long.valueOf(m.group(1)),
|
||||
Long.valueOf(m.group(2))));
|
||||
}
|
||||
}
|
||||
sds.add(FSImageTestUtil.mockStorageDirectory(
|
||||
NameNodeDirType.EDITS, false,
|
||||
files.toArray(new String[0])));
|
||||
}
|
||||
|
||||
NNStorage storage = Mockito.mock(NNStorage.class);
|
||||
Mockito.doReturn(sds).when(storage).dirIterable(NameNodeDirType.EDITS);
|
||||
return storage;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
import static org.junit.Assert.*;
|
||||
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -29,7 +28,6 @@ import java.util.List;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
|
||||
|
@ -56,7 +54,7 @@ public class TestFSImageStorageInspector {
|
|||
FSImageTransactionalStorageInspector inspector =
|
||||
new FSImageTransactionalStorageInspector();
|
||||
|
||||
StorageDirectory mockDir = mockDirectory(
|
||||
StorageDirectory mockDir = FSImageTestUtil.mockStorageDirectory(
|
||||
NameNodeDirType.IMAGE_AND_EDITS,
|
||||
false,
|
||||
"/foo/current/" + getImageFileName(123),
|
||||
|
@ -95,7 +93,7 @@ public class TestFSImageStorageInspector {
|
|||
FSImageTransactionalStorageInspector inspector =
|
||||
new FSImageTransactionalStorageInspector();
|
||||
|
||||
StorageDirectory mockDir = mockDirectory(
|
||||
StorageDirectory mockDir = FSImageTestUtil.mockStorageDirectory(
|
||||
NameNodeDirType.IMAGE_AND_EDITS,
|
||||
false,
|
||||
"/foo/current/" + getImageFileName(123),
|
||||
|
@ -123,7 +121,7 @@ public class TestFSImageStorageInspector {
|
|||
FSImageTransactionalStorageInspector inspector =
|
||||
new FSImageTransactionalStorageInspector();
|
||||
|
||||
StorageDirectory mockDir = mockDirectory(
|
||||
StorageDirectory mockDir = FSImageTestUtil.mockStorageDirectory(
|
||||
NameNodeDirType.IMAGE_AND_EDITS,
|
||||
false,
|
||||
"/foo/current/" + getImageFileName(123),
|
||||
|
@ -196,7 +194,7 @@ public class TestFSImageStorageInspector {
|
|||
inspector.inspectDirectory(
|
||||
mockDirectoryWithEditLogs("/foo3/current/"
|
||||
+ getInProgressEditsFileName(123)));
|
||||
inspector.inspectDirectory(mockDirectory(
|
||||
inspector.inspectDirectory(FSImageTestUtil.mockStorageDirectory(
|
||||
NameNodeDirType.IMAGE,
|
||||
false,
|
||||
"/foo4/current/" + getImageFileName(122)));
|
||||
|
@ -327,15 +325,15 @@ public class TestFSImageStorageInspector {
|
|||
FSImageTransactionalStorageInspector inspector =
|
||||
new FSImageTransactionalStorageInspector();
|
||||
|
||||
StorageDirectory mockImageDir = mockDirectory(
|
||||
StorageDirectory mockImageDir = FSImageTestUtil.mockStorageDirectory(
|
||||
NameNodeDirType.IMAGE,
|
||||
false,
|
||||
"/foo/current/" + getImageFileName(123));
|
||||
StorageDirectory mockImageDir2 = mockDirectory(
|
||||
StorageDirectory mockImageDir2 = FSImageTestUtil.mockStorageDirectory(
|
||||
NameNodeDirType.IMAGE,
|
||||
false,
|
||||
"/foo2/current/" + getImageFileName(456));
|
||||
StorageDirectory mockEditsDir = mockDirectory(
|
||||
StorageDirectory mockEditsDir = FSImageTestUtil.mockStorageDirectory(
|
||||
NameNodeDirType.EDITS,
|
||||
false,
|
||||
"/foo3/current/" + getFinalizedEditsFileName(123, 456),
|
||||
|
@ -364,43 +362,8 @@ public class TestFSImageStorageInspector {
|
|||
assertArrayEquals(new File[] {
|
||||
new File("/foo3/current/" + getInProgressEditsFileName(457))
|
||||
}, plan.getEditsFiles().toArray(new File[0]));
|
||||
|
||||
// Check log manifest
|
||||
assertEquals("[[123,456]]", inspector.getEditLogManifest(123).toString());
|
||||
assertEquals("[[123,456]]", inspector.getEditLogManifest(456).toString());
|
||||
assertEquals("[]", inspector.getEditLogManifest(457).toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLogManifest() throws IOException {
|
||||
FSImageTransactionalStorageInspector inspector =
|
||||
new FSImageTransactionalStorageInspector();
|
||||
inspector.inspectDirectory(
|
||||
mockDirectoryWithEditLogs("/foo1/current/"
|
||||
+ getFinalizedEditsFileName(1,1),
|
||||
"/foo1/current/"
|
||||
+ getFinalizedEditsFileName(2,200)));
|
||||
inspector.inspectDirectory(
|
||||
mockDirectoryWithEditLogs("/foo2/current/"
|
||||
+ getInProgressEditsFileName(1),
|
||||
"/foo2/current/"
|
||||
+ getFinalizedEditsFileName(201, 400)));
|
||||
inspector.inspectDirectory(
|
||||
mockDirectoryWithEditLogs("/foo3/current/"
|
||||
+ getFinalizedEditsFileName(1, 1),
|
||||
"/foo3/current/"
|
||||
+ getFinalizedEditsFileName(2,200)));
|
||||
|
||||
assertEquals("[[1,1], [2,200], [201,400]]",
|
||||
inspector.getEditLogManifest(1).toString());
|
||||
assertEquals("[[2,200], [201,400]]",
|
||||
inspector.getEditLogManifest(2).toString());
|
||||
assertEquals("[[2,200], [201,400]]",
|
||||
inspector.getEditLogManifest(10).toString());
|
||||
assertEquals("[[201,400]]",
|
||||
inspector.getEditLogManifest(201).toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test case where an in-progress log is in an earlier name directory
|
||||
* than a finalized log. Previously, getEditLogManifest wouldn't
|
||||
|
@ -426,46 +389,9 @@ public class TestFSImageStorageInspector {
|
|||
+ getFinalizedEditsFileName(2626,2627),
|
||||
"/foo2/current/"
|
||||
+ getFinalizedEditsFileName(2628,2629)));
|
||||
|
||||
assertEquals("[[2622,2623], [2624,2625], [2626,2627], [2628,2629]]",
|
||||
inspector.getEditLogManifest(2621).toString());
|
||||
}
|
||||
|
||||
private StorageDirectory mockDirectoryWithEditLogs(String... fileNames) {
|
||||
return mockDirectory(NameNodeDirType.EDITS, false, fileNames);
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a mock storage directory that returns some set of file contents.
|
||||
* @param type type of storage dir
|
||||
* @param previousExists should we mock that the previous/ dir exists?
|
||||
* @param fileNames the names of files contained in current/
|
||||
*/
|
||||
static StorageDirectory mockDirectory(
|
||||
StorageDirType type,
|
||||
boolean previousExists,
|
||||
String... fileNames) {
|
||||
StorageDirectory sd = mock(StorageDirectory.class);
|
||||
|
||||
doReturn(type).when(sd).getStorageDirType();
|
||||
|
||||
// Version file should always exist
|
||||
doReturn(FSImageTestUtil.mockFile(true)).when(sd).getVersionFile();
|
||||
|
||||
// Previous dir optionally exists
|
||||
doReturn(FSImageTestUtil.mockFile(previousExists))
|
||||
.when(sd).getPreviousDir();
|
||||
|
||||
// Return a mock 'current' directory which has the given paths
|
||||
File[] files = new File[fileNames.length];
|
||||
for (int i = 0; i < fileNames.length; i++) {
|
||||
files[i] = new File(fileNames[i]);
|
||||
}
|
||||
|
||||
File mockDir = Mockito.spy(new File("/dir/current"));
|
||||
doReturn(files).when(mockDir).listFiles();
|
||||
doReturn(mockDir).when(sd).getCurrentDir();
|
||||
|
||||
return sd;
|
||||
static StorageDirectory mockDirectoryWithEditLogs(String... fileNames) {
|
||||
return FSImageTestUtil.mockStorageDirectory(NameNodeDirType.EDITS, false, fileNames);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
public class TestFileJournalManager {
|
||||
|
||||
@Test
|
||||
public void testGetRemoteEditLog() throws IOException {
|
||||
StorageDirectory sd = FSImageTestUtil.mockStorageDirectory(
|
||||
NameNodeDirType.EDITS, false,
|
||||
NNStorage.getFinalizedEditsFileName(1, 100),
|
||||
NNStorage.getFinalizedEditsFileName(101, 200),
|
||||
NNStorage.getInProgressEditsFileName(201),
|
||||
NNStorage.getFinalizedEditsFileName(1001, 1100));
|
||||
|
||||
FileJournalManager fjm = new FileJournalManager(sd);
|
||||
assertEquals("[1,100],[101,200],[1001,1100]", getLogsAsString(fjm, 1));
|
||||
assertEquals("[101,200],[1001,1100]", getLogsAsString(fjm, 101));
|
||||
assertEquals("[1001,1100]", getLogsAsString(fjm, 201));
|
||||
try {
|
||||
assertEquals("[]", getLogsAsString(fjm, 150));
|
||||
fail("Did not throw when asking for a txn in the middle of a log");
|
||||
} catch (IOException ioe) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"150 which is in the middle", ioe);
|
||||
}
|
||||
assertEquals("Asking for a newer log than exists should return empty list",
|
||||
"", getLogsAsString(fjm, 9999));
|
||||
}
|
||||
|
||||
private static String getLogsAsString(
|
||||
FileJournalManager fjm, long firstTxId) throws IOException {
|
||||
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId));
|
||||
}
|
||||
|
||||
}
|
|
@ -216,7 +216,7 @@ public class TestNNStorageRetentionManager {
|
|||
}
|
||||
|
||||
StorageDirectory mockStorageDir() {
|
||||
return TestFSImageStorageInspector.mockDirectory(
|
||||
return FSImageTestUtil.mockStorageDirectory(
|
||||
type, false,
|
||||
files.toArray(new String[0]));
|
||||
}
|
||||
|
|
|
@ -140,17 +140,9 @@ public class TestStorageRestore {
|
|||
/**
|
||||
* test
|
||||
*/
|
||||
public void printStorages(FSImage fs) {
|
||||
LOG.info("current storages and corresponding sizes:");
|
||||
for(Iterator<StorageDirectory> it = fs.getStorage().dirIterator(); it.hasNext(); ) {
|
||||
StorageDirectory sd = it.next();
|
||||
|
||||
File curDir = sd.getCurrentDir();
|
||||
for (File f : curDir.listFiles()) {
|
||||
LOG.info(" file " + f.getAbsolutePath() + "; len = " + f.length());
|
||||
}
|
||||
}
|
||||
}
|
||||
private void printStorages(FSImage image) {
|
||||
FSImageTestUtil.logStorageContents(LOG, image.getStorage());
|
||||
}
|
||||
|
||||
/**
|
||||
* test
|
||||
|
|
Loading…
Reference in New Issue