HDFS-7513. HDFS inotify: add defaultBlockSize to CreateEvent (cmccabe)
This commit is contained in:
parent
fae3e8614f
commit
6e13fc62e1
|
@ -457,6 +457,8 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-7426. Change nntop JMX format to be a JSON blob. (wang)
|
HDFS-7426. Change nntop JMX format to be a JSON blob. (wang)
|
||||||
|
|
||||||
|
HDFS-7513. HDFS inotify: add defaultBlockSize to CreateEvent (cmccabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||||
|
|
|
@ -101,6 +101,7 @@ public abstract class Event {
|
||||||
private FsPermission perms;
|
private FsPermission perms;
|
||||||
private String symlinkTarget;
|
private String symlinkTarget;
|
||||||
private boolean overwrite;
|
private boolean overwrite;
|
||||||
|
private long defaultBlockSize;
|
||||||
|
|
||||||
public static class Builder {
|
public static class Builder {
|
||||||
private INodeType iNodeType;
|
private INodeType iNodeType;
|
||||||
|
@ -112,6 +113,7 @@ public abstract class Event {
|
||||||
private FsPermission perms;
|
private FsPermission perms;
|
||||||
private String symlinkTarget;
|
private String symlinkTarget;
|
||||||
private boolean overwrite;
|
private boolean overwrite;
|
||||||
|
private long defaultBlockSize = 0;
|
||||||
|
|
||||||
public Builder iNodeType(INodeType type) {
|
public Builder iNodeType(INodeType type) {
|
||||||
this.iNodeType = type;
|
this.iNodeType = type;
|
||||||
|
@ -158,6 +160,11 @@ public abstract class Event {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder defaultBlockSize(long defaultBlockSize) {
|
||||||
|
this.defaultBlockSize = defaultBlockSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public CreateEvent build() {
|
public CreateEvent build() {
|
||||||
return new CreateEvent(this);
|
return new CreateEvent(this);
|
||||||
}
|
}
|
||||||
|
@ -174,6 +181,7 @@ public abstract class Event {
|
||||||
this.perms = b.perms;
|
this.perms = b.perms;
|
||||||
this.symlinkTarget = b.symlinkTarget;
|
this.symlinkTarget = b.symlinkTarget;
|
||||||
this.overwrite = b.overwrite;
|
this.overwrite = b.overwrite;
|
||||||
|
this.defaultBlockSize = b.defaultBlockSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
public INodeType getiNodeType() {
|
public INodeType getiNodeType() {
|
||||||
|
@ -220,6 +228,10 @@ public abstract class Event {
|
||||||
public boolean getOverwrite() {
|
public boolean getOverwrite() {
|
||||||
return overwrite;
|
return overwrite;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getDefaultBlockSize() {
|
||||||
|
return defaultBlockSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -398,11 +410,36 @@ public abstract class Event {
|
||||||
private String dstPath;
|
private String dstPath;
|
||||||
private long timestamp;
|
private long timestamp;
|
||||||
|
|
||||||
public RenameEvent(String srcPath, String dstPath, long timestamp) {
|
public static class Builder {
|
||||||
|
private String srcPath;
|
||||||
|
private String dstPath;
|
||||||
|
private long timestamp;
|
||||||
|
|
||||||
|
public Builder srcPath(String srcPath) {
|
||||||
|
this.srcPath = srcPath;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder dstPath(String dstPath) {
|
||||||
|
this.dstPath = dstPath;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder timestamp(long timestamp) {
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RenameEvent build() {
|
||||||
|
return new RenameEvent(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private RenameEvent(Builder builder) {
|
||||||
super(EventType.RENAME);
|
super(EventType.RENAME);
|
||||||
this.srcPath = srcPath;
|
this.srcPath = builder.srcPath;
|
||||||
this.dstPath = dstPath;
|
this.dstPath = builder.dstPath;
|
||||||
this.timestamp = timestamp;
|
this.timestamp = builder.timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getSrcPath() {
|
public String getSrcPath() {
|
||||||
|
@ -427,9 +464,22 @@ public abstract class Event {
|
||||||
public static class AppendEvent extends Event {
|
public static class AppendEvent extends Event {
|
||||||
private String path;
|
private String path;
|
||||||
|
|
||||||
public AppendEvent(String path) {
|
public static class Builder {
|
||||||
|
private String path;
|
||||||
|
|
||||||
|
public Builder path(String path) {
|
||||||
|
this.path = path;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AppendEvent build() {
|
||||||
|
return new AppendEvent(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private AppendEvent(Builder b) {
|
||||||
super(EventType.APPEND);
|
super(EventType.APPEND);
|
||||||
this.path = path;
|
this.path = b.path;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getPath() {
|
public String getPath() {
|
||||||
|
@ -444,10 +494,29 @@ public abstract class Event {
|
||||||
private String path;
|
private String path;
|
||||||
private long timestamp;
|
private long timestamp;
|
||||||
|
|
||||||
public UnlinkEvent(String path, long timestamp) {
|
public static class Builder {
|
||||||
|
private String path;
|
||||||
|
private long timestamp;
|
||||||
|
|
||||||
|
public Builder path(String path) {
|
||||||
|
this.path = path;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder timestamp(long timestamp) {
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public UnlinkEvent build() {
|
||||||
|
return new UnlinkEvent(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private UnlinkEvent(Builder builder) {
|
||||||
super(EventType.UNLINK);
|
super(EventType.UNLINK);
|
||||||
this.path = path;
|
this.path = builder.path;
|
||||||
this.timestamp = timestamp;
|
this.timestamp = builder.timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getPath() {
|
public String getPath() {
|
||||||
|
|
|
@ -2566,6 +2566,7 @@ public class PBHelper {
|
||||||
.replication(create.getReplication())
|
.replication(create.getReplication())
|
||||||
.symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
|
.symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
|
||||||
create.getSymlinkTarget())
|
create.getSymlinkTarget())
|
||||||
|
.defaultBlockSize(create.getDefaultBlockSize())
|
||||||
.overwrite(create.getOverwrite()).build());
|
.overwrite(create.getOverwrite()).build());
|
||||||
break;
|
break;
|
||||||
case EVENT_METADATA:
|
case EVENT_METADATA:
|
||||||
|
@ -2592,19 +2593,26 @@ public class PBHelper {
|
||||||
case EVENT_RENAME:
|
case EVENT_RENAME:
|
||||||
InotifyProtos.RenameEventProto rename =
|
InotifyProtos.RenameEventProto rename =
|
||||||
InotifyProtos.RenameEventProto.parseFrom(p.getContents());
|
InotifyProtos.RenameEventProto.parseFrom(p.getContents());
|
||||||
events.add(new Event.RenameEvent(rename.getSrcPath(),
|
events.add(new Event.RenameEvent.Builder()
|
||||||
rename.getDestPath(), rename.getTimestamp()));
|
.srcPath(rename.getSrcPath())
|
||||||
|
.dstPath(rename.getDestPath())
|
||||||
|
.timestamp(rename.getTimestamp())
|
||||||
|
.build());
|
||||||
break;
|
break;
|
||||||
case EVENT_APPEND:
|
case EVENT_APPEND:
|
||||||
InotifyProtos.AppendEventProto reopen =
|
InotifyProtos.AppendEventProto reopen =
|
||||||
InotifyProtos.AppendEventProto.parseFrom(p.getContents());
|
InotifyProtos.AppendEventProto.parseFrom(p.getContents());
|
||||||
events.add(new Event.AppendEvent(reopen.getPath()));
|
events.add(new Event.AppendEvent.Builder()
|
||||||
|
.path(reopen.getPath())
|
||||||
|
.build());
|
||||||
break;
|
break;
|
||||||
case EVENT_UNLINK:
|
case EVENT_UNLINK:
|
||||||
InotifyProtos.UnlinkEventProto unlink =
|
InotifyProtos.UnlinkEventProto unlink =
|
||||||
InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
|
InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
|
||||||
events.add(new Event.UnlinkEvent(unlink.getPath(),
|
events.add(new Event.UnlinkEvent.Builder()
|
||||||
unlink.getTimestamp()));
|
.path(unlink.getPath())
|
||||||
|
.timestamp(unlink.getTimestamp())
|
||||||
|
.build());
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("Unexpected inotify event type: " +
|
throw new RuntimeException("Unexpected inotify event type: " +
|
||||||
|
@ -2650,6 +2658,7 @@ public class PBHelper {
|
||||||
.setReplication(ce2.getReplication())
|
.setReplication(ce2.getReplication())
|
||||||
.setSymlinkTarget(ce2.getSymlinkTarget() == null ?
|
.setSymlinkTarget(ce2.getSymlinkTarget() == null ?
|
||||||
"" : ce2.getSymlinkTarget())
|
"" : ce2.getSymlinkTarget())
|
||||||
|
.setDefaultBlockSize(ce2.getDefaultBlockSize())
|
||||||
.setOverwrite(ce2.getOverwrite()).build().toByteString()
|
.setOverwrite(ce2.getOverwrite()).build().toByteString()
|
||||||
).build());
|
).build());
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -53,10 +53,13 @@ public class InotifyFSEditLogOpTranslator {
|
||||||
.groupName(addOp.permissions.getGroupName())
|
.groupName(addOp.permissions.getGroupName())
|
||||||
.perms(addOp.permissions.getPermission())
|
.perms(addOp.permissions.getPermission())
|
||||||
.overwrite(addOp.overwrite)
|
.overwrite(addOp.overwrite)
|
||||||
|
.defaultBlockSize(addOp.blockSize)
|
||||||
.iNodeType(Event.CreateEvent.INodeType.FILE).build() });
|
.iNodeType(Event.CreateEvent.INodeType.FILE).build() });
|
||||||
} else {
|
} else { // append
|
||||||
return new EventBatch(op.txid,
|
return new EventBatch(op.txid,
|
||||||
new Event[] { new Event.AppendEvent(addOp.path) });
|
new Event[]{new Event.AppendEvent.Builder()
|
||||||
|
.path(addOp.path)
|
||||||
|
.build()});
|
||||||
}
|
}
|
||||||
case OP_CLOSE:
|
case OP_CLOSE:
|
||||||
FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op;
|
FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op;
|
||||||
|
@ -72,25 +75,40 @@ public class InotifyFSEditLogOpTranslator {
|
||||||
case OP_CONCAT_DELETE:
|
case OP_CONCAT_DELETE:
|
||||||
FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op;
|
FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op;
|
||||||
List<Event> events = Lists.newArrayList();
|
List<Event> events = Lists.newArrayList();
|
||||||
events.add(new Event.AppendEvent(cdOp.trg));
|
events.add(new Event.AppendEvent.Builder()
|
||||||
|
.path(cdOp.trg)
|
||||||
|
.build());
|
||||||
for (String src : cdOp.srcs) {
|
for (String src : cdOp.srcs) {
|
||||||
events.add(new Event.UnlinkEvent(src, cdOp.timestamp));
|
events.add(new Event.UnlinkEvent.Builder()
|
||||||
|
.path(src)
|
||||||
|
.timestamp(cdOp.timestamp)
|
||||||
|
.build());
|
||||||
}
|
}
|
||||||
events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp));
|
events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp));
|
||||||
return new EventBatch(op.txid, events.toArray(new Event[0]));
|
return new EventBatch(op.txid, events.toArray(new Event[0]));
|
||||||
case OP_RENAME_OLD:
|
case OP_RENAME_OLD:
|
||||||
FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op;
|
FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op;
|
||||||
return new EventBatch(op.txid, new Event[] {
|
return new EventBatch(op.txid, new Event[] {
|
||||||
new Event.RenameEvent(rnOpOld.src,
|
new Event.RenameEvent.Builder()
|
||||||
rnOpOld.dst, rnOpOld.timestamp) });
|
.srcPath(rnOpOld.src)
|
||||||
|
.dstPath(rnOpOld.dst)
|
||||||
|
.timestamp(rnOpOld.timestamp)
|
||||||
|
.build() });
|
||||||
case OP_RENAME:
|
case OP_RENAME:
|
||||||
FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op;
|
FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op;
|
||||||
return new EventBatch(op.txid, new Event[] {
|
return new EventBatch(op.txid, new Event[] {
|
||||||
new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) });
|
new Event.RenameEvent.Builder()
|
||||||
|
.srcPath(rnOp.src)
|
||||||
|
.dstPath(rnOp.dst)
|
||||||
|
.timestamp(rnOp.timestamp)
|
||||||
|
.build() });
|
||||||
case OP_DELETE:
|
case OP_DELETE:
|
||||||
FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op;
|
FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op;
|
||||||
return new EventBatch(op.txid, new Event[] {
|
return new EventBatch(op.txid, new Event[] {
|
||||||
new Event.UnlinkEvent(delOp.path, delOp.timestamp) });
|
new Event.UnlinkEvent.Builder()
|
||||||
|
.path(delOp.path)
|
||||||
|
.timestamp(delOp.timestamp)
|
||||||
|
.build() });
|
||||||
case OP_MKDIR:
|
case OP_MKDIR:
|
||||||
FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op;
|
FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op;
|
||||||
return new EventBatch(op.txid,
|
return new EventBatch(op.txid,
|
||||||
|
|
|
@ -78,6 +78,7 @@ message CreateEventProto {
|
||||||
optional int32 replication = 7;
|
optional int32 replication = 7;
|
||||||
optional string symlinkTarget = 8;
|
optional string symlinkTarget = 8;
|
||||||
optional bool overwrite = 9;
|
optional bool overwrite = 9;
|
||||||
|
optional int64 defaultBlockSize = 10 [default=0];
|
||||||
}
|
}
|
||||||
|
|
||||||
message CloseEventProto {
|
message CloseEventProto {
|
||||||
|
|
|
@ -170,6 +170,7 @@ public class TestDFSInotifyEventInputStream {
|
||||||
Assert.assertTrue(ce.getReplication() > 0);
|
Assert.assertTrue(ce.getReplication() > 0);
|
||||||
Assert.assertTrue(ce.getSymlinkTarget() == null);
|
Assert.assertTrue(ce.getSymlinkTarget() == null);
|
||||||
Assert.assertTrue(ce.getOverwrite());
|
Assert.assertTrue(ce.getOverwrite());
|
||||||
|
Assert.assertEquals(BLOCK_SIZE, ce.getDefaultBlockSize());
|
||||||
|
|
||||||
// CloseOp
|
// CloseOp
|
||||||
batch = waitForNextEvents(eis);
|
batch = waitForNextEvents(eis);
|
||||||
|
@ -186,7 +187,8 @@ public class TestDFSInotifyEventInputStream {
|
||||||
Assert.assertEquals(1, batch.getEvents().length);
|
Assert.assertEquals(1, batch.getEvents().length);
|
||||||
txid = checkTxid(batch, txid);
|
txid = checkTxid(batch, txid);
|
||||||
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
|
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
|
||||||
Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2"));
|
Event.AppendEvent append2 = (Event.AppendEvent)batch.getEvents()[0];
|
||||||
|
Assert.assertEquals("/file2", append2.getPath());
|
||||||
|
|
||||||
// CloseOp
|
// CloseOp
|
||||||
batch = waitForNextEvents(eis);
|
batch = waitForNextEvents(eis);
|
||||||
|
|
Loading…
Reference in New Issue