HDFS-5385. Caching RPCs are AtMostOnce, but do not persist client ID and call ID to edit log. (Chris Nauroth via Colin Patrick McCabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1534345 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-10-21 19:42:35 +00:00
parent f9c08d02eb
commit 69e5f90e9f
7 changed files with 319 additions and 9 deletions

View File

@ -111,3 +111,7 @@ HDFS-4949 (Unreleased)
HDFS-5203. Concurrent clients that add a cache directive on the same path
may prematurely uncache from each other. (Chris Nauroth via Colin Patrick
McCabe)
HDFS-5385. Caching RPCs are AtMostOnce, but do not persist client ID and
call ID to edit log. (Chris Nauroth via Colin Patrick McCabe)

View File

@ -2861,6 +2861,10 @@ public abstract class FSEditLogOp {
}
}
/**
* {@literal @AtMostOnce} for
* {@link ClientProtocol#addPathBasedCacheDirective}
*/
static class AddPathBasedCacheDirectiveOp extends FSEditLogOp {
String path;
short replication;
@ -2895,6 +2899,7 @@ public abstract class FSEditLogOp {
this.path = FSImageSerialization.readString(in);
this.replication = FSImageSerialization.readShort(in);
this.pool = FSImageSerialization.readString(in);
readRpcIds(in, logVersion);
}
@Override
@ -2902,6 +2907,7 @@ public abstract class FSEditLogOp {
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeShort(replication, out);
FSImageSerialization.writeString(pool, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@ -2910,6 +2916,7 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.toString(replication));
XMLUtils.addSaxString(contentHandler, "POOL", pool);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
@ -2917,6 +2924,7 @@ public abstract class FSEditLogOp {
path = st.getValue("PATH");
replication = Short.parseShort(st.getValue("REPLICATION"));
pool = st.getValue("POOL");
readRpcIdsFromXml(st);
}
@Override
@ -2925,11 +2933,17 @@ public abstract class FSEditLogOp {
builder.append("AddPathBasedCacheDirective [");
builder.append("path=" + path + ",");
builder.append("replication=" + replication + ",");
builder.append("pool=" + pool + "]");
builder.append("pool=" + pool);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/**
* {@literal @AtMostOnce} for
* {@link ClientProtocol#removePathBasedCacheDescriptor}
*/
static class RemovePathBasedCacheDescriptorOp extends FSEditLogOp {
long id;
@ -2950,32 +2964,39 @@ public abstract class FSEditLogOp {
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
this.id = FSImageSerialization.readLong(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(id, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "ID", Long.toString(id));
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.id = Long.parseLong(st.getValue("ID"));
readRpcIdsFromXml(st);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("RemovePathBasedCacheDescriptor [");
builder.append("id=" + Long.toString(id) + "]");
builder.append("id=" + Long.toString(id));
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/** {@literal @AtMostOnce} for {@link ClientProtocol#addCachePool} */
static class AddCachePoolOp extends FSEditLogOp {
CachePoolInfo info;
@ -2995,21 +3016,25 @@ public abstract class FSEditLogOp {
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
info = CachePoolInfo.readFrom(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
info .writeTo(out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
info .writeXmlTo(contentHandler);
info.writeXmlTo(contentHandler);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.info = CachePoolInfo.readXmlFrom(st);
readRpcIdsFromXml(st);
}
@Override
@ -3020,11 +3045,14 @@ public abstract class FSEditLogOp {
builder.append("ownerName=" + info.getOwnerName() + ",");
builder.append("groupName=" + info.getGroupName() + ",");
builder.append("mode=" + Short.toString(info.getMode().toShort()) + ",");
builder.append("weight=" + Integer.toString(info.getWeight()) + "]");
builder.append("weight=" + Integer.toString(info.getWeight()));
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/** {@literal @AtMostOnce} for {@link ClientProtocol#modifyCachePool} */
static class ModifyCachePoolOp extends FSEditLogOp {
CachePoolInfo info;
@ -3044,21 +3072,25 @@ public abstract class FSEditLogOp {
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
info = CachePoolInfo.readFrom(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
info.writeTo(out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
cachePoolInfoToXml(contentHandler, info);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.info = cachePoolInfoFromXml(st);
readRpcIdsFromXml(st);
}
@Override
@ -3082,11 +3114,13 @@ public abstract class FSEditLogOp {
fields.add("weight=" + info.getWeight());
}
builder.append(Joiner.on(",").join(fields));
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/** {@literal @AtMostOnce} for {@link ClientProtocol#removeCachePool} */
static class RemoveCachePoolOp extends FSEditLogOp {
String poolName;
@ -3106,28 +3140,34 @@ public abstract class FSEditLogOp {
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
poolName = FSImageSerialization.readString(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(poolName, out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "POOLNAME", poolName);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.poolName = st.getValue("POOLNAME");
readRpcIdsFromXml(st);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("RemoveCachePoolOp [");
builder.append("poolName=" + poolName + "]");
builder.append("poolName=" + poolName);
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}

View File

@ -993,6 +993,20 @@ public class DFSTestUtil {
locatedBlocks = DFSClientAdapter.callGetBlockLocations(
cluster.getNameNodeRpc(nnIndex), filePath, 0L, bytes.length);
} while (locatedBlocks.isUnderConstruction());
// OP_ADD_CACHE_POOL 35
filesystem.addCachePool(new CachePoolInfo("pool1"));
// OP_MODIFY_CACHE_POOL 36
filesystem.modifyCachePool(new CachePoolInfo("pool1").setWeight(99));
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33
PathBasedCacheDescriptor pbcd = filesystem.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
setPath(new Path("/path")).
setPool("pool1").
build());
// OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR 34
filesystem.removePathBasedCacheDescriptor(pbcd);
// OP_REMOVE_CACHE_POOL 37
filesystem.removeCachePool("pool1");
}
public static void abortStream(DFSOutputStream out) throws IOException {

View File

@ -413,7 +413,7 @@ public class TestNamenodeRetryCache {
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet();
assertEquals(14, cacheSet.size());
assertEquals(19, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
@ -432,7 +432,7 @@ public class TestNamenodeRetryCache {
assertTrue(namesystem.hasRetryCache());
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
.getRetryCache().getCacheSet();
assertEquals(14, cacheSet.size());
assertEquals(19, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -53,12 +54,15 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@ -147,7 +151,7 @@ public class TestRetryCacheWithHA {
FSNamesystem fsn0 = cluster.getNamesystem(0);
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) fsn0.getRetryCache().getCacheSet();
assertEquals(14, cacheSet.size());
assertEquals(19, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
@ -168,7 +172,7 @@ public class TestRetryCacheWithHA {
FSNamesystem fsn1 = cluster.getNamesystem(1);
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
.getRetryCache().getCacheSet();
assertEquals(14, cacheSet.size());
assertEquals(19, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
@ -733,6 +737,208 @@ public class TestRetryCacheWithHA {
}
}
/** addPathBasedCacheDirective */
class AddPathBasedCacheDirectiveOp extends AtMostOnceOp {
private String pool;
private String path;
private PathBasedCacheDescriptor descriptor;
AddPathBasedCacheDirectiveOp(DFSClient client, String pool, String path) {
super("addPathBasedCacheDirective", client);
this.pool = pool;
this.path = path;
}
@Override
void prepare() throws Exception {
dfs.addCachePool(new CachePoolInfo(pool));
}
@Override
void invoke() throws Exception {
descriptor = client.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
setPath(new Path(path)).
setPool(pool).
build());
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<PathBasedCacheDescriptor> iter =
dfs.listPathBasedCacheDescriptors(pool, new Path(path));
if (iter.hasNext()) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return descriptor;
}
}
/** removePathBasedCacheDescriptor */
class RemovePathBasedCacheDescriptorOp extends AtMostOnceOp {
private String pool;
private String path;
private PathBasedCacheDescriptor descriptor;
RemovePathBasedCacheDescriptorOp(DFSClient client, String pool,
String path) {
super("removePathBasedCacheDescriptor", client);
this.pool = pool;
this.path = path;
}
@Override
void prepare() throws Exception {
dfs.addCachePool(new CachePoolInfo(pool));
descriptor = dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
setPath(new Path(path)).
setPool(pool).
build());
}
@Override
void invoke() throws Exception {
client.removePathBasedCacheDescriptor(descriptor.getEntryId());
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<PathBasedCacheDescriptor> iter =
dfs.listPathBasedCacheDescriptors(pool, new Path(path));
if (!iter.hasNext()) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return null;
}
}
/** addCachePool */
class AddCachePoolOp extends AtMostOnceOp {
private String pool;
AddCachePoolOp(DFSClient client, String pool) {
super("addCachePool", client);
this.pool = pool;
}
@Override
void prepare() throws Exception {
}
@Override
void invoke() throws Exception {
client.addCachePool(new CachePoolInfo(pool));
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CachePoolInfo> iter = dfs.listCachePools();
if (iter.hasNext()) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return null;
}
}
/** modifyCachePool */
class ModifyCachePoolOp extends AtMostOnceOp {
String pool;
ModifyCachePoolOp(DFSClient client, String pool) {
super("modifyCachePool", client);
this.pool = pool;
}
@Override
void prepare() throws Exception {
client.addCachePool(new CachePoolInfo(pool).setWeight(10));
}
@Override
void invoke() throws Exception {
client.modifyCachePool(new CachePoolInfo(pool).setWeight(99));
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CachePoolInfo> iter = dfs.listCachePools();
if (iter.hasNext() && iter.next().getWeight() == 99) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return null;
}
}
/** removeCachePool */
class RemoveCachePoolOp extends AtMostOnceOp {
private String pool;
RemoveCachePoolOp(DFSClient client, String pool) {
super("removeCachePool", client);
this.pool = pool;
}
@Override
void prepare() throws Exception {
client.addCachePool(new CachePoolInfo(pool));
}
@Override
void invoke() throws Exception {
client.removeCachePool(pool);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CachePoolInfo> iter = dfs.listCachePools();
if (!iter.hasNext()) {
return true;
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return null;
}
}
@Test (timeout=60000)
public void testCreateSnapshot() throws Exception {
final DFSClient client = genClientWithDummyHandler();
@ -810,6 +1016,42 @@ public class TestRetryCacheWithHA {
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testAddPathBasedCacheDirective() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new AddPathBasedCacheDirectiveOp(client, "pool", "/path");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testRemovePathBasedCacheDescriptor() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new RemovePathBasedCacheDescriptorOp(client, "pool",
"/path");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testAddCachePool() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new AddCachePoolOp(client, "pool");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testModifyCachePool() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new ModifyCachePoolOp(client, "pool");
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testRemoveCachePool() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new RemoveCachePoolOp(client, "pool");
testClientRetryWithFailover(op);
}
/**
* When NN failover happens, if the client did not receive the response and
* send a retry request to the other NN, the same response should be recieved

View File

@ -822,6 +822,8 @@
<MODE>493</MODE>
</PERMISSION_STATUS>
<WEIGHT>100</WEIGHT>
<RPC_CLIENTID>27ac79f0-d378-4933-824b-c2a188968d97</RPC_CLIENTID>
<RPC_CALLID>75</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -833,6 +835,8 @@
<GROUPNAME>party</GROUPNAME>
<MODE>448</MODE>
<WEIGHT>1989</WEIGHT>
<RPC_CLIENTID>27ac79f0-d378-4933-824b-c2a188968d97</RPC_CLIENTID>
<RPC_CALLID>76</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -842,6 +846,8 @@
<PATH>/bar</PATH>
<REPLICATION>1</REPLICATION>
<POOL>poolparty</POOL>
<RPC_CLIENTID>27ac79f0-d378-4933-824b-c2a188968d97</RPC_CLIENTID>
<RPC_CALLID>77</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -849,6 +855,8 @@
<DATA>
<TXID>64</TXID>
<ID>1</ID>
<RPC_CLIENTID>27ac79f0-d378-4933-824b-c2a188968d97</RPC_CLIENTID>
<RPC_CALLID>78</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -856,6 +864,8 @@
<DATA>
<TXID>65</TXID>
<POOLNAME>poolparty</POOLNAME>
<RPC_CLIENTID>27ac79f0-d378-4933-824b-c2a188968d97</RPC_CLIENTID>
<RPC_CALLID>79</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>