HDFS-7413. Some unit tests should use NameNodeProtocols instead of FSNameSystem. Contributed by Haohui Mai.
This commit is contained in:
parent
a9a0cc3679
commit
acf1e033b0
|
@ -378,6 +378,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7409. Allow dead nodes to finish decommissioning if all files are
|
HDFS-7409. Allow dead nodes to finish decommissioning if all files are
|
||||||
fully replicated. (wang)
|
fully replicated. (wang)
|
||||||
|
|
||||||
|
HDFS-7413. Some unit tests should use NameNodeProtocols instead of
|
||||||
|
FSNameSystem. (wheat9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
|
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -99,15 +100,15 @@ public class TestEditLogRace {
|
||||||
// an object that does a bunch of transactions
|
// an object that does a bunch of transactions
|
||||||
//
|
//
|
||||||
static class Transactions implements Runnable {
|
static class Transactions implements Runnable {
|
||||||
final FSNamesystem namesystem;
|
final NamenodeProtocols nn;
|
||||||
short replication = 3;
|
short replication = 3;
|
||||||
long blockSize = 64;
|
long blockSize = 64;
|
||||||
volatile boolean stopped = false;
|
volatile boolean stopped = false;
|
||||||
volatile Thread thr;
|
volatile Thread thr;
|
||||||
final AtomicReference<Throwable> caught;
|
final AtomicReference<Throwable> caught;
|
||||||
|
|
||||||
Transactions(FSNamesystem ns, AtomicReference<Throwable> caught) {
|
Transactions(NamenodeProtocols ns, AtomicReference<Throwable> caught) {
|
||||||
namesystem = ns;
|
nn = ns;
|
||||||
this.caught = caught;
|
this.caught = caught;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,14 +116,14 @@ public class TestEditLogRace {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
thr = Thread.currentThread();
|
thr = Thread.currentThread();
|
||||||
PermissionStatus p = namesystem.createFsOwnerPermissions(
|
FsPermission p = new FsPermission((short)0777);
|
||||||
new FsPermission((short)0777));
|
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while (!stopped) {
|
while (!stopped) {
|
||||||
try {
|
try {
|
||||||
String dirname = "/thr-" + thr.getId() + "-dir-" + i;
|
String dirname = "/thr-" + thr.getId() + "-dir-" + i;
|
||||||
namesystem.mkdirs(dirname, p, true);
|
nn.mkdirs(dirname, p, true);
|
||||||
namesystem.delete(dirname, true);
|
nn.delete(dirname, true);
|
||||||
} catch (SafeModeException sme) {
|
} catch (SafeModeException sme) {
|
||||||
// This is OK - the tests will bring NN in and out of safemode
|
// This is OK - the tests will bring NN in and out of safemode
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
@ -143,7 +144,7 @@ public class TestEditLogRace {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startTransactionWorkers(FSNamesystem namesystem,
|
private void startTransactionWorkers(NamenodeProtocols namesystem,
|
||||||
AtomicReference<Throwable> caughtErr) {
|
AtomicReference<Throwable> caughtErr) {
|
||||||
// Create threads and make them run transactions concurrently.
|
// Create threads and make them run transactions concurrently.
|
||||||
for (int i = 0; i < NUM_THREADS; i++) {
|
for (int i = 0; i < NUM_THREADS; i++) {
|
||||||
|
@ -163,7 +164,7 @@ public class TestEditLogRace {
|
||||||
Thread thr = worker.getThread();
|
Thread thr = worker.getThread();
|
||||||
try {
|
try {
|
||||||
if (thr != null) thr.join();
|
if (thr != null) thr.join();
|
||||||
} catch (InterruptedException ie) {}
|
} catch (InterruptedException ignored) {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,11 +184,11 @@ public class TestEditLogRace {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
fileSys = cluster.getFileSystem();
|
fileSys = cluster.getFileSystem();
|
||||||
final FSNamesystem namesystem = cluster.getNamesystem();
|
final NamenodeProtocols nn = cluster.getNameNode().getRpcServer();
|
||||||
FSImage fsimage = namesystem.getFSImage();
|
FSImage fsimage = cluster.getNamesystem().getFSImage();
|
||||||
StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
|
StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
|
||||||
|
|
||||||
startTransactionWorkers(namesystem, caughtErr);
|
startTransactionWorkers(nn, caughtErr);
|
||||||
|
|
||||||
long previousLogTxId = 1;
|
long previousLogTxId = 1;
|
||||||
|
|
||||||
|
@ -197,12 +198,13 @@ public class TestEditLogRace {
|
||||||
} catch (InterruptedException e) {}
|
} catch (InterruptedException e) {}
|
||||||
|
|
||||||
LOG.info("Starting roll " + i + ".");
|
LOG.info("Starting roll " + i + ".");
|
||||||
CheckpointSignature sig = namesystem.rollEditLog();
|
CheckpointSignature sig = nn.rollEditLog();
|
||||||
|
|
||||||
long nextLog = sig.curSegmentTxId;
|
long nextLog = sig.curSegmentTxId;
|
||||||
String logFileName = NNStorage.getFinalizedEditsFileName(
|
String logFileName = NNStorage.getFinalizedEditsFileName(
|
||||||
previousLogTxId, nextLog - 1);
|
previousLogTxId, nextLog - 1);
|
||||||
previousLogTxId += verifyEditLogs(namesystem, fsimage, logFileName, previousLogTxId);
|
previousLogTxId += verifyEditLogs(cluster.getNamesystem(), fsimage,
|
||||||
|
logFileName, previousLogTxId);
|
||||||
|
|
||||||
assertEquals(previousLogTxId, nextLog);
|
assertEquals(previousLogTxId, nextLog);
|
||||||
|
|
||||||
|
@ -264,16 +266,17 @@ public class TestEditLogRace {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
fileSys = cluster.getFileSystem();
|
fileSys = cluster.getFileSystem();
|
||||||
final FSNamesystem namesystem = cluster.getNamesystem();
|
final FSNamesystem namesystem = cluster.getNamesystem();
|
||||||
|
final NamenodeProtocols nn = cluster.getNameNodeRpc();
|
||||||
|
|
||||||
FSImage fsimage = namesystem.getFSImage();
|
FSImage fsimage = namesystem.getFSImage();
|
||||||
FSEditLog editLog = fsimage.getEditLog();
|
FSEditLog editLog = fsimage.getEditLog();
|
||||||
|
|
||||||
startTransactionWorkers(namesystem, caughtErr);
|
startTransactionWorkers(nn, caughtErr);
|
||||||
|
|
||||||
for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) {
|
for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(20);
|
Thread.sleep(20);
|
||||||
} catch (InterruptedException e) {}
|
} catch (InterruptedException ignored) {}
|
||||||
|
|
||||||
|
|
||||||
LOG.info("Save " + i + ": entering safe mode");
|
LOG.info("Save " + i + ": entering safe mode");
|
||||||
|
@ -433,7 +436,7 @@ public class TestEditLogRace {
|
||||||
NNStorage.getInProgressEditsFileName(4),
|
NNStorage.getInProgressEditsFileName(4),
|
||||||
4));
|
4));
|
||||||
} finally {
|
} finally {
|
||||||
LOG.info("Closing namesystem");
|
LOG.info("Closing nn");
|
||||||
if(namesystem != null) namesystem.close();
|
if(namesystem != null) namesystem.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -527,7 +530,7 @@ public class TestEditLogRace {
|
||||||
NNStorage.getInProgressEditsFileName(4),
|
NNStorage.getInProgressEditsFileName(4),
|
||||||
4));
|
4));
|
||||||
} finally {
|
} finally {
|
||||||
LOG.info("Closing namesystem");
|
LOG.info("Closing nn");
|
||||||
if(namesystem != null) namesystem.close();
|
if(namesystem != null) namesystem.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.io.InputStreamReader;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
@ -49,7 +50,7 @@ public class TestMetaSave {
|
||||||
static final int blockSize = 8192;
|
static final int blockSize = 8192;
|
||||||
private static MiniDFSCluster cluster = null;
|
private static MiniDFSCluster cluster = null;
|
||||||
private static FileSystem fileSys = null;
|
private static FileSystem fileSys = null;
|
||||||
private static FSNamesystem namesystem = null;
|
private static NamenodeProtocols nnRpc = null;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUp() throws IOException {
|
public static void setUp() throws IOException {
|
||||||
|
@ -64,7 +65,7 @@ public class TestMetaSave {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
fileSys = cluster.getFileSystem();
|
fileSys = cluster.getFileSystem();
|
||||||
namesystem = cluster.getNamesystem();
|
nnRpc = cluster.getNameNodeRpc();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -81,9 +82,9 @@ public class TestMetaSave {
|
||||||
cluster.stopDataNode(1);
|
cluster.stopDataNode(1);
|
||||||
// wait for namenode to discover that a datanode is dead
|
// wait for namenode to discover that a datanode is dead
|
||||||
Thread.sleep(15000);
|
Thread.sleep(15000);
|
||||||
namesystem.setReplication("/filestatus0", (short) 4);
|
nnRpc.setReplication("/filestatus0", (short) 4);
|
||||||
|
|
||||||
namesystem.metaSave("metasave.out.txt");
|
nnRpc.metaSave("metasave.out.txt");
|
||||||
|
|
||||||
// Verification
|
// Verification
|
||||||
FileInputStream fstream = new FileInputStream(getLogFile(
|
FileInputStream fstream = new FileInputStream(getLogFile(
|
||||||
|
@ -100,7 +101,7 @@ public class TestMetaSave {
|
||||||
assertTrue(line.equals("Live Datanodes: 1"));
|
assertTrue(line.equals("Live Datanodes: 1"));
|
||||||
line = reader.readLine();
|
line = reader.readLine();
|
||||||
assertTrue(line.equals("Dead Datanodes: 1"));
|
assertTrue(line.equals("Dead Datanodes: 1"));
|
||||||
line = reader.readLine();
|
reader.readLine();
|
||||||
line = reader.readLine();
|
line = reader.readLine();
|
||||||
assertTrue(line.matches("^/filestatus[01]:.*"));
|
assertTrue(line.matches("^/filestatus[01]:.*"));
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -124,11 +125,11 @@ public class TestMetaSave {
|
||||||
cluster.stopDataNode(1);
|
cluster.stopDataNode(1);
|
||||||
// wait for namenode to discover that a datanode is dead
|
// wait for namenode to discover that a datanode is dead
|
||||||
Thread.sleep(15000);
|
Thread.sleep(15000);
|
||||||
namesystem.setReplication("/filestatus0", (short) 4);
|
nnRpc.setReplication("/filestatus0", (short) 4);
|
||||||
namesystem.delete("/filestatus0", true);
|
nnRpc.delete("/filestatus0", true);
|
||||||
namesystem.delete("/filestatus1", true);
|
nnRpc.delete("/filestatus1", true);
|
||||||
|
|
||||||
namesystem.metaSave("metasaveAfterDelete.out.txt");
|
nnRpc.metaSave("metasaveAfterDelete.out.txt");
|
||||||
|
|
||||||
// Verification
|
// Verification
|
||||||
BufferedReader reader = null;
|
BufferedReader reader = null;
|
||||||
|
@ -160,8 +161,8 @@ public class TestMetaSave {
|
||||||
@Test
|
@Test
|
||||||
public void testMetaSaveOverwrite() throws Exception {
|
public void testMetaSaveOverwrite() throws Exception {
|
||||||
// metaSave twice.
|
// metaSave twice.
|
||||||
namesystem.metaSave("metaSaveOverwrite.out.txt");
|
nnRpc.metaSave("metaSaveOverwrite.out.txt");
|
||||||
namesystem.metaSave("metaSaveOverwrite.out.txt");
|
nnRpc.metaSave("metaSaveOverwrite.out.txt");
|
||||||
|
|
||||||
// Read output file.
|
// Read output file.
|
||||||
FileInputStream fis = null;
|
FileInputStream fis = null;
|
||||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Options.Rename;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
@ -47,6 +46,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.ipc.ClientId;
|
import org.apache.hadoop.ipc.ClientId;
|
||||||
import org.apache.hadoop.ipc.RPC.RpcKind;
|
import org.apache.hadoop.ipc.RPC.RpcKind;
|
||||||
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
|
import org.apache.hadoop.ipc.RetryCache.CacheEntry;
|
||||||
|
@ -77,9 +78,8 @@ import org.junit.Test;
|
||||||
public class TestNamenodeRetryCache {
|
public class TestNamenodeRetryCache {
|
||||||
private static final byte[] CLIENT_ID = ClientId.getClientId();
|
private static final byte[] CLIENT_ID = ClientId.getClientId();
|
||||||
private static MiniDFSCluster cluster;
|
private static MiniDFSCluster cluster;
|
||||||
private static FSNamesystem namesystem;
|
private static NamenodeProtocols nnRpc;
|
||||||
private static final PermissionStatus perm = new PermissionStatus(
|
private static final FsPermission perm = FsPermission.getDefault();
|
||||||
"TestNamenodeRetryCache", null, FsPermission.getDefault());
|
|
||||||
private static DistributedFileSystem filesystem;
|
private static DistributedFileSystem filesystem;
|
||||||
private static int callId = 100;
|
private static int callId = 100;
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
|
@ -94,7 +94,7 @@ public class TestNamenodeRetryCache {
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
|
||||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
namesystem = cluster.getNamesystem();
|
nnRpc = cluster.getNameNode().getRpcServer();
|
||||||
filesystem = cluster.getFileSystem();
|
filesystem = cluster.getFileSystem();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,10 +108,6 @@ public class TestNamenodeRetryCache {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void incrementCallId() {
|
|
||||||
callId++;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Set the current Server RPC call */
|
/** Set the current Server RPC call */
|
||||||
public static void newCall() {
|
public static void newCall() {
|
||||||
Server.Call call = new Server.Call(++callId, 1, null, null,
|
Server.Call call = new Server.Call(++callId, 1, null, null,
|
||||||
|
@ -142,15 +138,15 @@ public class TestNamenodeRetryCache {
|
||||||
// Two retried concat calls succeed
|
// Two retried concat calls succeed
|
||||||
concatSetup(file1, file2);
|
concatSetup(file1, file2);
|
||||||
newCall();
|
newCall();
|
||||||
namesystem.concat(file1, new String[]{file2});
|
nnRpc.concat(file1, new String[]{file2});
|
||||||
namesystem.concat(file1, new String[]{file2});
|
nnRpc.concat(file1, new String[]{file2});
|
||||||
namesystem.concat(file1, new String[]{file2});
|
nnRpc.concat(file1, new String[]{file2});
|
||||||
|
|
||||||
// A non-retried concat request fails
|
// A non-retried concat request fails
|
||||||
newCall();
|
newCall();
|
||||||
try {
|
try {
|
||||||
// Second non-retry call should fail with an exception
|
// Second non-retry call should fail with an exception
|
||||||
namesystem.concat(file1, new String[]{file2});
|
nnRpc.concat(file1, new String[]{file2});
|
||||||
Assert.fail("testConcat - expected exception is not thrown");
|
Assert.fail("testConcat - expected exception is not thrown");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// Expected
|
// Expected
|
||||||
|
@ -165,15 +161,15 @@ public class TestNamenodeRetryCache {
|
||||||
String dir = "/testNamenodeRetryCache/testDelete";
|
String dir = "/testNamenodeRetryCache/testDelete";
|
||||||
// Two retried calls to create a non existent file
|
// Two retried calls to create a non existent file
|
||||||
newCall();
|
newCall();
|
||||||
namesystem.mkdirs(dir, perm, true);
|
nnRpc.mkdirs(dir, perm, true);
|
||||||
newCall();
|
newCall();
|
||||||
Assert.assertTrue(namesystem.delete(dir, false));
|
Assert.assertTrue(nnRpc.delete(dir, false));
|
||||||
Assert.assertTrue(namesystem.delete(dir, false));
|
Assert.assertTrue(nnRpc.delete(dir, false));
|
||||||
Assert.assertTrue(namesystem.delete(dir, false));
|
Assert.assertTrue(nnRpc.delete(dir, false));
|
||||||
|
|
||||||
// non-retried call fails and gets false as return
|
// non-retried call fails and gets false as return
|
||||||
newCall();
|
newCall();
|
||||||
Assert.assertFalse(namesystem.delete(dir, false));
|
Assert.assertFalse(nnRpc.delete(dir, false));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -185,15 +181,15 @@ public class TestNamenodeRetryCache {
|
||||||
|
|
||||||
// Two retried symlink calls succeed
|
// Two retried symlink calls succeed
|
||||||
newCall();
|
newCall();
|
||||||
namesystem.createSymlink(target, "/a/b", perm, true);
|
nnRpc.createSymlink(target, "/a/b", perm, true);
|
||||||
namesystem.createSymlink(target, "/a/b", perm, true);
|
nnRpc.createSymlink(target, "/a/b", perm, true);
|
||||||
namesystem.createSymlink(target, "/a/b", perm, true);
|
nnRpc.createSymlink(target, "/a/b", perm, true);
|
||||||
|
|
||||||
// non-retried call fails
|
// non-retried call fails
|
||||||
newCall();
|
newCall();
|
||||||
try {
|
try {
|
||||||
// Second non-retry call should fail with an exception
|
// Second non-retry call should fail with an exception
|
||||||
namesystem.createSymlink(target, "/a/b", perm, true);
|
nnRpc.createSymlink(target, "/a/b", perm, true);
|
||||||
Assert.fail("testCreateSymlink - expected exception is not thrown");
|
Assert.fail("testCreateSymlink - expected exception is not thrown");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// Expected
|
// Expected
|
||||||
|
@ -208,21 +204,16 @@ public class TestNamenodeRetryCache {
|
||||||
String src = "/testNamenodeRetryCache/testCreate/file";
|
String src = "/testNamenodeRetryCache/testCreate/file";
|
||||||
// Two retried calls succeed
|
// Two retried calls succeed
|
||||||
newCall();
|
newCall();
|
||||||
HdfsFileStatus status = namesystem.startFile(src, perm, "holder",
|
HdfsFileStatus status = nnRpc.create(src, perm, "holder",
|
||||||
"clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1,
|
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true,
|
||||||
BlockSize, null);
|
(short) 1, BlockSize, null);
|
||||||
Assert.assertEquals(status, namesystem.startFile(src, perm,
|
Assert.assertEquals(status, nnRpc.create(src, perm, "holder", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, BlockSize, null));
|
||||||
"holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
|
Assert.assertEquals(status, nnRpc.create(src, perm, "holder", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, BlockSize, null));
|
||||||
true, (short) 1, BlockSize, null));
|
|
||||||
Assert.assertEquals(status, namesystem.startFile(src, perm,
|
|
||||||
"holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
|
|
||||||
true, (short) 1, BlockSize, null));
|
|
||||||
|
|
||||||
// A non-retried call fails
|
// A non-retried call fails
|
||||||
newCall();
|
newCall();
|
||||||
try {
|
try {
|
||||||
namesystem.startFile(src, perm, "holder", "clientmachine",
|
nnRpc.create(src, perm, "holder", new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true, (short) 1, BlockSize, null);
|
||||||
EnumSet.of(CreateFlag.CREATE), true, (short) 1, BlockSize, null);
|
|
||||||
Assert.fail("testCreate - expected exception is not thrown");
|
Assert.fail("testCreate - expected exception is not thrown");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// expected
|
// expected
|
||||||
|
@ -241,14 +232,14 @@ public class TestNamenodeRetryCache {
|
||||||
|
|
||||||
// Retried append requests succeed
|
// Retried append requests succeed
|
||||||
newCall();
|
newCall();
|
||||||
LocatedBlock b = namesystem.appendFile(src, "holder", "clientMachine");
|
LocatedBlock b = nnRpc.append(src, "holder");
|
||||||
Assert.assertEquals(b, namesystem.appendFile(src, "holder", "clientMachine"));
|
Assert.assertEquals(b, nnRpc.append(src, "holder"));
|
||||||
Assert.assertEquals(b, namesystem.appendFile(src, "holder", "clientMachine"));
|
Assert.assertEquals(b, nnRpc.append(src, "holder"));
|
||||||
|
|
||||||
// non-retried call fails
|
// non-retried call fails
|
||||||
newCall();
|
newCall();
|
||||||
try {
|
try {
|
||||||
namesystem.appendFile(src, "holder", "clientMachine");
|
nnRpc.append(src, "holder");
|
||||||
Assert.fail("testAppend - expected exception is not thrown");
|
Assert.fail("testAppend - expected exception is not thrown");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// Expected
|
// Expected
|
||||||
|
@ -264,17 +255,17 @@ public class TestNamenodeRetryCache {
|
||||||
String src = "/testNamenodeRetryCache/testRename1/src";
|
String src = "/testNamenodeRetryCache/testRename1/src";
|
||||||
String target = "/testNamenodeRetryCache/testRename1/target";
|
String target = "/testNamenodeRetryCache/testRename1/target";
|
||||||
resetCall();
|
resetCall();
|
||||||
namesystem.mkdirs(src, perm, true);
|
nnRpc.mkdirs(src, perm, true);
|
||||||
|
|
||||||
// Retried renames succeed
|
// Retried renames succeed
|
||||||
newCall();
|
newCall();
|
||||||
Assert.assertTrue(namesystem.renameTo(src, target));
|
Assert.assertTrue(nnRpc.rename(src, target));
|
||||||
Assert.assertTrue(namesystem.renameTo(src, target));
|
Assert.assertTrue(nnRpc.rename(src, target));
|
||||||
Assert.assertTrue(namesystem.renameTo(src, target));
|
Assert.assertTrue(nnRpc.rename(src, target));
|
||||||
|
|
||||||
// A non-retried request fails
|
// A non-retried request fails
|
||||||
newCall();
|
newCall();
|
||||||
Assert.assertFalse(namesystem.renameTo(src, target));
|
Assert.assertFalse(nnRpc.rename(src, target));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -285,18 +276,18 @@ public class TestNamenodeRetryCache {
|
||||||
String src = "/testNamenodeRetryCache/testRename2/src";
|
String src = "/testNamenodeRetryCache/testRename2/src";
|
||||||
String target = "/testNamenodeRetryCache/testRename2/target";
|
String target = "/testNamenodeRetryCache/testRename2/target";
|
||||||
resetCall();
|
resetCall();
|
||||||
namesystem.mkdirs(src, perm, true);
|
nnRpc.mkdirs(src, perm, true);
|
||||||
|
|
||||||
// Retried renames succeed
|
// Retried renames succeed
|
||||||
newCall();
|
newCall();
|
||||||
namesystem.renameTo(src, target, Rename.NONE);
|
nnRpc.rename2(src, target, Rename.NONE);
|
||||||
namesystem.renameTo(src, target, Rename.NONE);
|
nnRpc.rename2(src, target, Rename.NONE);
|
||||||
namesystem.renameTo(src, target, Rename.NONE);
|
nnRpc.rename2(src, target, Rename.NONE);
|
||||||
|
|
||||||
// A non-retried request fails
|
// A non-retried request fails
|
||||||
newCall();
|
newCall();
|
||||||
try {
|
try {
|
||||||
namesystem.renameTo(src, target, Rename.NONE);
|
nnRpc.rename2(src, target, Rename.NONE);
|
||||||
Assert.fail("testRename 2 expected exception is not thrown");
|
Assert.fail("testRename 2 expected exception is not thrown");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// expected
|
// expected
|
||||||
|
@ -310,11 +301,12 @@ public class TestNamenodeRetryCache {
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testUpdatePipelineWithFailOver() throws Exception {
|
public void testUpdatePipelineWithFailOver() throws Exception {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
namesystem = null;
|
nnRpc = null;
|
||||||
filesystem = null;
|
filesystem = null;
|
||||||
cluster = new MiniDFSCluster.Builder(conf).nnTopology(
|
cluster = new MiniDFSCluster.Builder(conf).nnTopology(
|
||||||
MiniDFSNNTopology.simpleHATopology()).numDataNodes(1).build();
|
MiniDFSNNTopology.simpleHATopology()).numDataNodes(1).build();
|
||||||
FSNamesystem ns0 = cluster.getNamesystem(0);
|
cluster.waitActive();
|
||||||
|
NamenodeProtocols ns0 = cluster.getNameNodeRpc(0);
|
||||||
ExtendedBlock oldBlock = new ExtendedBlock();
|
ExtendedBlock oldBlock = new ExtendedBlock();
|
||||||
ExtendedBlock newBlock = new ExtendedBlock();
|
ExtendedBlock newBlock = new ExtendedBlock();
|
||||||
DatanodeID[] newNodes = new DatanodeID[2];
|
DatanodeID[] newNodes = new DatanodeID[2];
|
||||||
|
@ -345,20 +337,20 @@ public class TestNamenodeRetryCache {
|
||||||
public void testSnapshotMethods() throws Exception {
|
public void testSnapshotMethods() throws Exception {
|
||||||
String dir = "/testNamenodeRetryCache/testCreateSnapshot/src";
|
String dir = "/testNamenodeRetryCache/testCreateSnapshot/src";
|
||||||
resetCall();
|
resetCall();
|
||||||
namesystem.mkdirs(dir, perm, true);
|
nnRpc.mkdirs(dir, perm, true);
|
||||||
namesystem.allowSnapshot(dir);
|
nnRpc.allowSnapshot(dir);
|
||||||
|
|
||||||
// Test retry of create snapshot
|
// Test retry of create snapshot
|
||||||
newCall();
|
newCall();
|
||||||
String name = namesystem.createSnapshot(dir, "snap1");
|
String name = nnRpc.createSnapshot(dir, "snap1");
|
||||||
Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
|
Assert.assertEquals(name, nnRpc.createSnapshot(dir, "snap1"));
|
||||||
Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
|
Assert.assertEquals(name, nnRpc.createSnapshot(dir, "snap1"));
|
||||||
Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
|
Assert.assertEquals(name, nnRpc.createSnapshot(dir, "snap1"));
|
||||||
|
|
||||||
// Non retried calls should fail
|
// Non retried calls should fail
|
||||||
newCall();
|
newCall();
|
||||||
try {
|
try {
|
||||||
namesystem.createSnapshot(dir, "snap1");
|
nnRpc.createSnapshot(dir, "snap1");
|
||||||
Assert.fail("testSnapshotMethods expected exception is not thrown");
|
Assert.fail("testSnapshotMethods expected exception is not thrown");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// exptected
|
// exptected
|
||||||
|
@ -366,14 +358,14 @@ public class TestNamenodeRetryCache {
|
||||||
|
|
||||||
// Test retry of rename snapshot
|
// Test retry of rename snapshot
|
||||||
newCall();
|
newCall();
|
||||||
namesystem.renameSnapshot(dir, "snap1", "snap2");
|
nnRpc.renameSnapshot(dir, "snap1", "snap2");
|
||||||
namesystem.renameSnapshot(dir, "snap1", "snap2");
|
nnRpc.renameSnapshot(dir, "snap1", "snap2");
|
||||||
namesystem.renameSnapshot(dir, "snap1", "snap2");
|
nnRpc.renameSnapshot(dir, "snap1", "snap2");
|
||||||
|
|
||||||
// Non retried calls should fail
|
// Non retried calls should fail
|
||||||
newCall();
|
newCall();
|
||||||
try {
|
try {
|
||||||
namesystem.renameSnapshot(dir, "snap1", "snap2");
|
nnRpc.renameSnapshot(dir, "snap1", "snap2");
|
||||||
Assert.fail("testSnapshotMethods expected exception is not thrown");
|
Assert.fail("testSnapshotMethods expected exception is not thrown");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// expected
|
// expected
|
||||||
|
@ -381,14 +373,14 @@ public class TestNamenodeRetryCache {
|
||||||
|
|
||||||
// Test retry of delete snapshot
|
// Test retry of delete snapshot
|
||||||
newCall();
|
newCall();
|
||||||
namesystem.deleteSnapshot(dir, "snap2");
|
nnRpc.deleteSnapshot(dir, "snap2");
|
||||||
namesystem.deleteSnapshot(dir, "snap2");
|
nnRpc.deleteSnapshot(dir, "snap2");
|
||||||
namesystem.deleteSnapshot(dir, "snap2");
|
nnRpc.deleteSnapshot(dir, "snap2");
|
||||||
|
|
||||||
// Non retried calls should fail
|
// Non retried calls should fail
|
||||||
newCall();
|
newCall();
|
||||||
try {
|
try {
|
||||||
namesystem.deleteSnapshot(dir, "snap2");
|
nnRpc.deleteSnapshot(dir, "snap2");
|
||||||
Assert.fail("testSnapshotMethods expected exception is not thrown");
|
Assert.fail("testSnapshotMethods expected exception is not thrown");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// expected
|
// expected
|
||||||
|
@ -413,6 +405,7 @@ public class TestNamenodeRetryCache {
|
||||||
@Test
|
@Test
|
||||||
public void testRetryCacheRebuild() throws Exception {
|
public void testRetryCacheRebuild() throws Exception {
|
||||||
DFSTestUtil.runOperations(cluster, filesystem, conf, BlockSize, 0);
|
DFSTestUtil.runOperations(cluster, filesystem, conf, BlockSize, 0);
|
||||||
|
FSNamesystem namesystem = cluster.getNamesystem();
|
||||||
|
|
||||||
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
|
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
|
||||||
(LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet();
|
(LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet();
|
||||||
|
@ -429,8 +422,8 @@ public class TestNamenodeRetryCache {
|
||||||
// restart NameNode
|
// restart NameNode
|
||||||
cluster.restartNameNode();
|
cluster.restartNameNode();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
namesystem = cluster.getNamesystem();
|
|
||||||
|
|
||||||
|
namesystem = cluster.getNamesystem();
|
||||||
// check retry cache
|
// check retry cache
|
||||||
assertTrue(namesystem.hasRetryCache());
|
assertTrue(namesystem.hasRetryCache());
|
||||||
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
|
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
|
||||||
|
|
Loading…
Reference in New Issue