HDFS-7153. Add storagePolicy to NN edit log during file creation. (Arpit Agarwal)
This commit is contained in:
parent
9a53c3699b
commit
d45e7c7e85
|
@ -83,3 +83,6 @@
|
||||||
HDFS-7155. Bugfix in createLocatedFileStatus caused by bad merge.
|
HDFS-7155. Bugfix in createLocatedFileStatus caused by bad merge.
|
||||||
(Arpit Agarwal)
|
(Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-7153. Add storagePolicy to NN edit log during file creation.
|
||||||
|
(Arpit Agarwal)
|
||||||
|
|
||||||
|
|
|
@ -279,9 +279,16 @@ public class FSDirectory implements Closeable {
|
||||||
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
|
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
|
||||||
long mtime, long atime, short replication, long preferredBlockSize,
|
long mtime, long atime, short replication, long preferredBlockSize,
|
||||||
boolean isLazyPersist) {
|
boolean isLazyPersist) {
|
||||||
|
return newINodeFile(id, permissions, mtime, atime, replication, preferredBlockSize,
|
||||||
|
isLazyPersist, (byte)0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
|
||||||
|
long mtime, long atime, short replication, long preferredBlockSize,
|
||||||
|
boolean isLazyPersist, byte storagePolicyId) {
|
||||||
return new INodeFile(id, null, permissions, mtime, atime,
|
return new INodeFile(id, null, permissions, mtime, atime,
|
||||||
BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize,
|
BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize,
|
||||||
isLazyPersist, (byte) 0);
|
isLazyPersist, storagePolicyId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -334,17 +341,18 @@ public class FSDirectory implements Closeable {
|
||||||
boolean isLazyPersist,
|
boolean isLazyPersist,
|
||||||
boolean underConstruction,
|
boolean underConstruction,
|
||||||
String clientName,
|
String clientName,
|
||||||
String clientMachine) {
|
String clientMachine,
|
||||||
|
byte storagePolicyId) {
|
||||||
final INodeFile newNode;
|
final INodeFile newNode;
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
if (underConstruction) {
|
if (underConstruction) {
|
||||||
newNode = newINodeFile(id, permissions, modificationTime,
|
newNode = newINodeFile(id, permissions, modificationTime,
|
||||||
modificationTime, replication, preferredBlockSize, isLazyPersist);
|
modificationTime, replication, preferredBlockSize, isLazyPersist, storagePolicyId);
|
||||||
newNode.toUnderConstruction(clientName, clientMachine);
|
newNode.toUnderConstruction(clientName, clientMachine);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
newNode = newINodeFile(id, permissions, modificationTime, atime,
|
newNode = newINodeFile(id, permissions, modificationTime, atime,
|
||||||
replication, preferredBlockSize, isLazyPersist);
|
replication, preferredBlockSize, isLazyPersist, storagePolicyId);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -720,7 +720,8 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
.setClientName(newNode.getFileUnderConstructionFeature().getClientName())
|
.setClientName(newNode.getFileUnderConstructionFeature().getClientName())
|
||||||
.setClientMachine(
|
.setClientMachine(
|
||||||
newNode.getFileUnderConstructionFeature().getClientMachine())
|
newNode.getFileUnderConstructionFeature().getClientMachine())
|
||||||
.setOverwrite(overwrite);
|
.setOverwrite(overwrite)
|
||||||
|
.setStoragePolicyId(newNode.getStoragePolicyID());
|
||||||
|
|
||||||
AclFeature f = newNode.getAclFeature();
|
AclFeature f = newNode.getAclFeature();
|
||||||
if (f != null) {
|
if (f != null) {
|
||||||
|
|
|
@ -367,7 +367,7 @@ public class FSEditLogLoader {
|
||||||
replication, addCloseOp.mtime, addCloseOp.atime,
|
replication, addCloseOp.mtime, addCloseOp.atime,
|
||||||
addCloseOp.blockSize, addCloseOp.isLazyPersist,
|
addCloseOp.blockSize, addCloseOp.isLazyPersist,
|
||||||
true, addCloseOp.clientName,
|
true, addCloseOp.clientName,
|
||||||
addCloseOp.clientMachine);
|
addCloseOp.clientMachine, addCloseOp.storagePolicyId);
|
||||||
fsNamesys.leaseManager.addLease(addCloseOp.clientName, path);
|
fsNamesys.leaseManager.addLease(addCloseOp.clientName, path);
|
||||||
|
|
||||||
// add the op into retry cache if necessary
|
// add the op into retry cache if necessary
|
||||||
|
|
|
@ -103,6 +103,7 @@ import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEditLogProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.XAttrEditLogProto;
|
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.XAttrEditLogProto;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
import org.apache.hadoop.hdfs.util.XMLUtils;
|
import org.apache.hadoop.hdfs.util.XMLUtils;
|
||||||
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
|
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
|
||||||
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
|
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
|
||||||
|
@ -411,6 +412,7 @@ public abstract class FSEditLogOp {
|
||||||
String clientName;
|
String clientName;
|
||||||
String clientMachine;
|
String clientMachine;
|
||||||
boolean overwrite;
|
boolean overwrite;
|
||||||
|
byte storagePolicyId;
|
||||||
|
|
||||||
private AddCloseOp(FSEditLogOpCodes opCode) {
|
private AddCloseOp(FSEditLogOpCodes opCode) {
|
||||||
super(opCode);
|
super(opCode);
|
||||||
|
@ -501,6 +503,11 @@ public abstract class FSEditLogOp {
|
||||||
return (T)this;
|
return (T)this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
<T extends AddCloseOp> T setStoragePolicyId(byte storagePolicyId) {
|
||||||
|
this.storagePolicyId = storagePolicyId;
|
||||||
|
return (T)this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeFields(DataOutputStream out) throws IOException {
|
public void writeFields(DataOutputStream out) throws IOException {
|
||||||
FSImageSerialization.writeLong(inodeId, out);
|
FSImageSerialization.writeLong(inodeId, out);
|
||||||
|
@ -521,6 +528,7 @@ public abstract class FSEditLogOp {
|
||||||
FSImageSerialization.writeString(clientName,out);
|
FSImageSerialization.writeString(clientName,out);
|
||||||
FSImageSerialization.writeString(clientMachine,out);
|
FSImageSerialization.writeString(clientMachine,out);
|
||||||
FSImageSerialization.writeBoolean(overwrite, out);
|
FSImageSerialization.writeBoolean(overwrite, out);
|
||||||
|
FSImageSerialization.writeByte(storagePolicyId, out);
|
||||||
// write clientId and callId
|
// write clientId and callId
|
||||||
writeRpcIds(rpcClientId, rpcCallId, out);
|
writeRpcIds(rpcClientId, rpcCallId, out);
|
||||||
}
|
}
|
||||||
|
@ -599,6 +607,12 @@ public abstract class FSEditLogOp {
|
||||||
} else {
|
} else {
|
||||||
this.overwrite = false;
|
this.overwrite = false;
|
||||||
}
|
}
|
||||||
|
if (NameNodeLayoutVersion.supports(
|
||||||
|
NameNodeLayoutVersion.Feature.BLOCK_STORAGE_POLICY, logVersion)) {
|
||||||
|
this.storagePolicyId = FSImageSerialization.readByte(in);
|
||||||
|
} else {
|
||||||
|
this.storagePolicyId = BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||||
|
}
|
||||||
// read clientId and callId
|
// read clientId and callId
|
||||||
readRpcIds(in, logVersion);
|
readRpcIds(in, logVersion);
|
||||||
} else {
|
} else {
|
||||||
|
@ -661,6 +675,8 @@ public abstract class FSEditLogOp {
|
||||||
if (this.opCode == OP_ADD) {
|
if (this.opCode == OP_ADD) {
|
||||||
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
|
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
|
||||||
}
|
}
|
||||||
|
builder.append(", storagePolicyId=");
|
||||||
|
builder.append(storagePolicyId);
|
||||||
builder.append(", opCode=");
|
builder.append(", opCode=");
|
||||||
builder.append(opCode);
|
builder.append(opCode);
|
||||||
builder.append(", txid=");
|
builder.append(", txid=");
|
||||||
|
|
|
@ -385,6 +385,12 @@ public class FSImageSerialization {
|
||||||
uBoolean.write(out);
|
uBoolean.write(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** write the byte value */
|
||||||
|
static void writeByte(byte value, DataOutputStream out)
|
||||||
|
throws IOException {
|
||||||
|
out.write(value);
|
||||||
|
}
|
||||||
|
|
||||||
/** read the int value */
|
/** read the int value */
|
||||||
static int readInt(DataInput in) throws IOException {
|
static int readInt(DataInput in) throws IOException {
|
||||||
IntWritable uInt = TL_DATA.get().U_INT;
|
IntWritable uInt = TL_DATA.get().U_INT;
|
||||||
|
@ -424,6 +430,10 @@ public class FSImageSerialization {
|
||||||
return bytes;
|
return bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static byte readByte(DataInput in) throws IOException {
|
||||||
|
return in.readByte();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reading the path from the image and converting it to byte[][] directly
|
* Reading the path from the image and converting it to byte[][] directly
|
||||||
* this saves us an array copy and conversions to and from String
|
* this saves us an array copy and conversions to and from String
|
||||||
|
|
|
@ -2611,7 +2611,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
clientMachine, create, overwrite, createParent, replication,
|
clientMachine, create, overwrite, createParent, replication,
|
||||||
blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
|
blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
|
||||||
stat = dir.getFileInfo(src, false,
|
stat = dir.getFileInfo(src, false,
|
||||||
FSDirectory.isReservedRawName(srcArg), false);
|
FSDirectory.isReservedRawName(srcArg), true);
|
||||||
} catch (StandbyException se) {
|
} catch (StandbyException se) {
|
||||||
skipSync = true;
|
skipSync = true;
|
||||||
throw se;
|
throw se;
|
||||||
|
|
Loading…
Reference in New Issue