From 69e5f90e9febf37d2cdd69c485729c448ac3cabc Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Mon, 21 Oct 2013 19:42:35 +0000 Subject: [PATCH] HDFS-5385. Caching RPCs are AtMostOnce, but do not persist client ID and call ID to edit log. (Chris Nauroth via Colin Patrick McCabe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1534345 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES-HDFS-4949.txt | 4 + .../hdfs/server/namenode/FSEditLogOp.java | 50 +++- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 14 + .../namenode/TestNamenodeRetryCache.java | 4 +- .../namenode/ha/TestRetryCacheWithHA.java | 246 +++++++++++++++++- .../src/test/resources/editsStored | Bin 4499 -> 4609 bytes .../src/test/resources/editsStored.xml | 10 + 7 files changed, 319 insertions(+), 9 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt index 5da36d0270f..03e9438a26e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt @@ -111,3 +111,7 @@ HDFS-4949 (Unreleased) HDFS-5203. Concurrent clients that add a cache directive on the same path may prematurely uncache from each other. (Chris Nauroth via Colin Patrick McCabe) + + HDFS-5385. Caching RPCs are AtMostOnce, but do not persist client ID and + call ID to edit log. (Chris Nauroth via Colin Patrick McCabe) + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index e7123390ac9..302c1615bca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -2861,6 +2861,10 @@ public abstract class FSEditLogOp { } } + /** + * {@literal @AtMostOnce} for + * {@link ClientProtocol#addPathBasedCacheDirective} + */ static class AddPathBasedCacheDirectiveOp extends FSEditLogOp { String path; short replication; @@ -2895,6 +2899,7 @@ public abstract class FSEditLogOp { this.path = FSImageSerialization.readString(in); this.replication = FSImageSerialization.readShort(in); this.pool = FSImageSerialization.readString(in); + readRpcIds(in, logVersion); } @Override @@ -2902,6 +2907,7 @@ public abstract class FSEditLogOp { FSImageSerialization.writeString(path, out); FSImageSerialization.writeShort(replication, out); FSImageSerialization.writeString(pool, out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override @@ -2910,6 +2916,7 @@ public abstract class FSEditLogOp { XMLUtils.addSaxString(contentHandler, "REPLICATION", Short.toString(replication)); XMLUtils.addSaxString(contentHandler, "POOL", pool); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override @@ -2917,6 +2924,7 @@ public abstract class FSEditLogOp { path = st.getValue("PATH"); replication = Short.parseShort(st.getValue("REPLICATION")); pool = st.getValue("POOL"); + readRpcIdsFromXml(st); } @Override @@ -2925,11 +2933,17 @@ public abstract class FSEditLogOp { builder.append("AddPathBasedCacheDirective ["); builder.append("path=" + path + ","); builder.append("replication=" + replication + ","); - builder.append("pool=" + pool + "]"); + builder.append("pool=" + pool); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); + builder.append("]"); return builder.toString(); } } + /** + * {@literal @AtMostOnce} for + * {@link ClientProtocol#removePathBasedCacheDescriptor} + */ static class RemovePathBasedCacheDescriptorOp extends FSEditLogOp { long id; @@ -2950,32 +2964,39 @@ public abstract class FSEditLogOp { @Override void readFields(DataInputStream in, int logVersion) throws IOException { this.id = FSImageSerialization.readLong(in); + readRpcIds(in, logVersion); } @Override public void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeLong(id, out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override protected void toXml(ContentHandler contentHandler) throws SAXException { XMLUtils.addSaxString(contentHandler, "ID", Long.toString(id)); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { this.id = Long.parseLong(st.getValue("ID")); + readRpcIdsFromXml(st); } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("RemovePathBasedCacheDescriptor ["); - builder.append("id=" + Long.toString(id) + "]"); + builder.append("id=" + Long.toString(id)); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); + builder.append("]"); return builder.toString(); } } + /** {@literal @AtMostOnce} for {@link ClientProtocol#addCachePool} */ static class AddCachePoolOp extends FSEditLogOp { CachePoolInfo info; @@ -2995,21 +3016,25 @@ public abstract class FSEditLogOp { @Override void readFields(DataInputStream in, int logVersion) throws IOException { info = CachePoolInfo.readFrom(in); + readRpcIds(in, logVersion); } @Override public void writeFields(DataOutputStream out) throws IOException { info .writeTo(out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override protected void toXml(ContentHandler contentHandler) throws SAXException { - info .writeXmlTo(contentHandler); + info.writeXmlTo(contentHandler); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { this.info = CachePoolInfo.readXmlFrom(st); + readRpcIdsFromXml(st); } @Override @@ -3020,11 +3045,14 @@ public abstract class FSEditLogOp { builder.append("ownerName=" + info.getOwnerName() + ","); builder.append("groupName=" + info.getGroupName() + ","); builder.append("mode=" + Short.toString(info.getMode().toShort()) + ","); - builder.append("weight=" + Integer.toString(info.getWeight()) + "]"); + builder.append("weight=" + Integer.toString(info.getWeight())); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); + builder.append("]"); return builder.toString(); } } + /** {@literal @AtMostOnce} for {@link ClientProtocol#modifyCachePool} */ static class ModifyCachePoolOp extends FSEditLogOp { CachePoolInfo info; @@ -3044,21 +3072,25 @@ public abstract class FSEditLogOp { @Override void readFields(DataInputStream in, int logVersion) throws IOException { info = CachePoolInfo.readFrom(in); + readRpcIds(in, logVersion); } @Override public void writeFields(DataOutputStream out) throws IOException { info.writeTo(out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override protected void toXml(ContentHandler contentHandler) throws SAXException { cachePoolInfoToXml(contentHandler, info); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { this.info = cachePoolInfoFromXml(st); + readRpcIdsFromXml(st); } @Override @@ -3082,11 +3114,13 @@ public abstract class FSEditLogOp { fields.add("weight=" + info.getWeight()); } builder.append(Joiner.on(",").join(fields)); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); builder.append("]"); return builder.toString(); } } + /** {@literal @AtMostOnce} for {@link ClientProtocol#removeCachePool} */ static class RemoveCachePoolOp extends FSEditLogOp { String poolName; @@ -3106,28 +3140,34 @@ public abstract class FSEditLogOp { @Override void readFields(DataInputStream in, int logVersion) throws IOException { poolName = FSImageSerialization.readString(in); + readRpcIds(in, logVersion); } @Override public void writeFields(DataOutputStream out) throws IOException { FSImageSerialization.writeString(poolName, out); + writeRpcIds(rpcClientId, rpcCallId, out); } @Override protected void toXml(ContentHandler contentHandler) throws SAXException { XMLUtils.addSaxString(contentHandler, "POOLNAME", poolName); + appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { this.poolName = st.getValue("POOLNAME"); + readRpcIdsFromXml(st); } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("RemoveCachePoolOp ["); - builder.append("poolName=" + poolName + "]"); + builder.append("poolName=" + poolName); + appendRpcIdsToString(builder, rpcClientId, rpcCallId); + builder.append("]"); return builder.toString(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index d568b2cf9c3..8471eb721d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -993,6 +993,20 @@ public class DFSTestUtil { locatedBlocks = DFSClientAdapter.callGetBlockLocations( cluster.getNameNodeRpc(nnIndex), filePath, 0L, bytes.length); } while (locatedBlocks.isUnderConstruction()); + // OP_ADD_CACHE_POOL 35 + filesystem.addCachePool(new CachePoolInfo("pool1")); + // OP_MODIFY_CACHE_POOL 36 + filesystem.modifyCachePool(new CachePoolInfo("pool1").setWeight(99)); + // OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33 + PathBasedCacheDescriptor pbcd = filesystem.addPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setPath(new Path("/path")). + setPool("pool1"). + build()); + // OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR 34 + filesystem.removePathBasedCacheDescriptor(pbcd); + // OP_REMOVE_CACHE_POOL 37 + filesystem.removeCachePool("pool1"); } public static void abortStream(DFSOutputStream out) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java index ddb7c0fa692..576c3eaf113 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java @@ -413,7 +413,7 @@ public class TestNamenodeRetryCache { LightWeightCache cacheSet = (LightWeightCache) namesystem.getRetryCache().getCacheSet(); - assertEquals(14, cacheSet.size()); + assertEquals(19, cacheSet.size()); Map oldEntries = new HashMap(); @@ -432,7 +432,7 @@ public class TestNamenodeRetryCache { assertTrue(namesystem.hasRetryCache()); cacheSet = (LightWeightCache) namesystem .getRetryCache().getCacheSet(); - assertEquals(14, cacheSet.size()); + assertEquals(19, cacheSet.size()); iter = cacheSet.iterator(); while (iter.hasNext()) { CacheEntry entry = iter.next(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java index 82deab59386..44f4e64ab61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -53,12 +54,15 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor; +import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.INodeFile; @@ -147,7 +151,7 @@ public class TestRetryCacheWithHA { FSNamesystem fsn0 = cluster.getNamesystem(0); LightWeightCache cacheSet = (LightWeightCache) fsn0.getRetryCache().getCacheSet(); - assertEquals(14, cacheSet.size()); + assertEquals(19, cacheSet.size()); Map oldEntries = new HashMap(); @@ -168,7 +172,7 @@ public class TestRetryCacheWithHA { FSNamesystem fsn1 = cluster.getNamesystem(1); cacheSet = (LightWeightCache) fsn1 .getRetryCache().getCacheSet(); - assertEquals(14, cacheSet.size()); + assertEquals(19, cacheSet.size()); iter = cacheSet.iterator(); while (iter.hasNext()) { CacheEntry entry = iter.next(); @@ -733,6 +737,208 @@ public class TestRetryCacheWithHA { } } + /** addPathBasedCacheDirective */ + class AddPathBasedCacheDirectiveOp extends AtMostOnceOp { + private String pool; + private String path; + private PathBasedCacheDescriptor descriptor; + + AddPathBasedCacheDirectiveOp(DFSClient client, String pool, String path) { + super("addPathBasedCacheDirective", client); + this.pool = pool; + this.path = path; + } + + @Override + void prepare() throws Exception { + dfs.addCachePool(new CachePoolInfo(pool)); + } + + @Override + void invoke() throws Exception { + descriptor = client.addPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setPath(new Path(path)). + setPool(pool). + build()); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = + dfs.listPathBasedCacheDescriptors(pool, new Path(path)); + if (iter.hasNext()) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return descriptor; + } + } + + /** removePathBasedCacheDescriptor */ + class RemovePathBasedCacheDescriptorOp extends AtMostOnceOp { + private String pool; + private String path; + private PathBasedCacheDescriptor descriptor; + + RemovePathBasedCacheDescriptorOp(DFSClient client, String pool, + String path) { + super("removePathBasedCacheDescriptor", client); + this.pool = pool; + this.path = path; + } + + @Override + void prepare() throws Exception { + dfs.addCachePool(new CachePoolInfo(pool)); + descriptor = dfs.addPathBasedCacheDirective( + new PathBasedCacheDirective.Builder(). + setPath(new Path(path)). + setPool(pool). + build()); + } + + @Override + void invoke() throws Exception { + client.removePathBasedCacheDescriptor(descriptor.getEntryId()); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = + dfs.listPathBasedCacheDescriptors(pool, new Path(path)); + if (!iter.hasNext()) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return null; + } + } + + /** addCachePool */ + class AddCachePoolOp extends AtMostOnceOp { + private String pool; + + AddCachePoolOp(DFSClient client, String pool) { + super("addCachePool", client); + this.pool = pool; + } + + @Override + void prepare() throws Exception { + } + + @Override + void invoke() throws Exception { + client.addCachePool(new CachePoolInfo(pool)); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = dfs.listCachePools(); + if (iter.hasNext()) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return null; + } + } + + /** modifyCachePool */ + class ModifyCachePoolOp extends AtMostOnceOp { + String pool; + + ModifyCachePoolOp(DFSClient client, String pool) { + super("modifyCachePool", client); + this.pool = pool; + } + + @Override + void prepare() throws Exception { + client.addCachePool(new CachePoolInfo(pool).setWeight(10)); + } + + @Override + void invoke() throws Exception { + client.modifyCachePool(new CachePoolInfo(pool).setWeight(99)); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = dfs.listCachePools(); + if (iter.hasNext() && iter.next().getWeight() == 99) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return null; + } + } + + /** removeCachePool */ + class RemoveCachePoolOp extends AtMostOnceOp { + private String pool; + + RemoveCachePoolOp(DFSClient client, String pool) { + super("removeCachePool", client); + this.pool = pool; + } + + @Override + void prepare() throws Exception { + client.addCachePool(new CachePoolInfo(pool)); + } + + @Override + void invoke() throws Exception { + client.removeCachePool(pool); + } + + @Override + boolean checkNamenodeBeforeReturn() throws Exception { + for (int i = 0; i < CHECKTIMES; i++) { + RemoteIterator iter = dfs.listCachePools(); + if (!iter.hasNext()) { + return true; + } + Thread.sleep(1000); + } + return false; + } + + @Override + Object getResult() { + return null; + } + } + @Test (timeout=60000) public void testCreateSnapshot() throws Exception { final DFSClient client = genClientWithDummyHandler(); @@ -810,6 +1016,42 @@ public class TestRetryCacheWithHA { testClientRetryWithFailover(op); } + @Test (timeout=60000) + public void testAddPathBasedCacheDirective() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new AddPathBasedCacheDirectiveOp(client, "pool", "/path"); + testClientRetryWithFailover(op); + } + + @Test (timeout=60000) + public void testRemovePathBasedCacheDescriptor() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new RemovePathBasedCacheDescriptorOp(client, "pool", + "/path"); + testClientRetryWithFailover(op); + } + + @Test (timeout=60000) + public void testAddCachePool() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new AddCachePoolOp(client, "pool"); + testClientRetryWithFailover(op); + } + + @Test (timeout=60000) + public void testModifyCachePool() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new ModifyCachePoolOp(client, "pool"); + testClientRetryWithFailover(op); + } + + @Test (timeout=60000) + public void testRemoveCachePool() throws Exception { + DFSClient client = genClientWithDummyHandler(); + AtMostOnceOp op = new RemoveCachePoolOp(client, "pool"); + testClientRetryWithFailover(op); + } + /** * When NN failover happens, if the client did not receive the response and * send a retry request to the other NN, the same response should be recieved diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored index ffd6601a9b115ce81f4ded2ff9981baa83ec0482..17b95dabbbe144ca559fcde5982348778595d66f 100644 GIT binary patch delta 91 zcmbQN+^Djlf^TvFt0=2?r}K6F$u)f1V4hF^+c^HotNCO=JTYI>>LYiQ7$Cp_N;85L a`lbE4p+5Nmp9q-eKYf-@>E<7NvzY+G{2GV= delta 103 zcmZovnXJ5_f{&$xr>SA`BtGpxS((@86&WDFo`FR_DY1xwk%6-yKR>4+v8bfd|4QZ| eC8)dulxCduG(k`m%5g+ed^i2Wz0GO-vzY*1=Nnr9 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml index 160995daad0..f013c25a72c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml @@ -822,6 +822,8 @@ 493 100 + 27ac79f0-d378-4933-824b-c2a188968d97 + 75 @@ -833,6 +835,8 @@ party 448 1989 + 27ac79f0-d378-4933-824b-c2a188968d97 + 76 @@ -842,6 +846,8 @@ /bar 1 poolparty + 27ac79f0-d378-4933-824b-c2a188968d97 + 77 @@ -849,6 +855,8 @@ 64 1 + 27ac79f0-d378-4933-824b-c2a188968d97 + 78 @@ -856,6 +864,8 @@ 65 poolparty + 27ac79f0-d378-4933-824b-c2a188968d97 + 79