HDFS-5768. Consolidate the serialization code in DelegationTokenSecretManager. Contributed by Haohui Mai

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1558598 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2014-01-15 23:11:01 +00:00
parent 7274b5ff93
commit ca5d73d1ab
4 changed files with 116 additions and 102 deletions

View File

@ -478,6 +478,9 @@ Trunk (Unreleased)
HDFS-5726. Fix compilation error in AbstractINodeDiff for JDK7. (jing9) HDFS-5726. Fix compilation error in AbstractINodeDiff for JDK7. (jing9)
HDFS-5768. Consolidate the serialization code in DelegationTokenSecretManager
(Haohui Mai via brandonli)
Release 2.4.0 - UNRELEASED Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -59,6 +59,7 @@ public class DelegationTokenSecretManager
.getLog(DelegationTokenSecretManager.class); .getLog(DelegationTokenSecretManager.class);
private final FSNamesystem namesystem; private final FSNamesystem namesystem;
private final SerializerCompat serializerCompat = new SerializerCompat();
public DelegationTokenSecretManager(long delegationKeyUpdateInterval, public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
@ -157,17 +158,14 @@ public class DelegationTokenSecretManager
* @param in input stream to read fsimage * @param in input stream to read fsimage
* @throws IOException * @throws IOException
*/ */
public synchronized void loadSecretManagerState(DataInput in) public synchronized void loadSecretManagerStateCompat(DataInput in)
throws IOException { throws IOException {
if (running) { if (running) {
// a safety check // a safety check
throw new IOException( throw new IOException(
"Can't load state from image in a running SecretManager."); "Can't load state from image in a running SecretManager.");
} }
currentId = in.readInt(); serializerCompat.load(in);
loadAllKeys(in);
delegationTokenSequenceNumber = in.readInt();
loadCurrentTokens(in);
} }
/** /**
@ -177,12 +175,9 @@ public class DelegationTokenSecretManager
* @param sdPath String storage directory path * @param sdPath String storage directory path
* @throws IOException * @throws IOException
*/ */
public synchronized void saveSecretManagerState(DataOutputStream out, public synchronized void saveSecretManagerStateCompat(DataOutputStream out,
String sdPath) throws IOException { String sdPath) throws IOException {
out.writeInt(currentId); serializerCompat.save(out, sdPath);
saveAllKeys(out, sdPath);
out.writeInt(delegationTokenSequenceNumber);
saveCurrentTokens(out, sdPath);
} }
/** /**
@ -282,91 +277,6 @@ public class DelegationTokenSecretManager
return allKeys.size(); return allKeys.size();
} }
/**
* Private helper methods to save delegation keys and tokens in fsimage
*/
private synchronized void saveCurrentTokens(DataOutputStream out,
String sdPath) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.DELEGATION_TOKENS, sdPath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
prog.setTotal(Phase.SAVING_CHECKPOINT, step, currentTokens.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(currentTokens.size());
Iterator<DelegationTokenIdentifier> iter = currentTokens.keySet()
.iterator();
while (iter.hasNext()) {
DelegationTokenIdentifier id = iter.next();
id.write(out);
DelegationTokenInformation info = currentTokens.get(id);
out.writeLong(info.getRenewDate());
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
}
/*
* Save the current state of allKeys
*/
private synchronized void saveAllKeys(DataOutputStream out, String sdPath)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.DELEGATION_KEYS, sdPath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
prog.setTotal(Phase.SAVING_CHECKPOINT, step, currentTokens.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(allKeys.size());
Iterator<Integer> iter = allKeys.keySet().iterator();
while (iter.hasNext()) {
Integer key = iter.next();
allKeys.get(key).write(out);
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
}
/**
* Private helper methods to load Delegation tokens from fsimage
*/
private synchronized void loadCurrentTokens(DataInput in)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.DELEGATION_TOKENS);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
int numberOfTokens = in.readInt();
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfTokens);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numberOfTokens; i++) {
DelegationTokenIdentifier id = new DelegationTokenIdentifier();
id.readFields(in);
long expiryTime = in.readLong();
addPersistedDelegationToken(id, expiryTime);
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
/**
* Private helper method to load delegation keys from fsimage.
* @param in
* @throws IOException
*/
private synchronized void loadAllKeys(DataInput in) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.DELEGATION_KEYS);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
int numberOfKeys = in.readInt();
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfKeys);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numberOfKeys; i++) {
DelegationKey value = new DelegationKey();
value.readFields(in);
addKey(value);
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
/** /**
* Call namesystem to update editlogs for new master key. * Call namesystem to update editlogs for new master key.
*/ */
@ -420,4 +330,105 @@ public class DelegationTokenSecretManager
c.addToken(new Text(ugi.getShortUserName()), token); c.addToken(new Text(ugi.getShortUserName()), token);
return c; return c;
} }
private final class SerializerCompat {
private void load(DataInput in) throws IOException {
currentId = in.readInt();
loadAllKeys(in);
delegationTokenSequenceNumber = in.readInt();
loadCurrentTokens(in);
}
private void save(DataOutputStream out, String sdPath) throws IOException {
out.writeInt(currentId);
saveAllKeys(out, sdPath);
out.writeInt(delegationTokenSequenceNumber);
saveCurrentTokens(out, sdPath);
}
/**
* Private helper methods to save delegation keys and tokens in fsimage
*/
private synchronized void saveCurrentTokens(DataOutputStream out,
String sdPath) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.DELEGATION_TOKENS, sdPath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
prog.setTotal(Phase.SAVING_CHECKPOINT, step, currentTokens.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(currentTokens.size());
Iterator<DelegationTokenIdentifier> iter = currentTokens.keySet()
.iterator();
while (iter.hasNext()) {
DelegationTokenIdentifier id = iter.next();
id.write(out);
DelegationTokenInformation info = currentTokens.get(id);
out.writeLong(info.getRenewDate());
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
}
/*
* Save the current state of allKeys
*/
private synchronized void saveAllKeys(DataOutputStream out, String sdPath)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.DELEGATION_KEYS, sdPath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
prog.setTotal(Phase.SAVING_CHECKPOINT, step, currentTokens.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(allKeys.size());
Iterator<Integer> iter = allKeys.keySet().iterator();
while (iter.hasNext()) {
Integer key = iter.next();
allKeys.get(key).write(out);
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
}
/**
* Private helper methods to load Delegation tokens from fsimage
*/
private synchronized void loadCurrentTokens(DataInput in)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.DELEGATION_TOKENS);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
int numberOfTokens = in.readInt();
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfTokens);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numberOfTokens; i++) {
DelegationTokenIdentifier id = new DelegationTokenIdentifier();
id.readFields(in);
long expiryTime = in.readLong();
addPersistedDelegationToken(id, expiryTime);
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
/**
* Private helper method to load delegation keys from fsimage.
* @param in
* @throws IOException
*/
private synchronized void loadAllKeys(DataInput in) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.DELEGATION_KEYS);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
int numberOfKeys = in.readInt();
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfKeys);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numberOfKeys; i++) {
DelegationKey value = new DelegationKey();
value.readFields(in);
addKey(value);
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
}
} }

View File

@ -870,7 +870,7 @@ public class FSImageFormat {
//This must not happen if security is turned on. //This must not happen if security is turned on.
return; return;
} }
namesystem.loadSecretManagerState(in); namesystem.loadSecretManagerStateCompat(in);
} }
private void loadCacheManagerState(DataInput in) throws IOException { private void loadCacheManagerState(DataInput in) throws IOException {
@ -1032,7 +1032,7 @@ public class FSImageFormat {
sourceNamesystem.saveFilesUnderConstruction(out, snapshotUCMap); sourceNamesystem.saveFilesUnderConstruction(out, snapshotUCMap);
context.checkCancelled(); context.checkCancelled();
sourceNamesystem.saveSecretManagerState(out, sdPath); sourceNamesystem.saveSecretManagerStateCompat(out, sdPath);
context.checkCancelled(); context.checkCancelled();
sourceNamesystem.getCacheManager().saveState(out, sdPath); sourceNamesystem.getCacheManager().saveState(out, sdPath);
context.checkCancelled(); context.checkCancelled();

View File

@ -6250,16 +6250,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @param out save state of the secret manager * @param out save state of the secret manager
* @param sdPath String storage directory path * @param sdPath String storage directory path
*/ */
void saveSecretManagerState(DataOutputStream out, String sdPath) void saveSecretManagerStateCompat(DataOutputStream out, String sdPath)
throws IOException { throws IOException {
dtSecretManager.saveSecretManagerState(out, sdPath); dtSecretManager.saveSecretManagerStateCompat(out, sdPath);
} }
/** /**
* @param in load the state of secret manager from input stream * @param in load the state of secret manager from input stream
*/ */
void loadSecretManagerState(DataInput in) throws IOException { void loadSecretManagerStateCompat(DataInput in) throws IOException {
dtSecretManager.loadSecretManagerState(in); dtSecretManager.loadSecretManagerStateCompat(in);
} }
/** /**