From 75a326aaff8c92349701d9b3473c3070b8c2be44 Mon Sep 17 00:00:00 2001 From: Colin Patrick Mccabe Date: Tue, 25 Nov 2014 17:44:34 -0800 Subject: [PATCH] HDFS-7446. HDFS inotify should have the ability to determine what txid it has read up to (cmccabe) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/DFSInotifyEventInputStream.java | 65 ++-- .../hadoop/hdfs/inotify/EventBatch.java | 41 +++ .../{EventsList.java => EventBatchList.java} | 26 +- .../hadoop/hdfs/protocol/ClientProtocol.java | 8 +- .../ClientNamenodeProtocolTranslatorPB.java | 4 +- .../hadoop/hdfs/protocolPB/PBHelper.java | 343 ++++++++++-------- .../InotifyFSEditLogOpTranslator.java | 74 ++-- .../server/namenode/NameNodeRpcServer.java | 23 +- .../hadoop-hdfs/src/main/proto/inotify.proto | 10 +- .../hdfs/TestDFSInotifyEventInputStream.java | 209 ++++++----- 11 files changed, 467 insertions(+), 339 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java rename hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/{EventsList.java => EventBatchList.java} (65%) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4d2fb054912..13dda8825ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -419,6 +419,9 @@ Release 2.7.0 - UNRELEASED HDFS-7462. Consolidate implementation of mkdirs() into a single class. (wheat9) + HDFS-7446. HDFS inotify should have the ability to determine what txid it + has read up to (cmccabe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java index 73c5f55a43b..83b92b95387 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java @@ -19,11 +19,10 @@ package org.apache.hadoop.hdfs; import com.google.common.collect.Iterators; -import com.google.common.util.concurrent.UncheckedExecutionException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.inotify.Event; -import org.apache.hadoop.hdfs.inotify.EventsList; +import org.apache.hadoop.hdfs.inotify.EventBatch; +import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.inotify.MissingEventsException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.util.Time; @@ -33,13 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Iterator; import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * Stream for reading inotify events. DFSInotifyEventInputStreams should not @@ -52,7 +45,7 @@ public class DFSInotifyEventInputStream { .class); private final ClientProtocol namenode; - private Iterator it; + private Iterator it; private long lastReadTxid; /** * The most recent txid the NameNode told us it has sync'ed -- helps us @@ -78,22 +71,22 @@ public class DFSInotifyEventInputStream { } /** - * Returns the next event in the stream or null if no new events are currently - * available. + * Returns the next batch of events in the stream or null if no new + * batches are currently available. * * @throws IOException because of network error or edit log * corruption. Also possible if JournalNodes are unresponsive in the * QJM setting (even one unresponsive JournalNode is enough in rare cases), * so catching this exception and retrying at least a few times is * recommended. - * @throws MissingEventsException if we cannot return the next event in the - * stream because the data for the event (and possibly some subsequent events) - * has been deleted (generally because this stream is a very large number of - * events behind the current state of the NameNode). It is safe to continue - * reading from the stream after this exception is thrown -- the next - * available event will be returned. + * @throws MissingEventsException if we cannot return the next batch in the + * stream because the data for the events (and possibly some subsequent + * events) has been deleted (generally because this stream is a very large + * number of transactions behind the current state of the NameNode). It is + * safe to continue reading from the stream after this exception is thrown + * The next available batch of events will be returned. */ - public Event poll() throws IOException, MissingEventsException { + public EventBatch poll() throws IOException, MissingEventsException { // need to keep retrying until the NN sends us the latest committed txid if (lastReadTxid == -1) { LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); @@ -101,14 +94,14 @@ public class DFSInotifyEventInputStream { return null; } if (!it.hasNext()) { - EventsList el = namenode.getEditsFromTxid(lastReadTxid + 1); + EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1); if (el.getLastTxid() != -1) { // we only want to set syncTxid when we were actually able to read some // edits on the NN -- otherwise it will seem like edits are being // generated faster than we can read them when the problem is really // that we are temporarily unable to read edits syncTxid = el.getSyncTxid(); - it = el.getEvents().iterator(); + it = el.getBatches().iterator(); long formerLastReadTxid = lastReadTxid; lastReadTxid = el.getLastTxid(); if (el.getFirstTxid() != formerLastReadTxid + 1) { @@ -131,18 +124,18 @@ public class DFSInotifyEventInputStream { } /** - * Return a estimate of how many events behind the NameNode's current state - * this stream is. Clients should periodically call this method and check if - * its result is steadily increasing, which indicates that they are falling - * behind (i.e. events are being generated faster than the client is reading - * them). If a client falls too far behind events may be deleted before the - * client can read them. + * Return a estimate of how many transaction IDs behind the NameNode's + * current state this stream is. Clients should periodically call this method + * and check if its result is steadily increasing, which indicates that they + * are falling behind (i.e. transaction are being generated faster than the + * client is reading them). If a client falls too far behind events may be + * deleted before the client can read them. *

* A return value of -1 indicates that an estimate could not be produced, and * should be ignored. The value returned by this method is really only useful * when compared to previous or subsequent returned values. */ - public long getEventsBehindEstimate() { + public long getTxidsBehindEstimate() { if (syncTxid == 0) { return -1; } else { @@ -155,8 +148,8 @@ public class DFSInotifyEventInputStream { } /** - * Returns the next event in the stream, waiting up to the specified amount of - * time for a new event. Returns null if a new event is not available at the + * Returns the next event batch in the stream, waiting up to the specified + * amount of time for a new batch. Returns null if one is not available at the * end of the specified amount of time. The time before the method returns may * exceed the specified amount of time by up to the time required for an RPC * to the NameNode. @@ -168,12 +161,12 @@ public class DFSInotifyEventInputStream { * see {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ - public Event poll(long time, TimeUnit tu) throws IOException, + public EventBatch poll(long time, TimeUnit tu) throws IOException, InterruptedException, MissingEventsException { long initialTime = Time.monotonicNow(); long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); long nextWait = INITIAL_WAIT_MS; - Event next = null; + EventBatch next = null; while ((next = poll()) == null) { long timeLeft = totalWait - (Time.monotonicNow() - initialTime); if (timeLeft <= 0) { @@ -193,17 +186,17 @@ public class DFSInotifyEventInputStream { } /** - * Returns the next event in the stream, waiting indefinitely if a new event - * is not immediately available. + * Returns the next batch of events in the stream, waiting indefinitely if + * a new batch is not immediately available. * * @throws IOException see {@link DFSInotifyEventInputStream#poll()} * @throws MissingEventsException see * {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ - public Event take() throws IOException, InterruptedException, + public EventBatch take() throws IOException, InterruptedException, MissingEventsException { - Event next = null; + EventBatch next = null; int nextWaitMin = INITIAL_WAIT_MS; while ((next = poll()) == null) { // sleep for a random period between nextWaitMin and nextWaitMin * 2 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java new file mode 100644 index 00000000000..0ad1070c7e8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.inotify; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A batch of events that all happened on the same transaction ID. + */ +@InterfaceAudience.Public +public class EventBatch { + private final long txid; + private final Event[] events; + + public EventBatch(long txid, Event[] events) { + this.txid = txid; + this.events = events; + } + + public long getTxid() { + return txid; + } + + public Event[] getEvents() { return events; } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java similarity index 65% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java index 6d02d3c2980..9c9703869b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java @@ -23,30 +23,30 @@ import org.apache.hadoop.classification.InterfaceAudience; import java.util.List; /** - * Contains a set of events, the transaction ID in the edit log up to which we - * read to produce these events, and the first txid we observed when producing - * these events (the last of which is for the purpose of determining whether we - * have missed events due to edit deletion). Also contains the most recent txid - * that the NameNode has sync'ed, so the client can determine how far behind in - * the edit log it is. + * Contains a list of event batches, the transaction ID in the edit log up to + * which we read to produce these events, and the first txid we observed when + * producing these events (the last of which is for the purpose of determining + * whether we have missed events due to edit deletion). Also contains the most + * recent txid that the NameNode has sync'ed, so the client can determine how + * far behind in the edit log it is. */ @InterfaceAudience.Private -public class EventsList { - private List events; +public class EventBatchList { + private List batches; private long firstTxid; private long lastTxid; private long syncTxid; - public EventsList(List events, long firstTxid, long lastTxid, - long syncTxid) { - this.events = events; + public EventBatchList(List batches, long firstTxid, + long lastTxid, long syncTxid) { + this.batches = batches; this.firstTxid = firstTxid; this.lastTxid = lastTxid; this.syncTxid = syncTxid; } - public List getEvents() { - return events; + public List getBatches() { + return batches; } public long getFirstTxid() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 5f8bf3082e4..2301575f0e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -43,7 +43,7 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.inotify.EventsList; +import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -1408,9 +1408,9 @@ public interface ClientProtocol { public long getCurrentEditLogTxid() throws IOException; /** - * Get an ordered list of events corresponding to the edit log transactions - * from txid onwards. + * Get an ordered list of batches of events corresponding to the edit log + * transactions for txids equal to or greater than txid. */ @Idempotent - public EventsList getEditsFromTxid(long txid) throws IOException; + public EventBatchList getEditsFromTxid(long txid) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index ab14cd88b6a..58049204fc4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -45,7 +45,7 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.inotify.EventsList; +import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -1485,7 +1485,7 @@ public class ClientNamenodeProtocolTranslatorPB implements } @Override - public EventsList getEditsFromTxid(long txid) throws IOException { + public EventBatchList getEditsFromTxid(long txid) throws IOException { GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder() .setTxid(txid).build(); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 53ea6cf823d..2a5edc8dc94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -46,11 +46,12 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; +import org.apache.hadoop.hdfs.inotify.EventBatch; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.inotify.Event; -import org.apache.hadoop.hdfs.inotify.EventsList; +import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -2517,173 +2518,197 @@ public class PBHelper { } } - public static EventsList convert(GetEditsFromTxidResponseProto resp) throws + public static EventBatchList convert(GetEditsFromTxidResponseProto resp) throws IOException { - List events = Lists.newArrayList(); - for (InotifyProtos.EventProto p : resp.getEventsList().getEventsList()) { - switch(p.getType()) { - case EVENT_CLOSE: - InotifyProtos.CloseEventProto close = - InotifyProtos.CloseEventProto.parseFrom(p.getContents()); - events.add(new Event.CloseEvent(close.getPath(), close.getFileSize(), - close.getTimestamp())); - break; - case EVENT_CREATE: - InotifyProtos.CreateEventProto create = - InotifyProtos.CreateEventProto.parseFrom(p.getContents()); - events.add(new Event.CreateEvent.Builder() - .iNodeType(createTypeConvert(create.getType())) - .path(create.getPath()) - .ctime(create.getCtime()) - .ownerName(create.getOwnerName()) - .groupName(create.getGroupName()) - .perms(convert(create.getPerms())) - .replication(create.getReplication()) - .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null : - create.getSymlinkTarget()) - .overwrite(create.getOverwrite()).build()); - break; - case EVENT_METADATA: - InotifyProtos.MetadataUpdateEventProto meta = - InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents()); - events.add(new Event.MetadataUpdateEvent.Builder() - .path(meta.getPath()) - .metadataType(metadataUpdateTypeConvert(meta.getType())) - .mtime(meta.getMtime()) - .atime(meta.getAtime()) - .replication(meta.getReplication()) - .ownerName( - meta.getOwnerName().isEmpty() ? null : meta.getOwnerName()) - .groupName( - meta.getGroupName().isEmpty() ? null : meta.getGroupName()) - .perms(meta.hasPerms() ? convert(meta.getPerms()) : null) - .acls(meta.getAclsList().isEmpty() ? null : convertAclEntry( - meta.getAclsList())) - .xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs( - meta.getXAttrsList())) - .xAttrsRemoved(meta.getXAttrsRemoved()) - .build()); - break; - case EVENT_RENAME: - InotifyProtos.RenameEventProto rename = - InotifyProtos.RenameEventProto.parseFrom(p.getContents()); - events.add(new Event.RenameEvent(rename.getSrcPath(), rename.getDestPath(), - rename.getTimestamp())); - break; - case EVENT_APPEND: - InotifyProtos.AppendEventProto reopen = - InotifyProtos.AppendEventProto.parseFrom(p.getContents()); - events.add(new Event.AppendEvent(reopen.getPath())); - break; - case EVENT_UNLINK: - InotifyProtos.UnlinkEventProto unlink = - InotifyProtos.UnlinkEventProto.parseFrom(p.getContents()); - events.add(new Event.UnlinkEvent(unlink.getPath(), unlink.getTimestamp())); - break; - default: - throw new RuntimeException("Unexpected inotify event type: " + - p.getType()); - } + final InotifyProtos.EventsListProto list = resp.getEventsList(); + final long firstTxid = list.getFirstTxid(); + final long lastTxid = list.getLastTxid(); + + List batches = Lists.newArrayList(); + if (list.getEventsList().size() > 0) { + throw new IOException("Can't handle old inotify server response."); } - return new EventsList(events, resp.getEventsList().getFirstTxid(), + for (InotifyProtos.EventBatchProto bp : list.getBatchList()) { + long txid = bp.getTxid(); + if ((txid != -1) && ((txid < firstTxid) || (txid > lastTxid))) { + throw new IOException("Error converting TxidResponseProto: got a " + + "transaction id " + txid + " that was outside the range of [" + + firstTxid + ", " + lastTxid + "]."); + } + List events = Lists.newArrayList(); + for (InotifyProtos.EventProto p : bp.getEventsList()) { + switch (p.getType()) { + case EVENT_CLOSE: + InotifyProtos.CloseEventProto close = + InotifyProtos.CloseEventProto.parseFrom(p.getContents()); + events.add(new Event.CloseEvent(close.getPath(), + close.getFileSize(), close.getTimestamp())); + break; + case EVENT_CREATE: + InotifyProtos.CreateEventProto create = + InotifyProtos.CreateEventProto.parseFrom(p.getContents()); + events.add(new Event.CreateEvent.Builder() + .iNodeType(createTypeConvert(create.getType())) + .path(create.getPath()) + .ctime(create.getCtime()) + .ownerName(create.getOwnerName()) + .groupName(create.getGroupName()) + .perms(convert(create.getPerms())) + .replication(create.getReplication()) + .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null : + create.getSymlinkTarget()) + .overwrite(create.getOverwrite()).build()); + break; + case EVENT_METADATA: + InotifyProtos.MetadataUpdateEventProto meta = + InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents()); + events.add(new Event.MetadataUpdateEvent.Builder() + .path(meta.getPath()) + .metadataType(metadataUpdateTypeConvert(meta.getType())) + .mtime(meta.getMtime()) + .atime(meta.getAtime()) + .replication(meta.getReplication()) + .ownerName( + meta.getOwnerName().isEmpty() ? null : meta.getOwnerName()) + .groupName( + meta.getGroupName().isEmpty() ? null : meta.getGroupName()) + .perms(meta.hasPerms() ? convert(meta.getPerms()) : null) + .acls(meta.getAclsList().isEmpty() ? null : convertAclEntry( + meta.getAclsList())) + .xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs( + meta.getXAttrsList())) + .xAttrsRemoved(meta.getXAttrsRemoved()) + .build()); + break; + case EVENT_RENAME: + InotifyProtos.RenameEventProto rename = + InotifyProtos.RenameEventProto.parseFrom(p.getContents()); + events.add(new Event.RenameEvent(rename.getSrcPath(), + rename.getDestPath(), rename.getTimestamp())); + break; + case EVENT_APPEND: + InotifyProtos.AppendEventProto reopen = + InotifyProtos.AppendEventProto.parseFrom(p.getContents()); + events.add(new Event.AppendEvent(reopen.getPath())); + break; + case EVENT_UNLINK: + InotifyProtos.UnlinkEventProto unlink = + InotifyProtos.UnlinkEventProto.parseFrom(p.getContents()); + events.add(new Event.UnlinkEvent(unlink.getPath(), + unlink.getTimestamp())); + break; + default: + throw new RuntimeException("Unexpected inotify event type: " + + p.getType()); + } + } + batches.add(new EventBatch(txid, events.toArray(new Event[0]))); + } + return new EventBatchList(batches, resp.getEventsList().getFirstTxid(), resp.getEventsList().getLastTxid(), resp.getEventsList().getSyncTxid()); } - public static GetEditsFromTxidResponseProto convertEditsResponse(EventsList el) { + public static GetEditsFromTxidResponseProto convertEditsResponse(EventBatchList el) { InotifyProtos.EventsListProto.Builder builder = InotifyProtos.EventsListProto.newBuilder(); - for (Event e : el.getEvents()) { - switch(e.getEventType()) { - case CLOSE: - Event.CloseEvent ce = (Event.CloseEvent) e; - builder.addEvents(InotifyProtos.EventProto.newBuilder() - .setType(InotifyProtos.EventType.EVENT_CLOSE) - .setContents( - InotifyProtos.CloseEventProto.newBuilder() - .setPath(ce.getPath()) - .setFileSize(ce.getFileSize()) - .setTimestamp(ce.getTimestamp()).build().toByteString() - ).build()); - break; - case CREATE: - Event.CreateEvent ce2 = (Event.CreateEvent) e; - builder.addEvents(InotifyProtos.EventProto.newBuilder() - .setType(InotifyProtos.EventType.EVENT_CREATE) - .setContents( - InotifyProtos.CreateEventProto.newBuilder() - .setType(createTypeConvert(ce2.getiNodeType())) - .setPath(ce2.getPath()) - .setCtime(ce2.getCtime()) - .setOwnerName(ce2.getOwnerName()) - .setGroupName(ce2.getGroupName()) - .setPerms(convert(ce2.getPerms())) - .setReplication(ce2.getReplication()) - .setSymlinkTarget(ce2.getSymlinkTarget() == null ? - "" : ce2.getSymlinkTarget()) - .setOverwrite(ce2.getOverwrite()).build().toByteString() - ).build()); - break; - case METADATA: - Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e; - InotifyProtos.MetadataUpdateEventProto.Builder metaB = - InotifyProtos.MetadataUpdateEventProto.newBuilder() - .setPath(me.getPath()) - .setType(metadataUpdateTypeConvert(me.getMetadataType())) - .setMtime(me.getMtime()) - .setAtime(me.getAtime()) - .setReplication(me.getReplication()) - .setOwnerName(me.getOwnerName() == null ? "" : - me.getOwnerName()) - .setGroupName(me.getGroupName() == null ? "" : - me.getGroupName()) - .addAllAcls(me.getAcls() == null ? - Lists.newArrayList() : - convertAclEntryProto(me.getAcls())) - .addAllXAttrs(me.getxAttrs() == null ? - Lists.newArrayList() : - convertXAttrProto(me.getxAttrs())) - .setXAttrsRemoved(me.isxAttrsRemoved()); - if (me.getPerms() != null) { - metaB.setPerms(convert(me.getPerms())); + for (EventBatch b : el.getBatches()) { + List events = Lists.newArrayList(); + for (Event e : b.getEvents()) { + switch (e.getEventType()) { + case CLOSE: + Event.CloseEvent ce = (Event.CloseEvent) e; + events.add(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_CLOSE) + .setContents( + InotifyProtos.CloseEventProto.newBuilder() + .setPath(ce.getPath()) + .setFileSize(ce.getFileSize()) + .setTimestamp(ce.getTimestamp()).build().toByteString() + ).build()); + break; + case CREATE: + Event.CreateEvent ce2 = (Event.CreateEvent) e; + events.add(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_CREATE) + .setContents( + InotifyProtos.CreateEventProto.newBuilder() + .setType(createTypeConvert(ce2.getiNodeType())) + .setPath(ce2.getPath()) + .setCtime(ce2.getCtime()) + .setOwnerName(ce2.getOwnerName()) + .setGroupName(ce2.getGroupName()) + .setPerms(convert(ce2.getPerms())) + .setReplication(ce2.getReplication()) + .setSymlinkTarget(ce2.getSymlinkTarget() == null ? + "" : ce2.getSymlinkTarget()) + .setOverwrite(ce2.getOverwrite()).build().toByteString() + ).build()); + break; + case METADATA: + Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e; + InotifyProtos.MetadataUpdateEventProto.Builder metaB = + InotifyProtos.MetadataUpdateEventProto.newBuilder() + .setPath(me.getPath()) + .setType(metadataUpdateTypeConvert(me.getMetadataType())) + .setMtime(me.getMtime()) + .setAtime(me.getAtime()) + .setReplication(me.getReplication()) + .setOwnerName(me.getOwnerName() == null ? "" : + me.getOwnerName()) + .setGroupName(me.getGroupName() == null ? "" : + me.getGroupName()) + .addAllAcls(me.getAcls() == null ? + Lists.newArrayList() : + convertAclEntryProto(me.getAcls())) + .addAllXAttrs(me.getxAttrs() == null ? + Lists.newArrayList() : + convertXAttrProto(me.getxAttrs())) + .setXAttrsRemoved(me.isxAttrsRemoved()); + if (me.getPerms() != null) { + metaB.setPerms(convert(me.getPerms())); + } + events.add(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_METADATA) + .setContents(metaB.build().toByteString()) + .build()); + break; + case RENAME: + Event.RenameEvent re = (Event.RenameEvent) e; + events.add(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_RENAME) + .setContents( + InotifyProtos.RenameEventProto.newBuilder() + .setSrcPath(re.getSrcPath()) + .setDestPath(re.getDstPath()) + .setTimestamp(re.getTimestamp()).build().toByteString() + ).build()); + break; + case APPEND: + Event.AppendEvent re2 = (Event.AppendEvent) e; + events.add(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_APPEND) + .setContents( + InotifyProtos.AppendEventProto.newBuilder() + .setPath(re2.getPath()).build().toByteString() + ).build()); + break; + case UNLINK: + Event.UnlinkEvent ue = (Event.UnlinkEvent) e; + events.add(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_UNLINK) + .setContents( + InotifyProtos.UnlinkEventProto.newBuilder() + .setPath(ue.getPath()) + .setTimestamp(ue.getTimestamp()).build().toByteString() + ).build()); + break; + default: + throw new RuntimeException("Unexpected inotify event: " + e); } - builder.addEvents(InotifyProtos.EventProto.newBuilder() - .setType(InotifyProtos.EventType.EVENT_METADATA) - .setContents(metaB.build().toByteString()) - .build()); - break; - case RENAME: - Event.RenameEvent re = (Event.RenameEvent) e; - builder.addEvents(InotifyProtos.EventProto.newBuilder() - .setType(InotifyProtos.EventType.EVENT_RENAME) - .setContents( - InotifyProtos.RenameEventProto.newBuilder() - .setSrcPath(re.getSrcPath()) - .setDestPath(re.getDstPath()) - .setTimestamp(re.getTimestamp()).build().toByteString() - ).build()); - break; - case APPEND: - Event.AppendEvent re2 = (Event.AppendEvent) e; - builder.addEvents(InotifyProtos.EventProto.newBuilder() - .setType(InotifyProtos.EventType.EVENT_APPEND) - .setContents( - InotifyProtos.AppendEventProto.newBuilder() - .setPath(re2.getPath()).build().toByteString() - ).build()); - break; - case UNLINK: - Event.UnlinkEvent ue = (Event.UnlinkEvent) e; - builder.addEvents(InotifyProtos.EventProto.newBuilder() - .setType(InotifyProtos.EventType.EVENT_UNLINK) - .setContents( - InotifyProtos.UnlinkEventProto.newBuilder() - .setPath(ue.getPath()) - .setTimestamp(ue.getTimestamp()).build().toByteString() - ).build()); - break; - default: - throw new RuntimeException("Unexpected inotify event: " + e); } + builder.addBatch(InotifyProtos.EventBatchProto.newBuilder(). + setTxid(b.getTxid()). + addAllEvents(events)); } builder.setFirstTxid(el.getFirstTxid()); builder.setLastTxid(el.getLastTxid()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java index 00a5f2518cb..cd3fc23b2aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode; import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.inotify.Event; +import org.apache.hadoop.hdfs.inotify.EventBatch; import org.apache.hadoop.hdfs.protocol.Block; import java.util.List; @@ -39,32 +40,35 @@ public class InotifyFSEditLogOpTranslator { return size; } - public static Event[] translate(FSEditLogOp op) { + public static EventBatch translate(FSEditLogOp op) { switch(op.opCode) { case OP_ADD: FSEditLogOp.AddOp addOp = (FSEditLogOp.AddOp) op; if (addOp.blocks.length == 0) { // create - return new Event[] { new Event.CreateEvent.Builder().path(addOp.path) + return new EventBatch(op.txid, + new Event[] { new Event.CreateEvent.Builder().path(addOp.path) .ctime(addOp.atime) .replication(addOp.replication) .ownerName(addOp.permissions.getUserName()) .groupName(addOp.permissions.getGroupName()) .perms(addOp.permissions.getPermission()) .overwrite(addOp.overwrite) - .iNodeType(Event.CreateEvent.INodeType.FILE).build() }; + .iNodeType(Event.CreateEvent.INodeType.FILE).build() }); } else { - return new Event[] { new Event.AppendEvent(addOp.path) }; + return new EventBatch(op.txid, + new Event[] { new Event.AppendEvent(addOp.path) }); } case OP_CLOSE: FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op; - return new Event[] { - new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) }; + return new EventBatch(op.txid, new Event[] { + new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) }); case OP_SET_REPLICATION: FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.REPLICATION) .path(setRepOp.path) - .replication(setRepOp.replication).build() }; + .replication(setRepOp.replication).build() }); case OP_CONCAT_DELETE: FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op; List events = Lists.newArrayList(); @@ -73,73 +77,83 @@ public class InotifyFSEditLogOpTranslator { events.add(new Event.UnlinkEvent(src, cdOp.timestamp)); } events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp)); - return events.toArray(new Event[0]); + return new EventBatch(op.txid, events.toArray(new Event[0])); case OP_RENAME_OLD: FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op; - return new Event[] { - new Event.RenameEvent(rnOpOld.src, rnOpOld.dst, rnOpOld.timestamp) }; + return new EventBatch(op.txid, new Event[] { + new Event.RenameEvent(rnOpOld.src, + rnOpOld.dst, rnOpOld.timestamp) }); case OP_RENAME: FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op; - return new Event[] { - new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) }; + return new EventBatch(op.txid, new Event[] { + new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) }); case OP_DELETE: FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op; - return new Event[] { new Event.UnlinkEvent(delOp.path, delOp.timestamp) }; + return new EventBatch(op.txid, new Event[] { + new Event.UnlinkEvent(delOp.path, delOp.timestamp) }); case OP_MKDIR: FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op; - return new Event[] { new Event.CreateEvent.Builder().path(mkOp.path) + return new EventBatch(op.txid, + new Event[] { new Event.CreateEvent.Builder().path(mkOp.path) .ctime(mkOp.timestamp) .ownerName(mkOp.permissions.getUserName()) .groupName(mkOp.permissions.getGroupName()) .perms(mkOp.permissions.getPermission()) - .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() }; + .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() }); case OP_SET_PERMISSIONS: FSEditLogOp.SetPermissionsOp permOp = (FSEditLogOp.SetPermissionsOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.PERMS) .path(permOp.src) - .perms(permOp.permissions).build() }; + .perms(permOp.permissions).build() }); case OP_SET_OWNER: FSEditLogOp.SetOwnerOp ownOp = (FSEditLogOp.SetOwnerOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.OWNER) .path(ownOp.src) - .ownerName(ownOp.username).groupName(ownOp.groupname).build() }; + .ownerName(ownOp.username).groupName(ownOp.groupname).build() }); case OP_TIMES: FSEditLogOp.TimesOp timesOp = (FSEditLogOp.TimesOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.TIMES) .path(timesOp.path) - .atime(timesOp.atime).mtime(timesOp.mtime).build() }; + .atime(timesOp.atime).mtime(timesOp.mtime).build() }); case OP_SYMLINK: FSEditLogOp.SymlinkOp symOp = (FSEditLogOp.SymlinkOp) op; - return new Event[] { new Event.CreateEvent.Builder().path(symOp.path) + return new EventBatch(op.txid, + new Event[] { new Event.CreateEvent.Builder().path(symOp.path) .ctime(symOp.atime) .ownerName(symOp.permissionStatus.getUserName()) .groupName(symOp.permissionStatus.getGroupName()) .perms(symOp.permissionStatus.getPermission()) .symlinkTarget(symOp.value) - .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() }; + .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() }); case OP_REMOVE_XATTR: FSEditLogOp.RemoveXAttrOp rxOp = (FSEditLogOp.RemoveXAttrOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS) .path(rxOp.src) .xAttrs(rxOp.xAttrs) - .xAttrsRemoved(true).build() }; + .xAttrsRemoved(true).build() }); case OP_SET_XATTR: FSEditLogOp.SetXAttrOp sxOp = (FSEditLogOp.SetXAttrOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS) .path(sxOp.src) .xAttrs(sxOp.xAttrs) - .xAttrsRemoved(false).build() }; + .xAttrsRemoved(false).build() }); case OP_SET_ACL: FSEditLogOp.SetAclOp saOp = (FSEditLogOp.SetAclOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.ACLS) .path(saOp.src) - .acls(saOp.aclEntries).build() }; + .acls(saOp.aclEntries).build() }); default: return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 4995f4fec45..876afbaa2df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -69,7 +69,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.inotify.Event; -import org.apache.hadoop.hdfs.inotify.EventsList; +import org.apache.hadoop.hdfs.inotify.EventBatch; +import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -1794,7 +1795,7 @@ class NameNodeRpcServer implements NamenodeProtocols { } @Override // ClientProtocol - public EventsList getEditsFromTxid(long txid) throws IOException { + public EventBatchList getEditsFromTxid(long txid) throws IOException { namesystem.checkOperation(OperationCategory.READ); // only active namesystem.checkSuperuserPrivilege(); int maxEventsPerRPC = nn.conf.getInt( @@ -1812,13 +1813,14 @@ class NameNodeRpcServer implements NamenodeProtocols { // guaranteed to have been written by this NameNode.) boolean readInProgress = syncTxid > 0; - List events = Lists.newArrayList(); + List batches = Lists.newArrayList(); + int totalEvents = 0; long maxSeenTxid = -1; long firstSeenTxid = -1; if (syncTxid > 0 && txid > syncTxid) { // we can't read past syncTxid, so there's no point in going any further - return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid); + return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid); } Collection streams = null; @@ -1830,7 +1832,7 @@ class NameNodeRpcServer implements NamenodeProtocols { // will result LOG.info("NN is transitioning from active to standby and FSEditLog " + "is closed -- could not read edits"); - return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid); + return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid); } boolean breakOuter = false; @@ -1848,9 +1850,10 @@ class NameNodeRpcServer implements NamenodeProtocols { break; } - Event[] eventsFromOp = InotifyFSEditLogOpTranslator.translate(op); - if (eventsFromOp != null) { - events.addAll(Arrays.asList(eventsFromOp)); + EventBatch eventBatch = InotifyFSEditLogOpTranslator.translate(op); + if (eventBatch != null) { + batches.add(eventBatch); + totalEvents += eventBatch.getEvents().length; } if (op.getTransactionId() > maxSeenTxid) { maxSeenTxid = op.getTransactionId(); @@ -1858,7 +1861,7 @@ class NameNodeRpcServer implements NamenodeProtocols { if (firstSeenTxid == -1) { firstSeenTxid = op.getTransactionId(); } - if (events.size() >= maxEventsPerRPC || (syncTxid > 0 && + if (totalEvents >= maxEventsPerRPC || (syncTxid > 0 && op.getTransactionId() == syncTxid)) { // we're done breakOuter = true; @@ -1873,7 +1876,7 @@ class NameNodeRpcServer implements NamenodeProtocols { } } - return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid); + return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto index a1d4d920d1d..e51c02cf771 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto @@ -48,6 +48,11 @@ message EventProto { required bytes contents = 2; } +message EventBatchProto { + required int64 txid = 1; + repeated EventProto events = 2; +} + enum INodeType { I_TYPE_FILE = 0x0; I_TYPE_DIRECTORY = 0x1; @@ -111,8 +116,9 @@ message UnlinkEventProto { } message EventsListProto { - repeated EventProto events = 1; + repeated EventProto events = 1; // deprecated required int64 firstTxid = 2; required int64 lastTxid = 3; required int64 syncTxid = 4; -} \ No newline at end of file + repeated EventBatchProto batch = 5; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java index a608ba83606..82db11074f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.inotify.Event; +import org.apache.hadoop.hdfs.inotify.EventBatch; import org.apache.hadoop.hdfs.inotify.MissingEventsException; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes; @@ -49,11 +50,17 @@ public class TestDFSInotifyEventInputStream { private static final Log LOG = LogFactory.getLog( TestDFSInotifyEventInputStream.class); - private static Event waitForNextEvent(DFSInotifyEventInputStream eis) + private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis) throws IOException, MissingEventsException { - Event next = null; - while ((next = eis.poll()) == null); - return next; + EventBatch batch = null; + while ((batch = eis.poll()) == null); + return batch; + } + + private static long checkTxid(EventBatch batch, long prevTxid){ + Assert.assertTrue("Previous txid " + prevTxid + " was not less than " + + "new txid " + batch.getTxid(), prevTxid < batch.getTxid()); + return batch.getTxid(); } /** @@ -64,7 +71,7 @@ public class TestDFSInotifyEventInputStream { */ @Test public void testOpcodeCount() { - Assert.assertTrue(FSEditLogOpCodes.values().length == 47); + Assert.assertEquals(47, FSEditLogOpCodes.values().length); } @@ -127,30 +134,36 @@ public class TestDFSInotifyEventInputStream { "user::rwx,user:foo:rw-,group::r--,other::---", true)); client.removeAcl("/file5"); // SetAclOp -> MetadataUpdateEvent - Event next = null; + EventBatch batch = null; // RenameOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.RENAME); - Event.RenameEvent re = (Event.RenameEvent) next; - Assert.assertTrue(re.getDstPath().equals("/file4")); - Assert.assertTrue(re.getSrcPath().equals("/file")); + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + long txid = batch.getTxid(); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME); + Event.RenameEvent re = (Event.RenameEvent) batch.getEvents()[0]; + Assert.assertEquals("/file4", re.getDstPath()); + Assert.assertEquals("/file", re.getSrcPath()); Assert.assertTrue(re.getTimestamp() > 0); - long eventsBehind = eis.getEventsBehindEstimate(); + long eventsBehind = eis.getTxidsBehindEstimate(); // RenameOldOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.RENAME); - Event.RenameEvent re2 = (Event.RenameEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME); + Event.RenameEvent re2 = (Event.RenameEvent) batch.getEvents()[0]; Assert.assertTrue(re2.getDstPath().equals("/file2")); Assert.assertTrue(re2.getSrcPath().equals("/file4")); Assert.assertTrue(re.getTimestamp() > 0); // AddOp with overwrite - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); - Event.CreateEvent ce = (Event.CreateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); + Event.CreateEvent ce = (Event.CreateEvent) batch.getEvents()[0]; Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE); Assert.assertTrue(ce.getPath().equals("/file2")); Assert.assertTrue(ce.getCtime() > 0); @@ -159,66 +172,80 @@ public class TestDFSInotifyEventInputStream { Assert.assertTrue(ce.getOverwrite()); // CloseOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE); - Event.CloseEvent ce2 = (Event.CloseEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE); + Event.CloseEvent ce2 = (Event.CloseEvent) batch.getEvents()[0]; Assert.assertTrue(ce2.getPath().equals("/file2")); Assert.assertTrue(ce2.getFileSize() > 0); Assert.assertTrue(ce2.getTimestamp() > 0); // AddOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.APPEND); - Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2")); + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND); + Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2")); // CloseOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE); - Assert.assertTrue(((Event.CloseEvent) next).getPath().equals("/file2")); + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE); + Assert.assertTrue(((Event.CloseEvent) batch.getEvents()[0]).getPath().equals("/file2")); // TimesOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue.getPath().equals("/file2")); Assert.assertTrue(mue.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.TIMES); // SetReplicationOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue2.getPath().equals("/file2")); Assert.assertTrue(mue2.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.REPLICATION); Assert.assertTrue(mue2.getReplication() == 1); // ConcatDeleteOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.APPEND); - Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2")); - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK); - Event.UnlinkEvent ue2 = (Event.UnlinkEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(3, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND); + Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2")); + Assert.assertTrue(batch.getEvents()[1].getEventType() == Event.EventType.UNLINK); + Event.UnlinkEvent ue2 = (Event.UnlinkEvent) batch.getEvents()[1]; Assert.assertTrue(ue2.getPath().equals("/file3")); Assert.assertTrue(ue2.getTimestamp() > 0); - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE); - Event.CloseEvent ce3 = (Event.CloseEvent) next; + Assert.assertTrue(batch.getEvents()[2].getEventType() == Event.EventType.CLOSE); + Event.CloseEvent ce3 = (Event.CloseEvent) batch.getEvents()[2]; Assert.assertTrue(ce3.getPath().equals("/file2")); Assert.assertTrue(ce3.getTimestamp() > 0); // DeleteOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK); - Event.UnlinkEvent ue = (Event.UnlinkEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.UNLINK); + Event.UnlinkEvent ue = (Event.UnlinkEvent) batch.getEvents()[0]; Assert.assertTrue(ue.getPath().equals("/file2")); Assert.assertTrue(ue.getTimestamp() > 0); // MkdirOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); - Event.CreateEvent ce4 = (Event.CreateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); + Event.CreateEvent ce4 = (Event.CreateEvent) batch.getEvents()[0]; Assert.assertTrue(ce4.getiNodeType() == Event.CreateEvent.INodeType.DIRECTORY); Assert.assertTrue(ce4.getPath().equals("/dir")); @@ -227,18 +254,22 @@ public class TestDFSInotifyEventInputStream { Assert.assertTrue(ce4.getSymlinkTarget() == null); // SetPermissionsOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue3.getPath().equals("/dir")); Assert.assertTrue(mue3.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.PERMS); Assert.assertTrue(mue3.getPerms().toString().contains("rw-rw-rw-")); // SetOwnerOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue4.getPath().equals("/dir")); Assert.assertTrue(mue4.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.OWNER); @@ -246,9 +277,11 @@ public class TestDFSInotifyEventInputStream { Assert.assertTrue(mue4.getGroupName().equals("groupname")); // SymlinkOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); - Event.CreateEvent ce5 = (Event.CreateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); + Event.CreateEvent ce5 = (Event.CreateEvent) batch.getEvents()[0]; Assert.assertTrue(ce5.getiNodeType() == Event.CreateEvent.INodeType.SYMLINK); Assert.assertTrue(ce5.getPath().equals("/dir2")); @@ -257,9 +290,11 @@ public class TestDFSInotifyEventInputStream { Assert.assertTrue(ce5.getSymlinkTarget().equals("/dir")); // SetXAttrOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue5.getPath().equals("/file5")); Assert.assertTrue(mue5.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.XATTRS); @@ -268,9 +303,11 @@ public class TestDFSInotifyEventInputStream { Assert.assertTrue(!mue5.isxAttrsRemoved()); // RemoveXAttrOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue6.getPath().equals("/file5")); Assert.assertTrue(mue6.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.XATTRS); @@ -279,9 +316,11 @@ public class TestDFSInotifyEventInputStream { Assert.assertTrue(mue6.isxAttrsRemoved()); // SetAclOp (1) - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue7.getPath().equals("/file5")); Assert.assertTrue(mue7.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.ACLS); @@ -289,9 +328,11 @@ public class TestDFSInotifyEventInputStream { AclEntry.parseAclEntry("user::rwx", true))); // SetAclOp (2) - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue8.getPath().equals("/file5")); Assert.assertTrue(mue8.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.ACLS); @@ -305,7 +346,7 @@ public class TestDFSInotifyEventInputStream { // and we should not have been behind at all when eventsBehind was set // either, since there were few enough events that they should have all // been read to the client during the first poll() call - Assert.assertTrue(eis.getEventsBehindEstimate() == eventsBehind); + Assert.assertTrue(eis.getTxidsBehindEstimate() == eventsBehind); } finally { cluster.shutdown(); @@ -329,13 +370,14 @@ public class TestDFSInotifyEventInputStream { } cluster.getDfsCluster().shutdownNameNode(0); cluster.getDfsCluster().transitionToActive(1); - Event next = null; + EventBatch batch = null; // we can read all of the edits logged by the old active from the new // active for (int i = 0; i < 10; i++) { - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); - Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" + + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); + Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" + i)); } Assert.assertTrue(eis.poll() == null); @@ -369,11 +411,12 @@ public class TestDFSInotifyEventInputStream { // make sure that the old active can't read any further than the edits // it logged itself (it has no idea whether the in-progress edits from // the other writer have actually been committed) - Event next = null; + EventBatch batch = null; for (int i = 0; i < 10; i++) { - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); - Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" + + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); + Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" + i)); } Assert.assertTrue(eis.poll() == null); @@ -414,13 +457,13 @@ public class TestDFSInotifyEventInputStream { }, 1, TimeUnit.SECONDS); // a very generous wait period -- the edit will definitely have been // processed by the time this is up - Event next = eis.poll(5, TimeUnit.SECONDS); - Assert.assertTrue(next != null); - Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); - Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir")); + EventBatch batch = eis.poll(5, TimeUnit.SECONDS); + Assert.assertNotNull(batch); + Assert.assertEquals(1, batch.getEvents().length); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); + Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath()); } finally { cluster.shutdown(); } } - }