HDFS-7446. HDFS inotify should have the ability to determine what txid it has read up to (cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2014-11-25 17:44:34 -08:00
parent 185e0c7b4c
commit 75a326aaff
11 changed files with 467 additions and 339 deletions

View File

@ -419,6 +419,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7462. Consolidate implementation of mkdirs() into a single class. HDFS-7462. Consolidate implementation of mkdirs() into a single class.
(wheat9) (wheat9)
HDFS-7446. HDFS inotify should have the ability to determine what txid it
has read up to (cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -19,11 +19,10 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.UncheckedExecutionException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.inotify.Event; import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.EventsList; import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.inotify.MissingEventsException; import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -33,13 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** /**
* Stream for reading inotify events. DFSInotifyEventInputStreams should not * Stream for reading inotify events. DFSInotifyEventInputStreams should not
@ -52,7 +45,7 @@ public class DFSInotifyEventInputStream {
.class); .class);
private final ClientProtocol namenode; private final ClientProtocol namenode;
private Iterator<Event> it; private Iterator<EventBatch> it;
private long lastReadTxid; private long lastReadTxid;
/** /**
* The most recent txid the NameNode told us it has sync'ed -- helps us * The most recent txid the NameNode told us it has sync'ed -- helps us
@ -78,22 +71,22 @@ public class DFSInotifyEventInputStream {
} }
/** /**
* Returns the next event in the stream or null if no new events are currently * Returns the next batch of events in the stream or null if no new
* available. * batches are currently available.
* *
* @throws IOException because of network error or edit log * @throws IOException because of network error or edit log
* corruption. Also possible if JournalNodes are unresponsive in the * corruption. Also possible if JournalNodes are unresponsive in the
* QJM setting (even one unresponsive JournalNode is enough in rare cases), * QJM setting (even one unresponsive JournalNode is enough in rare cases),
* so catching this exception and retrying at least a few times is * so catching this exception and retrying at least a few times is
* recommended. * recommended.
* @throws MissingEventsException if we cannot return the next event in the * @throws MissingEventsException if we cannot return the next batch in the
* stream because the data for the event (and possibly some subsequent events) * stream because the data for the events (and possibly some subsequent
* has been deleted (generally because this stream is a very large number of * events) has been deleted (generally because this stream is a very large
* events behind the current state of the NameNode). It is safe to continue * number of transactions behind the current state of the NameNode). It is
* reading from the stream after this exception is thrown -- the next * safe to continue reading from the stream after this exception is thrown
* available event will be returned. * The next available batch of events will be returned.
*/ */
public Event poll() throws IOException, MissingEventsException { public EventBatch poll() throws IOException, MissingEventsException {
// need to keep retrying until the NN sends us the latest committed txid // need to keep retrying until the NN sends us the latest committed txid
if (lastReadTxid == -1) { if (lastReadTxid == -1) {
LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
@ -101,14 +94,14 @@ public class DFSInotifyEventInputStream {
return null; return null;
} }
if (!it.hasNext()) { if (!it.hasNext()) {
EventsList el = namenode.getEditsFromTxid(lastReadTxid + 1); EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
if (el.getLastTxid() != -1) { if (el.getLastTxid() != -1) {
// we only want to set syncTxid when we were actually able to read some // we only want to set syncTxid when we were actually able to read some
// edits on the NN -- otherwise it will seem like edits are being // edits on the NN -- otherwise it will seem like edits are being
// generated faster than we can read them when the problem is really // generated faster than we can read them when the problem is really
// that we are temporarily unable to read edits // that we are temporarily unable to read edits
syncTxid = el.getSyncTxid(); syncTxid = el.getSyncTxid();
it = el.getEvents().iterator(); it = el.getBatches().iterator();
long formerLastReadTxid = lastReadTxid; long formerLastReadTxid = lastReadTxid;
lastReadTxid = el.getLastTxid(); lastReadTxid = el.getLastTxid();
if (el.getFirstTxid() != formerLastReadTxid + 1) { if (el.getFirstTxid() != formerLastReadTxid + 1) {
@ -131,18 +124,18 @@ public class DFSInotifyEventInputStream {
} }
/** /**
* Return a estimate of how many events behind the NameNode's current state * Return a estimate of how many transaction IDs behind the NameNode's
* this stream is. Clients should periodically call this method and check if * current state this stream is. Clients should periodically call this method
* its result is steadily increasing, which indicates that they are falling * and check if its result is steadily increasing, which indicates that they
* behind (i.e. events are being generated faster than the client is reading * are falling behind (i.e. transaction are being generated faster than the
* them). If a client falls too far behind events may be deleted before the * client is reading them). If a client falls too far behind events may be
* client can read them. * deleted before the client can read them.
* <p/> * <p/>
* A return value of -1 indicates that an estimate could not be produced, and * A return value of -1 indicates that an estimate could not be produced, and
* should be ignored. The value returned by this method is really only useful * should be ignored. The value returned by this method is really only useful
* when compared to previous or subsequent returned values. * when compared to previous or subsequent returned values.
*/ */
public long getEventsBehindEstimate() { public long getTxidsBehindEstimate() {
if (syncTxid == 0) { if (syncTxid == 0) {
return -1; return -1;
} else { } else {
@ -155,8 +148,8 @@ public class DFSInotifyEventInputStream {
} }
/** /**
* Returns the next event in the stream, waiting up to the specified amount of * Returns the next event batch in the stream, waiting up to the specified
* time for a new event. Returns null if a new event is not available at the * amount of time for a new batch. Returns null if one is not available at the
* end of the specified amount of time. The time before the method returns may * end of the specified amount of time. The time before the method returns may
* exceed the specified amount of time by up to the time required for an RPC * exceed the specified amount of time by up to the time required for an RPC
* to the NameNode. * to the NameNode.
@ -168,12 +161,12 @@ public class DFSInotifyEventInputStream {
* see {@link DFSInotifyEventInputStream#poll()} * see {@link DFSInotifyEventInputStream#poll()}
* @throws InterruptedException if the calling thread is interrupted * @throws InterruptedException if the calling thread is interrupted
*/ */
public Event poll(long time, TimeUnit tu) throws IOException, public EventBatch poll(long time, TimeUnit tu) throws IOException,
InterruptedException, MissingEventsException { InterruptedException, MissingEventsException {
long initialTime = Time.monotonicNow(); long initialTime = Time.monotonicNow();
long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
long nextWait = INITIAL_WAIT_MS; long nextWait = INITIAL_WAIT_MS;
Event next = null; EventBatch next = null;
while ((next = poll()) == null) { while ((next = poll()) == null) {
long timeLeft = totalWait - (Time.monotonicNow() - initialTime); long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
if (timeLeft <= 0) { if (timeLeft <= 0) {
@ -193,17 +186,17 @@ public class DFSInotifyEventInputStream {
} }
/** /**
* Returns the next event in the stream, waiting indefinitely if a new event * Returns the next batch of events in the stream, waiting indefinitely if
* is not immediately available. * a new batch is not immediately available.
* *
* @throws IOException see {@link DFSInotifyEventInputStream#poll()} * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
* @throws MissingEventsException see * @throws MissingEventsException see
* {@link DFSInotifyEventInputStream#poll()} * {@link DFSInotifyEventInputStream#poll()}
* @throws InterruptedException if the calling thread is interrupted * @throws InterruptedException if the calling thread is interrupted
*/ */
public Event take() throws IOException, InterruptedException, public EventBatch take() throws IOException, InterruptedException,
MissingEventsException { MissingEventsException {
Event next = null; EventBatch next = null;
int nextWaitMin = INITIAL_WAIT_MS; int nextWaitMin = INITIAL_WAIT_MS;
while ((next = poll()) == null) { while ((next = poll()) == null) {
// sleep for a random period between nextWaitMin and nextWaitMin * 2 // sleep for a random period between nextWaitMin and nextWaitMin * 2

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.inotify;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A batch of events that all happened on the same transaction ID.
*/
@InterfaceAudience.Public
public class EventBatch {
private final long txid;
private final Event[] events;
public EventBatch(long txid, Event[] events) {
this.txid = txid;
this.events = events;
}
public long getTxid() {
return txid;
}
public Event[] getEvents() { return events; }
}

View File

@ -23,30 +23,30 @@ import org.apache.hadoop.classification.InterfaceAudience;
import java.util.List; import java.util.List;
/** /**
* Contains a set of events, the transaction ID in the edit log up to which we * Contains a list of event batches, the transaction ID in the edit log up to
* read to produce these events, and the first txid we observed when producing * which we read to produce these events, and the first txid we observed when
* these events (the last of which is for the purpose of determining whether we * producing these events (the last of which is for the purpose of determining
* have missed events due to edit deletion). Also contains the most recent txid * whether we have missed events due to edit deletion). Also contains the most
* that the NameNode has sync'ed, so the client can determine how far behind in * recent txid that the NameNode has sync'ed, so the client can determine how
* the edit log it is. * far behind in the edit log it is.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class EventsList { public class EventBatchList {
private List<Event> events; private List<EventBatch> batches;
private long firstTxid; private long firstTxid;
private long lastTxid; private long lastTxid;
private long syncTxid; private long syncTxid;
public EventsList(List<Event> events, long firstTxid, long lastTxid, public EventBatchList(List<EventBatch> batches, long firstTxid,
long syncTxid) { long lastTxid, long syncTxid) {
this.events = events; this.batches = batches;
this.firstTxid = firstTxid; this.firstTxid = firstTxid;
this.lastTxid = lastTxid; this.lastTxid = lastTxid;
this.syncTxid = syncTxid; this.syncTxid = syncTxid;
} }
public List<Event> getEvents() { public List<EventBatch> getBatches() {
return events; return batches;
} }
public long getFirstTxid() { public long getFirstTxid() {

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.inotify.EventsList; import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -1408,9 +1408,9 @@ public interface ClientProtocol {
public long getCurrentEditLogTxid() throws IOException; public long getCurrentEditLogTxid() throws IOException;
/** /**
* Get an ordered list of events corresponding to the edit log transactions * Get an ordered list of batches of events corresponding to the edit log
* from txid onwards. * transactions for txids equal to or greater than txid.
*/ */
@Idempotent @Idempotent
public EventsList getEditsFromTxid(long txid) throws IOException; public EventBatchList getEditsFromTxid(long txid) throws IOException;
} }

View File

@ -45,7 +45,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.inotify.EventsList; import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@ -1485,7 +1485,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
} }
@Override @Override
public EventsList getEditsFromTxid(long txid) throws IOException { public EventBatchList getEditsFromTxid(long txid) throws IOException {
GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder() GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder()
.setTxid(txid).build(); .setTxid(txid).build();
try { try {

View File

@ -46,11 +46,12 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.inotify.Event; import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventsList; import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@ -2517,173 +2518,197 @@ public class PBHelper {
} }
} }
public static EventsList convert(GetEditsFromTxidResponseProto resp) throws public static EventBatchList convert(GetEditsFromTxidResponseProto resp) throws
IOException { IOException {
List<Event> events = Lists.newArrayList(); final InotifyProtos.EventsListProto list = resp.getEventsList();
for (InotifyProtos.EventProto p : resp.getEventsList().getEventsList()) { final long firstTxid = list.getFirstTxid();
switch(p.getType()) { final long lastTxid = list.getLastTxid();
case EVENT_CLOSE:
InotifyProtos.CloseEventProto close = List<EventBatch> batches = Lists.newArrayList();
InotifyProtos.CloseEventProto.parseFrom(p.getContents()); if (list.getEventsList().size() > 0) {
events.add(new Event.CloseEvent(close.getPath(), close.getFileSize(), throw new IOException("Can't handle old inotify server response.");
close.getTimestamp()));
break;
case EVENT_CREATE:
InotifyProtos.CreateEventProto create =
InotifyProtos.CreateEventProto.parseFrom(p.getContents());
events.add(new Event.CreateEvent.Builder()
.iNodeType(createTypeConvert(create.getType()))
.path(create.getPath())
.ctime(create.getCtime())
.ownerName(create.getOwnerName())
.groupName(create.getGroupName())
.perms(convert(create.getPerms()))
.replication(create.getReplication())
.symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
create.getSymlinkTarget())
.overwrite(create.getOverwrite()).build());
break;
case EVENT_METADATA:
InotifyProtos.MetadataUpdateEventProto meta =
InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents());
events.add(new Event.MetadataUpdateEvent.Builder()
.path(meta.getPath())
.metadataType(metadataUpdateTypeConvert(meta.getType()))
.mtime(meta.getMtime())
.atime(meta.getAtime())
.replication(meta.getReplication())
.ownerName(
meta.getOwnerName().isEmpty() ? null : meta.getOwnerName())
.groupName(
meta.getGroupName().isEmpty() ? null : meta.getGroupName())
.perms(meta.hasPerms() ? convert(meta.getPerms()) : null)
.acls(meta.getAclsList().isEmpty() ? null : convertAclEntry(
meta.getAclsList()))
.xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs(
meta.getXAttrsList()))
.xAttrsRemoved(meta.getXAttrsRemoved())
.build());
break;
case EVENT_RENAME:
InotifyProtos.RenameEventProto rename =
InotifyProtos.RenameEventProto.parseFrom(p.getContents());
events.add(new Event.RenameEvent(rename.getSrcPath(), rename.getDestPath(),
rename.getTimestamp()));
break;
case EVENT_APPEND:
InotifyProtos.AppendEventProto reopen =
InotifyProtos.AppendEventProto.parseFrom(p.getContents());
events.add(new Event.AppendEvent(reopen.getPath()));
break;
case EVENT_UNLINK:
InotifyProtos.UnlinkEventProto unlink =
InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
events.add(new Event.UnlinkEvent(unlink.getPath(), unlink.getTimestamp()));
break;
default:
throw new RuntimeException("Unexpected inotify event type: " +
p.getType());
}
} }
return new EventsList(events, resp.getEventsList().getFirstTxid(), for (InotifyProtos.EventBatchProto bp : list.getBatchList()) {
long txid = bp.getTxid();
if ((txid != -1) && ((txid < firstTxid) || (txid > lastTxid))) {
throw new IOException("Error converting TxidResponseProto: got a " +
"transaction id " + txid + " that was outside the range of [" +
firstTxid + ", " + lastTxid + "].");
}
List<Event> events = Lists.newArrayList();
for (InotifyProtos.EventProto p : bp.getEventsList()) {
switch (p.getType()) {
case EVENT_CLOSE:
InotifyProtos.CloseEventProto close =
InotifyProtos.CloseEventProto.parseFrom(p.getContents());
events.add(new Event.CloseEvent(close.getPath(),
close.getFileSize(), close.getTimestamp()));
break;
case EVENT_CREATE:
InotifyProtos.CreateEventProto create =
InotifyProtos.CreateEventProto.parseFrom(p.getContents());
events.add(new Event.CreateEvent.Builder()
.iNodeType(createTypeConvert(create.getType()))
.path(create.getPath())
.ctime(create.getCtime())
.ownerName(create.getOwnerName())
.groupName(create.getGroupName())
.perms(convert(create.getPerms()))
.replication(create.getReplication())
.symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
create.getSymlinkTarget())
.overwrite(create.getOverwrite()).build());
break;
case EVENT_METADATA:
InotifyProtos.MetadataUpdateEventProto meta =
InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents());
events.add(new Event.MetadataUpdateEvent.Builder()
.path(meta.getPath())
.metadataType(metadataUpdateTypeConvert(meta.getType()))
.mtime(meta.getMtime())
.atime(meta.getAtime())
.replication(meta.getReplication())
.ownerName(
meta.getOwnerName().isEmpty() ? null : meta.getOwnerName())
.groupName(
meta.getGroupName().isEmpty() ? null : meta.getGroupName())
.perms(meta.hasPerms() ? convert(meta.getPerms()) : null)
.acls(meta.getAclsList().isEmpty() ? null : convertAclEntry(
meta.getAclsList()))
.xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs(
meta.getXAttrsList()))
.xAttrsRemoved(meta.getXAttrsRemoved())
.build());
break;
case EVENT_RENAME:
InotifyProtos.RenameEventProto rename =
InotifyProtos.RenameEventProto.parseFrom(p.getContents());
events.add(new Event.RenameEvent(rename.getSrcPath(),
rename.getDestPath(), rename.getTimestamp()));
break;
case EVENT_APPEND:
InotifyProtos.AppendEventProto reopen =
InotifyProtos.AppendEventProto.parseFrom(p.getContents());
events.add(new Event.AppendEvent(reopen.getPath()));
break;
case EVENT_UNLINK:
InotifyProtos.UnlinkEventProto unlink =
InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
events.add(new Event.UnlinkEvent(unlink.getPath(),
unlink.getTimestamp()));
break;
default:
throw new RuntimeException("Unexpected inotify event type: " +
p.getType());
}
}
batches.add(new EventBatch(txid, events.toArray(new Event[0])));
}
return new EventBatchList(batches, resp.getEventsList().getFirstTxid(),
resp.getEventsList().getLastTxid(), resp.getEventsList().getSyncTxid()); resp.getEventsList().getLastTxid(), resp.getEventsList().getSyncTxid());
} }
public static GetEditsFromTxidResponseProto convertEditsResponse(EventsList el) { public static GetEditsFromTxidResponseProto convertEditsResponse(EventBatchList el) {
InotifyProtos.EventsListProto.Builder builder = InotifyProtos.EventsListProto.Builder builder =
InotifyProtos.EventsListProto.newBuilder(); InotifyProtos.EventsListProto.newBuilder();
for (Event e : el.getEvents()) { for (EventBatch b : el.getBatches()) {
switch(e.getEventType()) { List<InotifyProtos.EventProto> events = Lists.newArrayList();
case CLOSE: for (Event e : b.getEvents()) {
Event.CloseEvent ce = (Event.CloseEvent) e; switch (e.getEventType()) {
builder.addEvents(InotifyProtos.EventProto.newBuilder() case CLOSE:
.setType(InotifyProtos.EventType.EVENT_CLOSE) Event.CloseEvent ce = (Event.CloseEvent) e;
.setContents( events.add(InotifyProtos.EventProto.newBuilder()
InotifyProtos.CloseEventProto.newBuilder() .setType(InotifyProtos.EventType.EVENT_CLOSE)
.setPath(ce.getPath()) .setContents(
.setFileSize(ce.getFileSize()) InotifyProtos.CloseEventProto.newBuilder()
.setTimestamp(ce.getTimestamp()).build().toByteString() .setPath(ce.getPath())
).build()); .setFileSize(ce.getFileSize())
break; .setTimestamp(ce.getTimestamp()).build().toByteString()
case CREATE: ).build());
Event.CreateEvent ce2 = (Event.CreateEvent) e; break;
builder.addEvents(InotifyProtos.EventProto.newBuilder() case CREATE:
.setType(InotifyProtos.EventType.EVENT_CREATE) Event.CreateEvent ce2 = (Event.CreateEvent) e;
.setContents( events.add(InotifyProtos.EventProto.newBuilder()
InotifyProtos.CreateEventProto.newBuilder() .setType(InotifyProtos.EventType.EVENT_CREATE)
.setType(createTypeConvert(ce2.getiNodeType())) .setContents(
.setPath(ce2.getPath()) InotifyProtos.CreateEventProto.newBuilder()
.setCtime(ce2.getCtime()) .setType(createTypeConvert(ce2.getiNodeType()))
.setOwnerName(ce2.getOwnerName()) .setPath(ce2.getPath())
.setGroupName(ce2.getGroupName()) .setCtime(ce2.getCtime())
.setPerms(convert(ce2.getPerms())) .setOwnerName(ce2.getOwnerName())
.setReplication(ce2.getReplication()) .setGroupName(ce2.getGroupName())
.setSymlinkTarget(ce2.getSymlinkTarget() == null ? .setPerms(convert(ce2.getPerms()))
"" : ce2.getSymlinkTarget()) .setReplication(ce2.getReplication())
.setOverwrite(ce2.getOverwrite()).build().toByteString() .setSymlinkTarget(ce2.getSymlinkTarget() == null ?
).build()); "" : ce2.getSymlinkTarget())
break; .setOverwrite(ce2.getOverwrite()).build().toByteString()
case METADATA: ).build());
Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e; break;
InotifyProtos.MetadataUpdateEventProto.Builder metaB = case METADATA:
InotifyProtos.MetadataUpdateEventProto.newBuilder() Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;
.setPath(me.getPath()) InotifyProtos.MetadataUpdateEventProto.Builder metaB =
.setType(metadataUpdateTypeConvert(me.getMetadataType())) InotifyProtos.MetadataUpdateEventProto.newBuilder()
.setMtime(me.getMtime()) .setPath(me.getPath())
.setAtime(me.getAtime()) .setType(metadataUpdateTypeConvert(me.getMetadataType()))
.setReplication(me.getReplication()) .setMtime(me.getMtime())
.setOwnerName(me.getOwnerName() == null ? "" : .setAtime(me.getAtime())
me.getOwnerName()) .setReplication(me.getReplication())
.setGroupName(me.getGroupName() == null ? "" : .setOwnerName(me.getOwnerName() == null ? "" :
me.getGroupName()) me.getOwnerName())
.addAllAcls(me.getAcls() == null ? .setGroupName(me.getGroupName() == null ? "" :
Lists.<AclEntryProto>newArrayList() : me.getGroupName())
convertAclEntryProto(me.getAcls())) .addAllAcls(me.getAcls() == null ?
.addAllXAttrs(me.getxAttrs() == null ? Lists.<AclEntryProto>newArrayList() :
Lists.<XAttrProto>newArrayList() : convertAclEntryProto(me.getAcls()))
convertXAttrProto(me.getxAttrs())) .addAllXAttrs(me.getxAttrs() == null ?
.setXAttrsRemoved(me.isxAttrsRemoved()); Lists.<XAttrProto>newArrayList() :
if (me.getPerms() != null) { convertXAttrProto(me.getxAttrs()))
metaB.setPerms(convert(me.getPerms())); .setXAttrsRemoved(me.isxAttrsRemoved());
if (me.getPerms() != null) {
metaB.setPerms(convert(me.getPerms()));
}
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_METADATA)
.setContents(metaB.build().toByteString())
.build());
break;
case RENAME:
Event.RenameEvent re = (Event.RenameEvent) e;
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_RENAME)
.setContents(
InotifyProtos.RenameEventProto.newBuilder()
.setSrcPath(re.getSrcPath())
.setDestPath(re.getDstPath())
.setTimestamp(re.getTimestamp()).build().toByteString()
).build());
break;
case APPEND:
Event.AppendEvent re2 = (Event.AppendEvent) e;
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_APPEND)
.setContents(
InotifyProtos.AppendEventProto.newBuilder()
.setPath(re2.getPath()).build().toByteString()
).build());
break;
case UNLINK:
Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_UNLINK)
.setContents(
InotifyProtos.UnlinkEventProto.newBuilder()
.setPath(ue.getPath())
.setTimestamp(ue.getTimestamp()).build().toByteString()
).build());
break;
default:
throw new RuntimeException("Unexpected inotify event: " + e);
} }
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_METADATA)
.setContents(metaB.build().toByteString())
.build());
break;
case RENAME:
Event.RenameEvent re = (Event.RenameEvent) e;
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_RENAME)
.setContents(
InotifyProtos.RenameEventProto.newBuilder()
.setSrcPath(re.getSrcPath())
.setDestPath(re.getDstPath())
.setTimestamp(re.getTimestamp()).build().toByteString()
).build());
break;
case APPEND:
Event.AppendEvent re2 = (Event.AppendEvent) e;
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_APPEND)
.setContents(
InotifyProtos.AppendEventProto.newBuilder()
.setPath(re2.getPath()).build().toByteString()
).build());
break;
case UNLINK:
Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_UNLINK)
.setContents(
InotifyProtos.UnlinkEventProto.newBuilder()
.setPath(ue.getPath())
.setTimestamp(ue.getTimestamp()).build().toByteString()
).build());
break;
default:
throw new RuntimeException("Unexpected inotify event: " + e);
} }
builder.addBatch(InotifyProtos.EventBatchProto.newBuilder().
setTxid(b.getTxid()).
addAllEvents(events));
} }
builder.setFirstTxid(el.getFirstTxid()); builder.setFirstTxid(el.getFirstTxid());
builder.setLastTxid(el.getLastTxid()); builder.setLastTxid(el.getLastTxid());

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.inotify.Event; import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import java.util.List; import java.util.List;
@ -39,32 +40,35 @@ public class InotifyFSEditLogOpTranslator {
return size; return size;
} }
public static Event[] translate(FSEditLogOp op) { public static EventBatch translate(FSEditLogOp op) {
switch(op.opCode) { switch(op.opCode) {
case OP_ADD: case OP_ADD:
FSEditLogOp.AddOp addOp = (FSEditLogOp.AddOp) op; FSEditLogOp.AddOp addOp = (FSEditLogOp.AddOp) op;
if (addOp.blocks.length == 0) { // create if (addOp.blocks.length == 0) { // create
return new Event[] { new Event.CreateEvent.Builder().path(addOp.path) return new EventBatch(op.txid,
new Event[] { new Event.CreateEvent.Builder().path(addOp.path)
.ctime(addOp.atime) .ctime(addOp.atime)
.replication(addOp.replication) .replication(addOp.replication)
.ownerName(addOp.permissions.getUserName()) .ownerName(addOp.permissions.getUserName())
.groupName(addOp.permissions.getGroupName()) .groupName(addOp.permissions.getGroupName())
.perms(addOp.permissions.getPermission()) .perms(addOp.permissions.getPermission())
.overwrite(addOp.overwrite) .overwrite(addOp.overwrite)
.iNodeType(Event.CreateEvent.INodeType.FILE).build() }; .iNodeType(Event.CreateEvent.INodeType.FILE).build() });
} else { } else {
return new Event[] { new Event.AppendEvent(addOp.path) }; return new EventBatch(op.txid,
new Event[] { new Event.AppendEvent(addOp.path) });
} }
case OP_CLOSE: case OP_CLOSE:
FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op; FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op;
return new Event[] { return new EventBatch(op.txid, new Event[] {
new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) }; new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) });
case OP_SET_REPLICATION: case OP_SET_REPLICATION:
FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op; FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder() return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.REPLICATION) .metadataType(Event.MetadataUpdateEvent.MetadataType.REPLICATION)
.path(setRepOp.path) .path(setRepOp.path)
.replication(setRepOp.replication).build() }; .replication(setRepOp.replication).build() });
case OP_CONCAT_DELETE: case OP_CONCAT_DELETE:
FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op; FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op;
List<Event> events = Lists.newArrayList(); List<Event> events = Lists.newArrayList();
@ -73,73 +77,83 @@ public class InotifyFSEditLogOpTranslator {
events.add(new Event.UnlinkEvent(src, cdOp.timestamp)); events.add(new Event.UnlinkEvent(src, cdOp.timestamp));
} }
events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp)); events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp));
return events.toArray(new Event[0]); return new EventBatch(op.txid, events.toArray(new Event[0]));
case OP_RENAME_OLD: case OP_RENAME_OLD:
FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op; FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op;
return new Event[] { return new EventBatch(op.txid, new Event[] {
new Event.RenameEvent(rnOpOld.src, rnOpOld.dst, rnOpOld.timestamp) }; new Event.RenameEvent(rnOpOld.src,
rnOpOld.dst, rnOpOld.timestamp) });
case OP_RENAME: case OP_RENAME:
FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op; FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op;
return new Event[] { return new EventBatch(op.txid, new Event[] {
new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) }; new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) });
case OP_DELETE: case OP_DELETE:
FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op; FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op;
return new Event[] { new Event.UnlinkEvent(delOp.path, delOp.timestamp) }; return new EventBatch(op.txid, new Event[] {
new Event.UnlinkEvent(delOp.path, delOp.timestamp) });
case OP_MKDIR: case OP_MKDIR:
FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op; FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op;
return new Event[] { new Event.CreateEvent.Builder().path(mkOp.path) return new EventBatch(op.txid,
new Event[] { new Event.CreateEvent.Builder().path(mkOp.path)
.ctime(mkOp.timestamp) .ctime(mkOp.timestamp)
.ownerName(mkOp.permissions.getUserName()) .ownerName(mkOp.permissions.getUserName())
.groupName(mkOp.permissions.getGroupName()) .groupName(mkOp.permissions.getGroupName())
.perms(mkOp.permissions.getPermission()) .perms(mkOp.permissions.getPermission())
.iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() }; .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() });
case OP_SET_PERMISSIONS: case OP_SET_PERMISSIONS:
FSEditLogOp.SetPermissionsOp permOp = (FSEditLogOp.SetPermissionsOp) op; FSEditLogOp.SetPermissionsOp permOp = (FSEditLogOp.SetPermissionsOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder() return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.PERMS) .metadataType(Event.MetadataUpdateEvent.MetadataType.PERMS)
.path(permOp.src) .path(permOp.src)
.perms(permOp.permissions).build() }; .perms(permOp.permissions).build() });
case OP_SET_OWNER: case OP_SET_OWNER:
FSEditLogOp.SetOwnerOp ownOp = (FSEditLogOp.SetOwnerOp) op; FSEditLogOp.SetOwnerOp ownOp = (FSEditLogOp.SetOwnerOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder() return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.OWNER) .metadataType(Event.MetadataUpdateEvent.MetadataType.OWNER)
.path(ownOp.src) .path(ownOp.src)
.ownerName(ownOp.username).groupName(ownOp.groupname).build() }; .ownerName(ownOp.username).groupName(ownOp.groupname).build() });
case OP_TIMES: case OP_TIMES:
FSEditLogOp.TimesOp timesOp = (FSEditLogOp.TimesOp) op; FSEditLogOp.TimesOp timesOp = (FSEditLogOp.TimesOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder() return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.TIMES) .metadataType(Event.MetadataUpdateEvent.MetadataType.TIMES)
.path(timesOp.path) .path(timesOp.path)
.atime(timesOp.atime).mtime(timesOp.mtime).build() }; .atime(timesOp.atime).mtime(timesOp.mtime).build() });
case OP_SYMLINK: case OP_SYMLINK:
FSEditLogOp.SymlinkOp symOp = (FSEditLogOp.SymlinkOp) op; FSEditLogOp.SymlinkOp symOp = (FSEditLogOp.SymlinkOp) op;
return new Event[] { new Event.CreateEvent.Builder().path(symOp.path) return new EventBatch(op.txid,
new Event[] { new Event.CreateEvent.Builder().path(symOp.path)
.ctime(symOp.atime) .ctime(symOp.atime)
.ownerName(symOp.permissionStatus.getUserName()) .ownerName(symOp.permissionStatus.getUserName())
.groupName(symOp.permissionStatus.getGroupName()) .groupName(symOp.permissionStatus.getGroupName())
.perms(symOp.permissionStatus.getPermission()) .perms(symOp.permissionStatus.getPermission())
.symlinkTarget(symOp.value) .symlinkTarget(symOp.value)
.iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() }; .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() });
case OP_REMOVE_XATTR: case OP_REMOVE_XATTR:
FSEditLogOp.RemoveXAttrOp rxOp = (FSEditLogOp.RemoveXAttrOp) op; FSEditLogOp.RemoveXAttrOp rxOp = (FSEditLogOp.RemoveXAttrOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder() return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS) .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
.path(rxOp.src) .path(rxOp.src)
.xAttrs(rxOp.xAttrs) .xAttrs(rxOp.xAttrs)
.xAttrsRemoved(true).build() }; .xAttrsRemoved(true).build() });
case OP_SET_XATTR: case OP_SET_XATTR:
FSEditLogOp.SetXAttrOp sxOp = (FSEditLogOp.SetXAttrOp) op; FSEditLogOp.SetXAttrOp sxOp = (FSEditLogOp.SetXAttrOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder() return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS) .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
.path(sxOp.src) .path(sxOp.src)
.xAttrs(sxOp.xAttrs) .xAttrs(sxOp.xAttrs)
.xAttrsRemoved(false).build() }; .xAttrsRemoved(false).build() });
case OP_SET_ACL: case OP_SET_ACL:
FSEditLogOp.SetAclOp saOp = (FSEditLogOp.SetAclOp) op; FSEditLogOp.SetAclOp saOp = (FSEditLogOp.SetAclOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder() return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.ACLS) .metadataType(Event.MetadataUpdateEvent.MetadataType.ACLS)
.path(saOp.src) .path(saOp.src)
.acls(saOp.aclEntries).build() }; .acls(saOp.aclEntries).build() });
default: default:
return null; return null;
} }

View File

@ -69,7 +69,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.inotify.Event; import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventsList; import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -1794,7 +1795,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
} }
@Override // ClientProtocol @Override // ClientProtocol
public EventsList getEditsFromTxid(long txid) throws IOException { public EventBatchList getEditsFromTxid(long txid) throws IOException {
namesystem.checkOperation(OperationCategory.READ); // only active namesystem.checkOperation(OperationCategory.READ); // only active
namesystem.checkSuperuserPrivilege(); namesystem.checkSuperuserPrivilege();
int maxEventsPerRPC = nn.conf.getInt( int maxEventsPerRPC = nn.conf.getInt(
@ -1812,13 +1813,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
// guaranteed to have been written by this NameNode.) // guaranteed to have been written by this NameNode.)
boolean readInProgress = syncTxid > 0; boolean readInProgress = syncTxid > 0;
List<Event> events = Lists.newArrayList(); List<EventBatch> batches = Lists.newArrayList();
int totalEvents = 0;
long maxSeenTxid = -1; long maxSeenTxid = -1;
long firstSeenTxid = -1; long firstSeenTxid = -1;
if (syncTxid > 0 && txid > syncTxid) { if (syncTxid > 0 && txid > syncTxid) {
// we can't read past syncTxid, so there's no point in going any further // we can't read past syncTxid, so there's no point in going any further
return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid); return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
} }
Collection<EditLogInputStream> streams = null; Collection<EditLogInputStream> streams = null;
@ -1830,7 +1832,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
// will result // will result
LOG.info("NN is transitioning from active to standby and FSEditLog " + LOG.info("NN is transitioning from active to standby and FSEditLog " +
"is closed -- could not read edits"); "is closed -- could not read edits");
return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid); return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
} }
boolean breakOuter = false; boolean breakOuter = false;
@ -1848,9 +1850,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
break; break;
} }
Event[] eventsFromOp = InotifyFSEditLogOpTranslator.translate(op); EventBatch eventBatch = InotifyFSEditLogOpTranslator.translate(op);
if (eventsFromOp != null) { if (eventBatch != null) {
events.addAll(Arrays.asList(eventsFromOp)); batches.add(eventBatch);
totalEvents += eventBatch.getEvents().length;
} }
if (op.getTransactionId() > maxSeenTxid) { if (op.getTransactionId() > maxSeenTxid) {
maxSeenTxid = op.getTransactionId(); maxSeenTxid = op.getTransactionId();
@ -1858,7 +1861,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
if (firstSeenTxid == -1) { if (firstSeenTxid == -1) {
firstSeenTxid = op.getTransactionId(); firstSeenTxid = op.getTransactionId();
} }
if (events.size() >= maxEventsPerRPC || (syncTxid > 0 && if (totalEvents >= maxEventsPerRPC || (syncTxid > 0 &&
op.getTransactionId() == syncTxid)) { op.getTransactionId() == syncTxid)) {
// we're done // we're done
breakOuter = true; breakOuter = true;
@ -1873,7 +1876,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
} }
} }
return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid); return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
} }
@Override @Override

View File

@ -48,6 +48,11 @@ message EventProto {
required bytes contents = 2; required bytes contents = 2;
} }
message EventBatchProto {
required int64 txid = 1;
repeated EventProto events = 2;
}
enum INodeType { enum INodeType {
I_TYPE_FILE = 0x0; I_TYPE_FILE = 0x0;
I_TYPE_DIRECTORY = 0x1; I_TYPE_DIRECTORY = 0x1;
@ -111,8 +116,9 @@ message UnlinkEventProto {
} }
message EventsListProto { message EventsListProto {
repeated EventProto events = 1; repeated EventProto events = 1; // deprecated
required int64 firstTxid = 2; required int64 firstTxid = 2;
required int64 lastTxid = 3; required int64 lastTxid = 3;
required int64 syncTxid = 4; required int64 syncTxid = 4;
} repeated EventBatchProto batch = 5;
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.inotify.Event; import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.MissingEventsException; import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
@ -49,11 +50,17 @@ public class TestDFSInotifyEventInputStream {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
TestDFSInotifyEventInputStream.class); TestDFSInotifyEventInputStream.class);
private static Event waitForNextEvent(DFSInotifyEventInputStream eis) private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
throws IOException, MissingEventsException { throws IOException, MissingEventsException {
Event next = null; EventBatch batch = null;
while ((next = eis.poll()) == null); while ((batch = eis.poll()) == null);
return next; return batch;
}
private static long checkTxid(EventBatch batch, long prevTxid){
Assert.assertTrue("Previous txid " + prevTxid + " was not less than " +
"new txid " + batch.getTxid(), prevTxid < batch.getTxid());
return batch.getTxid();
} }
/** /**
@ -64,7 +71,7 @@ public class TestDFSInotifyEventInputStream {
*/ */
@Test @Test
public void testOpcodeCount() { public void testOpcodeCount() {
Assert.assertTrue(FSEditLogOpCodes.values().length == 47); Assert.assertEquals(47, FSEditLogOpCodes.values().length);
} }
@ -127,30 +134,36 @@ public class TestDFSInotifyEventInputStream {
"user::rwx,user:foo:rw-,group::r--,other::---", true)); "user::rwx,user:foo:rw-,group::r--,other::---", true));
client.removeAcl("/file5"); // SetAclOp -> MetadataUpdateEvent client.removeAcl("/file5"); // SetAclOp -> MetadataUpdateEvent
Event next = null; EventBatch batch = null;
// RenameOp // RenameOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.RENAME); Assert.assertEquals(1, batch.getEvents().length);
Event.RenameEvent re = (Event.RenameEvent) next; long txid = batch.getTxid();
Assert.assertTrue(re.getDstPath().equals("/file4")); Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
Assert.assertTrue(re.getSrcPath().equals("/file")); Event.RenameEvent re = (Event.RenameEvent) batch.getEvents()[0];
Assert.assertEquals("/file4", re.getDstPath());
Assert.assertEquals("/file", re.getSrcPath());
Assert.assertTrue(re.getTimestamp() > 0); Assert.assertTrue(re.getTimestamp() > 0);
long eventsBehind = eis.getEventsBehindEstimate(); long eventsBehind = eis.getTxidsBehindEstimate();
// RenameOldOp // RenameOldOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.RENAME); Assert.assertEquals(1, batch.getEvents().length);
Event.RenameEvent re2 = (Event.RenameEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
Event.RenameEvent re2 = (Event.RenameEvent) batch.getEvents()[0];
Assert.assertTrue(re2.getDstPath().equals("/file2")); Assert.assertTrue(re2.getDstPath().equals("/file2"));
Assert.assertTrue(re2.getSrcPath().equals("/file4")); Assert.assertTrue(re2.getSrcPath().equals("/file4"));
Assert.assertTrue(re.getTimestamp() > 0); Assert.assertTrue(re.getTimestamp() > 0);
// AddOp with overwrite // AddOp with overwrite
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); Assert.assertEquals(1, batch.getEvents().length);
Event.CreateEvent ce = (Event.CreateEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
Event.CreateEvent ce = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE); Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
Assert.assertTrue(ce.getPath().equals("/file2")); Assert.assertTrue(ce.getPath().equals("/file2"));
Assert.assertTrue(ce.getCtime() > 0); Assert.assertTrue(ce.getCtime() > 0);
@ -159,66 +172,80 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce.getOverwrite()); Assert.assertTrue(ce.getOverwrite());
// CloseOp // CloseOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE); Assert.assertEquals(1, batch.getEvents().length);
Event.CloseEvent ce2 = (Event.CloseEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE);
Event.CloseEvent ce2 = (Event.CloseEvent) batch.getEvents()[0];
Assert.assertTrue(ce2.getPath().equals("/file2")); Assert.assertTrue(ce2.getPath().equals("/file2"));
Assert.assertTrue(ce2.getFileSize() > 0); Assert.assertTrue(ce2.getFileSize() > 0);
Assert.assertTrue(ce2.getTimestamp() > 0); Assert.assertTrue(ce2.getTimestamp() > 0);
// AddOp // AddOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.APPEND); Assert.assertEquals(1, batch.getEvents().length);
Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2")); txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2"));
// CloseOp // CloseOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE); Assert.assertEquals(1, batch.getEvents().length);
Assert.assertTrue(((Event.CloseEvent) next).getPath().equals("/file2")); txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE);
Assert.assertTrue(((Event.CloseEvent) batch.getEvents()[0]).getPath().equals("/file2"));
// TimesOp // TimesOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); Assert.assertEquals(1, batch.getEvents().length);
Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue.getPath().equals("/file2")); Assert.assertTrue(mue.getPath().equals("/file2"));
Assert.assertTrue(mue.getMetadataType() == Assert.assertTrue(mue.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.TIMES); Event.MetadataUpdateEvent.MetadataType.TIMES);
// SetReplicationOp // SetReplicationOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); Assert.assertEquals(1, batch.getEvents().length);
Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue2.getPath().equals("/file2")); Assert.assertTrue(mue2.getPath().equals("/file2"));
Assert.assertTrue(mue2.getMetadataType() == Assert.assertTrue(mue2.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.REPLICATION); Event.MetadataUpdateEvent.MetadataType.REPLICATION);
Assert.assertTrue(mue2.getReplication() == 1); Assert.assertTrue(mue2.getReplication() == 1);
// ConcatDeleteOp // ConcatDeleteOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.APPEND); Assert.assertEquals(3, batch.getEvents().length);
Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2")); txid = checkTxid(batch, txid);
next = waitForNextEvent(eis); Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK); Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2"));
Event.UnlinkEvent ue2 = (Event.UnlinkEvent) next; Assert.assertTrue(batch.getEvents()[1].getEventType() == Event.EventType.UNLINK);
Event.UnlinkEvent ue2 = (Event.UnlinkEvent) batch.getEvents()[1];
Assert.assertTrue(ue2.getPath().equals("/file3")); Assert.assertTrue(ue2.getPath().equals("/file3"));
Assert.assertTrue(ue2.getTimestamp() > 0); Assert.assertTrue(ue2.getTimestamp() > 0);
next = waitForNextEvent(eis); Assert.assertTrue(batch.getEvents()[2].getEventType() == Event.EventType.CLOSE);
Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE); Event.CloseEvent ce3 = (Event.CloseEvent) batch.getEvents()[2];
Event.CloseEvent ce3 = (Event.CloseEvent) next;
Assert.assertTrue(ce3.getPath().equals("/file2")); Assert.assertTrue(ce3.getPath().equals("/file2"));
Assert.assertTrue(ce3.getTimestamp() > 0); Assert.assertTrue(ce3.getTimestamp() > 0);
// DeleteOp // DeleteOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK); Assert.assertEquals(1, batch.getEvents().length);
Event.UnlinkEvent ue = (Event.UnlinkEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.UNLINK);
Event.UnlinkEvent ue = (Event.UnlinkEvent) batch.getEvents()[0];
Assert.assertTrue(ue.getPath().equals("/file2")); Assert.assertTrue(ue.getPath().equals("/file2"));
Assert.assertTrue(ue.getTimestamp() > 0); Assert.assertTrue(ue.getTimestamp() > 0);
// MkdirOp // MkdirOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); Assert.assertEquals(1, batch.getEvents().length);
Event.CreateEvent ce4 = (Event.CreateEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
Event.CreateEvent ce4 = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce4.getiNodeType() == Assert.assertTrue(ce4.getiNodeType() ==
Event.CreateEvent.INodeType.DIRECTORY); Event.CreateEvent.INodeType.DIRECTORY);
Assert.assertTrue(ce4.getPath().equals("/dir")); Assert.assertTrue(ce4.getPath().equals("/dir"));
@ -227,18 +254,22 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce4.getSymlinkTarget() == null); Assert.assertTrue(ce4.getSymlinkTarget() == null);
// SetPermissionsOp // SetPermissionsOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); Assert.assertEquals(1, batch.getEvents().length);
Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue3.getPath().equals("/dir")); Assert.assertTrue(mue3.getPath().equals("/dir"));
Assert.assertTrue(mue3.getMetadataType() == Assert.assertTrue(mue3.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.PERMS); Event.MetadataUpdateEvent.MetadataType.PERMS);
Assert.assertTrue(mue3.getPerms().toString().contains("rw-rw-rw-")); Assert.assertTrue(mue3.getPerms().toString().contains("rw-rw-rw-"));
// SetOwnerOp // SetOwnerOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); Assert.assertEquals(1, batch.getEvents().length);
Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue4.getPath().equals("/dir")); Assert.assertTrue(mue4.getPath().equals("/dir"));
Assert.assertTrue(mue4.getMetadataType() == Assert.assertTrue(mue4.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.OWNER); Event.MetadataUpdateEvent.MetadataType.OWNER);
@ -246,9 +277,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(mue4.getGroupName().equals("groupname")); Assert.assertTrue(mue4.getGroupName().equals("groupname"));
// SymlinkOp // SymlinkOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); Assert.assertEquals(1, batch.getEvents().length);
Event.CreateEvent ce5 = (Event.CreateEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
Event.CreateEvent ce5 = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce5.getiNodeType() == Assert.assertTrue(ce5.getiNodeType() ==
Event.CreateEvent.INodeType.SYMLINK); Event.CreateEvent.INodeType.SYMLINK);
Assert.assertTrue(ce5.getPath().equals("/dir2")); Assert.assertTrue(ce5.getPath().equals("/dir2"));
@ -257,9 +290,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce5.getSymlinkTarget().equals("/dir")); Assert.assertTrue(ce5.getSymlinkTarget().equals("/dir"));
// SetXAttrOp // SetXAttrOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); Assert.assertEquals(1, batch.getEvents().length);
Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue5.getPath().equals("/file5")); Assert.assertTrue(mue5.getPath().equals("/file5"));
Assert.assertTrue(mue5.getMetadataType() == Assert.assertTrue(mue5.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.XATTRS); Event.MetadataUpdateEvent.MetadataType.XATTRS);
@ -268,9 +303,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(!mue5.isxAttrsRemoved()); Assert.assertTrue(!mue5.isxAttrsRemoved());
// RemoveXAttrOp // RemoveXAttrOp
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); Assert.assertEquals(1, batch.getEvents().length);
Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue6.getPath().equals("/file5")); Assert.assertTrue(mue6.getPath().equals("/file5"));
Assert.assertTrue(mue6.getMetadataType() == Assert.assertTrue(mue6.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.XATTRS); Event.MetadataUpdateEvent.MetadataType.XATTRS);
@ -279,9 +316,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(mue6.isxAttrsRemoved()); Assert.assertTrue(mue6.isxAttrsRemoved());
// SetAclOp (1) // SetAclOp (1)
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); Assert.assertEquals(1, batch.getEvents().length);
Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue7.getPath().equals("/file5")); Assert.assertTrue(mue7.getPath().equals("/file5"));
Assert.assertTrue(mue7.getMetadataType() == Assert.assertTrue(mue7.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.ACLS); Event.MetadataUpdateEvent.MetadataType.ACLS);
@ -289,9 +328,11 @@ public class TestDFSInotifyEventInputStream {
AclEntry.parseAclEntry("user::rwx", true))); AclEntry.parseAclEntry("user::rwx", true)));
// SetAclOp (2) // SetAclOp (2)
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); Assert.assertEquals(1, batch.getEvents().length);
Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) next; txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue8.getPath().equals("/file5")); Assert.assertTrue(mue8.getPath().equals("/file5"));
Assert.assertTrue(mue8.getMetadataType() == Assert.assertTrue(mue8.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.ACLS); Event.MetadataUpdateEvent.MetadataType.ACLS);
@ -305,7 +346,7 @@ public class TestDFSInotifyEventInputStream {
// and we should not have been behind at all when eventsBehind was set // and we should not have been behind at all when eventsBehind was set
// either, since there were few enough events that they should have all // either, since there were few enough events that they should have all
// been read to the client during the first poll() call // been read to the client during the first poll() call
Assert.assertTrue(eis.getEventsBehindEstimate() == eventsBehind); Assert.assertTrue(eis.getTxidsBehindEstimate() == eventsBehind);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
@ -329,13 +370,14 @@ public class TestDFSInotifyEventInputStream {
} }
cluster.getDfsCluster().shutdownNameNode(0); cluster.getDfsCluster().shutdownNameNode(0);
cluster.getDfsCluster().transitionToActive(1); cluster.getDfsCluster().transitionToActive(1);
Event next = null; EventBatch batch = null;
// we can read all of the edits logged by the old active from the new // we can read all of the edits logged by the old active from the new
// active // active
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); Assert.assertEquals(1, batch.getEvents().length);
Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
i)); i));
} }
Assert.assertTrue(eis.poll() == null); Assert.assertTrue(eis.poll() == null);
@ -369,11 +411,12 @@ public class TestDFSInotifyEventInputStream {
// make sure that the old active can't read any further than the edits // make sure that the old active can't read any further than the edits
// it logged itself (it has no idea whether the in-progress edits from // it logged itself (it has no idea whether the in-progress edits from
// the other writer have actually been committed) // the other writer have actually been committed)
Event next = null; EventBatch batch = null;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
next = waitForNextEvent(eis); batch = waitForNextEvents(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); Assert.assertEquals(1, batch.getEvents().length);
Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
i)); i));
} }
Assert.assertTrue(eis.poll() == null); Assert.assertTrue(eis.poll() == null);
@ -414,13 +457,13 @@ public class TestDFSInotifyEventInputStream {
}, 1, TimeUnit.SECONDS); }, 1, TimeUnit.SECONDS);
// a very generous wait period -- the edit will definitely have been // a very generous wait period -- the edit will definitely have been
// processed by the time this is up // processed by the time this is up
Event next = eis.poll(5, TimeUnit.SECONDS); EventBatch batch = eis.poll(5, TimeUnit.SECONDS);
Assert.assertTrue(next != null); Assert.assertNotNull(batch);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); Assert.assertEquals(1, batch.getEvents().length);
Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir")); Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath());
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
} }
} }