HDFS-13956. iNotify should include information to identify a file as either replicated or erasure coded. Contributed by Hrishikesh Gadre.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Hrishikesh Gadre 2018-10-10 10:23:07 -07:00 committed by Wei-Chiu Chuang
parent 3ead525c71
commit bf3d591f0c
5 changed files with 152 additions and 26 deletions

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import java.util.List;
import java.util.Optional;
/**
* Events sent by the inotify system. Note that no events are necessarily sent
@ -112,6 +113,7 @@ public abstract class Event {
private String symlinkTarget;
private boolean overwrite;
private long defaultBlockSize;
private Optional<Boolean> erasureCoded;
public static class Builder {
private INodeType iNodeType;
@ -124,6 +126,7 @@ public abstract class Event {
private String symlinkTarget;
private boolean overwrite;
private long defaultBlockSize = 0;
private Optional<Boolean> erasureCoded = Optional.empty();
public Builder iNodeType(INodeType type) {
this.iNodeType = type;
@ -175,6 +178,11 @@ public abstract class Event {
return this;
}
public Builder erasureCoded(boolean ecCoded) {
this.erasureCoded = Optional.of(ecCoded);
return this;
}
public CreateEvent build() {
return new CreateEvent(this);
}
@ -192,6 +200,7 @@ public abstract class Event {
this.symlinkTarget = b.symlinkTarget;
this.overwrite = b.overwrite;
this.defaultBlockSize = b.defaultBlockSize;
this.erasureCoded = b.erasureCoded;
}
public INodeType getiNodeType() {
@ -243,6 +252,10 @@ public abstract class Event {
return defaultBlockSize;
}
public Optional<Boolean> isErasureCoded() {
return erasureCoded;
}
@Override
@InterfaceStability.Unstable
public String toString() {
@ -261,6 +274,7 @@ public abstract class Event {
content.append("overwrite=").append(overwrite)
.append(", defaultBlockSize=").append(defaultBlockSize)
.append(", erasureCoded=").append(erasureCoded)
.append("]");
return content.toString();
}

View File

@ -908,7 +908,7 @@ public class PBHelperClient {
case EVENT_CREATE:
InotifyProtos.CreateEventProto create =
InotifyProtos.CreateEventProto.parseFrom(p.getContents());
events.add(new Event.CreateEvent.Builder()
Event.CreateEvent.Builder builder = new Event.CreateEvent.Builder()
.iNodeType(createTypeConvert(create.getType()))
.path(create.getPath())
.ctime(create.getCtime())
@ -919,7 +919,11 @@ public class PBHelperClient {
.symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
create.getSymlinkTarget())
.defaultBlockSize(create.getDefaultBlockSize())
.overwrite(create.getOverwrite()).build());
.overwrite(create.getOverwrite());
if (create.hasErasureCoded()) {
builder.erasureCoded(create.getErasureCoded());
}
events.add(builder.build());
break;
case EVENT_METADATA:
InotifyProtos.MetadataUpdateEventProto meta =
@ -2909,11 +2913,9 @@ public class PBHelperClient {
break;
case CREATE:
Event.CreateEvent ce2 = (Event.CreateEvent) e;
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_CREATE)
.setContents(
InotifyProtos.CreateEventProto.newBuilder()
.setType(createTypeConvert(ce2.getiNodeType()))
InotifyProtos.CreateEventProto.Builder pB =
(InotifyProtos.CreateEventProto.newBuilder());
pB.setType(createTypeConvert(ce2.getiNodeType()))
.setPath(ce2.getPath())
.setCtime(ce2.getCtime())
.setOwnerName(ce2.getOwnerName())
@ -2923,8 +2925,14 @@ public class PBHelperClient {
.setSymlinkTarget(ce2.getSymlinkTarget() == null ?
"" : ce2.getSymlinkTarget())
.setDefaultBlockSize(ce2.getDefaultBlockSize())
.setOverwrite(ce2.getOverwrite()).build().toByteString()
).build());
.setOverwrite(ce2.getOverwrite());
if (ce2.isErasureCoded().isPresent()) {
pB.setErasureCoded(ce2.isErasureCoded().get());
}
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_CREATE)
.setContents(pB.build().toByteString())
.build());
break;
case METADATA:
Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;

View File

@ -80,6 +80,7 @@ message CreateEventProto {
optional string symlinkTarget = 8;
optional bool overwrite = 9;
optional int64 defaultBlockSize = 10 [default=0];
optional bool erasureCoded = 11;
}
message CloseEventProto {

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.io.erasurecode.ErasureCodeConstants;
import java.util.List;
@ -54,6 +55,8 @@ public class InotifyFSEditLogOpTranslator {
.perms(addOp.permissions.getPermission())
.overwrite(addOp.overwrite)
.defaultBlockSize(addOp.blockSize)
.erasureCoded(addOp.erasureCodingPolicyId
!= ErasureCodeConstants.REPLICATION_POLICY_ID)
.iNodeType(Event.CreateEvent.INodeType.FILE).build() });
} else { // append
return new EventBatch(op.txid,

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@ -140,6 +141,7 @@ public class TestDFSInotifyEventInputStream {
client.rename("/file5", "/dir"); // RenameOldOp -> RenameEvent
//TruncateOp -> TruncateEvent
client.truncate("/truncate_file", BLOCK_SIZE);
client.create("/file_ec_test1", false);
EventBatch batch = null;
// RenameOp
@ -180,6 +182,8 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce.getSymlinkTarget() == null);
Assert.assertTrue(ce.getOverwrite());
Assert.assertEquals(BLOCK_SIZE, ce.getDefaultBlockSize());
Assert.assertTrue(ce.isErasureCoded().isPresent());
Assert.assertFalse(ce.isErasureCoded().get());
LOG.info(ce.toString());
Assert.assertTrue(ce.toString().startsWith("CreateEvent [INodeType="));
@ -395,6 +399,25 @@ public class TestDFSInotifyEventInputStream {
LOG.info(et.toString());
Assert.assertTrue(et.toString().startsWith("TruncateEvent [path="));
// CreateEvent without overwrite
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType()
== Event.EventType.CREATE);
ce = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
Assert.assertTrue(ce.getPath().equals("/file_ec_test1"));
Assert.assertTrue(ce.getCtime() > 0);
Assert.assertTrue(ce.getReplication() > 0);
Assert.assertTrue(ce.getSymlinkTarget() == null);
Assert.assertFalse(ce.getOverwrite());
Assert.assertEquals(BLOCK_SIZE, ce.getDefaultBlockSize());
Assert.assertTrue(ce.isErasureCoded().isPresent());
Assert.assertFalse(ce.isErasureCoded().get());
LOG.info(ce.toString());
Assert.assertTrue(ce.toString().startsWith("CreateEvent [INodeType="));
// Returns null when there are no further events
Assert.assertTrue(eis.poll() == null);
@ -410,6 +433,83 @@ public class TestDFSInotifyEventInputStream {
}
}
@Test(timeout = 120000)
public void testErasureCodedFiles() throws Exception {
ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy();
final int dataUnits = ecPolicy.getNumDataUnits();
final int parityUnits = ecPolicy.getNumParityUnits();
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, ecPolicy.getCellSize());
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
// so that we can get an atime change
conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 1);
MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf);
builder.getDfsBuilder().numDataNodes(dataUnits + parityUnits);
MiniQJMHACluster cluster = builder.build();
try {
cluster.getDfsCluster().waitActive();
cluster.getDfsCluster().transitionToActive(0);
DFSClient client = new DFSClient(cluster.getDfsCluster().getNameNode(0)
.getNameNodeAddress(), conf);
DistributedFileSystem fs =
(DistributedFileSystem)cluster.getDfsCluster().getFileSystem(0);
Path ecDir = new Path("/ecdir");
fs.mkdirs(ecDir);
fs.setErasureCodingPolicy(ecDir, ecPolicy.getName());
DFSInotifyEventInputStream eis = client.getInotifyEventStream();
int sz = ecPolicy.getNumDataUnits() * ecPolicy.getCellSize();
byte[] contents = new byte[sz];
DFSTestUtil.writeFile(fs, new Path("/ecdir/file_ec_test2"), contents);
EventBatch batch = null;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
long txid = batch.getTxid();
long eventsBehind = eis.getTxidsBehindEstimate();
Assert.assertTrue(batch.getEvents()[0].getEventType()
== Event.EventType.CREATE);
Event.CreateEvent ce = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
Assert.assertTrue(ce.getPath().equals("/ecdir/file_ec_test2"));
Assert.assertTrue(ce.getCtime() > 0);
Assert.assertEquals(1, ce.getReplication());
Assert.assertTrue(ce.getSymlinkTarget() == null);
Assert.assertTrue(ce.getOverwrite());
Assert.assertEquals(ecPolicy.getCellSize(), ce.getDefaultBlockSize());
Assert.assertTrue(ce.isErasureCoded().isPresent());
Assert.assertTrue(ce.isErasureCoded().get());
LOG.info(ce.toString());
Assert.assertTrue(ce.toString().startsWith("CreateEvent [INodeType="));
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType()
== Event.EventType.CLOSE);
Assert.assertTrue(((Event.CloseEvent) batch.getEvents()[0]).getPath()
.equals("/ecdir/file_ec_test2"));
// Returns null when there are no further events
Assert.assertTrue(eis.poll() == null);
// make sure the estimate hasn't changed since the above assertion
// tells us that we are fully caught up to the current namesystem state
// and we should not have been behind at all when eventsBehind was set
// either, since there were few enough events that they should have all
// been read to the client during the first poll() call
Assert.assertTrue(eis.getTxidsBehindEstimate() == eventsBehind);
} finally {
cluster.shutdown();
}
}
@Test(timeout = 120000)
public void testNNFailover() throws IOException, URISyntaxException,
MissingEventsException {