diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java index 78a85b0546a..2ae8f0a1006 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import java.util.List; +import java.util.Optional; /** * Events sent by the inotify system. Note that no events are necessarily sent @@ -112,6 +113,7 @@ public enum INodeType { private String symlinkTarget; private boolean overwrite; private long defaultBlockSize; + private Optional erasureCoded; public static class Builder { private INodeType iNodeType; @@ -124,6 +126,7 @@ public static class Builder { private String symlinkTarget; private boolean overwrite; private long defaultBlockSize = 0; + private Optional erasureCoded = Optional.empty(); public Builder iNodeType(INodeType type) { this.iNodeType = type; @@ -175,6 +178,11 @@ public Builder defaultBlockSize(long defaultBlockSize) { return this; } + public Builder erasureCoded(boolean ecCoded) { + this.erasureCoded = Optional.of(ecCoded); + return this; + } + public CreateEvent build() { return new CreateEvent(this); } @@ -192,6 +200,7 @@ private CreateEvent(Builder b) { this.symlinkTarget = b.symlinkTarget; this.overwrite = b.overwrite; this.defaultBlockSize = b.defaultBlockSize; + this.erasureCoded = b.erasureCoded; } public INodeType getiNodeType() { @@ -243,6 +252,10 @@ public long getDefaultBlockSize() { return defaultBlockSize; } + public Optional isErasureCoded() { + return erasureCoded; + } + @Override @InterfaceStability.Unstable public String toString() { @@ -261,6 +274,7 @@ public String toString() { content.append("overwrite=").append(overwrite) .append(", defaultBlockSize=").append(defaultBlockSize) + .append(", erasureCoded=").append(erasureCoded) .append("]"); return content.toString(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 4a5a493bf50..4b966bb5297 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -908,18 +908,22 @@ public static EventBatchList convert(GetEditsFromTxidResponseProto resp) case EVENT_CREATE: InotifyProtos.CreateEventProto create = InotifyProtos.CreateEventProto.parseFrom(p.getContents()); - events.add(new Event.CreateEvent.Builder() - .iNodeType(createTypeConvert(create.getType())) - .path(create.getPath()) - .ctime(create.getCtime()) - .ownerName(create.getOwnerName()) - .groupName(create.getGroupName()) - .perms(convert(create.getPerms())) - .replication(create.getReplication()) - .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null : - create.getSymlinkTarget()) - .defaultBlockSize(create.getDefaultBlockSize()) - .overwrite(create.getOverwrite()).build()); + Event.CreateEvent.Builder builder = new Event.CreateEvent.Builder() + .iNodeType(createTypeConvert(create.getType())) + .path(create.getPath()) + .ctime(create.getCtime()) + .ownerName(create.getOwnerName()) + .groupName(create.getGroupName()) + .perms(convert(create.getPerms())) + .replication(create.getReplication()) + .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null : + create.getSymlinkTarget()) + .defaultBlockSize(create.getDefaultBlockSize()) + .overwrite(create.getOverwrite()); + if (create.hasErasureCoded()) { + builder.erasureCoded(create.getErasureCoded()); + } + events.add(builder.build()); break; case EVENT_METADATA: InotifyProtos.MetadataUpdateEventProto meta = @@ -2909,22 +2913,26 @@ public static GetEditsFromTxidResponseProto convertEditsResponse( break; case CREATE: Event.CreateEvent ce2 = (Event.CreateEvent) e; + InotifyProtos.CreateEventProto.Builder pB = + (InotifyProtos.CreateEventProto.newBuilder()); + pB.setType(createTypeConvert(ce2.getiNodeType())) + .setPath(ce2.getPath()) + .setCtime(ce2.getCtime()) + .setOwnerName(ce2.getOwnerName()) + .setGroupName(ce2.getGroupName()) + .setPerms(convert(ce2.getPerms())) + .setReplication(ce2.getReplication()) + .setSymlinkTarget(ce2.getSymlinkTarget() == null ? + "" : ce2.getSymlinkTarget()) + .setDefaultBlockSize(ce2.getDefaultBlockSize()) + .setOverwrite(ce2.getOverwrite()); + if (ce2.isErasureCoded().isPresent()) { + pB.setErasureCoded(ce2.isErasureCoded().get()); + } events.add(InotifyProtos.EventProto.newBuilder() .setType(InotifyProtos.EventType.EVENT_CREATE) - .setContents( - InotifyProtos.CreateEventProto.newBuilder() - .setType(createTypeConvert(ce2.getiNodeType())) - .setPath(ce2.getPath()) - .setCtime(ce2.getCtime()) - .setOwnerName(ce2.getOwnerName()) - .setGroupName(ce2.getGroupName()) - .setPerms(convert(ce2.getPerms())) - .setReplication(ce2.getReplication()) - .setSymlinkTarget(ce2.getSymlinkTarget() == null ? - "" : ce2.getSymlinkTarget()) - .setDefaultBlockSize(ce2.getDefaultBlockSize()) - .setOverwrite(ce2.getOverwrite()).build().toByteString() - ).build()); + .setContents(pB.build().toByteString()) + .build()); break; case METADATA: Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto index 53399029582..f1934082589 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/inotify.proto @@ -80,6 +80,7 @@ message CreateEventProto { optional string symlinkTarget = 8; optional bool overwrite = 9; optional int64 defaultBlockSize = 10 [default=0]; + optional bool erasureCoded = 11; } message CloseEventProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java index 09181074643..8a54c8a7759 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.inotify.Event; import org.apache.hadoop.hdfs.inotify.EventBatch; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; import java.util.List; @@ -54,6 +55,8 @@ public static EventBatch translate(FSEditLogOp op) { .perms(addOp.permissions.getPermission()) .overwrite(addOp.overwrite) .defaultBlockSize(addOp.blockSize) + .erasureCoded(addOp.erasureCodingPolicyId + != ErasureCodeConstants.REPLICATION_POLICY_ID) .iNodeType(Event.CreateEvent.INodeType.FILE).build() }); } else { // append return new EventBatch(op.txid, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java index b0b85e75af4..05d3c63e52e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -140,6 +141,7 @@ public void testBasic() throws IOException, URISyntaxException, client.rename("/file5", "/dir"); // RenameOldOp -> RenameEvent //TruncateOp -> TruncateEvent client.truncate("/truncate_file", BLOCK_SIZE); + client.create("/file_ec_test1", false); EventBatch batch = null; // RenameOp @@ -180,6 +182,8 @@ public void testBasic() throws IOException, URISyntaxException, Assert.assertTrue(ce.getSymlinkTarget() == null); Assert.assertTrue(ce.getOverwrite()); Assert.assertEquals(BLOCK_SIZE, ce.getDefaultBlockSize()); + Assert.assertTrue(ce.isErasureCoded().isPresent()); + Assert.assertFalse(ce.isErasureCoded().get()); LOG.info(ce.toString()); Assert.assertTrue(ce.toString().startsWith("CreateEvent [INodeType=")); @@ -395,6 +399,25 @@ public void testBasic() throws IOException, URISyntaxException, LOG.info(et.toString()); Assert.assertTrue(et.toString().startsWith("TruncateEvent [path=")); + // CreateEvent without overwrite + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() + == Event.EventType.CREATE); + ce = (Event.CreateEvent) batch.getEvents()[0]; + Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE); + Assert.assertTrue(ce.getPath().equals("/file_ec_test1")); + Assert.assertTrue(ce.getCtime() > 0); + Assert.assertTrue(ce.getReplication() > 0); + Assert.assertTrue(ce.getSymlinkTarget() == null); + Assert.assertFalse(ce.getOverwrite()); + Assert.assertEquals(BLOCK_SIZE, ce.getDefaultBlockSize()); + Assert.assertTrue(ce.isErasureCoded().isPresent()); + Assert.assertFalse(ce.isErasureCoded().get()); + LOG.info(ce.toString()); + Assert.assertTrue(ce.toString().startsWith("CreateEvent [INodeType=")); + // Returns null when there are no further events Assert.assertTrue(eis.poll() == null); @@ -410,6 +433,83 @@ public void testBasic() throws IOException, URISyntaxException, } } + @Test(timeout = 120000) + public void testErasureCodedFiles() throws Exception { + ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy(); + final int dataUnits = ecPolicy.getNumDataUnits(); + final int parityUnits = ecPolicy.getNumParityUnits(); + + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, ecPolicy.getCellSize()); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); + // so that we can get an atime change + conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 1); + + MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf); + builder.getDfsBuilder().numDataNodes(dataUnits + parityUnits); + MiniQJMHACluster cluster = builder.build(); + + try { + cluster.getDfsCluster().waitActive(); + cluster.getDfsCluster().transitionToActive(0); + DFSClient client = new DFSClient(cluster.getDfsCluster().getNameNode(0) + .getNameNodeAddress(), conf); + DistributedFileSystem fs = + (DistributedFileSystem)cluster.getDfsCluster().getFileSystem(0); + + Path ecDir = new Path("/ecdir"); + fs.mkdirs(ecDir); + fs.setErasureCodingPolicy(ecDir, ecPolicy.getName()); + + DFSInotifyEventInputStream eis = client.getInotifyEventStream(); + + int sz = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize(); + byte[] contents = new byte[sz]; + DFSTestUtil.writeFile(fs, new Path("/ecdir/file_ec_test2"), contents); + + EventBatch batch = null; + + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + long txid = batch.getTxid(); + long eventsBehind = eis.getTxidsBehindEstimate(); + Assert.assertTrue(batch.getEvents()[0].getEventType() + == Event.EventType.CREATE); + Event.CreateEvent ce = (Event.CreateEvent) batch.getEvents()[0]; + Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE); + Assert.assertTrue(ce.getPath().equals("/ecdir/file_ec_test2")); + Assert.assertTrue(ce.getCtime() > 0); + Assert.assertEquals(1, ce.getReplication()); + Assert.assertTrue(ce.getSymlinkTarget() == null); + Assert.assertTrue(ce.getOverwrite()); + Assert.assertEquals(ecPolicy.getCellSize(), ce.getDefaultBlockSize()); + Assert.assertTrue(ce.isErasureCoded().isPresent()); + Assert.assertTrue(ce.isErasureCoded().get()); + LOG.info(ce.toString()); + Assert.assertTrue(ce.toString().startsWith("CreateEvent [INodeType=")); + + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() + == Event.EventType.CLOSE); + Assert.assertTrue(((Event.CloseEvent) batch.getEvents()[0]).getPath() + .equals("/ecdir/file_ec_test2")); + + // Returns null when there are no further events + Assert.assertTrue(eis.poll() == null); + + // make sure the estimate hasn't changed since the above assertion + // tells us that we are fully caught up to the current namesystem state + // and we should not have been behind at all when eventsBehind was set + // either, since there were few enough events that they should have all + // been read to the client during the first poll() call + Assert.assertTrue(eis.getTxidsBehindEstimate() == eventsBehind); + } finally { + cluster.shutdown(); + } + } + @Test(timeout = 120000) public void testNNFailover() throws IOException, URISyntaxException, MissingEventsException {